Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -79,13 +80,17 @@ public class DynamicIcebergSink
private final Configuration flinkConfig;
private final int cacheMaximumSize;

// Set by the builder before sinkTo() — forward writer results to union into pre-commit topology
private final transient DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults;

DynamicIcebergSink(
CatalogLoader catalogLoader,
Map<String, String> snapshotProperties,
String uidPrefix,
Map<String, String> writeProperties,
Configuration flinkConfig,
int cacheMaximumSize) {
int cacheMaximumSize,
DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults) {
this.catalogLoader = catalogLoader;
this.snapshotProperties = snapshotProperties;
this.uidPrefix = uidPrefix;
Expand All @@ -96,6 +101,7 @@ public class DynamicIcebergSink
// This is used to separate files generated by different sinks writing the same table.
// Also used to generate the aggregator operator name
this.sinkId = UUID.randomUUID().toString();
this.forwardWriteResults = forwardWriteResults;
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -145,7 +151,11 @@ public DataStream<CommittableMessage<DynamicCommittable>> addPreCommitTopology(
TypeInformation<CommittableMessage<DynamicCommittable>> typeInformation =
CommittableMessageTypeInfo.of(this::getCommittableSerializer);

return writeResults
// Union forward writer results with the shuffle writer results
DataStream<CommittableMessage<DynamicWriteResult>> allResults =
writeResults.union(forwardWriteResults);

return allResults
.keyBy(
committable -> {
if (committable instanceof CommittableSummary) {
Expand All @@ -168,6 +178,56 @@ public SimpleVersionedSerializer<DynamicWriteResult> getWriteResultSerializer()
return new DynamicWriteResultSerializer();
}

/**
* A lightweight Sink used with {@link SinkWriterOperatorFactory} for the forward write path.
* Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} emits committables
* downstream. The committer is never called — committing is handled by the main sink.
*/
@VisibleForTesting
static class ForwardWriterSink
implements Sink<DynamicRecordInternal>, SupportsCommitter<DynamicWriteResult> {

private final CatalogLoader catalogLoader;
private final Map<String, String> writeProperties;
private final Configuration flinkConfig;
private final int cacheMaximumSize;

ForwardWriterSink(
CatalogLoader catalogLoader,
Map<String, String> writeProperties,
Configuration flinkConfig,
int cacheMaximumSize) {
this.catalogLoader = catalogLoader;
this.writeProperties = writeProperties;
this.flinkConfig = flinkConfig;
this.cacheMaximumSize = cacheMaximumSize;
}

@SuppressWarnings("deprecation")
@Override
public SinkWriter<DynamicRecordInternal> createWriter(InitContext context) throws IOException {
return new DynamicWriter(
catalogLoader.loadCatalog(),
writeProperties,
flinkConfig,
cacheMaximumSize,
new DynamicWriterMetrics(context.metricGroup()),
context.getSubtaskId(),
context.getAttemptNumber());
}

@Override
public Committer<DynamicWriteResult> createCommitter(CommitterInitContext context) {
throw new UnsupportedOperationException(
"WriterSink is used only for writing; committing is handled by the main sink");
}

@Override
public SimpleVersionedSerializer<DynamicWriteResult> getCommittableSerializer() {
return new DynamicWriteResultSerializer();
}
}

public static class Builder<T> {
private DataStream<T> input;
private DynamicRecordGenerator<T> generator;
Expand Down Expand Up @@ -358,43 +418,79 @@ private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}

private DynamicIcebergSink build() {
private DynamicIcebergSink build(
SingleOutputStreamOperator<DynamicRecordInternal> converted,
DynamicRecordInternalType sideOutputType) {

Preconditions.checkArgument(
generator != null, "Please use withGenerator() to convert the input DataStream.");
Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null");

uidPrefix = Optional.ofNullable(uidPrefix).orElse("");

Configuration flinkConfig =
readableConfig instanceof Configuration
? (Configuration) readableConfig
: Configuration.fromMap(readableConfig.toMap());

return instantiateSink(writeOptions, flinkConfig);
// Forward writer: chained with generator via forward edge, no data shuffle
ForwardWriterSink forwardWriterSink =
new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig, cacheMaximumSize);
TypeInformation<CommittableMessage<DynamicWriteResult>> writeResultTypeInfo =
CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);

DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
converted
.getSideOutput(
new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
.transform(
operatorName("Forward-Writer"),
writeResultTypeInfo,
new SinkWriterOperatorFactory<>(forwardWriterSink))
.uid(prefixIfNotNull(uidPrefix, "-forward-writer"));

// Inject forward write results into sink — they'll be unioned in addPreCommitTopology
return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
}

@VisibleForTesting
DynamicIcebergSink instantiateSink(
Map<String, String> writeProperties, Configuration flinkWriteConf) {
Map<String, String> writeProperties,
Configuration flinkWriteConf,
DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults) {
return new DynamicIcebergSink(
catalogLoader,
snapshotSummary,
uidPrefix,
writeProperties,
flinkWriteConf,
cacheMaximumSize);
cacheMaximumSize,
forwardWriteResults);
}

/**
* Append the iceberg sink operators to write records to iceberg table.
*
* <p>The topology splits records by distribution mode:
*
* <ul>
* <li>Forward records ({@code null} distributionMode) go through a forward edge to a chained
* writer, avoiding any data shuffle.
* <li>Shuffle records (non-null distributionMode) go through the standard Sink2 pipeline with
* hash/round-robin distribution.
* </ul>
*
* Both writers feed into a single shared pre-commit aggregator and committer, ensuring atomic
* commits across both paths.
*
* @return {@link DataStreamSink} for sink.
*/
public DataStreamSink<DynamicRecordInternal> append() {
uidPrefix = Optional.ofNullable(uidPrefix).orElse("");

DynamicRecordInternalType type =
new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize);
DynamicIcebergSink sink = build();
DynamicRecordInternalType sideOutputType =
new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize);

SingleOutputStreamOperator<DynamicRecordInternal> converted =
input
.process(
Expand All @@ -412,12 +508,14 @@ public DataStreamSink<DynamicRecordInternal> append() {
.name(operatorName("generator"))
.returns(type);

DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
DynamicIcebergSink sink = build(converted, sideOutputType);

// Shuffle path: table update side output + main output → sinkTo()
DataStream<DynamicRecordInternal> shuffleInput =
converted
.getSideOutput(
new OutputTag<>(
DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)))
DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, sideOutputType))
.keyBy((KeySelector<DynamicRecordInternal, String>) DynamicRecordInternal::tableName)
.map(
new DynamicTableUpdateOperator(
Expand All @@ -431,16 +529,19 @@ public DataStreamSink<DynamicRecordInternal> append() {
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
.union(converted)
.sinkTo(sink)
.union(converted);

DataStreamSink<DynamicRecordInternal> result =
shuffleInput
.sinkTo(sink) // Forward write results are implicitly injected here
.uid(prefixIfNotNull(uidPrefix, "-sink"));

FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig);
if (flinkWriteConf.writeParallelism() != null) {
rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism());
result.setParallelism(flinkWriteConf.writeParallelism());
}

return rowDataDataStreamSink;
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,40 @@ public class DynamicRecord {
private Schema schema;
private RowData rowData;
private PartitionSpec partitionSpec;
private DistributionMode distributionMode;
@Nullable private DistributionMode distributionMode;
private int writeParallelism;
private boolean upsertMode;
@Nullable private Set<String> equalityFields;

/**
* Constructs a new DynamicRecord.
* Constructs a new DynamicRecord with forward (no shuffle) writes.
*
* @param tableIdentifier The target table identifier.
* @param branch The target table branch.
* @param schema The target table schema.
* @param rowData The data matching the provided schema.
* @param partitionSpec The target table {@link PartitionSpec}.
* @param distributionMode The {@link DistributionMode}.
*/
public DynamicRecord(
TableIdentifier tableIdentifier,
String branch,
Schema schema,
RowData rowData,
PartitionSpec partitionSpec) {
this(tableIdentifier, branch, schema, rowData, partitionSpec, null, -1);
}

/**
* Constructs a new DynamicRecord. This record will be shuffled as specified by {@code
* distributionMode}.
*
* @param tableIdentifier The target table identifier.
* @param branch The target table branch.
* @param schema The target table schema.
* @param rowData The data matching the provided schema.
* @param partitionSpec The target table {@link PartitionSpec}.
* @param distributionMode The {@link DistributionMode}. {@code null} indicates forward (no
* shuffle) writes.
* @param writeParallelism The number of parallel writers. Can be set to any value {@literal > 0},
* but will always be automatically capped by the maximum write parallelism, which is the
* parallelism of the sink. Set to Integer.MAX_VALUE for always using the maximum available
Expand Down
Loading
Loading