diff --git a/java/Iceberg/IcebergDataStreamSink/README.md b/java/Iceberg/IcebergDataStreamSink/README.md
index e1b0f630..4cd71c41 100644
--- a/java/Iceberg/IcebergDataStreamSink/README.md
+++ b/java/Iceberg/IcebergDataStreamSink/README.md
@@ -2,7 +2,7 @@
* Flink version: 1.20.0
* Flink API: DataStream API
-* Iceberg 1.8.1
+* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
diff --git a/java/Iceberg/IcebergDataStreamSink/pom.xml b/java/Iceberg/IcebergDataStreamSink/pom.xml
index 215cff5a..96f5ec63 100644
--- a/java/Iceberg/IcebergDataStreamSink/pom.xml
+++ b/java/Iceberg/IcebergDataStreamSink/pom.xml
@@ -14,16 +14,36 @@
11
${target.java.version}
${target.java.version}
-
+ 1.20
1.20.0
1.11.3
- 3.4.0
- 1.8.1
+ 1.9.1
1.2.0
2.23.1
5.8.1
+
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+
+ 1.12.782
+ pom
+ import
+
+
+ software.amazon.awssdk
+ bom
+ 2.28.29
+ pom
+ import
+
+
+
+
+
@@ -44,12 +64,7 @@
${flink.version}
provided
-
- org.apache.flink
- flink-connector-files
- ${flink.version}
- provided
-
+
org.apache.flink
flink-table-runtime
@@ -57,13 +72,6 @@
provided
-
-
- org.apache.flink
- flink-metrics-dropwizard
- ${flink.version}
-
-
com.amazonaws
@@ -79,62 +87,26 @@
${flink.version}
-
-
-
- org.apache.iceberg
- iceberg-core
- ${iceberg.version}
-
-
- org.apache.iceberg
- iceberg-flink
- ${iceberg.version}
-
-
- org.apache.iceberg
- iceberg-flink-1.20
- ${iceberg.version}
-
-
- org.apache.iceberg
- iceberg-aws-bundle
- ${iceberg.version}
-
-
- org.apache.iceberg
- iceberg-aws
- ${iceberg.version}
-
-
- org.apache.hadoop
- hadoop-client
- ${hadoop.version}
-
-
- org.apache.avro
- avro
-
-
-
- org.slf4j
- slf4j-reload4j
-
-
-
+
+ org.apache.iceberg
+ iceberg-flink-runtime-${flink.major.version}
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-aws-bundle
+ ${iceberg.version}
+
+
+
+ org.apache.flink
+ flink-s3-fs-hadoop
+ ${flink.version}
+
-
-
- org.junit.jupiter
- junit-jupiter
- ${junit5.version}
- test
-
-
-
-
+
org.apache.logging.log4j
log4j-slf4j-impl
@@ -149,7 +121,6 @@
org.apache.logging.log4j
log4j-core
${log4j.version}
- runtime
diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java
new file mode 100644
index 00000000..d692448b
--- /dev/null
+++ b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/AvroGenericRecordToRowDataMapper.java
@@ -0,0 +1,31 @@
+package com.amazonaws.services.msf.iceberg;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+public class AvroGenericRecordToRowDataMapper implements MapFunction {
+ private final AvroToRowDataConverters.AvroToRowDataConverter converter;
+
+ AvroGenericRecordToRowDataMapper(RowType rowType) {
+ this.converter = AvroToRowDataConverters.createRowConverter(rowType);
+ }
+
+ public RowData map(GenericRecord genericRecord) throws Exception {
+ return (RowData)this.converter.convert(genericRecord);
+ }
+
+ public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
+ DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
+ LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
+ RowType rowType = (RowType) logicalType;
+ return new AvroGenericRecordToRowDataMapper(rowType);
+ }
+}
\ No newline at end of file
diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
index c851a763..b846e8cc 100644
--- a/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
+++ b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java
@@ -8,7 +8,6 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
@@ -19,9 +18,9 @@
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.shaded.org.apache.avro.Schema;
import java.util.Arrays;
import java.util.HashMap;
@@ -88,11 +87,12 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS);
List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+"));
+ Schema shadedAvroSchema = new Schema.Parser().parse(avroSchema.toString());
// Convert Avro Schema to Iceberg Schema, this will be used for creating the table
- org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
+ org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(shadedAvroSchema);
// Avro Generic Record to Row Data Mapper
- MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
+ AvroGenericRecordToRowDataMapper avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
// Catalog properties for using Glue Data Catalog
diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json
index 30a6ac03..30de5f0e 100644
--- a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json
+++ b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json
@@ -8,12 +8,12 @@
{
"PropertyGroupId": "Iceberg",
"PropertyMap": {
- "bucket.prefix": "s3:///iceberg",
- "catalog.db": "default",
- "catalog.table": "prices_iceberg",
+ "bucket.prefix": "s3:///iceberg",
+ "catalog.db": "iceberg",
+ "catalog.table": "prices_iceberg_datastream",
"partition.fields": "symbol",
"operation": "append",
"upsert.equality.fields": "symbol"
}
}
-]
\ No newline at end of file
+]
diff --git a/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java
deleted file mode 100644
index 5593ca1d..00000000
--- a/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.amazonaws.services.msf.datagen;
-
-import com.amazonaws.services.msf.avro.AvroSchemaUtils;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
-class AvroGenericStockTradeGeneratorFunctionTest {
-
- @Test
- void generateRecord() throws Exception {
- Schema avroSchema = AvroSchemaUtils.loadSchema();
- AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema);
-
- GenericRecord record = generatorFunction.map(42L);
-
-
- assertInstanceOf(String.class, record.get("timestamp"));
- assertInstanceOf(String.class, record.get("symbol"));
- assertInstanceOf(Float.class, record.get("price"));
- assertInstanceOf(Integer.class, record.get("volumes"));
- }
-
-}
\ No newline at end of file