Skip to content

Commit

Permalink
feat: Add Flink SQL Application
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda committed Sep 17, 2023
1 parent 964763d commit d095074
Show file tree
Hide file tree
Showing 11 changed files with 665 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ compose.cdc:

.PHONY: compose.stream
compose.stream:
COMPOSE_PROFILES=flink,kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up
COMPOSE_PROFILES=kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up

.PHONY: compose.clean
compose.clean:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ services:
working_dir: /opt/flink
ports:
- "8081:8081"
- "8082:8081"
- "6123:6123"

environment:
Expand Down
44 changes: 44 additions & 0 deletions project-flink/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
### Java ###
# Compiled class file
*.class

# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
replay_pid*

### Maven ###
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar

# Eclipse m2e generated files
# Eclipse Core
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath
Binary file added project-flink/.idea/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
189 changes: 189 additions & 0 deletions project-flink/flink-sql-runner/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.github.lambda.lakehouse</groupId>
<artifactId>project-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>flink-sql-runner</artifactId>

<properties>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${dep.version.flink}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${dep.version.flink}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${dep.version.kafka}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${dep.version.flink}</version>
<scope>provided</scope>
</dependency>

<!-- Iceberg Dependency -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${dep.version.flinkShort}</artifactId>
<version>${dep.version.iceberg}</version>
</dependency>

<!-- Extra Dependency -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>${dep.version.awssdk}</version>
</dependency>

<!-- Hadoop Dependency -->
<!-- mapreduce -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${dep.version.hadoop}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${dep.version.hadoop}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${dep.version.hadoop}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${dep.version.hadoop}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${dep.version.hadoop}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<!-- The maven-shade plugin creates a fat jar that contains all
dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>combined</shadedClassifierName>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.github.lambda.lakehouse;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkAppKafkaToIceberg {

private static final Logger LOG = LoggerFactory.getLogger(FlinkAppKafkaToIceberg.class);

public static void main(String[] args) throws Exception {

TableEnvironment tableEnv = buildTableEnvironment();

Table tableRawCustomers = buildSourceTable("raw_customers", tableEnv);
Table tableAggrCustomers = buildSinkTable("aggr_customers", tableEnv);

tableEnv.executeSql("INSERT INTO aggr_customers SELECT id, weight FROM raw_customers");
}

public static Table buildSinkTable(String tableName, TableEnvironment tableEnv) {
String query = ""
+ "CREATE TABLE " + tableName + " (\n"
+ " id BIGINT,\n"
+ " weight DECIMAL(38, 10),\n"
+ " PRIMARY KEY (id) NOT ENFORCED\n"
+ ") "
+ "WITH (\n"
+ " 'connector' = 'iceberg',\n"
+ " 'topic' = 'aggregation.customers',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'properties.allow.auto.create.topics' = 'true',\n"
+ " 'value.format' = 'json',\n"
+ " 'key.format' = 'json'\n"
+ ");\n";
tableEnv.executeSql(query);
tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();

Table table = tableEnv.from(tableName);

return table;
}

public static Table buildSourceTable(String tableName, TableEnvironment tableEnv) {
String query = ""
+ "CREATE TABLE " + tableName + " (\n"
+ " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n"
+ " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"
+ " origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,\n"
+ " origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,\n"
+ " origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,\n"
+ " origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,\n"
+ " id BIGINT,\n"
+ " name STRING,\n"
+ " description STRING,\n"
+ " weight DECIMAL(38, 10)\n" + ") "
+ "WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = 'cdc-json.inventory.data.inventory.customers',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'properties.group.id' = 'testGroup',\n"
+ " 'properties.auto.offset.reset' = 'earliest',\n"
+ " 'scan.startup.mode' = 'earliest-offset',\n"
+ " 'format' = 'debezium-json',\n"
+ " 'debezium-json.schema-include' = 'true',\n"
+ " 'debezium-json.ignore-parse-errors' = 'false'\n"
+ ");\n";
tableEnv.executeSql(query);
tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print();

Table table = tableEnv.from(tableName);

return table;
}

public static StreamTableEnvironment buildTableEnvironment() {
// TODO (Kun): Handle Parameters
// - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(conf);
env.getCheckpointConfig().setCheckpointInterval(30000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoint");
env.setDefaultSavepointDirectory("file:///tmp/flink-savepoint");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

return tableEnv;
}
}
Loading

0 comments on commit d095074

Please sign in to comment.