Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions java/Iceberg/IcebergSQLSink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
## Iceberg Sink (Glue Data Catalog) using SQL

* Flink version: 1.20.0
* Flink API: SQL API
* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) sink

This example demonstrates how to use
[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Glue Data Catalog.

For simplicity, the application generates synthetic data, random stock prices, internally.
Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records
that can be converted to table format for SQL operations.

### Prerequisites

The application expects the following resources:
* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default").
The application creates the Table, but the Catalog must exist already.
* An S3 bucket to write the Iceberg table.

#### IAM Permissions

The application must have IAM permissions to:
* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
* Read and Write from the S3 bucket.

### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.

When running locally, the configuration is read from the
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.

Runtime parameters:

| Group ID | Key | Default | Description |
|-----------|--------------------------|-------------------|--------------------------------------------------------------------------------------------|
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket and path URL prefix, starting with `s3://`. For example `s3://mybucket/iceberg`. |
| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |

### Running locally, in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.

### Checkpoints

Checkpointing must be enabled. Iceberg commits writes on checkpoint.

When running locally, the application enables checkpoints programmatically, every 30 seconds.
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.

### Sample Data Schema

The application uses a predefined schema for the stock price data with the following fields:
* `timestamp`: STRING - ISO timestamp of the record
* `symbol`: STRING - Stock symbol (e.g., AAPL, AMZN)
* `price`: FLOAT - Stock price (0-10 range)
* `volumes`: INT - Trade volumes (0-1000000 range)

### Known limitations of the Flink Iceberg sink

At the moment there are current limitations concerning Flink Iceberg integration:
* Doesn't support Iceberg Table with hidden partitioning
* Doesn't support adding columns, removing columns, renaming columns or changing columns.

---

### Known Flink issue: Hadoop library clash

When integrating Flink with Iceberg, there's a common issue affecting most Flink setups

When using Flink SQL's `CREATE CATALOG` statements, Hadoop libraries must be available on the system classpath.
However, standard Flink distributions use shaded dependencies that can create class loading conflicts with Hadoop's
expectations.
Flink default classloading, when running in Application mode, prevents from using some Hadoop classes even if
included in the application uber-jar.

#### Solution

This example shows a simple workaround to prevent the Hadoop class clashing:
1. Include a modified version of the Flink class `org.apache.flink.runtime.util.HadoopUtils`
2. Use Maven Shade Plugin to prevent class conflicts

The modified [`org.apache.flink.runtime.util.HadoopUtils`](src/main/java/org/apache/flink/runtime/util/HadoopUtils.java)
class is included in the source code of this project. You can include it as-is in your project, using the same package name.

The shading is configured in the [`pom.xml`](pom.xml). In your project you can copy the `<relocations>...</relocations>` configuration
into the `maven-shade-plugin` configuration.

```xml
<relocations>
<relocation>
<pattern>org.apache.hadoop.conf</pattern>
<shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
<shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
</relocation>
</relocations>
```
193 changes: 193 additions & 0 deletions java/Iceberg/IcebergSQLSink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>iceberg-sql-sink</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.major.version>1.20</flink.major.version>
<flink.version>1.20.0</flink.version>
<scala.version>2.12</scala.version>
<iceberg.version>1.9.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>


<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.12.782</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.29</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${flink.major.version}</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- S3 File System Support -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Logging Dependencies -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- Shade plugin to build the fat-jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.IcebergSQLSinkJob</mainClass>
</transformer>
</transformers>
<!-- We relocate Hadoop-conf classes packaged with the application,
along with the modified HadoopUtils class -->
<relocations>
<relocation>
<pattern>org.apache.hadoop.conf</pattern>
<shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
<shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading