Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/Iceberg/IcebergDataStreamSink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

* Flink version: 1.20.0
* Flink API: DataStream API
* Iceberg 1.8.1
* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
Expand Down
111 changes: 41 additions & 70 deletions java/Iceberg/IcebergDataStreamSink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,36 @@
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>

<flink.major.version>1.20</flink.major.version>
<flink.version>1.20.0</flink.version>
<avro.version>1.11.3</avro.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.8.1</iceberg.version>
<iceberg.version>1.9.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.12.782</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.29</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>
<!-- Flink Core dependencies -->
<dependency>
Expand All @@ -44,26 +64,14 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink Iceberg uses DropWizard metrics -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -79,62 +87,26 @@
<version>${flink.version}</version>
</dependency>

<!--Iceberg dependencies -->
<!-- DO NOT include the iceberg-flink-runtime-* dependency, because it contains a shaded version of Avro -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.20</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<!-- exclude to prevent multiple of SLF4j binding conflict -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${flink.major.version}</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- S3 File System Support -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<!-- Logging Dependencies -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand All @@ -149,7 +121,6 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.amazonaws.services.msf.iceberg;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.formats.avro.AvroToRowDataConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

public class AvroGenericRecordToRowDataMapper implements MapFunction<GenericRecord, RowData> {
private final AvroToRowDataConverters.AvroToRowDataConverter converter;

AvroGenericRecordToRowDataMapper(RowType rowType) {
this.converter = AvroToRowDataConverters.createRowConverter(rowType);
}

public RowData map(GenericRecord genericRecord) throws Exception {
return (RowData)this.converter.convert(genericRecord);
}

public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = (RowType) logicalType;
return new AvroGenericRecordToRowDataMapper(rowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand All @@ -19,9 +18,9 @@
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.shaded.org.apache.avro.Schema;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -88,11 +87,12 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS);
List<String> equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+"));

Schema shadedAvroSchema = new Schema.Parser().parse(avroSchema.toString());
Copy link
Contributor

@nicusX nicusX Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth adding a comment here, highlighting this is using the shaded implementation of AVRO Schema.
One may guess from the name, but better being explicit


// Convert Avro Schema to Iceberg Schema, this will be used for creating the table
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(shadedAvroSchema);
// Avro Generic Record to Row Data Mapper
MapFunction<GenericRecord, RowData> avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
AvroGenericRecordToRowDataMapper avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);


// Catalog properties for using Glue Data Catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
{
"PropertyGroupId": "Iceberg",
"PropertyMap": {
"bucket.prefix": "s3://<my-bucket>/iceberg",
"catalog.db": "default",
"catalog.table": "prices_iceberg",
"bucket.prefix": "s3://<bucket-name>/iceberg",
"catalog.db": "iceberg",
"catalog.table": "prices_iceberg_datastream",
"partition.fields": "symbol",
"operation": "append",
"upsert.equality.fields": "symbol"
}
}
]
]

This file was deleted.