From 5dc08d5eb0a828fc49f0087fe429837c40433e68 Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 20:37:54 +0000 Subject: [PATCH 1/6] Flink Sink: Add WriteObserver plugin interface for per-record metadata --- .../flink/sink/IcebergCommittable.java | 19 +++++ .../sink/IcebergCommittableSerializer.java | 62 +++++++++++++--- .../iceberg/flink/sink/IcebergCommitter.java | 52 +++++++++++--- .../iceberg/flink/sink/IcebergSink.java | 19 ++++- .../iceberg/flink/sink/IcebergSinkWriter.java | 25 +++++++ .../flink/sink/IcebergWriteAggregator.java | 19 ++++- .../iceberg/flink/sink/WriteObserver.java | 69 ++++++++++++++++++ .../sink/WriteObserverMetadataHolder.java | 53 ++++++++++++++ .../flink/sink/WriteResultSerializer.java | 71 ++++++++++++++++--- 9 files changed, 354 insertions(+), 35 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java index 408c3e9a9d5f..5f3c8c3fdda4 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java @@ -20,7 +20,10 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.Map; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** @@ -36,12 +39,24 @@ class IcebergCommittable implements Serializable { private final String jobId; private final String operatorId; private final long checkpointId; + private final Map observerMetadata; IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + this(manifest, jobId, operatorId, checkpointId, null); + } + + IcebergCommittable( + byte[] manifest, + String jobId, + String operatorId, + long checkpointId, + @Nullable Map observerMetadata) { this.manifest = manifest; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; + this.observerMetadata = + observerMetadata != null ? observerMetadata : Collections.emptyMap(); } byte[] manifest() { @@ -60,6 +75,10 @@ Long checkpointId() { return checkpointId; } + Map observerMetadata() { + return observerMetadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index 1d83c211e001..5127f58026ab 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -31,7 +33,7 @@ *

In both cases only the respective part is serialized. */ public class IcebergCommittableSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public int getVersion() { @@ -47,22 +49,62 @@ public byte[] serialize(IcebergCommittable committable) throws IOException { view.writeLong(committable.checkpointId()); view.writeInt(committable.manifest().length); view.write(committable.manifest()); + + Map metadata = committable.observerMetadata(); + if (metadata != null && !metadata.isEmpty()) { + view.writeBoolean(true); + view.writeInt(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + view.writeUTF(entry.getKey()); + view.writeUTF(entry.getValue()); + } + } else { + view.writeBoolean(false); + } + return out.toByteArray(); } @Override public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - String jobId = view.readUTF(); - String operatorId = view.readUTF(); - long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + return deserializeV1(serialized); + } else if (version == 2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private IcebergCommittable deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + } + + private IcebergCommittable deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + + Map metadata = null; + boolean hasMetadata = view.readBoolean(); + if (hasMetadata) { + int mapSize = view.readInt(); + metadata = new HashMap<>(mapSize); + for (int i = 0; i < mapSize; i++) { + metadata.put(view.readUTF(), view.readUTF()); + } + } + + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index 8e45a2db30b2..ae3e977becd0 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -21,11 +21,13 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.iceberg.AppendFiles; @@ -157,13 +159,20 @@ private void commitPendingRequests( long checkpointId = commitRequestMap.lastKey(); List manifests = Lists.newArrayList(); NavigableMap pendingResults = Maps.newTreeMap(); + + // Merge observer metadata from all committables + Map mergedObserverMetadata = new HashMap<>(); for (Map.Entry> e : commitRequestMap.entrySet()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue().getCommittable().manifest())) { + IcebergCommittable committable = e.getValue().getCommittable(); + if (!committable.observerMetadata().isEmpty()) { + mergedObserverMetadata.putAll(committable.observerMetadata()); + } + if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.manifest())) { pendingResults.put(e.getKey(), EMPTY_WRITE_RESULT); } else { DeltaManifests deltaManifests = SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, e.getValue().getCommittable().manifest()); + DeltaManifestsSerializer.INSTANCE, committable.manifest()); pendingResults.put( e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); @@ -171,8 +180,10 @@ private void commitPendingRequests( } } + Map observerMetadata = + mergedObserverMetadata.isEmpty() ? null : mergedObserverMetadata; CommitSummary summary = new CommitSummary(pendingResults); - commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId); + commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); if (committerMetrics != null) { committerMetrics.updateCommitSummary(summary); } @@ -195,14 +206,15 @@ private void commitPendingResult( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { if (replacePartitions) { - replacePartitions(pendingResults, summary, newFlinkJobId, operatorId); + replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); } else { - commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId); + commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); } continuousEmptyCheckpoints = 0; } else { @@ -215,7 +227,8 @@ private void replacePartitions( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long checkpointId = pendingResults.lastKey(); Preconditions.checkState( summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files."); @@ -229,14 +242,16 @@ private void replacePartitions( String description = "dynamic partition overwrite"; logCommitSummary(summary, description); - commitOperation(dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId); + commitOperation( + dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId, observerMetadata); } private void commitDeltaTxn( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long checkpointId = pendingResults.lastKey(); if (summary.deleteFilesCount() == 0) { // To be compatible with iceberg format V1. @@ -250,7 +265,8 @@ private void commitDeltaTxn( String description = "append"; logCommitSummary(summary, description); // fail all commits as really its only one - commitOperation(appendFiles, description, newFlinkJobId, operatorId, checkpointId); + commitOperation( + appendFiles, description, newFlinkJobId, operatorId, checkpointId, observerMetadata); } else { // To be compatible with iceberg format V2. for (Map.Entry e : pendingResults.entrySet()) { @@ -274,7 +290,8 @@ private void commitDeltaTxn( String description = "rowDelta"; logCommitSummary(summary, description); - commitOperation(rowDelta, description, newFlinkJobId, operatorId, e.getKey()); + commitOperation( + rowDelta, description, newFlinkJobId, operatorId, e.getKey(), observerMetadata); } } } @@ -285,8 +302,21 @@ private void commitOperation( String newFlinkJobId, String operatorId, long checkpointId) { + commitOperation(operation, description, newFlinkJobId, operatorId, checkpointId, null); + } + + private void commitOperation( + SnapshotUpdate operation, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId, + @Nullable Map observerMetadata) { snapshotProperties.forEach(operation::set); + if (observerMetadata != null) { + observerMetadata.forEach(operation::set); + } // custom snapshot metadata properties will be overridden if they conflict with internal ones // used by the sink. operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index eaaf4ea6e4e3..eab54435ad23 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; @@ -174,6 +175,8 @@ public class IcebergSink private final transient List> maintenanceTasks; private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; + @Nullable private final WriteObserver writeObserver; + private IcebergSink( TableLoader tableLoader, Table table, @@ -188,7 +191,8 @@ private IcebergSink( boolean overwriteMode, List> maintenanceTasks, FlinkMaintenanceConfig flinkMaintenanceConfig, - Set equalityFieldColumns) { + Set equalityFieldColumns, + @Nullable WriteObserver writeObserver) { this.tableLoader = tableLoader; this.snapshotProperties = snapshotProperties; this.uidSuffix = uidSuffix; @@ -212,6 +216,7 @@ private IcebergSink( this.maintenanceTasks = maintenanceTasks; this.flinkMaintenanceConfig = flinkMaintenanceConfig; this.equalityFieldColumns = equalityFieldColumns; + this.writeObserver = writeObserver; } @Override @@ -232,7 +237,8 @@ public SinkWriter createWriter(WriterInitContext context) { taskWriterFactory, metrics, context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + writeObserver); } @Override @@ -349,6 +355,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; private final List> maintenanceTasks = Lists.newArrayList(); + @Nullable private WriteObserver writeObserver; private Builder() {} @@ -716,6 +723,11 @@ public Builder toBranch(String branch) { return this; } + public Builder writeObserver(WriteObserver observer) { + this.writeObserver = observer; + return this; + } + IcebergSink build() { Preconditions.checkArgument( @@ -806,7 +818,8 @@ IcebergSink build() { overwriteMode, maintenanceTasks, flinkMaintenanceConfig, - equalityFieldColumnsSet); + equalityFieldColumnsSet, + writeObserver); } /** diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java index 7234cf74020e..7228b356f3bc 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.table.data.RowData; @@ -46,6 +48,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { private TaskWriter writer; private final int subTaskId; private final int attemptId; + @Nullable private final WriteObserver writeObserver; IcebergSinkWriter( String fullTableName, @@ -53,6 +56,16 @@ class IcebergSinkWriter implements CommittingSinkWriter { IcebergStreamWriterMetrics metrics, int subTaskId, int attemptId) { + this(fullTableName, taskWriterFactory, metrics, subTaskId, attemptId, null); + } + + IcebergSinkWriter( + String fullTableName, + TaskWriterFactory taskWriterFactory, + IcebergStreamWriterMetrics metrics, + int subTaskId, + int attemptId, + @Nullable WriteObserver writeObserver) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; // Initialize the task writer factory. @@ -62,6 +75,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { this.metrics = metrics; this.subTaskId = subTaskId; this.attemptId = attemptId; + this.writeObserver = writeObserver; LOG.debug( "Created Stream Writer for table {} subtask {} attemptId {}", fullTableName, @@ -71,6 +85,9 @@ class IcebergSinkWriter implements CommittingSinkWriter { @Override public void write(RowData element, Context context) throws IOException, InterruptedException { + if (writeObserver != null) { + writeObserver.observe(element, context); + } writer.write(element); } @@ -108,6 +125,14 @@ public Collection prepareCommit() throws IOException { attemptId, result.dataFiles().length, result.deleteFiles().length); + + if (writeObserver != null) { + Map metadata = writeObserver.snapshotMetadata(); + if (metadata != null && !metadata.isEmpty()) { + WriteObserverMetadataHolder.set(metadata); + } + } + return Lists.newArrayList(result); } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 4a4a789bf9ef..7b4d8af87eb3 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; @@ -52,6 +54,7 @@ class IcebergWriteAggregator extends AbstractStreamOperator results; private final TableLoader tableLoader; + private final Map accumulatedObserverMetadata = new HashMap<>(); private long lastCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; @@ -106,12 +109,18 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { this.lastCheckpointId = checkpointId; + Map metadata = + accumulatedObserverMetadata.isEmpty() + ? null + : new HashMap<>(accumulatedObserverMetadata); + IcebergCommittable committable = new IcebergCommittable( writeToManifest(results, checkpointId), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), - checkpointId); + checkpointId, + metadata); CommittableMessage summary = new CommittableSummary<>(0, 1, checkpointId, 1, 1, 0); output.collect(new StreamRecord<>(summary)); @@ -120,6 +129,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { output.collect(new StreamRecord<>(message)); LOG.info("Emitted commit message to downstream committer operator"); results.clear(); + accumulatedObserverMetadata.clear(); } /** @@ -150,6 +160,13 @@ public void processElement(StreamRecord> element if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { results.add(((CommittableWithLineage) element.getValue()).getCommittable()); + + // Read observer metadata set by WriteResultSerializer.deserialize() on this thread. + // Merge across parallel writer subtasks — last value wins for duplicate keys. + Map metadata = WriteObserverMetadataHolder.getAndClear(); + if (metadata != null && !metadata.isEmpty()) { + accumulatedObserverMetadata.putAll(metadata); + } } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java new file mode 100644 index 000000000000..d56dffd2d2d1 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.table.data.RowData; + +/** + * Observer that is notified for each record written by {@link IcebergSinkWriter} and produces + * per-checkpoint metadata that flows through the sink pipeline to the Iceberg snapshot summary. + * + *

Use cases include per-record watermark extraction, data quality score tracking, or custom + * metadata that should be attached to each committed Iceberg snapshot. + * + *

The observer is called on the writer's task thread. {@link #observe(RowData, SinkWriter.Context)} + * is called for each record. At checkpoint time, {@link #snapshotMetadata()} is called to collect + * accumulated metadata, which is then carried through the aggregator to the committer and applied + * as additional Iceberg snapshot properties. The returned metadata map is merged across parallel + * writer subtasks in the aggregator. + * + *

Implementations must be {@link Serializable} because the observer travels through the Flink + * job graph (client → TaskManager) as part of {@link IcebergSink}. + */ +public interface WriteObserver extends Serializable { + + /** + * Called for each record written by the sink writer. + * + * @param element the record being written + * @param context the sink writer context, providing access to the current Flink watermark + */ + void observe(RowData element, SinkWriter.Context context); + + /** + * Called at checkpoint time to collect accumulated metadata for this checkpoint interval. + * + *

The returned map entries are applied as additional Iceberg snapshot properties alongside + * the static {@code snapshotProperties} configured on the sink builder. Implementations should + * reset their internal accumulators after this call so the next checkpoint interval starts fresh. + * + *

When multiple writer subtasks produce metadata, the aggregator merges them by taking the + * last value for each key. For watermark-style metadata, implementations should use keys that + * include a subtask identifier, or the aggregator's merge logic should be considered. + * + * @return metadata key-value pairs to include in the snapshot summary, or an empty map + */ + default Map snapshotMetadata() { + return Collections.emptyMap(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java new file mode 100644 index 000000000000..bfc24d7e3128 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Thread-local holder for passing {@link WriteObserver} metadata through the WriteResult + * serialization boundary. + * + *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then + * {@link WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task thread. + * + *

On the aggregator side: {@link WriteResultSerializer#deserialize} stores metadata here, then + * {@link IcebergWriteAggregator#processElement} reads and clears it. Both run on the aggregator's + * task thread. + * + *

This is safe because Flink processes elements sequentially on each task thread — serialize and + * prepareCommit share one thread, deserialize and processElement share another. + */ +final class WriteObserverMetadataHolder { + private static final ThreadLocal> METADATA = new ThreadLocal<>(); + + private WriteObserverMetadataHolder() {} + + static void set(@Nullable Map metadata) { + METADATA.set(metadata); + } + + @Nullable + static Map getAndClear() { + Map metadata = METADATA.get(); + METADATA.remove(); + return metadata; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index 40a3ce0cb846..e57e771a0ad9 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; @@ -29,7 +31,7 @@ @Internal public class WriteResultSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public int getVersion() { @@ -40,24 +42,73 @@ public int getVersion() { public byte[] serialize(WriteResult writeResult) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + byte[] result = InstantiationUtil.serializeObject(writeResult); + view.writeInt(result.length); view.write(result); + + Map metadata = WriteObserverMetadataHolder.getAndClear(); + if (metadata != null && !metadata.isEmpty()) { + view.writeBoolean(true); + view.writeInt(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + view.writeUTF(entry.getKey()); + view.writeUTF(entry.getValue()); + } + } else { + view.writeBoolean(false); + } + return out.toByteArray(); } @Override public WriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - byte[] resultBuf = new byte[serialized.length]; - view.read(resultBuf); - try { - return InstantiationUtil.deserializeObject( - resultBuf, IcebergCommittableSerializer.class.getClassLoader()); - } catch (ClassNotFoundException cnc) { - throw new IOException("Could not deserialize the WriteResult object", cnc); - } + return deserializeV1(serialized); + } else if (version == 2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private WriteResult deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + byte[] resultBuf = new byte[serialized.length]; + view.read(resultBuf); + try { + return InstantiationUtil.deserializeObject( + resultBuf, WriteResultSerializer.class.getClassLoader()); + } catch (ClassNotFoundException cnc) { + throw new IOException("Could not deserialize the WriteResult object", cnc); + } + } + + private WriteResult deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + + int resultLen = view.readInt(); + byte[] resultBuf = new byte[resultLen]; + view.read(resultBuf); + WriteResult writeResult; + try { + writeResult = + InstantiationUtil.deserializeObject( + resultBuf, WriteResultSerializer.class.getClassLoader()); + } catch (ClassNotFoundException cnc) { + throw new IOException("Could not deserialize the WriteResult object", cnc); + } + + boolean hasMetadata = view.readBoolean(); + if (hasMetadata) { + int mapSize = view.readInt(); + Map metadata = new HashMap<>(mapSize); + for (int i = 0; i < mapSize; i++) { + metadata.put(view.readUTF(), view.readUTF()); + } + WriteObserverMetadataHolder.set(metadata); + } + + return writeResult; + } } From 95486479b4f1ff8d72349e08a7e67b26678162ba Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 21:06:58 +0000 Subject: [PATCH 2/6] Fix spotless formatting violations --- .../iceberg/flink/sink/IcebergCommittable.java | 3 +-- .../flink/sink/IcebergWriteAggregator.java | 4 +--- .../apache/iceberg/flink/sink/WriteObserver.java | 16 ++++++++-------- .../flink/sink/WriteObserverMetadataHolder.java | 6 +++--- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java index 5f3c8c3fdda4..6fcfdd46a4d2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java @@ -55,8 +55,7 @@ class IcebergCommittable implements Serializable { this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; - this.observerMetadata = - observerMetadata != null ? observerMetadata : Collections.emptyMap(); + this.observerMetadata = observerMetadata != null ? observerMetadata : Collections.emptyMap(); } byte[] manifest() { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 7b4d8af87eb3..b09bbbe0f3ea 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -110,9 +110,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { this.lastCheckpointId = checkpointId; Map metadata = - accumulatedObserverMetadata.isEmpty() - ? null - : new HashMap<>(accumulatedObserverMetadata); + accumulatedObserverMetadata.isEmpty() ? null : new HashMap<>(accumulatedObserverMetadata); IcebergCommittable committable = new IcebergCommittable( diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java index d56dffd2d2d1..9d629ce48b82 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java @@ -31,11 +31,11 @@ *

Use cases include per-record watermark extraction, data quality score tracking, or custom * metadata that should be attached to each committed Iceberg snapshot. * - *

The observer is called on the writer's task thread. {@link #observe(RowData, SinkWriter.Context)} - * is called for each record. At checkpoint time, {@link #snapshotMetadata()} is called to collect - * accumulated metadata, which is then carried through the aggregator to the committer and applied - * as additional Iceberg snapshot properties. The returned metadata map is merged across parallel - * writer subtasks in the aggregator. + *

The observer is called on the writer's task thread. {@link #observe(RowData, + * SinkWriter.Context)} is called for each record. At checkpoint time, {@link #snapshotMetadata()} + * is called to collect accumulated metadata, which is then carried through the aggregator to the + * committer and applied as additional Iceberg snapshot properties. The returned metadata map is + * merged across parallel writer subtasks in the aggregator. * *

Implementations must be {@link Serializable} because the observer travels through the Flink * job graph (client → TaskManager) as part of {@link IcebergSink}. @@ -53,9 +53,9 @@ public interface WriteObserver extends Serializable { /** * Called at checkpoint time to collect accumulated metadata for this checkpoint interval. * - *

The returned map entries are applied as additional Iceberg snapshot properties alongside - * the static {@code snapshotProperties} configured on the sink builder. Implementations should - * reset their internal accumulators after this call so the next checkpoint interval starts fresh. + *

The returned map entries are applied as additional Iceberg snapshot properties alongside the + * static {@code snapshotProperties} configured on the sink builder. Implementations should reset + * their internal accumulators after this call so the next checkpoint interval starts fresh. * *

When multiple writer subtasks produce metadata, the aggregator merges them by taking the * last value for each key. For watermark-style metadata, implementations should use keys that diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java index bfc24d7e3128..9617fec43626 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java @@ -25,14 +25,14 @@ * Thread-local holder for passing {@link WriteObserver} metadata through the WriteResult * serialization boundary. * - *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then - * {@link WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task thread. + *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then {@link + * WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task thread. * *

On the aggregator side: {@link WriteResultSerializer#deserialize} stores metadata here, then * {@link IcebergWriteAggregator#processElement} reads and clears it. Both run on the aggregator's * task thread. * - *

This is safe because Flink processes elements sequentially on each task thread — serialize and + *

This is safe because Flink processes elements sequentially on each task thread -- serialize and * prepareCommit share one thread, deserialize and processElement share another. */ final class WriteObserverMetadataHolder { From 4f2c8b59e160729f9d7f0b44c0e37d96b1cab859 Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 21:11:01 +0000 Subject: [PATCH 3/6] Fix spotless line length in WriteObserverMetadataHolder --- .../iceberg/flink/sink/WriteObserverMetadataHolder.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java index 9617fec43626..17ebc53a47f4 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java @@ -25,15 +25,16 @@ * Thread-local holder for passing {@link WriteObserver} metadata through the WriteResult * serialization boundary. * - *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then {@link - * WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task thread. + *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then + * {@link WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task + * thread. * *

On the aggregator side: {@link WriteResultSerializer#deserialize} stores metadata here, then * {@link IcebergWriteAggregator#processElement} reads and clears it. Both run on the aggregator's * task thread. * - *

This is safe because Flink processes elements sequentially on each task thread -- serialize and - * prepareCommit share one thread, deserialize and processElement share another. + *

This is safe because Flink processes elements sequentially on each task thread -- serialize + * and prepareCommit share one thread, deserialize and processElement share another. */ final class WriteObserverMetadataHolder { private static final ThreadLocal> METADATA = new ThreadLocal<>(); From e7b0289f3293411b61b84c46a3b7a55e5992ae1e Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 21:35:19 +0000 Subject: [PATCH 4/6] Remove unused commitOperation overload flagged by Error Prone --- .../org/apache/iceberg/flink/sink/IcebergCommitter.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index ae3e977becd0..aafb27102136 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -296,15 +296,6 @@ private void commitDeltaTxn( } } - private void commitOperation( - SnapshotUpdate operation, - String description, - String newFlinkJobId, - String operatorId, - long checkpointId) { - commitOperation(operation, description, newFlinkJobId, operatorId, checkpointId, null); - } - private void commitOperation( SnapshotUpdate operation, String description, From d6ce7569ee3f6612d2bbdc0398151b0aea3a4a0c Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 21:59:01 +0000 Subject: [PATCH 5/6] Replace new HashMap with Maps.newHashMap per checkstyle rules --- .../iceberg/flink/sink/IcebergCommittableSerializer.java | 4 ++-- .../org/apache/iceberg/flink/sink/IcebergCommitter.java | 3 +-- .../apache/iceberg/flink/sink/IcebergWriteAggregator.java | 6 +++--- .../apache/iceberg/flink/sink/WriteResultSerializer.java | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index 5127f58026ab..9e65450677eb 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -20,11 +20,11 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer @@ -99,7 +99,7 @@ private IcebergCommittable deserializeV2(byte[] serialized) throws IOException { boolean hasMetadata = view.readBoolean(); if (hasMetadata) { int mapSize = view.readInt(); - metadata = new HashMap<>(mapSize); + metadata = Maps.newHashMapWithExpectedSize(mapSize); for (int i = 0; i < mapSize; i++) { metadata.put(view.readUTF(), view.readUTF()); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index aafb27102136..f9686eea0696 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -161,7 +160,7 @@ private void commitPendingRequests( NavigableMap pendingResults = Maps.newTreeMap(); // Merge observer metadata from all committables - Map mergedObserverMetadata = new HashMap<>(); + Map mergedObserverMetadata = Maps.newHashMap(); for (Map.Entry> e : commitRequestMap.entrySet()) { IcebergCommittable committable = e.getValue().getCommittable(); if (!committable.observerMetadata().isEmpty()) { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index b09bbbe0f3ea..2c6f8255ce17 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -54,7 +54,7 @@ class IcebergWriteAggregator extends AbstractStreamOperator results; private final TableLoader tableLoader; - private final Map accumulatedObserverMetadata = new HashMap<>(); + private final Map accumulatedObserverMetadata = Maps.newHashMap(); private long lastCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; @@ -110,7 +110,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { this.lastCheckpointId = checkpointId; Map metadata = - accumulatedObserverMetadata.isEmpty() ? null : new HashMap<>(accumulatedObserverMetadata); + accumulatedObserverMetadata.isEmpty() ? null : Maps.newHashMap(accumulatedObserverMetadata); IcebergCommittable committable = new IcebergCommittable( diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index e57e771a0ad9..383ba3e5bf98 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -20,13 +20,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.io.WriteResult; @Internal @@ -102,7 +102,7 @@ private WriteResult deserializeV2(byte[] serialized) throws IOException { boolean hasMetadata = view.readBoolean(); if (hasMetadata) { int mapSize = view.readInt(); - Map metadata = new HashMap<>(mapSize); + Map metadata = Maps.newHashMapWithExpectedSize(mapSize); for (int i = 0; i < mapSize; i++) { metadata.put(view.readUTF(), view.readUTF()); } From 8f50f5690bf94e798ae5a25ecac37c6fa9c1e67f Mon Sep 17 00:00:00 2001 From: Herbert Wang Date: Thu, 26 Mar 2026 22:05:52 +0000 Subject: [PATCH 6/6] Fix import ordering for spotless --- .../org/apache/iceberg/flink/sink/IcebergWriteAggregator.java | 2 +- .../org/apache/iceberg/flink/sink/WriteResultSerializer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 2c6f8255ce17..a22870b42e42 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Collection; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -36,6 +35,7 @@ import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index 383ba3e5bf98..d94d00c86c27 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -26,8 +26,8 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; @Internal public class WriteResultSerializer implements SimpleVersionedSerializer {