Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
458 lines (418 sloc) 14 KB

No More Silos - CDC demo

The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/no-more-silos-integrating-databases-and-apache-kafka

Running the test rig

  1. Bring up the stack

    git clone https://github.com/confluentinc/demo-scene.git
    cd no-more-silos
    docker-compose up -d

    This brings up the stack ready for use.

  2. Wait for Kafka Connect to be started

    bash -c ' \
    echo "Waiting for Kafka Connect to start listening on localhost ⏳"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -eq 000 ] ; do
      echo -e $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) " Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
    '
  3. Launch the KSQL CLI:

    docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
  4. Launch MySQL CLI

    docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

Part 01 - Query-Based CDC

MySQL to Kafka using JDBC Source connector

  1. In MySQL, examine the data

    DESCRIBE customers;
    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
    +----+------------+-----------+----------------------------+---------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                      | UPDATE_TS           |
    +----+------------+-----------+----------------------------+---------------------+
    |  1 | Bibby      | Argabrite | bargabrite0@google.com.hk  | 2019-04-01 16:51:18 |
    |  2 | Auberon    | Sulland   | asulland1@slideshare.net   | 2019-04-01 16:51:18 |
    |  3 | Marv       | Dalrymple | mdalrymple2@macromedia.com | 2019-04-01 16:51:18 |
    |  4 | Nolana     | Yeeles    | nyeeles3@drupal.org        | 2019-04-01 16:51:18 |
    |  5 | Modestia   | Coltart   | mcoltart4@scribd.com       | 2019-04-01 16:51:18 |
    +----+------------+-----------+----------------------------+---------------------+
    5 rows in set (0.00 sec)
  2. Create the connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_mysql_00",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:mysql://mysql:3306/demo",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "mysql-00-",
                      "poll.interval.ms": 1000,
                      "tasks.max":1,
                      "mode":"timestamp",
                      "table.whitelist" : "demo.customers",
                      "timestamp.column.name": "UPDATE_TS",
                      "validate.non.null": false
                      }
              }'
  3. Check it’s running

    curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
    jdbc_source_mysql_00  |  RUNNING  |  RUNNING
  4. Examine the data with KSQL (doesn’t have to be; it’s just an easy tool to use)

    • List topics:

      SHOW TOPICS;
       Kafka Topic              | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
      ------------------------------------------------------------------------------------------------------
       _confluent-metrics       | false      | 12         | 1                  | 0         | 0
       _schemas                 | false      | 1          | 1                  | 0         | 0
       connect-debezium-configs | false      | 1          | 1                  | 0         | 0
       connect-debezium-offsets | false      | 25         | 1                  | 0         | 0
       connect-debezium-status  | false      | 5          | 1                  | 0         | 0
       kafka-connect-configs    | false      | 1          | 1                  | 0         | 0
       kafka-connect-offsets    | false      | 25         | 1                  | 0         | 0
       kafka-connect-status     | false      | 5          | 1                  | 0         | 0
       mysql-00-customers       | false      | 1          | 1                  | 0         | 0
      ------------------------------------------------------------------------------------------------------
    • Show the contents of the topic:

      PRINT 'mysql-00-customers' FROM BEGINNING;
      Format:AVRO
      4/1/19 5:01:52 PM UTC, null, {"id": 1, "first_name": "Bibby", "last_name": "Argabrite", "email": "bargabrite0@google.com.hk", "gender": "Female", "comments": "Reactive exuding productivity", "UPDATE_TS": 1554137478000}
      4/1/19 5:01:52 PM UTC, null, {"id": 2, "first_name": "Auberon", "last_name": "Sulland", "email": "asulland1@slideshare.net", "gender": "Male", "comments": "Organized context-sensitive Graphical User Interface", "UPDATE_TS": 1554137478000}
      4/1/19 5:01:52 PM UTC, null, {"id": 3, "first_name": "Marv", "last_name": "Dalrymple", "email": "mdalrymple2@macromedia.com", "gender": "Male", "comments": "Versatile didactic pricing structure", "UPDATE_TS": 1554137478000}
      4/1/19 5:01:52 PM UTC, null, {"id": 4, "first_name": "Nolana", "last_name": "Yeeles", "email": "nyeeles3@drupal.org", "gender": "Female", "comments": "Adaptive real-time archive", "UPDATE_TS": 1554137478000}
      4/1/19 5:01:52 PM UTC, null, {"id": 5, "first_name": "Modestia", "last_name": "Coltart", "email": "mcoltart4@scribd.com", "gender": "Female", "comments": "Reverse-engineered non-volatile success", "UPDATE_TS": 1554137478000}
      Note
      Instead of KSQL you can use a standard Kafka consumer
      docker-compose exec -T kafka-connect \
                kafka-avro-console-consumer \
                --bootstrap-server kafka:29092 \
                --property schema.registry.url=http://schema-registry:8081 \
                --topic mysql-00-customers --from-beginning | jq -c '.'
  5. Split the screen to show Kafka topic output along with MySQL.

  6. Make changes in MySQL and observe that the Kafka topic (as shown by KSQL) updates automatically

    • Insert a new row in MySQL:

      INSERT INTO customers (ID, FIRST_NAME, LAST_NAME, EMAIL, GENDER, COMMENTS) VALUES (42, 'Rick', 'Astley', '', 'Male', '');
    • Insert a new row in MySQL:

      UPDATE customers SET EMAIL = 'Never.gonna.give.you@up.com' WHERE ID = 42;

Part 02 - Log-Based CDC

MySQL to Kafka using JDBC Source connector

  1. In MySQL, examine the data

    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
    +----+------------+-----------+----------------------------+---------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                      | UPDATE_TS           |
    +----+------------+-----------+----------------------------+---------------------+
    |  1 | Bibby      | Argabrite | bargabrite0@google.com.hk  | 2019-04-01 16:51:18 |
    |  2 | Auberon    | Sulland   | asulland1@slideshare.net   | 2019-04-01 16:51:18 |
    |  3 | Marv       | Dalrymple | mdalrymple2@macromedia.com | 2019-04-01 16:51:18 |
    |  4 | Nolana     | Yeeles    | nyeeles3@drupal.org        | 2019-04-01 16:51:18 |
    |  5 | Modestia   | Coltart   | mcoltart4@scribd.com       | 2019-04-01 16:51:18 |
    | 42 | Rick       | Astley    | Never.gonna.give.you@up.com| 2019-04-01 17:59:43 |
    +----+------------+-----------+----------------------------+---------------------+
    5 rows in set (0.00 sec)
  2. Create the connector

    curl -i -X POST -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:18083/connectors/ \
        -d '{
          "name": "debezium-source-customers-00",
          "config": {
                "connector.class": "io.debezium.connector.mysql.MySqlConnector",
                "database.hostname": "mysql",
                "database.port": "3306",
                "database.user": "debezium",
                "database.password": "dbz",
                "database.server.id": "42",
                "database.server.name": "asgard",
                "table.whitelist": "demo.customers",
                "database.history.kafka.bootstrap.servers": "kafka:29092",
                "database.history.kafka.topic": "dbhistory.demo" ,
                "include.schema.changes": "true"
           }
        }'
  3. Check it’s running

    curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
    debezium-source-customers-00  |  RUNNING  |  RUNNING
  4. Examine the data with console consumer

    docker-compose exec -T kafka-connect \
              kafka-avro-console-consumer \
              --bootstrap-server kafka:29092 \
              --property schema.registry.url=http://schema-registry:8081 \
              --topic asgard.demo.customers --from-beginning | jq '.op, .before, .after'
    "u"
    {
      "asgard.demo.customers.Value": {
        "id": 42,
        "first_name": {
          "string": "Rick"
        },
        "last_name": {
          "string": "Astley"
        },
        "email": {
          "string": ""
        },
        "gender": {
          "string": "Male"
        },
        "comments": {
          "string": ""
        },
        "UPDATE_TS": "2019-04-01T23:53:31Z"
      }
    }
    {
      "asgard.demo.customers.Value": {
        "id": 42,
        "first_name": {
          "string": "Rick"
        },
        "last_name": {
          "string": "Astley"
        },
        "email": {
          "string": "Never.gonna.give.you@up.com"
        },
        "gender": {
          "string": "Male"
        },
        "comments": {
          "string": ""
        },
        "UPDATE_TS": "2019-04-01T23:53:38Z"
      }
    }
    Note
    You can use KSQL here too: PRINT 'asgard.demo.customers' FROM BEGINNING;
  5. Split the screen to show Kafka topic output along with MySQL.

  6. Rerun the console consumer to show compact output

    docker-compose exec -T kafka-connect \
              kafka-avro-console-consumer \
              --bootstrap-server kafka:29092 \
              --property schema.registry.url=http://schema-registry:8081 \
              --topic asgard.demo.customers --from-beginning | jq -c '.'
  7. Make changes in MySQL and observe that the Kafka topic (as shown by KSQL) updates automatically

    • Update a new row in MySQL:

      UPDATE customers SET EMAIL = 'r.astley@example.com' WHERE ID = 42;
      UPDATE customers SET FIRST_NAME = 'BOB' WHERE ID = 42;
    • Delete a row in MySQL:

      DELETE FROM customers WHERE ID=2;

Optional - Stream/Table duality in KSQL

SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_CDC_STREAM WITH (KAFKA_TOPIC='asgard.demo.customers', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_AFTER AS
  SELECT AFTER->ID AS ID,
         AFTER->FIRST_NAME AS FIRST_NAME,
         AFTER->LAST_NAME AS LAST_NAME,
         AFTER->EMAIL AS EMAIL,
         AFTER->GENDER AS GENDER,
         AFTER->COMMENTS AS COMMENTS
    FROM CUSTOMERS_CDC_STREAM;
CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_AFTER PARTITION BY ID;
SELECT ROWKEY, ID FROM CUSTOMERS_STREAM LIMIT 1;
CREATE TABLE CUSTOMERS_TABLE WITH (KAFKA_TOPIC='CUSTOMERS_STREAM', VALUE_FORMAT='AVRO', KEY='ID');
  • In MySQL, query the state:

    mysql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM customers WHERE ID=42;
    +----+------------+-----------+-----------------------------+
    | ID | FIRST_NAME | LAST_NAME | EMAIL                       |
    +----+------------+-----------+-----------------------------+
    | 42 | Rick       | Astley    | Never.gonna.give.you@up.com |
    +----+------------+-----------+-----------------------------+
    1 rows in set (0.00 sec)
  • In KSQL query the table:

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM CUSTOMERS_TABLE WHERE ID=42;
    42 | Rick | Astley | Never.gonna.give.you@up.com | 2019-04-01T22:42:58Z
  • In KSQL query the stream:

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM CUSTOMERS_STREAM WHERE ID=42;
    42 | Rick | Astley |
    42 | Rick | Astley | Never.gonna.give.you@up.com
    42 | Rick | Astley | r.astley@example.com
  • Show before/after records:

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT OP, BEFORE->EMAIL, AFTER->EMAIL FROM CUSTOMERS_CDC_STREAM WHERE AFTER->ID=42;
    [source,sql]
    c | null |
    u |  | Never.gonna.give.you@up.com
    u | Never.gonna.give.you@up.com | r.astley@example.com
    u | r.astley@example.com | r.astley@example.com

Option - Stream/table joins

  • Join to a stream of events

    CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
    SELECT MESSAGE, STARS, USER_ID FROM RATINGS;
    SELECT R.RATING_ID, R.MESSAGE, R.STARS, \
          C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL
          FROM RATINGS R \
            LEFT JOIN CUSTOMERS_TABLE C \
            ON R.USER_ID = C.ID \
          WHERE C.FIRST_NAME IS NOT NULL;
    CREATE STREAM RATINGS_ENRICHED AS \
    SELECT R.RATING_ID, R.MESSAGE, R.STARS, \
          C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.EMAIL AS EMAIL
          FROM RATINGS R \
            LEFT JOIN CUSTOMERS_TABLE C \
            ON R.USER_ID = C.ID \
          WHERE C.FIRST_NAME IS NOT NULL;
    PRINT 'RATINGS_ENRICHED';
You can’t perform that action at this time.