From bf44fc2cd8c09cfe8f3889328bc2d17f4df9f047 Mon Sep 17 00:00:00 2001
From: Kun <1amb4a@gmail.com>
Date: Sat, 9 Sep 2023 16:17:16 +0900
Subject: [PATCH] feat: Support Kafka, CDC Connectorwq
---
.gitignore | 4 +
Makefile | 13 ++
README.md | 17 +++
docker/debezium/register-cdc-inventory.json | 16 ++
docker/mysql/entrypoint.sql | 109 +++++++++++++
docker/mysql/my.cnf | 15 ++
docker/volume/.gitkeep | 0
dokcer-compose-cdc.yml | 160 ++++++++++++++++++++
8 files changed, 334 insertions(+)
create mode 100644 docker/debezium/register-cdc-inventory.json
create mode 100644 docker/mysql/entrypoint.sql
create mode 100644 docker/mysql/my.cnf
delete mode 100755 docker/volume/.gitkeep
create mode 100644 dokcer-compose-cdc.yml
diff --git a/.gitignore b/.gitignore
index 68cb4e7..ca43213 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,10 @@
__pycache__/
notebook/.ipynb_checkpoints/
.DS_Store
+.mypy_cache/
+.pytest_cache
+.ruff_cache/
+logs/
# Created by https://www.toptal.com/developers/gitignore/api/maven,java,intellij+all,intellij+iml,git
# Edit at https://www.toptal.com/developers/gitignore?templates=maven,java,intellij+all,intellij+iml,git
diff --git a/Makefile b/Makefile
index 75527ab..d1808c5 100644
--- a/Makefile
+++ b/Makefile
@@ -34,6 +34,19 @@ compose.trino:
compose.dbt:
COMPOSE_PROFILES=trino,airflow docker-compose up
+.PHONY: compose.cdc
+compose.cdc:
+ COMPOSE_PROFILES=kafka docker-compose -f dokcer-compose-cdc.yml up
+
+.PHONY: compose.clean
+compose.clean:
+ @ echo ""
+ @ echo ""
+ @ echo "[$(TAG)] ($(shell date '+%H:%M:%S')) - Cleaning container volumes ('docker/volume')"
+ @ rm -rf docker/volume
+ @ echo ""
+ @ echo ""
+
.PHONY:test
test:
@ echo ""
diff --git a/README.md b/README.md
index 91e6feb..6935976 100644
--- a/README.md
+++ b/README.md
@@ -14,6 +14,8 @@ Supported Data Pipeline Components
| [Hudi](https://hudi.apache.org/) | 0.13.1+ | Table Format (Lakehouse) |
| [Airflow](https://airflow.apache.org/) | 2.7+ | Scheduler |
| [Jupyterlab](https://jupyter.org/) | 3+ | Notebook |
+| [Kafka](https://kafka.apache.org/) | 3.5+ | Messaging Broker |
+| [Debezium](https://debezium.io/) | 2.3+ | CDC Connector |
@@ -30,6 +32,9 @@ COMPOSE_PROFILES=airflow docker-compose up;
# Combine multiple profiles
COMPOSE_PROFILES=trino,spark docker-compose up;
+
+# for CDC environment (Kafka, ZK, Debezium)
+COMPOSE_PROFILES=kafka docker-compose up;
```
Then access the lakehouse services.
@@ -46,6 +51,18 @@ Then access the lakehouse services.
+## CDC Starter kit
+
+```bash
+# Run cdc-related containers
+make compose.cdc;
+
+# Register debezium mysql connector
+curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
+ http://localhost:8083/connectors/ \
+ -d @docker/debezium/register-cdc-inventory.json
+```
+
## DBT Starter kit
```bash
diff --git a/docker/debezium/register-cdc-inventory.json b/docker/debezium/register-cdc-inventory.json
new file mode 100644
index 0000000..7eb6bea
--- /dev/null
+++ b/docker/debezium/register-cdc-inventory.json
@@ -0,0 +1,16 @@
+{
+ "name": "inventory-connector",
+ "config": {
+ "connector.class": "io.debezium.connector.mysql.MySqlConnector",
+ "tasks.max": "1",
+ "database.hostname": "mysql",
+ "database.port": "3306",
+ "database.user": "root",
+ "database.password": "admin",
+ "database.server.id": "180000",
+ "topic.prefix": "cdc.inventory.data",
+ "database.include.list": "inventory",
+ "schema.history.internal.kafka.bootstrap.servers": "kafka1:29092",
+ "schema.history.internal.kafka.topic": "cdc.inventory.connect.schema-changes"
+ }
+}
diff --git a/docker/mysql/entrypoint.sql b/docker/mysql/entrypoint.sql
new file mode 100644
index 0000000..afe6548
--- /dev/null
+++ b/docker/mysql/entrypoint.sql
@@ -0,0 +1,109 @@
+GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root' IDENTIFIED BY 'admin';
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root' IDENTIFIED BY 'admin';
+
+CREATE DATABASE inventory;
+GRANT ALL PRIVILEGES ON inventory.* TO 'root'@'%';
+
+# Switch to this database
+USE inventory;
+
+# Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL,
+ description VARCHAR(512),
+ weight FLOAT
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+ (default,"car battery","12V car battery",8.1),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
+ (default,"hammer","12oz carpenter's hammer",0.75),
+ (default,"hammer","14oz carpenter's hammer",0.875),
+ (default,"hammer","16oz carpenter's hammer",1.0),
+ (default,"rocks","box of assorted rocks",5.3),
+ (default,"jacket","water resistent black wind breaker",0.1),
+ (default,"spare tire","24 inch spare tire",22.2);
+
+# Create and populate the products on hand using multiple inserts
+CREATE TABLE products_on_hand (
+ product_id INTEGER NOT NULL PRIMARY KEY,
+ quantity INTEGER NOT NULL,
+ FOREIGN KEY (product_id) REFERENCES products(id)
+);
+
+INSERT INTO products_on_hand VALUES (101,3);
+INSERT INTO products_on_hand VALUES (102,8);
+INSERT INTO products_on_hand VALUES (103,18);
+INSERT INTO products_on_hand VALUES (104,4);
+INSERT INTO products_on_hand VALUES (105,5);
+INSERT INTO products_on_hand VALUES (106,0);
+INSERT INTO products_on_hand VALUES (107,44);
+INSERT INTO products_on_hand VALUES (108,2);
+INSERT INTO products_on_hand VALUES (109,5);
+
+# Create some customers ...
+CREATE TABLE customers (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ first_name VARCHAR(255) NOT NULL,
+ last_name VARCHAR(255) NOT NULL,
+ email VARCHAR(255) NOT NULL UNIQUE KEY
+) AUTO_INCREMENT=1001;
+
+
+INSERT INTO customers
+VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
+ (default,"George","Bailey","gbailey@foobar.com"),
+ (default,"Edward","Walker","ed@walker.com"),
+ (default,"Anne","Kretchmar","annek@noanswer.org");
+
+# Create some fake addresses
+CREATE TABLE addresses (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ customer_id INTEGER NOT NULL,
+ street VARCHAR(255) NOT NULL,
+ city VARCHAR(255) NOT NULL,
+ state VARCHAR(255) NOT NULL,
+ zip VARCHAR(255) NOT NULL,
+ type enum('SHIPPING','BILLING','LIVING') NOT NULL,
+ FOREIGN KEY address_customer (customer_id) REFERENCES customers(id)
+) AUTO_INCREMENT = 10;
+
+INSERT INTO addresses
+VALUES (default,1001,'3183 Moore Avenue','Euless','Texas','76036','SHIPPING'),
+ (default,1001,'2389 Hidden Valley Road','Harrisburg','Pennsylvania','17116','BILLING'),
+ (default,1002,'281 Riverside Drive','Augusta','Georgia','30901','BILLING'),
+ (default,1003,'3787 Brownton Road','Columbus','Mississippi','39701','SHIPPING'),
+ (default,1003,'2458 Lost Creek Road','Bethlehem','Pennsylvania','18018','SHIPPING'),
+ (default,1003,'4800 Simpson Square','Hillsdale','Oklahoma','73743','BILLING'),
+ (default,1004,'1289 University Hill Road','Canehill','Arkansas','72717','LIVING');
+
+# Create some very simple orders
+CREATE TABLE orders (
+ order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ order_date DATE NOT NULL,
+ purchaser INTEGER NOT NULL,
+ quantity INTEGER NOT NULL,
+ product_id INTEGER NOT NULL,
+ FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
+ FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
+) AUTO_INCREMENT = 10001;
+
+INSERT INTO orders
+VALUES (default, '2016-01-16', 1001, 1, 102),
+ (default, '2016-01-17', 1002, 2, 105),
+ (default, '2016-02-19', 1002, 2, 106),
+ (default, '2016-02-21', 1003, 1, 107);
+
+# Create table with Spatial/Geometry type
+CREATE TABLE geom (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ g GEOMETRY NOT NULL,
+ h GEOMETRY);
+
+INSERT INTO geom
+VALUES(default, ST_GeomFromText('POINT(1 1)'), NULL),
+ (default, ST_GeomFromText('LINESTRING(2 1, 6 6)'), NULL),
+ (default, ST_GeomFromText('POLYGON((0 5, 2 5, 2 7, 0 7, 0 5))'), NULL);
diff --git a/docker/mysql/my.cnf b/docker/mysql/my.cnf
new file mode 100644
index 0000000..0b918a0
--- /dev/null
+++ b/docker/mysql/my.cnf
@@ -0,0 +1,15 @@
+[mysqld]
+server-id= 1
+skip-host-cache
+skip-name-resolve
+user=mysql
+symbolic-links=0 # Disabling symbolic-links is recommended to prevent assorted security risks
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+binlog-format=row
+log_bin=mysql-bin
+expire_logs_days=7
+max_binlog_size=100M
diff --git a/docker/volume/.gitkeep b/docker/volume/.gitkeep
deleted file mode 100755
index e69de29..0000000
diff --git a/dokcer-compose-cdc.yml b/dokcer-compose-cdc.yml
new file mode 100644
index 0000000..bad13e7
--- /dev/null
+++ b/dokcer-compose-cdc.yml
@@ -0,0 +1,160 @@
+x-kafka-common:
+ &kafka-common
+ # profiles: [ "kafka" ]
+ image: confluentinc/cp-kafka:7.5.0 # OSS Kafka 3.5.X
+ environment:
+ &kafka-common-env
+ KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
+ KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
+ KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
+ KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
+
+ depends_on:
+ &kafka-common-depends
+ - zk
+
+x-zk-common:
+ &zk-common
+ image: confluentinc/cp-zookeeper:7.5.0
+ # profiles: [ "kafka" ]
+ environment:
+ &zk-common-env
+ ZOOKEEPER_SERVERS: zk:2888:3888
+
+services:
+ ####################################################################################################
+ # Kafka
+ ####################################################################################################
+ zk:
+ <<: *zk-common
+ hostname: zk
+ container_name: zk
+ ports:
+ - "2181:2181"
+ environment:
+ <<: *zk-common-env
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_SERVER_ID: 1
+ volumes:
+ - ./docker/volume/zookeeper/data:/var/lib/zookeeper/data
+ - ./docker/volume/zookeeper/log:/var/lib/zookeeper/log
+
+ kafka1:
+ <<: *kafka-common
+ hostname: kafka1
+ container_name: kafka1
+ ports:
+ - "9092:9092"
+ - "29092:29092"
+ environment:
+ <<: *kafka-common-env
+ KAFKA_BROKER_ID: 1
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
+ depends_on:
+ *kafka-common-depends
+ volumes:
+ - ./docker/volume/kafka/kafka1-data:/var/lib/kafka/data
+
+
+ kafka-ui:
+ # profiles: [ "kafka" ]
+ image: provectuslabs/kafka-ui
+ hostname: kafka-ui
+ container_name: kafka-ui
+ ports:
+ - "8085:8080"
+ # 환경 변수 설정
+ environment:
+ - KAFKA_CLUSTERS_0_NAME=local
+ - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:29092
+ - KAFKA_CLUSTERS_0_ZOOKEEPER=zk:2181
+ depends_on:
+ - zk
+ - kafka1
+
+ # kafka2:
+# <<: *kafka-common
+# hostname: kafka2
+# container_name: kafka2
+# ports:
+# - "9093:9093"
+# - "29093:29093"
+# environment:
+# <<: *kafka-common-env
+# KAFKA_BROKER_ID: 2
+# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
+# depends_on:
+# <<: *kafka-common-depends
+# volumes:
+# - ./docker/volume/kafka/kafka2-data:/var/lib/kafka/data
+#
+# kafka3:
+# <<: *kafka-common
+# hostname: kafka3
+# container_name: kafka3
+# ports:
+# - "9094:9094"
+# - "29094:29094"
+# environment:
+# <<: *kafka-common-env
+# KAFKA_BROKER_ID: 3
+# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
+# depends_on:
+# <<: *kafka-common-depends
+# volumes:
+# - ./docker/volume/kafka/kafka3-data:/var/lib/kafka/data
+
+
+ ####################################################################################################
+ # MySQL
+ ####################################################################################################
+ mysql:
+ # profiles: [ "kafka" ]
+ image: mysql:5.7
+ platform: linux/amd64
+ container_name: mysql
+ hostname: mysql
+ ports:
+ - "3306:3306"
+ environment:
+ TZ: UTC
+ MYSQL_ROOT_PASSWORD: "admin"
+ MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
+ # MYSQL_USER: "mysqluser"
+ # MYSQL_PASSWORD: "mysqlpw"
+
+ restart: always
+ volumes:
+ - ./docker/mysql/my.cnf:/etc/mysql/conf.d/my.cnf
+ - ./docker/mysql/entrypoint.sql:/docker-entrypoint-initdb.d/entrypoint.sql
+ - ./docker/volume/mysql/data:/var/lib/mysql
+ - ./docker/volume/mysql/logs/:/var/log/mysql
+
+
+ ####################################################################################################
+ # Kafka Producer: Debezium MySQL Connector
+ ####################################################################################################
+ kafka-producer:
+ # profiles: [ "kafka" ]
+ image: debezium/connect:2.3.3.Final
+ container_name: kafka-producer
+ hostname: kafka-producer
+ ports:
+ - "8083:8083"
+ environment:
+ - BOOTSTRAP_SERVERS=kafka1:29092
+ - GROUP_ID=cdc.inventory
+ - CONFIG_STORAGE_TOPIC=cdc.inventory.connect.configs
+ - OFFSET_STORAGE_TOPIC=cdc.inventory.connect.offsets
+ - STATUS_STORAGE_TOPIC=cdc.inventory.connect.statuses
+ depends_on:
+ - kafka1
+ - mysql
+
+# Configure Network
+networks:
+ default:
+ name: lakehouse