diff --git a/Makefile b/Makefile
index 460dcc5..73cdd80 100644
--- a/Makefile
+++ b/Makefile
@@ -57,7 +57,7 @@ compose.cdc:
.PHONY: compose.stream
compose.stream:
- COMPOSE_PROFILES=flink,kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up
+ COMPOSE_PROFILES=kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up
.PHONY: compose.clean
compose.clean:
diff --git a/docker-compose.yml b/docker-compose.yml
index ac2d2b0..28600a1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -244,7 +244,7 @@ services:
working_dir: /opt/flink
ports:
- - "8081:8081"
+ - "8082:8081"
- "6123:6123"
environment:
diff --git a/project-flink/.gitignore b/project-flink/.gitignore
new file mode 100644
index 0000000..d18d270
--- /dev/null
+++ b/project-flink/.gitignore
@@ -0,0 +1,44 @@
+### Java ###
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+replay_pid*
+
+### Maven ###
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+# https://github.com/takari/maven-wrapper#usage-without-binary-jar
+.mvn/wrapper/maven-wrapper.jar
+
+# Eclipse m2e generated files
+# Eclipse Core
+.project
+# JDT-specific (Eclipse Java Development Tools)
+.classpath
diff --git a/project-flink/.idea/icon.png b/project-flink/.idea/icon.png
new file mode 100644
index 0000000..37c980e
Binary files /dev/null and b/project-flink/.idea/icon.png differ
diff --git a/project-flink/flink-sql-runner/pom.xml b/project-flink/flink-sql-runner/pom.xml
new file mode 100644
index 0000000..1222ea7
--- /dev/null
+++ b/project-flink/flink-sql-runner/pom.xml
@@ -0,0 +1,189 @@
+
+
+ 4.0.0
+
+
+ com.github.lambda.lakehouse
+ project-flink
+ 1.0-SNAPSHOT
+ ../pom.xml
+
+
+ flink-sql-runner
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-json
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-table-planner_2.12
+ ${dep.version.flink}
+
+
+
+ org.apache.flink
+ flink-connector-files
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${dep.version.flink}
+
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-sql-connector-kafka
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${dep.version.kafka}
+
+
+
+ org.apache.flink
+ flink-s3-fs-hadoop
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-parquet
+ ${dep.version.flink}
+ provided
+
+
+
+
+ org.apache.iceberg
+ iceberg-flink-runtime-${dep.version.flinkShort}
+ ${dep.version.iceberg}
+
+
+
+
+ software.amazon.awssdk
+ bundle
+ ${dep.version.awssdk}
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${dep.version.hadoop}
+
+
+ com.sun.jersey
+ jersey-core
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ ${dep.version.hadoop}
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+ ${dep.version.hadoop}
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${dep.version.hadoop}
+
+
+ com.sun.jersey
+ jersey-core
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${dep.version.hadoop}
+
+
+ com.sun.jersey
+ jersey-core
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.4.1
+
+ true
+ combined
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 8
+
+
+
+
+
+
diff --git a/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToIceberg.java b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToIceberg.java
new file mode 100644
index 0000000..1eb7e39
--- /dev/null
+++ b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToIceberg.java
@@ -0,0 +1,98 @@
+package com.github.lambda.lakehouse;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkAppKafkaToIceberg {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkAppKafkaToIceberg.class);
+
+ public static void main(String[] args) throws Exception {
+
+ TableEnvironment tableEnv = buildTableEnvironment();
+
+ Table tableRawCustomers = buildSourceTable("raw_customers", tableEnv);
+ Table tableAggrCustomers = buildSinkTable("aggr_customers", tableEnv);
+
+ tableEnv.executeSql("INSERT INTO aggr_customers SELECT id, weight FROM raw_customers");
+ }
+
+ public static Table buildSinkTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " id BIGINT,\n"
+ + " weight DECIMAL(38, 10),\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ") "
+ + "WITH (\n"
+ + " 'connector' = 'iceberg',\n"
+ + " 'topic' = 'aggregation.customers',\n"
+ + " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ + " 'properties.allow.auto.create.topics' = 'true',\n"
+ + " 'value.format' = 'json',\n"
+ + " 'key.format' = 'json'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static Table buildSourceTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n"
+ + " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"
+ + " origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,\n"
+ + " origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,\n"
+ + " origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,\n"
+ + " origin_properties MAP METADATA FROM 'value.source.properties' VIRTUAL,\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + " description STRING,\n"
+ + " weight DECIMAL(38, 10)\n" + ") "
+ + "WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = 'cdc-json.inventory.data.inventory.customers',\n"
+ + " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ + " 'properties.group.id' = 'testGroup',\n"
+ + " 'properties.auto.offset.reset' = 'earliest',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'format' = 'debezium-json',\n"
+ + " 'debezium-json.schema-include' = 'true',\n"
+ + " 'debezium-json.ignore-parse-errors' = 'false'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static StreamTableEnvironment buildTableEnvironment() {
+ // TODO (Kun): Handle Parameters
+ // - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/
+ Configuration conf = new Configuration();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironmentWithWebUI(conf);
+ env.getCheckpointConfig().setCheckpointInterval(30000L);
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
+ env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoint");
+ env.setDefaultSavepointDirectory("file:///tmp/flink-savepoint");
+
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ return tableEnv;
+ }
+}
diff --git a/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToKafka.java b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToKafka.java
new file mode 100644
index 0000000..69d75ee
--- /dev/null
+++ b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToKafka.java
@@ -0,0 +1,98 @@
+package com.github.lambda.lakehouse;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkAppKafkaToKafka {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkAppKafkaToKafka.class);
+
+ public static void main(String[] args) throws Exception {
+
+ TableEnvironment tableEnv = buildTableEnvironment();
+
+ Table tableRawCustomers = buildSourceTable("raw_customers", tableEnv);
+ Table tableAggrCustomers = buildSinkTable("aggr_customers", tableEnv);
+
+ tableEnv.executeSql("INSERT INTO aggr_customers SELECT id, weight FROM raw_customers");
+ }
+
+ public static Table buildSinkTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " id BIGINT,\n"
+ + " weight DECIMAL(38, 10),\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ") "
+ + "WITH (\n"
+ + " 'connector' = 'upsert-kafka',\n"
+ + " 'topic' = 'aggregation.customers',\n"
+ + " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ + " 'properties.allow.auto.create.topics' = 'true',\n"
+ + " 'value.format' = 'json',\n"
+ + " 'key.format' = 'json'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static Table buildSourceTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n"
+ + " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"
+ + " origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,\n"
+ + " origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,\n"
+ + " origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,\n"
+ + " origin_properties MAP METADATA FROM 'value.source.properties' VIRTUAL,\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + " description STRING,\n"
+ + " weight DECIMAL(38, 10)\n" + ") "
+ + "WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = 'cdc-json.inventory.data.inventory.customers',\n"
+ + " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ + " 'properties.group.id' = 'testGroup',\n"
+ + " 'properties.auto.offset.reset' = 'earliest',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'format' = 'debezium-json',\n"
+ + " 'debezium-json.schema-include' = 'true',\n"
+ + " 'debezium-json.ignore-parse-errors' = 'false'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static StreamTableEnvironment buildTableEnvironment() {
+ // TODO (Kun): Handle Parameters
+ // - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/
+ Configuration conf = new Configuration();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironmentWithWebUI(conf);
+ env.getCheckpointConfig().setCheckpointInterval(30000L);
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
+ env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoint");
+ env.setDefaultSavepointDirectory("file:///tmp/flink-savepoint");
+
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ return tableEnv;
+ }
+}
diff --git a/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToS3.java b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToS3.java
new file mode 100644
index 0000000..cc5854a
--- /dev/null
+++ b/project-flink/flink-sql-runner/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToS3.java
@@ -0,0 +1,90 @@
+package com.github.lambda.lakehouse;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkAppKafkaToS3 {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkAppKafkaToS3.class);
+
+ public static void main(String[] args) throws Exception {
+
+ TableEnvironment tableEnv = buildTableEnvironment();
+
+ Table tableRawCustomers = buildSourceTable("raw_customers", tableEnv);
+ Table tableAggrCustomers = buildSinkTable("aggr_customers", tableEnv);
+
+ tableEnv.executeSql("INSERT INTO aggr_customers SELECT state, trace, worker_id FROM raw_customers");
+ }
+
+ public static Table buildSinkTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " state STRING,\n"
+ + " trace STRING,\n"
+ + " worker_id STRING\n"
+ + ") PARTITIONED BY (worker_id) "
+ + "WITH (\n"
+ + " 'connector' = 'filesystem',\n"
+ + " 'format' = 'parquet',\n"
+ + " 'path' = 's3://datalake/parquet/test'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static Table buildSourceTable(String tableName, TableEnvironment tableEnv) {
+ String query = ""
+ + "CREATE TABLE " + tableName + " (\n"
+ + " state STRING,\n"
+ + " trace STRING,\n"
+ + " worker_id STRING\n" + ") "
+ + "WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = 'connect-cluster.inventory.status',\n"
+ + " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ + " 'properties.group.id' = 'testGroup',\n"
+ + " 'properties.auto.offset.reset' = 'earliest',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'format' = 'json',"
+ + " 'json.ignore-parse-errors' = 'true',\n"
+ + " 'json.fail-on-missing-field' = 'false'\n"
+ + ");\n";
+ tableEnv.executeSql(query);
+ tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();
+
+ Table table = tableEnv.from(tableName);
+
+ return table;
+ }
+
+ public static StreamTableEnvironment buildTableEnvironment() {
+ // TODO (Kun): Handle Parameters
+ // - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/
+
+ Configuration conf = new Configuration();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironmentWithWebUI(conf);
+ env.getCheckpointConfig().setCheckpointInterval(10000L);
+// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+// env.setStateBackend(new EmbeddedRocksDBStateBackend());
+// env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoint");
+// env.setDefaultSavepointDirectory("file:///tmp/flink-savepoint");
+
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ return tableEnv;
+ }
+}
diff --git a/project-flink/flink-sql-runner/src/main/resources/core-site.xml b/project-flink/flink-sql-runner/src/main/resources/core-site.xml
new file mode 100644
index 0000000..8b71e6d
--- /dev/null
+++ b/project-flink/flink-sql-runner/src/main/resources/core-site.xml
@@ -0,0 +1,18 @@
+
+
+ fs.s3a.endpoint
+ http://localhost:9000
+
+
+ fs.s3a.access.key
+ minio
+
+
+ fs.s3a.secret.key
+ minio123
+
+
+ fs.s3a.path.style.access
+ true
+
+
diff --git a/project-flink/flink-sql-runner/src/main/resources/log4j.properties b/project-flink/flink-sql-runner/src/main/resources/log4j.properties
new file mode 100644
index 0000000..1ab559e
--- /dev/null
+++ b/project-flink/flink-sql-runner/src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/project-flink/pom.xml b/project-flink/pom.xml
new file mode 100644
index 0000000..9e8259b
--- /dev/null
+++ b/project-flink/pom.xml
@@ -0,0 +1,121 @@
+
+
+ 4.0.0
+
+ com.github.lambda.lakehouse
+ project-flink
+ 1.0-SNAPSHOT
+ pom
+
+
+ flink-sql-runner
+
+
+
+ 11
+ 11
+ 11
+
+ UTF-8
+ UTF-8
+
+ 1.16.1
+ 1.16
+ 1.3.1
+ 3.4.0
+ 3.3.1
+ 2.20.18
+
+ 1.7.36
+ 2.17.1
+
+
+ NonEmptyAtclauseDescription,
+ JavadocMethod,
+ MissingJavadocType,
+ MissingJavadocMethod
+
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${dep.version.flink}
+ provided
+
+
+
+ org.apache.flink
+ flink-java
+ ${dep.version.flink}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-statebackend-rocksdb
+ ${dep.version.flink}
+
+
+
+
+ org.apache.flink
+ flink-runtime-web
+ ${dep.version.flink}
+ provided
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${dep.version.slf4j}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${dep.version.log4j}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${dep.version.log4j}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${dep.version.log4j}
+ provided
+
+
+
+
+
+
+
+ com.fasterxml.jackson
+ jackson-bom
+ pom
+ import
+ 2.15.0
+
+
+ org.junit
+ junit-bom
+ 5.9.3
+ pom
+ import
+
+
+
+
+
+