From 70d26adfd77cdedffb042cedfe2a16dfe88e06ea Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Sun, 13 Jul 2025 19:05:47 +0200 Subject: [PATCH] New Iceberg SQL Sink example --- java/Iceberg/IcebergSQLSink/README.md | 109 ++++++++++ java/Iceberg/IcebergSQLSink/pom.xml | 193 ++++++++++++++++++ .../services/msf/IcebergSQLSinkJob.java | 163 +++++++++++++++ .../services/msf/domain/StockPrice.java | 60 ++++++ .../source/StockPriceGeneratorFunction.java | 27 +++ .../flink/runtime/util/HadoopUtils.java | 120 +++++++++++ .../flink-application-properties-dev.json | 16 ++ .../src/main/resources/log4j2.properties | 13 ++ java/Iceberg/README.md | 7 +- java/pom.xml | 1 + 10 files changed, 706 insertions(+), 3 deletions(-) create mode 100644 java/Iceberg/IcebergSQLSink/README.md create mode 100644 java/Iceberg/IcebergSQLSink/pom.xml create mode 100644 java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java create mode 100644 java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java create mode 100644 java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java create mode 100644 java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java create mode 100644 java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties diff --git a/java/Iceberg/IcebergSQLSink/README.md b/java/Iceberg/IcebergSQLSink/README.md new file mode 100644 index 0000000..1cfd8f3 --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/README.md @@ -0,0 +1,109 @@ +## Iceberg Sink (Glue Data Catalog) using SQL + +* Flink version: 1.20.0 +* Flink API: SQL API +* Iceberg 1.9.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) sink + +This example demonstrates how to use +[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records +that can be converted to table format for SQL operations. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|--------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket and path URL prefix, starting with `s3://`. For example `s3://mybucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 30 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + +### Sample Data Schema + +The application uses a predefined schema for the stock price data with the following fields: +* `timestamp`: STRING - ISO timestamp of the record +* `symbol`: STRING - Stock symbol (e.g., AAPL, AMZN) +* `price`: FLOAT - Stock price (0-10 range) +* `volumes`: INT - Trade volumes (0-1000000 range) + +### Known limitations of the Flink Iceberg sink + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +--- + +### Known Flink issue: Hadoop library clash + +When integrating Flink with Iceberg, there's a common issue affecting most Flink setups + +When using Flink SQL's `CREATE CATALOG` statements, Hadoop libraries must be available on the system classpath. +However, standard Flink distributions use shaded dependencies that can create class loading conflicts with Hadoop's +expectations. +Flink default classloading, when running in Application mode, prevents from using some Hadoop classes even if +included in the application uber-jar. + +#### Solution + +This example shows a simple workaround to prevent the Hadoop class clashing: +1. Include a modified version of the Flink class `org.apache.flink.runtime.util.HadoopUtils` +2. Use Maven Shade Plugin to prevent class conflicts + +The modified [`org.apache.flink.runtime.util.HadoopUtils`](src/main/java/org/apache/flink/runtime/util/HadoopUtils.java) +class is included in the source code of this project. You can include it as-is in your project, using the same package name. + +The shading is configured in the [`pom.xml`](pom.xml). In your project you can copy the `...` configuration +into the `maven-shade-plugin` configuration. + +```xml + + + org.apache.hadoop.conf + shaded.org.apache.hadoop.conf + + + org.apache.flink.runtime.util.HadoopUtils + shadow.org.apache.flink.runtime.util.HadoopUtils + + +``` \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLSink/pom.xml b/java/Iceberg/IcebergSQLSink/pom.xml new file mode 100644 index 0000000..8579036 --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/pom.xml @@ -0,0 +1,193 @@ + + + 4.0.0 + + com.amazonaws + iceberg-sql-sink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + 1.20 + 1.20.0 + 2.12 + 1.9.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + + com.amazonaws + aws-java-sdk-bom + + 1.12.782 + pom + import + + + software.amazon.awssdk + bom + 2.28.29 + pom + import + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + provided + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + org.apache.iceberg + iceberg-flink-runtime-${flink.major.version} + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + + + org.apache.flink + flink-s3-fs-hadoop + ${flink.version} + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.0 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.IcebergSQLSinkJob + + + + + + org.apache.hadoop.conf + shaded.org.apache.hadoop.conf + + + org.apache.flink.runtime.util.HadoopUtils + shadow.org.apache.flink.runtime.util.HadoopUtils + + + + + + + + + \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java new file mode 100644 index 0000000..0c2962f --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/IcebergSQLSinkJob.java @@ -0,0 +1,163 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.source.StockPriceGeneratorFunction; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class IcebergSQLSinkJob { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSQLSinkJob.class); + + // Constants + private static final String CATALOG_NAME = "glue"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + public static final String DEFAULT_DATABASE = "default"; + public static final String DEFAULT_TABLE = "prices_iceberg"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + private static void validateURI(String uri) { + String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; + Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), + "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(IcebergSQLSinkJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + private static String createCatalogStatement(String s3BucketPrefix) { + return "CREATE CATALOG " + CATALOG_NAME + " WITH (" + + "'type' = 'iceberg', " + + "'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog'," + + "'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'," + + "'warehouse' = '" + s3BucketPrefix + "')"; + } + + private static String createTableStatement(String sinkTableName) { + return "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" + + "`timestamp` STRING, " + + "symbol STRING," + + "price FLOAT," + + "volumes INT" + + ") PARTITIONED BY (symbol) "; + } + + private static IcebergConfig setupIcebergProperties(Properties icebergProperties) { + String s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); + String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_DATABASE); + String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_TABLE); + + Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket prefix for the warehouse."); + Preconditions.checkNotNull(glueDatabase, "You must supply a database name"); + Preconditions.checkNotNull(glueTable, "You must supply a table name"); + + // Validate S3 URI format + validateURI(s3BucketPrefix); + + LOG.info("Iceberg configuration: bucket={}, database={}, table={}", + s3BucketPrefix, glueDatabase, glueTable); + + return new IcebergConfig(s3BucketPrefix, glueDatabase, glueTable); + } + + private static class IcebergConfig { + final String s3BucketPrefix; + final String glueDatabase; + final String glueTable; + + IcebergConfig(String s3BucketPrefix, String glueDatabase, String glueTable) { + this.s3BucketPrefix = s3BucketPrefix; + this.glueDatabase = glueDatabase; + this.glueTable = glueTable; + } + } + + public static void main(String[] args) throws Exception { + // 1. Initialize environments + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. If running local, we need to enable Checkpoints. Iceberg commits data with every checkpoint + if (isLocal(env)) { + // For development, we are checkpointing every 30 second to have data commited faster. + env.enableCheckpointing(30000); + } + + // 3. Parse and validate the configuration for the Iceberg sink + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + IcebergConfig config = setupIcebergProperties(icebergProperties); + + // 4. Create data generator source, using DataStream API + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to a Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + String sinkTableName = CATALOG_NAME + "." + config.glueDatabase + "." + config.glueTable; + + // Create catalog and configure it + tableEnv.executeSql(createCatalogStatement(config.s3BucketPrefix)); + tableEnv.executeSql("USE CATALOG " + CATALOG_NAME); + tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + config.glueDatabase); + tableEnv.executeSql("USE " + config.glueDatabase); + + // Create table + String createTableStatement = createTableStatement(sinkTableName); + LOG.info("Creating table with statement: {}", createTableStatement); + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations - Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + " " + + "SELECT `timestamp`, symbol, price, volumes " + + "FROM default_catalog.default_database.stockPriceTable"; + TableResult insertResult = tableEnv.executeSql(insertQuery); + + // Keep the job running to continuously insert data + LOG.info("Application started successfully. Inserting data into Iceberg table: {}", sinkTableName); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 0000000..082693a --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,60 @@ +package com.amazonaws.services.msf.domain; + +public class StockPrice { + private String timestamp; + private String symbol; + private Float price; + private Integer volumes; + + public StockPrice() { + } + + public StockPrice(String timestamp, String symbol, Float price, Integer volumes) { + this.timestamp = timestamp; + this.symbol = symbol; + this.price = price; + this.volumes = volumes; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public Float getPrice() { + return price; + } + + public void setPrice(Float price) { + this.price = price; + } + + public Integer getVolumes() { + return volumes; + } + + public void setVolumes(Integer volumes) { + this.volumes = volumes; + } + + @Override + public String toString() { + return "com.amazonaws.services.msf.pojo.StockPrice{" + + "timestamp='" + timestamp + '\'' + + ", symbol='" + symbol + '\'' + + ", price=" + price + + ", volumes=" + volumes + + '}'; + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..650fed6 --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.msf.source; + +import com.amazonaws.services.msf.domain.StockPrice; +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as com.amazonaws.services.msf.pojo.StockPrice POJOs. + * + * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction + * from the IcebergDataStreamSink example. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + @Override + public StockPrice map(Long sequence) throws Exception { + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + return new StockPrice(timestamp, symbol, price, volumes); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java new file mode 100644 index 0000000..bc8cd8c --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java @@ -0,0 +1,120 @@ +package org.apache.flink.runtime.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + + +/** + * This class is a copy of org.apache.flink.runtime.util.HadoopUtils with the getHadoopConfiguration() method replaced to + * return an org.apache.hadoop.conf.Configuration instead of org.apache.hadoop.hdfs.HdfsConfiguration. + * + * This class is then shaded, along with org.apache.hadoop.conf.*, to avoid conflicts with the same classes provided by + * org.apache.flink:flink-s3-fs-hadoop, which is normally installed as plugin in Flink when S3. + * + * Other methods are copied from the original class. + */ +public class HadoopUtils { + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + /** + * This method has been re-implemented to always return a org.apache.hadoop.conf.Configuration + */ + public static Configuration getHadoopConfiguration( + org.apache.flink.configuration.Configuration flinkConfiguration) { + return new Configuration(false); + } + + public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi) { + return UserGroupInformation.isSecurityEnabled() + && ugi.getAuthenticationMethod() + == UserGroupInformation.AuthenticationMethod.KERBEROS; + } + + + public static boolean areKerberosCredentialsValid( + UserGroupInformation ugi, boolean useTicketCache) { + Preconditions.checkState(isKerberosSecurityEnabled(ugi)); + + // note: UGI::hasKerberosCredentials inaccurately reports false + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), + // so we check only in ticket cache scenario. + if (useTicketCache && !ugi.hasKerberosCredentials()) { + if (hasHDFSDelegationToken(ugi)) { + LOG.warn( + "Hadoop security is enabled but current login user does not have Kerberos credentials, " + + "use delegation token instead. Flink application will terminate after token expires."); + return true; + } else { + LOG.error( + "Hadoop security is enabled, but current login user has neither Kerberos credentials " + + "nor delegation tokens!"); + return false; + } + } + + return true; + } + + /** + * Indicates whether the user has an HDFS delegation token. + */ + public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) { + Collection> usrTok = ugi.getTokens(); + for (Token token : usrTok) { + if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) { + return true; + } + } + return false; + } + + /** + * Checks if the Hadoop dependency is at least the given version. + */ + public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj > major || (maj == major && min >= minor); + } + + /** + * Checks if the Hadoop dependency is at most the given version. + */ + public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj < major || (maj == major && min < minor); + } + + private static Tuple2 getMajorMinorBundledHadoopVersion() { + String versionString = VersionInfo.getVersion(); + String[] versionParts = versionString.split("\\."); + + if (versionParts.length < 2) { + throw new FlinkRuntimeException( + "Cannot determine version of Hadoop, unexpected version string: " + + versionString); + } + + int maj = Integer.parseInt(versionParts[0]); + int min = Integer.parseInt(versionParts[1]); + return Tuple2.of(maj, min); + } +} diff --git a/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..e24669e --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,16 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 10.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "bucket.prefix": "s3:///iceberg", + "catalog.db": "iceberg", + "catalog.table": "sqlsink_prices" + } + } +] diff --git a/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties b/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..9ba6bb3 --- /dev/null +++ b/java/Iceberg/IcebergSQLSink/src/main/resources/log4j2.properties @@ -0,0 +1,13 @@ +# Log4j2 configuration +status = warn +name = PropertiesConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = ConsoleAppender +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Root logger configuration +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender diff --git a/java/Iceberg/README.md b/java/Iceberg/README.md index 640a382..b84ccfe 100644 --- a/java/Iceberg/README.md +++ b/java/Iceberg/README.md @@ -5,8 +5,9 @@ Examples demonstrating how to work with Apache Iceberg tables in Amazon Managed ## Table of Contents ### Iceberg Sinks -- [**Iceberg DataStream Sink**](./IcebergDataStreamSink) - Writing data to Iceberg tables using AWS Glue Data Catalog -- [**S3 Table Sink**](./S3TableSink) - Writing data to Iceberg tables stored directly in S3 +- [**Iceberg DataStream Sink**](./IcebergDataStreamSink) - Writing data to Iceberg tables on S3 with DataStream API, using AWS Glue Data Catalog +- [**S3 Table Sink**](./S3TableSink) - Writing data to S3 Tables with DataStream API +- [**Iceberg SQL Sink**](./IcebergSQLSink) - Writing data to Iceberg tables on S3 with SQL using AWS Glue Data Catalog ### Iceberg Sources -- [**Iceberg DataStream Source**](./IcebergDataStreamSource) - Reading data from Iceberg tables using AWS Glue Data Catalog +- [**Iceberg DataStream Source**](./IcebergDataStreamSource) - Reading data from Iceberg tables with DataStream API, using AWS Glue Data Catalog diff --git a/java/pom.xml b/java/pom.xml index 344a44d..0cba628 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -24,6 +24,7 @@ Iceberg/IcebergDataStreamSink Iceberg/IcebergDataStreamSource Iceberg/S3TableSink + Iceberg/IcebergSQLSink KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders