Skip to content

Microservice CDC(Change Data Capture) implementation for Mysql using Debezium Source and Confluent Sink Connector

Notifications You must be signed in to change notification settings


Repository files navigation


Start tutorial demo

docker-compose -f docker-compose-mysql.yaml up -d

Check connector plugins list


List connectors


Register source connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Consume messages from a Debezium topic

docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/ \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

Register sink connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-sink.json

Register multiple sink connector for different tables, if primary key or the configuration differs

To check status of a connector: replace name of the connector in "jdbc-sink-connector"


To delete a connector: replace name of the connector in "jdbc-sink-connector"

curl -X DELETE localhost:8083/connectors/jdbc-sink-connector

To restart a connector

curl -X POST localhost:8083/connectors/jdbc-sink-connector/restart

Connect to Mysql source:

docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

Connect to Mysql Destination:

docker-compose -f docker-compose-mysql.yaml exec mysqldestination bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

Verify records in source

select * from customers;

Update records in source

update customers set first_name='dip';

Insert records in source

insert into customers(id,first_name,last_name,email) values(1005,'1','1','1');

Verify records in destination

select * from kafka_customers;

Delete a record in source

delete from customers where id=1005;

Add column in source : Column will only be added if any new insert sql statement is found with the new column

ALTER TABLE customers ADD phone varchar(40) NULL AFTER email;

Rename column name in source : Column will only be added if any new insert sql statement is found with the new column,but old data in not available in new column. So only rename columns if data are not added in old field

ALTER TABLE customers CHANGE COLUMN phone mobile varchar(20) NULL;

Remove a column in source : Column is not removed : Remove is not yet supported

ALTER TABLE customers DROP COLUMN mobile;

Index/Unique index/Auto increment column/Foreign Keys is not added in destination, but due to it no adverse effect is found yet

Check whether log bin is enabled in mysql


Add multiple comma seperated tables name that needs to be replicated should be placed in topics key of sink connector configuration

"topics": "customers,addresses,products"

Add prefix to the destination tables

Eg: For customers table, kafka_customers will be created in the destination.


Add multiple parallel workers for the tasks

The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism. Eg: "tasks.max" : 3

Optional : Get mysql Server id to be placed in :

SELECT @@server_id

Refer debezium config for more

End the demo

docker-compose -f docker-compose-mysql.yaml down


Thanks to the below contributors

  1. Microservices - Database per service architecture
  2. Use CQRS to overcome Database per service pitfalls
  3. Inspired by Eventuate Tram
  4. Debezium Mysql Source Connector
  5. Confluent JDBC Sink Connector configuration
  6. Manage Confluent Kafka connector
  7. Inplace of Debezium, Confluent JDBC Source Connector configuration can also be used.
  8. Other Debezium source connector examples


Microservice CDC(Change Data Capture) implementation for Mysql using Debezium Source and Confluent Sink Connector






No releases published


No packages published