diff --git a/java/Iceberg/IcebergSQLSink/README.md b/java/Iceberg/IcebergSQLSink/README.md
new file mode 100644
index 0000000..1cfd8f3
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/README.md
@@ -0,0 +1,109 @@
+## Iceberg Sink (Glue Data Catalog) using SQL
+
+* Flink version: 1.20.0
+* Flink API: SQL API
+* Iceberg 1.9.1
+* Language: Java (11)
+* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
+ and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) sink
+
+This example demonstrates how to use
+[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Glue Data Catalog.
+
+For simplicity, the application generates synthetic data, random stock prices, internally.
+Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records
+that can be converted to table format for SQL operations.
+
+### Prerequisites
+
+The application expects the following resources:
+* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default").
+ The application creates the Table, but the Catalog must exist already.
+* An S3 bucket to write the Iceberg table.
+
+#### IAM Permissions
+
+The application must have IAM permissions to:
+* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
+ See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
+* Read and Write from the S3 bucket.
+
+### Runtime configuration
+
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
+
+When running locally, the configuration is read from the
+[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
+
+Runtime parameters:
+
+| Group ID | Key | Default | Description |
+|-----------|--------------------------|-------------------|--------------------------------------------------------------------------------------------|
+| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
+| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket and path URL prefix, starting with `s3://`. For example `s3://mybucket/iceberg`. |
+| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
+| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |
+
+### Running locally, in IntelliJ
+
+You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
+
+See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
+
+### Checkpoints
+
+Checkpointing must be enabled. Iceberg commits writes on checkpoint.
+
+When running locally, the application enables checkpoints programmatically, every 30 seconds.
+When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
+
+### Sample Data Schema
+
+The application uses a predefined schema for the stock price data with the following fields:
+* `timestamp`: STRING - ISO timestamp of the record
+* `symbol`: STRING - Stock symbol (e.g., AAPL, AMZN)
+* `price`: FLOAT - Stock price (0-10 range)
+* `volumes`: INT - Trade volumes (0-1000000 range)
+
+### Known limitations of the Flink Iceberg sink
+
+At the moment there are current limitations concerning Flink Iceberg integration:
+* Doesn't support Iceberg Table with hidden partitioning
+* Doesn't support adding columns, removing columns, renaming columns or changing columns.
+
+---
+
+### Known Flink issue: Hadoop library clash
+
+When integrating Flink with Iceberg, there's a common issue affecting most Flink setups
+
+When using Flink SQL's `CREATE CATALOG` statements, Hadoop libraries must be available on the system classpath.
+However, standard Flink distributions use shaded dependencies that can create class loading conflicts with Hadoop's
+expectations.
+Flink default classloading, when running in Application mode, prevents from using some Hadoop classes even if
+included in the application uber-jar.
+
+#### Solution
+
+This example shows a simple workaround to prevent the Hadoop class clashing:
+1. Include a modified version of the Flink class `org.apache.flink.runtime.util.HadoopUtils`
+2. Use Maven Shade Plugin to prevent class conflicts
+
+The modified [`org.apache.flink.runtime.util.HadoopUtils`](src/main/java/org/apache/flink/runtime/util/HadoopUtils.java)
+class is included in the source code of this project. You can include it as-is in your project, using the same package name.
+
+The shading is configured in the [`pom.xml`](pom.xml). In your project you can copy the `...` configuration
+into the `maven-shade-plugin` configuration.
+
+```xml
+
+
+ org.apache.hadoop.conf
+ shaded.org.apache.hadoop.conf
+
+
+ org.apache.flink.runtime.util.HadoopUtils
+ shadow.org.apache.flink.runtime.util.HadoopUtils
+
+
+```
\ No newline at end of file
diff --git a/java/Iceberg/IcebergSQLSink/pom.xml b/java/Iceberg/IcebergSQLSink/pom.xml
new file mode 100644
index 0000000..8579036
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/pom.xml
@@ -0,0 +1,193 @@
+
+
+ 4.0.0
+
+ com.amazonaws
+ iceberg-sql-sink
+ 1.0
+ jar
+
+
+ UTF-8
+ 11
+ ${target.java.version}
+ ${target.java.version}
+ 1.20
+ 1.20.0
+ 2.12
+ 1.9.1
+ 1.2.0
+ 2.23.1
+ 5.8.1
+
+
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+
+ 1.12.782
+ pom
+ import
+
+
+ software.amazon.awssdk
+ bom
+ 2.28.29
+ pom
+ import
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-datagen
+ ${flink.version}
+ provided
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+ org.apache.iceberg
+ iceberg-flink-runtime-${flink.major.version}
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-aws-bundle
+ ${iceberg.version}
+
+
+
+
+ org.apache.flink
+ flink-s3-fs-hadoop
+ ${flink.version}
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.6.0
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.amazonaws.services.msf.IcebergSQLSinkJob
+
+
+
+
+
+ org.apache.hadoop.conf
+ shaded.org.apache.hadoop.conf
+
+
+ org.apache.flink.runtime.util.HadoopUtils
+ shadow.org.apache.flink.runtime.util.HadoopUtils
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java
new file mode 100644
index 0000000..0c2962f
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java
@@ -0,0 +1,163 @@
+package com.amazonaws.services.msf;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import com.amazonaws.services.msf.domain.StockPrice;
+import com.amazonaws.services.msf.source.StockPriceGeneratorFunction;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+public class IcebergSQLSinkJob {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergSQLSinkJob.class);
+
+ // Constants
+ private static final String CATALOG_NAME = "glue";
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+ public static final String DEFAULT_DATABASE = "default";
+ public static final String DEFAULT_TABLE = "prices_iceberg";
+
+ private static boolean isLocal(StreamExecutionEnvironment env) {
+ return env instanceof LocalStreamEnvironment;
+ }
+
+ private static void validateURI(String uri) {
+ String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$";
+ Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern),
+ "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri);
+ }
+
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (isLocal(env)) {
+ LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ Objects.requireNonNull(IcebergSQLSinkJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath());
+ } else {
+ LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) {
+ double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0"));
+ Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0");
+
+ LOG.info("Data generator: {} record/sec", recordsPerSecond);
+ return new DataGeneratorSource(new StockPriceGeneratorFunction(),
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(recordsPerSecond),
+ TypeInformation.of(StockPrice.class));
+ }
+
+ private static String createCatalogStatement(String s3BucketPrefix) {
+ return "CREATE CATALOG " + CATALOG_NAME + " WITH (" +
+ "'type' = 'iceberg', " +
+ "'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog'," +
+ "'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'," +
+ "'warehouse' = '" + s3BucketPrefix + "')";
+ }
+
+ private static String createTableStatement(String sinkTableName) {
+ return "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" +
+ "`timestamp` STRING, " +
+ "symbol STRING," +
+ "price FLOAT," +
+ "volumes INT" +
+ ") PARTITIONED BY (symbol) ";
+ }
+
+ private static IcebergConfig setupIcebergProperties(Properties icebergProperties) {
+ String s3BucketPrefix = icebergProperties.getProperty("bucket.prefix");
+ String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_DATABASE);
+ String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_TABLE);
+
+ Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket prefix for the warehouse.");
+ Preconditions.checkNotNull(glueDatabase, "You must supply a database name");
+ Preconditions.checkNotNull(glueTable, "You must supply a table name");
+
+ // Validate S3 URI format
+ validateURI(s3BucketPrefix);
+
+ LOG.info("Iceberg configuration: bucket={}, database={}, table={}",
+ s3BucketPrefix, glueDatabase, glueTable);
+
+ return new IcebergConfig(s3BucketPrefix, glueDatabase, glueTable);
+ }
+
+ private static class IcebergConfig {
+ final String s3BucketPrefix;
+ final String glueDatabase;
+ final String glueTable;
+
+ IcebergConfig(String s3BucketPrefix, String glueDatabase, String glueTable) {
+ this.s3BucketPrefix = s3BucketPrefix;
+ this.glueDatabase = glueDatabase;
+ this.glueTable = glueTable;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // 1. Initialize environments
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ // 2. If running local, we need to enable Checkpoints. Iceberg commits data with every checkpoint
+ if (isLocal(env)) {
+ // For development, we are checkpointing every 30 second to have data commited faster.
+ env.enableCheckpointing(30000);
+ }
+
+ // 3. Parse and validate the configuration for the Iceberg sink
+ Map applicationProperties = loadApplicationProperties(env);
+ Properties icebergProperties = applicationProperties.get("Iceberg");
+ IcebergConfig config = setupIcebergProperties(icebergProperties);
+
+ // 4. Create data generator source, using DataStream API
+ Properties dataGenProperties = applicationProperties.get("DataGen");
+ DataStream stockPriceDataStream = env.fromSource(
+ createDataGenerator(dataGenProperties),
+ WatermarkStrategy.noWatermarks(),
+ "DataGen");
+
+ // 5. Convert DataStream to a Table and create view
+ Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream);
+ tableEnv.createTemporaryView("stockPriceTable", stockPriceTable);
+
+ String sinkTableName = CATALOG_NAME + "." + config.glueDatabase + "." + config.glueTable;
+
+ // Create catalog and configure it
+ tableEnv.executeSql(createCatalogStatement(config.s3BucketPrefix));
+ tableEnv.executeSql("USE CATALOG " + CATALOG_NAME);
+ tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + config.glueDatabase);
+ tableEnv.executeSql("USE " + config.glueDatabase);
+
+ // Create table
+ String createTableStatement = createTableStatement(sinkTableName);
+ LOG.info("Creating table with statement: {}", createTableStatement);
+ tableEnv.executeSql(createTableStatement);
+
+ // 7. Execute SQL operations - Insert data from stock price stream
+ String insertQuery = "INSERT INTO " + sinkTableName + " " +
+ "SELECT `timestamp`, symbol, price, volumes " +
+ "FROM default_catalog.default_database.stockPriceTable";
+ TableResult insertResult = tableEnv.executeSql(insertQuery);
+
+ // Keep the job running to continuously insert data
+ LOG.info("Application started successfully. Inserting data into Iceberg table: {}", sinkTableName);
+ }
+}
\ No newline at end of file
diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java
new file mode 100644
index 0000000..082693a
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java
@@ -0,0 +1,60 @@
+package com.amazonaws.services.msf.domain;
+
+public class StockPrice {
+ private String timestamp;
+ private String symbol;
+ private Float price;
+ private Integer volumes;
+
+ public StockPrice() {
+ }
+
+ public StockPrice(String timestamp, String symbol, Float price, Integer volumes) {
+ this.timestamp = timestamp;
+ this.symbol = symbol;
+ this.price = price;
+ this.volumes = volumes;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+ public void setSymbol(String symbol) {
+ this.symbol = symbol;
+ }
+
+ public Float getPrice() {
+ return price;
+ }
+
+ public void setPrice(Float price) {
+ this.price = price;
+ }
+
+ public Integer getVolumes() {
+ return volumes;
+ }
+
+ public void setVolumes(Integer volumes) {
+ this.volumes = volumes;
+ }
+
+ @Override
+ public String toString() {
+ return "com.amazonaws.services.msf.pojo.StockPrice{" +
+ "timestamp='" + timestamp + '\'' +
+ ", symbol='" + symbol + '\'' +
+ ", price=" + price +
+ ", volumes=" + volumes +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java
new file mode 100644
index 0000000..650fed6
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java
@@ -0,0 +1,27 @@
+package com.amazonaws.services.msf.source;
+
+import com.amazonaws.services.msf.domain.StockPrice;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import java.time.Instant;
+
+/**
+ * Function used by DataGen source to generate random records as com.amazonaws.services.msf.pojo.StockPrice POJOs.
+ *
+ * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction
+ * from the IcebergDataStreamSink example.
+ */
+public class StockPriceGeneratorFunction implements GeneratorFunction {
+
+ private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"};
+
+ @Override
+ public StockPrice map(Long sequence) throws Exception {
+ String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)];
+ float price = RandomUtils.nextFloat(0, 10);
+ int volumes = RandomUtils.nextInt(0, 1000000);
+ String timestamp = Instant.now().toString();
+
+ return new StockPrice(timestamp, symbol, price, volumes);
+ }
+}
\ No newline at end of file
diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
new file mode 100644
index 0000000..bc8cd8c
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -0,0 +1,120 @@
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+
+/**
+ * This class is a copy of org.apache.flink.runtime.util.HadoopUtils with the getHadoopConfiguration() method replaced to
+ * return an org.apache.hadoop.conf.Configuration instead of org.apache.hadoop.hdfs.HdfsConfiguration.
+ *
+ * This class is then shaded, along with org.apache.hadoop.conf.*, to avoid conflicts with the same classes provided by
+ * org.apache.flink:flink-s3-fs-hadoop, which is normally installed as plugin in Flink when S3.
+ *
+ * Other methods are copied from the original class.
+ */
+public class HadoopUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+
+ static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+ /**
+ * This method has been re-implemented to always return a org.apache.hadoop.conf.Configuration
+ */
+ public static Configuration getHadoopConfiguration(
+ org.apache.flink.configuration.Configuration flinkConfiguration) {
+ return new Configuration(false);
+ }
+
+ public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi) {
+ return UserGroupInformation.isSecurityEnabled()
+ && ugi.getAuthenticationMethod()
+ == UserGroupInformation.AuthenticationMethod.KERBEROS;
+ }
+
+
+ public static boolean areKerberosCredentialsValid(
+ UserGroupInformation ugi, boolean useTicketCache) {
+ Preconditions.checkState(isKerberosSecurityEnabled(ugi));
+
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ if (useTicketCache && !ugi.hasKerberosCredentials()) {
+ if (hasHDFSDelegationToken(ugi)) {
+ LOG.warn(
+ "Hadoop security is enabled but current login user does not have Kerberos credentials, "
+ + "use delegation token instead. Flink application will terminate after token expires.");
+ return true;
+ } else {
+ LOG.error(
+ "Hadoop security is enabled, but current login user has neither Kerberos credentials "
+ + "nor delegation tokens!");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Indicates whether the user has an HDFS delegation token.
+ */
+ public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) {
+ Collection> usrTok = ugi.getTokens();
+ for (Token extends TokenIdentifier> token : usrTok) {
+ if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Checks if the Hadoop dependency is at least the given version.
+ */
+ public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException {
+ final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion();
+ int maj = hadoopVersion.f0;
+ int min = hadoopVersion.f1;
+
+ return maj > major || (maj == major && min >= minor);
+ }
+
+ /**
+ * Checks if the Hadoop dependency is at most the given version.
+ */
+ public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException {
+ final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion();
+ int maj = hadoopVersion.f0;
+ int min = hadoopVersion.f1;
+
+ return maj < major || (maj == major && min < minor);
+ }
+
+ private static Tuple2 getMajorMinorBundledHadoopVersion() {
+ String versionString = VersionInfo.getVersion();
+ String[] versionParts = versionString.split("\\.");
+
+ if (versionParts.length < 2) {
+ throw new FlinkRuntimeException(
+ "Cannot determine version of Hadoop, unexpected version string: "
+ + versionString);
+ }
+
+ int maj = Integer.parseInt(versionParts[0]);
+ int min = Integer.parseInt(versionParts[1]);
+ return Tuple2.of(maj, min);
+ }
+}
diff --git a/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 0000000..e24669e
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,16 @@
+[
+ {
+ "PropertyGroupId": "DataGen",
+ "PropertyMap": {
+ "records.per.sec": 10.0
+ }
+ },
+ {
+ "PropertyGroupId": "Iceberg",
+ "PropertyMap": {
+ "bucket.prefix": "s3:///iceberg",
+ "catalog.db": "iceberg",
+ "catalog.table": "sqlsink_prices"
+ }
+ }
+]
diff --git a/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties b/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..9ba6bb3
--- /dev/null
+++ b/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties
@@ -0,0 +1,13 @@
+# Log4j2 configuration
+status = warn
+name = PropertiesConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = ConsoleAppender
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Root logger configuration
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
diff --git a/java/Iceberg/README.md b/java/Iceberg/README.md
index 640a382..b84ccfe 100644
--- a/java/Iceberg/README.md
+++ b/java/Iceberg/README.md
@@ -5,8 +5,9 @@ Examples demonstrating how to work with Apache Iceberg tables in Amazon Managed
## Table of Contents
### Iceberg Sinks
-- [**Iceberg DataStream Sink**](./IcebergDataStreamSink) - Writing data to Iceberg tables using AWS Glue Data Catalog
-- [**S3 Table Sink**](./S3TableSink) - Writing data to Iceberg tables stored directly in S3
+- [**Iceberg DataStream Sink**](./IcebergDataStreamSink) - Writing data to Iceberg tables on S3 with DataStream API, using AWS Glue Data Catalog
+- [**S3 Table Sink**](./S3TableSink) - Writing data to S3 Tables with DataStream API
+- [**Iceberg SQL Sink**](./IcebergSQLSink) - Writing data to Iceberg tables on S3 with SQL using AWS Glue Data Catalog
### Iceberg Sources
-- [**Iceberg DataStream Source**](./IcebergDataStreamSource) - Reading data from Iceberg tables using AWS Glue Data Catalog
+- [**Iceberg DataStream Source**](./IcebergDataStreamSource) - Reading data from Iceberg tables with DataStream API, using AWS Glue Data Catalog
diff --git a/java/pom.xml b/java/pom.xml
index 344a44d..0cba628 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -24,6 +24,7 @@
Iceberg/IcebergDataStreamSink
Iceberg/IcebergDataStreamSource
Iceberg/S3TableSink
+ Iceberg/IcebergSQLSink
KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders
KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders
KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders