Skip to content

Commit

Permalink
feat: Support Kafka, CDC Connectorwq
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda committed Sep 9, 2023
1 parent 08b3c43 commit 8064cf8
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
__pycache__/
notebook/.ipynb_checkpoints/
.DS_Store
.mypy_cache/
.pytest_cache
.ruff_cache/
logs/

# Created by https://www.toptal.com/developers/gitignore/api/maven,java,intellij+all,intellij+iml,git
# Edit at https://www.toptal.com/developers/gitignore?templates=maven,java,intellij+all,intellij+iml,git
Expand Down
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ compose.trino:
compose.dbt:
COMPOSE_PROFILES=trino,airflow docker-compose up

.PHONY: compose.cdc
compose.cdc:
COMPOSE_PROFILES=kafka docker-compose -f dokcer-compose-cdc.yml up

.PHONY: compose.clean
compose.clean:
@ echo ""
@ echo ""
@ echo "[$(TAG)] ($(shell date '+%H:%M:%S')) - Cleaning container volumes ('docker/volume')"
@ rm -rf docker/volume
@ echo ""
@ echo ""

.PHONY:test
test:
@ echo ""
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Supported Data Pipeline Components
| [Hudi](https://hudi.apache.org/) | 0.13.1+ | Table Format (Lakehouse) |
| [Airflow](https://airflow.apache.org/) | 2.7+ | Scheduler |
| [Jupyterlab](https://jupyter.org/) | 3+ | Notebook |
| [Kafka](https://kafka.apache.org/) | 3.5+ | Messaging Broker |
| [Debezium](https://debezium.io/) | 2.3+ | CDC Connector |

<br/>

Expand All @@ -30,6 +32,9 @@ COMPOSE_PROFILES=airflow docker-compose up;

# Combine multiple profiles
COMPOSE_PROFILES=trino,spark docker-compose up;

# for CDC environment (Kafka, ZK, Debezium)
COMPOSE_PROFILES=kafka docker-compose up;
```

Then access the lakehouse services.
Expand Down
16 changes: 16 additions & 0 deletions docker/debezium/register-cdc-inventory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "admin",
"database.server.id": "180000",
"topic.prefix": "cdc.inventory.data",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:29092",
"schema.history.internal.kafka.topic": "cdc.inventory.connect.schema-changes"
}
}
109 changes: 109 additions & 0 deletions docker/mysql/entrypoint.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root' IDENTIFIED BY 'admin';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root' IDENTIFIED BY 'admin';

CREATE DATABASE inventory;
GRANT ALL PRIVILEGES ON inventory.* TO 'root'@'%';

# Switch to this database
USE inventory;

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);

# Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);

INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);

# Create some customers ...
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;


INSERT INTO customers
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
(default,"George","Bailey","gbailey@foobar.com"),
(default,"Edward","Walker","ed@walker.com"),
(default,"Anne","Kretchmar","annek@noanswer.org");

# Create some fake addresses
CREATE TABLE addresses (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
customer_id INTEGER NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
state VARCHAR(255) NOT NULL,
zip VARCHAR(255) NOT NULL,
type enum('SHIPPING','BILLING','LIVING') NOT NULL,
FOREIGN KEY address_customer (customer_id) REFERENCES customers(id)
) AUTO_INCREMENT = 10;

INSERT INTO addresses
VALUES (default,1001,'3183 Moore Avenue','Euless','Texas','76036','SHIPPING'),
(default,1001,'2389 Hidden Valley Road','Harrisburg','Pennsylvania','17116','BILLING'),
(default,1002,'281 Riverside Drive','Augusta','Georgia','30901','BILLING'),
(default,1003,'3787 Brownton Road','Columbus','Mississippi','39701','SHIPPING'),
(default,1003,'2458 Lost Creek Road','Bethlehem','Pennsylvania','18018','SHIPPING'),
(default,1003,'4800 Simpson Square','Hillsdale','Oklahoma','73743','BILLING'),
(default,1004,'1289 University Hill Road','Canehill','Arkansas','72717','LIVING');

# Create some very simple orders
CREATE TABLE orders (
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-01-17', 1002, 2, 105),
(default, '2016-02-19', 1002, 2, 106),
(default, '2016-02-21', 1003, 1, 107);

# Create table with Spatial/Geometry type
CREATE TABLE geom (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
g GEOMETRY NOT NULL,
h GEOMETRY);

INSERT INTO geom
VALUES(default, ST_GeomFromText('POINT(1 1)'), NULL),
(default, ST_GeomFromText('LINESTRING(2 1, 6 6)'), NULL),
(default, ST_GeomFromText('POLYGON((0 5, 2 5, 2 7, 0 7, 0 5))'), NULL);
15 changes: 15 additions & 0 deletions docker/mysql/my.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[mysqld]
server-id= 1
skip-host-cache
skip-name-resolve
user=mysql
symbolic-links=0 # Disabling symbolic-links is recommended to prevent assorted security risks

# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------

binlog-format=row
log_bin=mysql-bin
expire_logs_days=7
max_binlog_size=100M
Empty file removed docker/volume/.gitkeep
Empty file.
160 changes: 160 additions & 0 deletions dokcer-compose-cdc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
x-kafka-common:
&kafka-common
# profiles: [ "kafka" ]
image: confluentinc/cp-kafka:7.5.0 # OSS Kafka 3.5.X
environment:
&kafka-common-env
KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT

depends_on:
&kafka-common-depends
- zk

x-zk-common:
&zk-common
image: confluentinc/cp-zookeeper:7.5.0
# profiles: [ "kafka" ]
environment:
&zk-common-env
ZOOKEEPER_SERVERS: zk:2888:3888

services:
####################################################################################################
# Kafka
####################################################################################################
zk:
<<: *zk-common
hostname: zk
container_name: zk
ports:
- "2181:2181"
environment:
<<: *zk-common-env
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
volumes:
- ./docker/volume/zookeeper/data:/var/lib/zookeeper/data
- ./docker/volume/zookeeper/log:/var/lib/zookeeper/log

kafka1:
<<: *kafka-common
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
<<: *kafka-common-env
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
depends_on:
*kafka-common-depends
volumes:
- ./docker/volume/kafka/kafka1-data:/var/lib/kafka/data


kafka-ui:
# profiles: [ "kafka" ]
image: provectuslabs/kafka-ui
hostname: kafka-ui
container_name: kafka-ui
ports:
- "8085:8080"
# 환경 변수 설정
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zk:2181
depends_on:
- zk
- kafka1

# kafka2:
# <<: *kafka-common
# hostname: kafka2
# container_name: kafka2
# ports:
# - "9093:9093"
# - "29093:29093"
# environment:
# <<: *kafka-common-env
# KAFKA_BROKER_ID: 2
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
# depends_on:
# <<: *kafka-common-depends
# volumes:
# - ./docker/volume/kafka/kafka2-data:/var/lib/kafka/data
#
# kafka3:
# <<: *kafka-common
# hostname: kafka3
# container_name: kafka3
# ports:
# - "9094:9094"
# - "29094:29094"
# environment:
# <<: *kafka-common-env
# KAFKA_BROKER_ID: 3
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
# depends_on:
# <<: *kafka-common-depends
# volumes:
# - ./docker/volume/kafka/kafka3-data:/var/lib/kafka/data


####################################################################################################
# MySQL
####################################################################################################
mysql:
# profiles: [ "kafka" ]
image: mysql:5.7
platform: linux/amd64
container_name: mysql
hostname: mysql
ports:
- "3306:3306"
environment:
TZ: UTC
MYSQL_ROOT_PASSWORD: "admin"
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
# MYSQL_USER: "mysqluser"
# MYSQL_PASSWORD: "mysqlpw"

restart: always
volumes:
- ./docker/mysql/my.cnf:/etc/mysql/conf.d/my.cnf
- ./docker/mysql/entrypoint.sql:/docker-entrypoint-initdb.d/entrypoint.sql
- ./docker/volume/mysql/data:/var/lib/mysql
- ./docker/volume/mysql/logs/:/var/log/mysql


####################################################################################################
# Kafka Producer: Debezium MySQL Connector
####################################################################################################
kafka-producer:
# profiles: [ "kafka" ]
image: debezium/connect:2.3.3.Final
container_name: kafka-producer
hostname: kafka-producer
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka1:29092
- GROUP_ID=cdc.inventory
- CONFIG_STORAGE_TOPIC=cdc.inventory.connect.configs
- OFFSET_STORAGE_TOPIC=cdc.inventory.connect.offsets
- STATUS_STORAGE_TOPIC=cdc.inventory.connect.statuses
depends_on:
- kafka1
- mysql

# Configure Network
networks:
default:
name: lakehouse

0 comments on commit 8064cf8

Please sign in to comment.