From 792e21ce0f2d09e43728d6cfe1691afba4b9a059 Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 14 Jul 2025 11:36:37 +0200 Subject: [PATCH 1/4] Updating DataStream to use same depedencies as SQL --- java/Iceberg/IcebergDataStreamSink/pom.xml | 111 +++++++----------- .../AvroGenericRecordToRowDataMapper.java | 31 +++++ .../msf/iceberg/IcebergSinkBuilder.java | 8 +- .../flink-application-properties-dev.json | 6 +- 4 files changed, 79 insertions(+), 77 deletions(-) create mode 100644 java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java diff --git a/java/Iceberg/IcebergDataStreamSink/pom.xml b/java/Iceberg/IcebergDataStreamSink/pom.xml index 215cff5..96f5ec6 100644 --- a/java/Iceberg/IcebergDataStreamSink/pom.xml +++ b/java/Iceberg/IcebergDataStreamSink/pom.xml @@ -14,16 +14,36 @@ 11 ${target.java.version} ${target.java.version} - + 1.20 1.20.0 1.11.3 - 3.4.0 - 1.8.1 + 1.9.1 1.2.0 2.23.1 5.8.1 + + + + com.amazonaws + aws-java-sdk-bom + + 1.12.782 + pom + import + + + software.amazon.awssdk + bom + 2.28.29 + pom + import + + + + + @@ -44,12 +64,7 @@ ${flink.version} provided - - org.apache.flink - flink-connector-files - ${flink.version} - provided - + org.apache.flink flink-table-runtime @@ -57,13 +72,6 @@ provided - - - org.apache.flink - flink-metrics-dropwizard - ${flink.version} - - com.amazonaws @@ -79,62 +87,26 @@ ${flink.version} - - - - org.apache.iceberg - iceberg-core - ${iceberg.version} - - - org.apache.iceberg - iceberg-flink - ${iceberg.version} - - - org.apache.iceberg - iceberg-flink-1.20 - ${iceberg.version} - - - org.apache.iceberg - iceberg-aws-bundle - ${iceberg.version} - - - org.apache.iceberg - iceberg-aws - ${iceberg.version} - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - - - org.apache.avro - avro - - - - org.slf4j - slf4j-reload4j - - - + + org.apache.iceberg + iceberg-flink-runtime-${flink.major.version} + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + + org.apache.flink + flink-s3-fs-hadoop + ${flink.version} + - - - org.junit.jupiter - junit-jupiter - ${junit5.version} - test - - - - + org.apache.logging.log4j log4j-slf4j-impl @@ -149,7 +121,6 @@ org.apache.logging.log4j log4j-core ${log4j.version} - runtime diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java new file mode 100644 index 0000000..d692448 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java @@ -0,0 +1,31 @@ +package com.amazonaws.services.msf.iceberg; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +public class AvroGenericRecordToRowDataMapper implements MapFunction { + private final AvroToRowDataConverters.AvroToRowDataConverter converter; + + AvroGenericRecordToRowDataMapper(RowType rowType) { + this.converter = AvroToRowDataConverters.createRowConverter(rowType); + } + + public RowData map(GenericRecord genericRecord) throws Exception { + return (RowData)this.converter.convert(genericRecord); + } + + public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) { + DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString()); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + RowType rowType = (RowType) logicalType; + return new AvroGenericRecordToRowDataMapper(rowType); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java index c851a76..b846e8c 100644 --- a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java +++ b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -8,7 +8,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.iceberg.BaseTable; -import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -19,9 +18,9 @@ import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.shaded.org.apache.avro.Schema; import java.util.Arrays; import java.util.HashMap; @@ -88,11 +87,12 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS); List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+")); + Schema shadedAvroSchema = new Schema.Parser().parse(avroSchema.toString()); // Convert Avro Schema to Iceberg Schema, this will be used for creating the table - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(shadedAvroSchema); // Avro Generic Record to Row Data Mapper - MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); + AvroGenericRecordToRowDataMapper avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); // Catalog properties for using Glue Data Catalog diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json index 30a6ac0..7000272 100644 --- a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json @@ -8,9 +8,9 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3:///iceberg", - "catalog.db": "default", - "catalog.table": "prices_iceberg", + "bucket.prefix": "s3://iceberg-performance-026090544291/iceberg", + "catalog.db": "iceberg", + "catalog.table": "prices_iceberg_datastream2", "partition.fields": "symbol", "operation": "append", "upsert.equality.fields": "symbol" From 34621e6bbf6565838cd513ff1f7cce3ba0c956dd Mon Sep 17 00:00:00 2001 From: FranMorilloAWS <85679820+FranMorilloAWS@users.noreply.github.com> Date: Mon, 14 Jul 2025 11:57:39 +0200 Subject: [PATCH 2/4] Update README.md --- java/Iceberg/IcebergDataStreamSink/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/Iceberg/IcebergDataStreamSink/README.md b/java/Iceberg/IcebergDataStreamSink/README.md index e1b0f63..4cd71c4 100644 --- a/java/Iceberg/IcebergDataStreamSink/README.md +++ b/java/Iceberg/IcebergDataStreamSink/README.md @@ -2,7 +2,7 @@ * Flink version: 1.20.0 * Flink API: DataStream API -* Iceberg 1.8.1 +* Iceberg 1.9.1 * Language: Java (11) * Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) From f934e997a010477700fe9788d85943f0797f7dfc Mon Sep 17 00:00:00 2001 From: FranMorilloAWS <85679820+FranMorilloAWS@users.noreply.github.com> Date: Mon, 14 Jul 2025 12:07:05 +0200 Subject: [PATCH 3/4] Update flink-application-properties-dev.json --- .../main/resources/flink-application-properties-dev.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json index 7000272..30de5f0 100644 --- a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json @@ -8,12 +8,12 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3://iceberg-performance-026090544291/iceberg", + "bucket.prefix": "s3:///iceberg", "catalog.db": "iceberg", - "catalog.table": "prices_iceberg_datastream2", + "catalog.table": "prices_iceberg_datastream", "partition.fields": "symbol", "operation": "append", "upsert.equality.fields": "symbol" } } -] \ No newline at end of file +] From f5b44906367c78334e62750d7eb1207cc61204e8 Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 14 Jul 2025 14:40:43 +0200 Subject: [PATCH 4/4] removing data gen test --- ...enericStockTradeGeneratorFunctionTest.java | 26 ------------------- 1 file changed, 26 deletions(-) delete mode 100644 java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java diff --git a/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java deleted file mode 100644 index 5593ca1..0000000 --- a/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.amazonaws.services.msf.datagen; - -import com.amazonaws.services.msf.avro.AvroSchemaUtils; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - -class AvroGenericStockTradeGeneratorFunctionTest { - - @Test - void generateRecord() throws Exception { - Schema avroSchema = AvroSchemaUtils.loadSchema(); - AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema); - - GenericRecord record = generatorFunction.map(42L); - - - assertInstanceOf(String.class, record.get("timestamp")); - assertInstanceOf(String.class, record.get("symbol")); - assertInstanceOf(Float.class, record.get("price")); - assertInstanceOf(Integer.class, record.get("volumes")); - } - -} \ No newline at end of file