Skip to content

Commit

Permalink
feat: Add Iceberg CDC Playbook
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda committed Sep 10, 2023
1 parent 79b0b91 commit 09ca127
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
1 change: 1 addition & 0 deletions docker-compose-cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ services:
ports:
- "8083:8083"
environment:
# https://hub.docker.com/r/debezium/connect-base
- BOOTSTRAP_SERVERS=kafka1:29092
- GROUP_ID=cdc.inventory
- CONFIG_STORAGE_TOPIC=cdc.inventory.connect.configs
Expand Down
57 changes: 57 additions & 0 deletions playbook/flink-sql-iceberg-cdc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
CREATE TABLE raw_products (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(38, 10)

) WITH (
'connector' = 'kafka',
'topic' = 'cdc.inventory.data.inventory.products',
'properties.bootstrap.servers' = 'kafka1:29092',
'properties.group.id' = 'testGroup',
'properties.auto.offset.reset' = 'earliest',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true',
'debezium-json.ignore-parse-errors' = 'false'
);

CREATE TABLE iceberg.flink.inventory_products (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(38, 10),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'catalog-name'='iceberg',
'catalog-database'='flink',
'catalog-type'='hive',
'uri'='thrift://hive-metastore:9083',
'warehouse'='s3a://datalake/iceberg',
'format-version'='2',
'write.parquet.compression-codec'='zstd',
'write.parquet.compression-level'='9',
'write.distribution-mode'='none',
'write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'write.upsert.enabled'='true'
);

SET 'execution.checkpointing.interval' = '30s';
SET 'parallelism.default' = '2';
SET 'pipeline.max-parallelism' = '12';
SET 'execution.runtime-mode' = 'streaming';
SET pipeline.name='INSERT_inventory_products';
INSERT INTO iceberg.flink.inventory_products SELECT id, name, description, weight FROM raw_products;

RESET pipeline.name;
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT * FROM iceberg.flink.inventory_products;

0 comments on commit 09ca127

Please sign in to comment.