Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time

No More Silos - CDC demo

The slides that accompany this demo can be found here: https://talks.rmoff.net/

Running the test rig

  1. Bring up the stack

    git clone https://github.com/confluentinc/demo-scene.git
    cd no-more-silos
    docker-compose up -d

    This brings up the stack ready for use.

  2. Wait for Kafka Connect to be started

    bash -c '
    echo "Waiting for Kafka Connect to start listening on localhost ⏳"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) " Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
    '
  3. Make sure the connector plugins are available

    curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'debezium.*mysql|JdbcSink'
    "io.confluent.connect.jdbc.JdbcSinkConnector"
    "io.debezium.connector.mysql.MySqlConnector"
  4. Make sure that the MySQL JDBC driver has been successfully downloaded

    docker exec kafka-connect ls -l /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/ | grep mysql
    drwxr-xr-x 3 appuser appuser 4096 Dec  1  2020 mysql-connector-java-8.0.23
  5. Launch MySQL CLI

    docker exec -it mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

Part 01 - Query-Based CDC

MySQL to Kafka using JDBC Source connector

  1. In MySQL, examine the data

    DESCRIBE customers;
    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
    +----+------------+-----------+----------------------------+---------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                      | UPDATE_TS           |
    +----+------------+-----------+----------------------------+---------------------+
    |  1 | Bibby      | Argabrite | bargabrite0@google.com.hk  | 2021-09-29 10:30:22 |
    |  2 | Auberon    | Sulland   | asulland1@slideshare.net   | 2021-09-29 10:30:22 |
    |  3 | Marv       | Dalrymple | mdalrymple2@macromedia.com | 2021-09-29 10:30:22 |
    |  4 | Nolana     | Yeeles    | nyeeles3@drupal.org        | 2021-09-29 10:30:22 |
    |  5 | Modestia   | Coltart   | mcoltart4@scribd.com       | 2021-09-29 10:30:22 |
    +----+------------+-----------+----------------------------+---------------------+
    5 rows in set (0.00 sec)
  2. Create the connector

    curl -X PUT http://localhost:8083/connectors/source-jdbc-mysql-00/config \
             -H "Content-Type: application/json" -d '{
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "connection.url": "jdbc:mysql://mysql:3306/demo",
              "connection.user": "connect_user",
              "connection.password": "asgard",
              "topic.prefix": "mysql-00-",
              "poll.interval.ms": 1000,
              "tasks.max":1,
              "mode":"timestamp",
              "table.whitelist" : "demo.customers",
              "timestamp.column.name": "UPDATE_TS",
              "validate.non.null": false
              }'
  3. Check it’s running

    curl -s "http://localhost:8083/connectors?expand=info&expand=status" |
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |
           column -s : -t| sed 's/\"//g'| sort
    source  |  source-datagen-item_details_01  |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector
    source  |  source-jdbc-mysql-00            |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSourceConnector
  4. Examine the data

    docker exec kcat kcat                  \
            -b broker:29092                \
            -r http://schema-registry:8081 \
            -s avro                        \
            -t mysql-00-customers          \
            -C -o beginning -u -q | jq  '.'
  5. Split the screen to show Kafka topic output along with MySQL.

  6. Make changes in MySQL and observe that the Kafka topic (as shown by kcat) updates automatically

    • Insert a new row in MySQL:

      INSERT INTO customers (ID, FIRST_NAME, LAST_NAME, EMAIL, GENDER, COMMENTS) VALUES (42, 'Rick', 'Astley', '', 'Male', '');
    • Update a row in MySQL:

      UPDATE customers SET EMAIL = 'Never.gonna.give.you@up.com' WHERE ID = 42;

Part 02 - Log-Based CDC

MySQL to Kafka using JDBC Source connector

  1. In MySQL, examine the data

    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
    +----+------------+-----------+-----------------------------+---------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                       | UPDATE_TS           |
    +----+------------+-----------+-----------------------------+---------------------+
    |  1 | Bibby      | Argabrite | bargabrite0@google.com.hk   | 2021-09-29 10:30:22 |
    |  2 | Auberon    | Sulland   | asulland1@slideshare.net    | 2021-09-29 10:30:22 |
    |  3 | Marv       | Dalrymple | mdalrymple2@macromedia.com  | 2021-09-29 10:30:22 |
    |  4 | Nolana     | Yeeles    | nyeeles3@drupal.org         | 2021-09-29 10:30:22 |
    |  5 | Modestia   | Coltart   | mcoltart4@scribd.com        | 2021-09-29 10:30:22 |
    | 42 | Rick       | Astley    | Never.gonna.give.you@up.com | 2021-09-29 10:35:40 |
    +----+------------+-----------+-----------------------------+---------------------+
    6 rows in set (0.00 sec)
  2. Create the connector

    curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \
        -d '{
              "connector.class": "io.debezium.connector.mysql.MySqlConnector",
              "database.hostname": "mysql",
              "database.port": "3306",
              "database.user": "debezium",
              "database.password": "dbz",
              "database.server.id": "42",
              "database.allowPublicKeyRetrieval":"true",
              "database.server.name": "asgard",
              "table.whitelist": "demo.customers",
              "database.history.kafka.bootstrap.servers": "broker:29092",
              "database.history.kafka.topic": "asgard.dbhistory.demo" ,
              "include.schema.changes": "true"
        }'
  3. Check it’s running

    curl -s "http://localhost:8083/connectors?expand=info&expand=status" |
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |
           column -s : -t| sed 's/\"//g'| sort
    source  |  source-datagen-item_details_01  |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector
    source  |  source-debezium-mysql-00        |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector
    source  |  source-jdbc-mysql-00            |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSourceConnector
  4. Examine the data with kcat

    docker exec kcat kcat                  \
            -b broker:29092                \
            -r http://schema-registry:8081 \
            -s avro                        \
            -t asgard.demo.customers       \
            -C -o beginning -u -q -J | jq '.'
    {
      "topic": "asgard.demo.customers",
      "partition": 0,
      "offset": 5,
      "tstype": "create",
      "ts": 1632911781668,
      "broker": 1,
      "key": {
        "id": 42
      },
      "key_schema_id": 5,
      "payload": {
        "before": null,
        "after": {
          "Value": {
            "id": 42,
            "first_name": {
              "string": "Rick"
            },
            "last_name": {
              "string": "Astley"
            },
            "email": {
              "string": "Never.gonna.give.you@up.com"
            },
            "gender": {
              "string": "Male"
            },
            "comments": {
              "string": ""
            },
            "UPDATE_TS": {
              "string": "2021-09-29T10:35:40Z"
            }
          }
        },
        "source": {
          "version": "1.6.0.Final",
          "connector": "mysql",
          "name": "asgard",
          "ts_ms": 1632911781203,
          "snapshot": {
            "string": "last"
          },
          "db": "demo",
          "sequence": null,
          "table": {
            "string": "customers"
          },
          "server_id": 0,
          "gtid": null,
          "file": "binlog.000002",
          "pos": 874,
          "row": 0,
          "thread": null,
          "query": null
        },
        "op": "r",
        "ts_ms": {
          "long": 1632911781203
        },
        "transaction": null
      },
      "value_schema_id": 6
    }
  5. Split the screen to show Kafka topic output along with MySQL.

  6. Rerun kcat to show compact output

    docker exec kcat kcat                  \
            -b broker:29092                \
            -r http://schema-registry:8081 \
            -s avro                        \
            -t asgard.demo.customers       \
            -C -o beginning -u -q -J | jq '.payload | del(.source)'
  7. Make changes in MySQL and observe that the Kafka topic (as shown by kcat) updates automatically

    • Update a new row in MySQL:

      UPDATE customers SET EMAIL = 'r.astley@example.com' WHERE ID = 42;
      UPDATE customers SET FIRST_NAME = 'BOB' WHERE ID = 42;
    • Delete a row in MySQL:

      DELETE FROM customers WHERE ID=2;

Optional - Stream/Table duality in ksqlDB

docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
⏳ Waiting for ksqlDB to be available before launching CLI

Wed Sep 29 10:44:33 UTC 2021  ksqlDB server listener HTTP state:  200  (waiting for 200)
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v0.21.0, Server v0.21.0 located at http://ksqldb:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_CDC_STREAM WITH (KAFKA_TOPIC='asgard.demo.customers', VALUE_FORMAT='AVRO');

CREATE TABLE CUSTOMERS AS
  SELECT AFTER->ID AS ID,
         LATEST_BY_OFFSET(AFTER->FIRST_NAME) AS FIRST_NAME,
         LATEST_BY_OFFSET(AFTER->LAST_NAME) AS LAST_NAME,
         LATEST_BY_OFFSET(AFTER->EMAIL) AS EMAIL,
         LATEST_BY_OFFSET(AFTER->GENDER) AS GENDER,
         LATEST_BY_OFFSET(AFTER->COMMENTS) AS COMMENTS
    FROM CUSTOMERS_CDC_STREAM
    GROUP BY AFTER->ID;
  • In MySQL, query the state:

    mysql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM customers WHERE ID=42;
    +----+------------+-----------+-----------------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                       |
    +----+------------+-----------+-----------------------------+
    | 42 | BOB       | Astley    | Never.gonna.give.you@up.com |
    +----+------------+-----------+-----------------------------+
    1 rows in set (0.00 sec)
  • In ksqlDB query the table:

    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM customers WHERE ID=42;
    +-------------+-------------+-------------+-------------+-------------+-------------+
    |ID           |FIRST_NAME   |LAST_NAME    |EMAIL        |GENDER       |COMMENTS     |
    +-------------+-------------+-------------+-------------+-------------+-------------+
    |42           |BOB          |Astley       |r.astley@exam|Male         |             |
    |             |             |             |ple.com      |             |             |
  • In ksqlDB query the stream:

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT OP, AFTER->ID, AFTER->FIRST_NAME, AFTER->LAST_NAME, AFTER->EMAIL
      FROM CUSTOMERS_CDC_STREAM
      WHERE AFTER->ID=42
      EMIT CHANGES;
    
    +----------------+----------------+----------------+----------------+----------------+
    |OP              |ID              |FIRST_NAME      |LAST_NAME       |EMAIL           |
    +----------------+----------------+----------------+----------------+----------------+
    |c               |42              |Rick            |Astley          |Never.gonna.give|
    |                |                |                |                |.you@up.com     |
    |u               |42              |Rick            |Astley          |r.astley@example|
    |                |                |                |                |.com            |
    |u               |42              |BOB             |Astley          |r.astley@example|
    |                |                |                |                |.com            |
    |u               |42              |Rick            |Astley          |r.astley@example|
    |                |                |                |                |.com            |

Option - Stream/table joins

  • Join to a stream of events

    CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
    SELECT MESSAGE, STARS, USER_ID FROM RATINGS EMIT CHANGES;
    SELECT R.RATING_ID, R.MESSAGE, R.STARS,
          C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL
          FROM RATINGS R
            INNER JOIN CUSTOMERS C
            ON R.USER_ID = C.ID
          EMIT CHANGES;
    CREATE STREAM RATINGS_ENRICHED AS
    SELECT R.RATING_ID, R.MESSAGE, R.STARS,
          C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL
          FROM RATINGS R
            INNER JOIN CUSTOMERS C
            ON R.USER_ID = C.ID
          EMIT CHANGES;
    PRINT 'RATINGS_ENRICHED';