Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
@basil0 basil0 tweaks 332e3f8 Feb 11, 2020
2 contributors

Users who have contributed to this file

@rmoff @basil0
588 lines (449 sloc) 17 KB

Streaming ETL demo - Enriching event stream data with CDC data from MySQL, stream into Elasticsearch

Kafka Connect / ksqlDB / Elasticsearch

This is designed to be run as a step-by-step demo. The ksql-statements.sql should match those run in this doc end-to-end and in theory you can just run the file, but I have not tested it. PRs welcome for a one-click script that just demonstrates the end-to-end running demo :)

Pre-reqs

Local:

  • curl

  • jq

  • Docker

Pre-Flight Setup

Start the environment

docker-compose up -d

Run ksqlDB CLI and MySQL CLI

Optionally, use something like screen or tmux to have these both easily to hand. Or multiple Terminal tabs. Whatever works for you :)

  • ksqlDB CLI:

    docker exec -it ksqldb-cli bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb-server:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb-server:8088'

Pre-flight checklist

Demo

Kafka Connect / ksqlDB / Elasticsearch

Part 01 - ksqlDB for filtering streams

Inspect topics

SHOW TOPICS;
ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 confluent_rmoff_02ksql_processing_log | 1          | 1
 ratings                               | 1          | 1
-------------------------------------------------------------------------

Inspect ratings & define stream

CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');

Select columns from live stream of data

SELECT STARS, CHANNEL, MESSAGE FROM RATINGS EMIT CHANGES;

Filter live stream of data

SELECT STARS, CHANNEL, MESSAGE FROM RATINGS WHERE STARS<3  EMIT CHANGES;

Create a derived stream

CREATE STREAM POOR_RATINGS AS
SELECT STARS, CHANNEL, MESSAGE FROM RATINGS WHERE STARS<3;

SELECT * FROM POOR_RATINGS EMIT CHANGES LIMIT 5;

DESCRIBE EXTENDED POOR_RATINGS;

Optionally, bring up a second ksqlDB prompt and show live ratings / live filtered ratings:

-- Live stream of ratings data
SET 'auto.offset.reset' = 'latest';
PRINT 'ratings';

-- You can use SELECT too, but PRINT makes it clearer that it's coming from a topic
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIMESTAMP, STARS, CHANNEL FROM RATINGS EMIT CHANGES;
-- Just the ratings with star rating less than 3:
SET 'auto.offset.reset' = 'latest';
PRINT 'POOR_RATINGS';

-- You can use SELECT too, but PRINT makes it clearer that it's coming from a topic
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIMESTAMP, STARS, CHANNEL FROM POOR_RATINGS EMIT CHANGES;

Return to slides


Part 02 - Kafka Connect

Kafka to Elasticsearch

See also kafka-to-es-demo.adoc

Show MySQL table + contents

Launch the MySQL CLI:

docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'
SHOW TABLES;
+----------------+
| Tables_in_demo |
+----------------+
| CUSTOMERS      |
+----------------+
1 row in set (0.00 sec)
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS LIMIT 5;
+----+-------------+------------+------------------------+-------------+
| ID | FIRST_NAME  | LAST_NAME  | EMAIL                  | CLUB_STATUS |
+----+-------------+------------+------------------------+-------------+
|  1 | Rica        | Blaisdell  | rblaisdell0@rambler.ru | bronze      |
|  2 | Ruthie      | Brockherst | rbrockherst1@ow.ly     | platinum    |
|  3 | Mariejeanne | Cocci      | mcocci2@techcrunch.com | bronze      |
|  4 | Hashim      | Rumke      | hrumke3@sohu.com       | platinum    |
|  5 | Hansiain    | Coda       | hcoda4@senate.gov      | platinum    |
+----+-------------+------------+------------------------+-------------+
5 rows in set (0.00 sec)

Ingest the data (plus any new changes) into Kafka

In ksqlDB:

CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'mysql',
    'database.port' = '3306',
    'database.user' = 'debezium',
    'database.password' = 'dbz',
    'database.server.id' = '42',
    'database.server.name' = 'asgard',
    'table.whitelist' = 'demo.customers',
    'database.history.kafka.bootstrap.servers' = 'kafka:29092',
    'database.history.kafka.topic' = 'dbhistory.demo' ,
    'include.schema.changes' = 'false',
    'transforms'= 'unwrap,extractkey',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field'= 'id',
    'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
    );

Check that it’s running:

ksql> SHOW CONNECTORS;

 Connector Name    | Type   | Class                                               | Status
----------------------------------------------------------------------------------------------------------------
 source-datagen-01 | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector | RUNNING (1/1 tasks RUNNING)
 SOURCE_MYSQL_01   | SOURCE | io.debezium.connector.mysql.MySqlConnector          | RUNNING (1/1 tasks RUNNING)
----------------------------------------------------------------------------------------------------------------

Show Kafka topic has been created & populated

SHOW TOPICS;
ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 POOR_RATINGS                          | 1          | 1
 asgard.demo.CUSTOMERS                 | 1          | 1
 confluent_rmoff_02ksql_processing_log | 1          | 1
 dbhistory.demo                        | 1          | 1
 ratings                               | 1          | 1
-------------------------------------------------------------------------

Show topic contents

PRINT 'asgard.demo.CUSTOMERS' FROM BEGINNING;
Format:AVRO
2/5/20 12:54:07 PM UTC, , {"id": 1, "first_name": "Rica", "last_name": "Blaisdell", "email": "rblaisdell0@rambler.ru", "gender": "Female", "club_status": "bronze", "comments": "Universal optimal hierarchy", "create_ts": "2020-02-05T12:48:46Z", "update_ts": "2020-02-05T12:48:46Z", "messagetopic": "asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}
2/5/20 12:54:07 PM UTC, , {"id": 2, "first_name": "Ruthie", "last_name": "Brockherst", "email": "rbrockherst1@ow.ly", "gender": "Female", "club_status": "platinum", "comments": "Reverse-engineered tangible interface", "create_ts": "2020-02-05T12:48:46Z", "update_ts": "2020-02-05T12:48:46Z", "messagetopic": "asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}
    …

Create ksqlDB stream and table

CREATE TABLE  CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

Query the ksqlDB table:

SET 'auto.offset.reset' = 'earliest';
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS EMIT CHANGES;

Make changes in MySQL, observe it in Kafka

MySQL terminal:

INSERT INTO CUSTOMERS (ID,FIRST_NAME,LAST_NAME) VALUES (42,'Rick','Astley');
UPDATE CUSTOMERS SET EMAIL = 'rick@example.com' where ID=42;
UPDATE CUSTOMERS SET CLUB_STATUS = 'bronze' where ID=42;
UPDATE CUSTOMERS SET CLUB_STATUS = 'platinum' where ID=42;

[Optional] Demonstrate Stream / Table difference

Check the data in ksqlDB:

Here’s the table - the latest value for a given key

SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
       ID,
       FIRST_NAME,
       LAST_NAME,
       EMAIL,
       CLUB_STATUS
  FROM CUSTOMERS WHERE ID=42
  EMIT CHANGES;
+----------+----+-----------+----------+-----------------+------------+
|EVENT_TS  |ID  |FIRST_NAME |LAST_NAME |EMAIL            |CLUB_STATUS |
+----------+----+-----------+----------+-----------------+------------+
|15:43:58  |42  |Rick       |Astley    |rick@example.com |platinum    |
^CQuery terminated

Here’s the stream - every event, which in this context is every change event on the source database:

CREATE STREAM CUSTOMERS_STREAM WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

SET 'auto.offset.reset' = 'earliest';

SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
       ID,
       FIRST_NAME,
       LAST_NAME,
       EMAIL,
       CLUB_STATUS
  FROM CUSTOMERS_STREAM WHERE ID=42
  EMIT CHANGES;
+---------+----+-----------+----------+------------------+------------+
|TS       |ID  |FIRST_NAME |LAST_NAME |EMAIL             |CLUB_STATUS |
+---------+----+-----------+----------+------------------+------------+
|16:08:49 |42  |Rick       |Astley    |null              |null        |
|16:09:30 |42  |Rick       |Astley    |rick@example.com  |null        |
|16:09:32 |42  |Rick       |Astley    |rick@example.com  |bronze      |
|16:09:35 |42  |Rick       |Astley    |rick@example.com  |platinum    |
^CQuery terminated
ksql>

Return to slides


Part 03 - ksqlDB for joining streams

Join live stream of ratings to customer data

SELECT R.RATING_ID, R.MESSAGE, R.CHANNEL,
       C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
       C.CLUB_STATUS
FROM   RATINGS R
       LEFT JOIN CUSTOMERS C
         ON CAST(R.USER_ID AS STRING) = C.ROWKEY
WHERE  C.FIRST_NAME IS NOT NULL
EMIT CHANGES;
+------------+-----------------------------------+-------+--------------------+-------------+
|RATING_ID   |MESSAGE                            |ID     |FULL_NAME           |CLUB_STATUS  |
+------------+-----------------------------------+-------+--------------------+-------------+
|1           |more peanuts please                |9      |Even Tinham         |silver       |
|2           |Exceeded all my expectations. Thank|8      |Patti Rosten        |silver       |
|            | you !                             |       |                    |             |
|3           |meh                                |17     |Brianna Paradise    |bronze       |
|4           |is this as good as it gets? really |14     |Isabelita Talboy    |gold         |
|            |?                                  |       |                    |             |
|5           |why is it so difficult to keep the |19     |Josiah Brockett     |gold         |
|            |bathrooms clean ?                  |       |                    |             |
…

Persist this stream of data

SET 'auto.offset.reset' = 'earliest';
CREATE STREAM RATINGS_WITH_CUSTOMER_DATA
       WITH (KAFKA_TOPIC='ratings-enriched')
       AS
SELECT R.RATING_ID, R.MESSAGE, R.STARS, R.CHANNEL,
       C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
       C.CLUB_STATUS, C.EMAIL
FROM   RATINGS R
       LEFT JOIN CUSTOMERS C
         ON CAST(R.USER_ID AS STRING) = C.ROWKEY
WHERE  C.FIRST_NAME IS NOT NULL
EMIT CHANGES;

[Optional] Examine changing reference data

CUSTOMERS is a ksqlDB table, which means that we have the latest value for a given key.

Check out the ratings for customer id 2 only:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
        FULL_NAME, CLUB_STATUS, STARS, MESSAGE, CHANNEL
  FROM RATINGS_WITH_CUSTOMER_DATA
  WHERE ID=2
  EMIT CHANGES;

In mysql, make a change to ID 2

UPDATE CUSTOMERS SET CLUB_STATUS = 'bronze' WHERE ID=2;

Observe in the continuous ksqlDB query that the customer name has now changed.

Create stream of unhappy VIPs

CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS AS
SELECT FULL_NAME, CLUB_STATUS, EMAIL, STARS, MESSAGE
FROM   RATINGS_WITH_CUSTOMER_DATA
WHERE  STARS < 3
  AND  CLUB_STATUS = 'platinum';

Stream to Elasticsearch

CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url' = 'http://elasticsearch:9200',
  'type.name' = '',
  'behavior.on.malformed.documents' = 'warn',
  'errors.tolerance' = 'all',
  'errors.log.enable' = 'true',
  'errors.log.include.messages' = 'true',
  'topics' = 'ratings-enriched,UNHAPPY_PLATINUM_CUSTOMERS',
  'key.ignore' = 'true',
  'schema.ignore' = 'true',
  'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
  'transforms'= 'ExtractTimestamp',
  'transforms.ExtractTimestamp.type'= 'org.apache.kafka.connect.transforms.InsertField$Value',
  'transforms.ExtractTimestamp.timestamp.field' = 'EXTRACT_TS'
);

Check status

ksql> SHOW CONNECTORS;

 Connector Name    | Type   | Class                                                         | Status
--------------------------------------------------------------------------------------------------------------------------
 source-datagen-01 | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector           | RUNNING (1/1 tasks RUNNING)
 SOURCE_MYSQL_01   | SOURCE | io.debezium.connector.mysql.MySqlConnector                    | RUNNING (1/1 tasks RUNNING)
 SINK_ELASTIC_00   | SINK   | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------------------------

Check data in Elasticsearch:

docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount"

unhappy_platinum_customers        1
.kibana_task_manager_1            2
.apm-agent-configuration          0
kafka-ratings-enriched-2018-08    1
.kibana_1                        11
ratings-enriched               3699

View in Kibana

Tested on Elasticsearch 7.5.0

Kibana


Return to slides


#EOF

Optional

Aggregations

Simple aggregation - count of ratings per person, per minute:

SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       FULL_NAME,COUNT(*) AS RATINGS_COUNT
  FROM RATINGS_WITH_CUSTOMER_DATA
        WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY FULL_NAME
  EMIT CHANGES;

Persist this and show the timestamp:

CREATE TABLE RATINGS_PER_CUSTOMER_PER_MINUTE AS
SELECT FULL_NAME,COUNT(*) AS RATINGS_COUNT
  FROM RATINGS_WITH_CUSTOMER_DATA
        WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY FULL_NAME
  EMIT CHANGES;

Push Query

SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       FULL_NAME,
       RATINGS_COUNT
  FROM RATINGS_PER_CUSTOMER_PER_MINUTE
  WHERE ROWKEY='Rica Blaisdell'
  EMIT CHANGES;

Pull Query

SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       FULL_NAME,
       RATINGS_COUNT
FROM   RATINGS_PER_CUSTOMER_PER_MINUTE
WHERE  ROWKEY='Rica Blaisdell'
  AND  WINDOWSTART > '2020-02-07T17:08:00.000';

Try the REST API out in bash:

docker exec -it ksqldb-cli bash

Copy and paste:

# Store the epoch (milliseconds) five minutes ago
PREDICATE=$(date --date '-5 min' +%s)000

# Pull from ksqlDB the aggregate-by-minute for the last five minutes for a given user:
curl -X "POST" "http://ksqldb-server:8088/query" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -d '{"ksql":"SELECT TIMESTAMPTOSTRING(WINDOWSTART, '\''yyyy-MM-dd HH:mm:ss'\'') AS WINDOW_START_TS,        FULL_NAME,       RATINGS_COUNT FROM   RATINGS_PER_CUSTOMER_PER_MINUTE WHERE  ROWKEY='\''Rica Blaisdell'\''   AND  WINDOWSTART > '$PREDICATE';"}'

Press Ctrl-D to exit the Docker container

You can’t perform that action at this time.