Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/src/main/java/org/apache/iceberg/DistributionMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
public enum DistributionMode {
NONE("none"),
HASH("hash"),
RANGE("range");
RANGE("range"),
ROUND_ROBIN("round-robin");

private final String modeName;

Expand All @@ -54,7 +55,7 @@ public String modeName() {
public static DistributionMode fromName(String modeName) {
Preconditions.checkArgument(null != modeName, "Invalid distribution mode: null");
try {
return DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH));
return DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH).replace('-', '_'));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Invalid distribution mode: %s", modeName));
}
Expand Down
14 changes: 13 additions & 1 deletion docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ We need the following information (DynamicRecord) for every record:
| `Schema` | The schema of the record. |
| `Spec` | The expected partitioning specification for the record. |
| `RowData` | The actual row data to be written. |
| `DistributionMode` | The distribution mode for writing the record (currently supports NONE or HASH). |
| `DistributionMode` | The distribution mode for writing the record (supports NONE, ROUND_ROBIN, or HASH). See [Distribution Modes](#distribution-modes-1) below. |
| `Parallelism` | The maximum number of parallel writers for a given table/branch/schema/spec (WriteTarget). |
| `UpsertMode` | Overrides this table's write.upsert.enabled (optional). |
| `EqualityFields` | The equality fields for the table(optional). |
Expand Down Expand Up @@ -547,6 +547,18 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are
| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. |
| `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). |

### Distribution Modes

The `DistributionMode` set on each `DynamicRecord` controls how that record is routed from the processor to the writer:

| Mode | Behavior |
|------|----------|
| `NONE` | True forward/no-redistribution. Records are sent directly from the processor to the writer using a forward edge, which enables Flink operator chaining. This avoids serialization and network shuffle overhead. Table metadata updates are always performed immediately inside the processor (regardless of `immediateTableUpdate` setting). Best for high-throughput pipelines where each subtask writes independently. |
| `ROUND_ROBIN` | Records are distributed across writer subtasks in a round-robin fashion (or by equality fields if set). This is the default when no distribution mode is set on the `DynamicRecord`. |
| `HASH` | Records are distributed by partition key (partitioned tables) or equality fields (unpartitioned tables). Ensures that records for the same partition are handled by the same writer subtask. |

Records with different distribution modes can be mixed in the same pipeline. The sink internally maintains two write paths: a forward path for `NONE` records (chainable, no shuffle) and a shuffle path for `ROUND_ROBIN`/`HASH` records (keyed by writer key).

### Notes

- **Range distribution mode**: Currently, the dynamic sink does not support the `RANGE` distribution mode, if set, it will fall back to `HASH`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,23 @@ public class DynamicIcebergSink
private final Map<String, String> writeProperties;
private final Configuration flinkConfig;
private final int cacheMaximumSize;
private final boolean forwardOnly;

DynamicIcebergSink(
CatalogLoader catalogLoader,
Map<String, String> snapshotProperties,
String uidPrefix,
Map<String, String> writeProperties,
Configuration flinkConfig,
int cacheMaximumSize) {
int cacheMaximumSize,
boolean forwardOnly) {
this.catalogLoader = catalogLoader;
this.snapshotProperties = snapshotProperties;
this.uidPrefix = uidPrefix;
this.writeProperties = writeProperties;
this.flinkConfig = flinkConfig;
this.cacheMaximumSize = cacheMaximumSize;
this.forwardOnly = forwardOnly;
// We generate a random UUID every time when a sink is created.
// This is used to separate files generated by different sinks writing the same table.
// Also used to generate the aggregator operator name
Expand Down Expand Up @@ -135,6 +138,9 @@ public void addPostCommitTopology(
@Override
public DataStream<DynamicRecordInternal> addPreWriteTopology(
DataStream<DynamicRecordInternal> inputDataStream) {
if (forwardOnly) {
return inputDataStream;
}
return distributeDataStream(inputDataStream);
}

Expand Down Expand Up @@ -357,7 +363,7 @@ private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}

private DynamicIcebergSink build() {
private DynamicIcebergSink build(boolean forwardOnly) {

Preconditions.checkArgument(
generator != null, "Please use withGenerator() to convert the input DataStream.");
Expand All @@ -370,19 +376,20 @@ private DynamicIcebergSink build() {
? (Configuration) readableConfig
: Configuration.fromMap(readableConfig.toMap());

return instantiateSink(writeOptions, flinkConfig);
return instantiateSink(writeOptions, flinkConfig, forwardOnly);
}

@VisibleForTesting
DynamicIcebergSink instantiateSink(
Map<String, String> writeProperties, Configuration flinkWriteConf) {
Map<String, String> writeProperties, Configuration flinkWriteConf, boolean forwardOnly) {
return new DynamicIcebergSink(
catalogLoader,
snapshotSummary,
uidPrefix,
writeProperties,
flinkWriteConf,
cacheMaximumSize);
cacheMaximumSize,
forwardOnly);
}

/**
Expand All @@ -393,7 +400,6 @@ DynamicIcebergSink instantiateSink(
public DataStreamSink<DynamicRecordInternal> append() {
DynamicRecordInternalType type =
new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize);
DynamicIcebergSink sink = build();
SingleOutputStreamOperator<DynamicRecordInternal> converted =
input
.process(
Expand All @@ -411,7 +417,9 @@ public DataStreamSink<DynamicRecordInternal> append() {
.name(operatorName("generator"))
.returns(type);

DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
// Shuffle sink: handles non-NONE distribution records (HASH, ROUND_ROBIN, RANGE)
DynamicIcebergSink shuffleSink = build(false);
DataStream<DynamicRecordInternal> shuffleInput =
converted
.getSideOutput(
new OutputTag<>(
Expand All @@ -430,16 +438,29 @@ public DataStreamSink<DynamicRecordInternal> append() {
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
.union(converted)
.sinkTo(sink)
.uid(prefixIfNotNull(uidPrefix, "-sink"));
.union(converted);

DataStreamSink<DynamicRecordInternal> shuffleSinkResult =
shuffleInput.sinkTo(shuffleSink).uid(prefixIfNotNull(uidPrefix, "-shuffle-sink"));

FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig);
if (flinkWriteConf.writeParallelism() != null) {
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
shuffleSinkResult.setParallelism(flinkWriteConf.writeParallelism());
}

return rowDataDataStreamSink;
// Forward sink: handles NONE distribution records (forward edge, chainable)
DynamicIcebergSink forwardSink = build(true);
OutputTag<DynamicRecordInternal> forwardTag =
new OutputTag<>(
DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM,
new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize));

converted
.getSideOutput(forwardTag)
.sinkTo(forwardSink)
.uid(prefixIfNotNull(uidPrefix, "-forward-sink"));

return shuffleSinkResult;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

@Internal
class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal>
implements Collector<DynamicRecord> {
@VisibleForTesting
static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream";

@VisibleForTesting static final String DYNAMIC_FORWARD_STREAM = "dynamic-forward-stream";

private final DynamicRecordGenerator<T> generator;
private final CatalogLoader catalogLoader;
private final boolean immediateUpdate;
Expand All @@ -51,6 +55,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private transient HashKeyGenerator hashKeyGenerator;
private transient TableUpdater updater;
private transient OutputTag<DynamicRecordInternal> updateStream;
private transient OutputTag<DynamicRecordInternal> forwardStream;
private transient Collector<DynamicRecordInternal> collector;
private transient Context context;

Expand Down Expand Up @@ -90,9 +95,14 @@ public void open(OpenContext openContext) throws Exception {
this.hashKeyGenerator =
new HashKeyGenerator(
cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
if (immediateUpdate) {
updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns);
} else {
// Always create updater — needed for forced immediate updates on NONE records
this.updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns);
// Always create forward stream tag for NONE distribution records
this.forwardStream =
new OutputTag<>(
DYNAMIC_FORWARD_STREAM,
new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {};
if (!immediateUpdate) {
updateStream =
new OutputTag<>(
DYNAMIC_TABLE_UPDATE_STREAM,
Expand All @@ -112,6 +122,10 @@ public void processElement(T element, Context ctx, Collector<DynamicRecordIntern

@Override
public void collect(DynamicRecord data) {
DistributionMode mode =
MoreObjects.firstNonNull(data.distributionMode(), DistributionMode.ROUND_ROBIN);
boolean isForward = (mode == DistributionMode.NONE);

boolean exists = tableCache.exists(data.tableIdentifier()).f0;
String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null;

Expand All @@ -122,21 +136,26 @@ public void collect(DynamicRecord data) {

PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null;

if (!exists
|| foundBranch == null
|| foundSpec == null
|| foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
if (immediateUpdate) {
boolean needsUpdate =
!exists
|| foundBranch == null
|| foundSpec == null
|| foundSchema.compareResult() == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED;

if (needsUpdate) {
if (isForward || immediateUpdate) {
// NONE records always force immediate update; non-NONE with immediateUpdate=true also
Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> newData =
updater.update(
data.tableIdentifier(), data.branch(), data.schema(), data.spec(), tableCreator);
emit(
collector,
data,
newData.f0.resolvedTableSchema(),
newData.f0.recordConverter(),
newData.f1);
newData.f1,
isForward);
} else {
// Non-NONE records with immediateUpdate=false go to update side output
int writerKey =
hashKeyGenerator.generateKey(
data,
Expand All @@ -159,33 +178,47 @@ public void collect(DynamicRecord data) {
}
} else {
emit(
collector,
data,
foundSchema.resolvedTableSchema(),
foundSchema.recordConverter(),
foundSpec);
foundSpec,
isForward);
}
}

private void emit(
Collector<DynamicRecordInternal> out,
DynamicRecord data,
Schema schema,
DataConverter recordConverter,
PartitionSpec spec) {
PartitionSpec spec,
boolean forward) {
RowData rowData = (RowData) recordConverter.convert(data.rowData());
int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
String tableName = data.tableIdentifier().toString();
out.collect(
new DynamicRecordInternal(
tableName,
data.branch(),
schema,
rowData,
spec,
writerKey,
data.upsertMode(),
DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema)));
if (forward) {
DynamicRecordInternal record =
new DynamicRecordInternal(
tableName,
data.branch(),
schema,
rowData,
spec,
0,
data.upsertMode(),
DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema));
context.output(forwardStream, record);
} else {
int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData);
collector.collect(
new DynamicRecordInternal(
tableName,
data.branch(),
schema,
rowData,
spec,
writerKey,
data.upsertMode(),
DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema)));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int generateKey(
MoreObjects.firstNonNull(tableSchema, dynamicRecord.schema()),
MoreObjects.firstNonNull(tableSpec, dynamicRecord.spec()),
MoreObjects.firstNonNull(
dynamicRecord.distributionMode(), DistributionMode.NONE),
dynamicRecord.distributionMode(), DistributionMode.ROUND_ROBIN),
MoreObjects.firstNonNull(
dynamicRecord.equalityFields(), Collections.emptySet()),
Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)));
Expand All @@ -120,6 +120,12 @@ private KeySelector<RowData, Integer> getKeySelector(
"Creating new KeySelector for table '{}' with distribution mode '{}'", tableName, mode);
switch (mode) {
case NONE:
return row -> {
throw new IllegalStateException(
"Records with DistributionMode.NONE are routed via the forward path. They should not reach the key generator.");
};

case ROUND_ROBIN:
if (equalityFields.isEmpty()) {
return tableKeySelector(tableName, writeParallelism, maxWriteParallelism);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public void generate(RowData inputRecord, Collector<DynamicRecord> out) throws E
schema,
inputRecord,
PartitionSpec.unpartitioned(),
DistributionMode.NONE,
DistributionMode.ROUND_ROBIN,
1);
out.collect(dynamicRecord);
}
Expand Down
Loading