diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java index 408c3e9a9d5f..6fcfdd46a4d2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java @@ -20,7 +20,10 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.Map; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** @@ -36,12 +39,23 @@ class IcebergCommittable implements Serializable { private final String jobId; private final String operatorId; private final long checkpointId; + private final Map observerMetadata; IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { + this(manifest, jobId, operatorId, checkpointId, null); + } + + IcebergCommittable( + byte[] manifest, + String jobId, + String operatorId, + long checkpointId, + @Nullable Map observerMetadata) { this.manifest = manifest; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; + this.observerMetadata = observerMetadata != null ? observerMetadata : Collections.emptyMap(); } byte[] manifest() { @@ -60,6 +74,10 @@ Long checkpointId() { return checkpointId; } + Map observerMetadata() { + return observerMetadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index 1d83c211e001..9e65450677eb 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -20,9 +20,11 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer @@ -31,7 +33,7 @@ *

In both cases only the respective part is serialized. */ public class IcebergCommittableSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public int getVersion() { @@ -47,22 +49,62 @@ public byte[] serialize(IcebergCommittable committable) throws IOException { view.writeLong(committable.checkpointId()); view.writeInt(committable.manifest().length); view.write(committable.manifest()); + + Map metadata = committable.observerMetadata(); + if (metadata != null && !metadata.isEmpty()) { + view.writeBoolean(true); + view.writeInt(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + view.writeUTF(entry.getKey()); + view.writeUTF(entry.getValue()); + } + } else { + view.writeBoolean(false); + } + return out.toByteArray(); } @Override public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - String jobId = view.readUTF(); - String operatorId = view.readUTF(); - long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + return deserializeV1(serialized); + } else if (version == 2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private IcebergCommittable deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); + } + + private IcebergCommittable deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + + Map metadata = null; + boolean hasMetadata = view.readBoolean(); + if (hasMetadata) { + int mapSize = view.readInt(); + metadata = Maps.newHashMapWithExpectedSize(mapSize); + for (int i = 0; i < mapSize; i++) { + metadata.put(view.readUTF(), view.readUTF()); + } + } + + return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java index 8e45a2db30b2..f9686eea0696 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java @@ -26,6 +26,7 @@ import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.iceberg.AppendFiles; @@ -157,13 +158,20 @@ private void commitPendingRequests( long checkpointId = commitRequestMap.lastKey(); List manifests = Lists.newArrayList(); NavigableMap pendingResults = Maps.newTreeMap(); + + // Merge observer metadata from all committables + Map mergedObserverMetadata = Maps.newHashMap(); for (Map.Entry> e : commitRequestMap.entrySet()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue().getCommittable().manifest())) { + IcebergCommittable committable = e.getValue().getCommittable(); + if (!committable.observerMetadata().isEmpty()) { + mergedObserverMetadata.putAll(committable.observerMetadata()); + } + if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.manifest())) { pendingResults.put(e.getKey(), EMPTY_WRITE_RESULT); } else { DeltaManifests deltaManifests = SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, e.getValue().getCommittable().manifest()); + DeltaManifestsSerializer.INSTANCE, committable.manifest()); pendingResults.put( e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); @@ -171,8 +179,10 @@ private void commitPendingRequests( } } + Map observerMetadata = + mergedObserverMetadata.isEmpty() ? null : mergedObserverMetadata; CommitSummary summary = new CommitSummary(pendingResults); - commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId); + commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); if (committerMetrics != null) { committerMetrics.updateCommitSummary(summary); } @@ -195,14 +205,15 @@ private void commitPendingResult( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { if (replacePartitions) { - replacePartitions(pendingResults, summary, newFlinkJobId, operatorId); + replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); } else { - commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId); + commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata); } continuousEmptyCheckpoints = 0; } else { @@ -215,7 +226,8 @@ private void replacePartitions( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long checkpointId = pendingResults.lastKey(); Preconditions.checkState( summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files."); @@ -229,14 +241,16 @@ private void replacePartitions( String description = "dynamic partition overwrite"; logCommitSummary(summary, description); - commitOperation(dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId); + commitOperation( + dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId, observerMetadata); } private void commitDeltaTxn( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - String operatorId) { + String operatorId, + @Nullable Map observerMetadata) { long checkpointId = pendingResults.lastKey(); if (summary.deleteFilesCount() == 0) { // To be compatible with iceberg format V1. @@ -250,7 +264,8 @@ private void commitDeltaTxn( String description = "append"; logCommitSummary(summary, description); // fail all commits as really its only one - commitOperation(appendFiles, description, newFlinkJobId, operatorId, checkpointId); + commitOperation( + appendFiles, description, newFlinkJobId, operatorId, checkpointId, observerMetadata); } else { // To be compatible with iceberg format V2. for (Map.Entry e : pendingResults.entrySet()) { @@ -274,7 +289,8 @@ private void commitDeltaTxn( String description = "rowDelta"; logCommitSummary(summary, description); - commitOperation(rowDelta, description, newFlinkJobId, operatorId, e.getKey()); + commitOperation( + rowDelta, description, newFlinkJobId, operatorId, e.getKey(), observerMetadata); } } } @@ -284,9 +300,13 @@ private void commitOperation( String description, String newFlinkJobId, String operatorId, - long checkpointId) { + long checkpointId, + @Nullable Map observerMetadata) { snapshotProperties.forEach(operation::set); + if (observerMetadata != null) { + observerMetadata.forEach(operation::set); + } // custom snapshot metadata properties will be overridden if they conflict with internal ones // used by the sink. operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index eaaf4ea6e4e3..eab54435ad23 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; @@ -174,6 +175,8 @@ public class IcebergSink private final transient List> maintenanceTasks; private final transient FlinkMaintenanceConfig flinkMaintenanceConfig; + @Nullable private final WriteObserver writeObserver; + private IcebergSink( TableLoader tableLoader, Table table, @@ -188,7 +191,8 @@ private IcebergSink( boolean overwriteMode, List> maintenanceTasks, FlinkMaintenanceConfig flinkMaintenanceConfig, - Set equalityFieldColumns) { + Set equalityFieldColumns, + @Nullable WriteObserver writeObserver) { this.tableLoader = tableLoader; this.snapshotProperties = snapshotProperties; this.uidSuffix = uidSuffix; @@ -212,6 +216,7 @@ private IcebergSink( this.maintenanceTasks = maintenanceTasks; this.flinkMaintenanceConfig = flinkMaintenanceConfig; this.equalityFieldColumns = equalityFieldColumns; + this.writeObserver = writeObserver; } @Override @@ -232,7 +237,8 @@ public SinkWriter createWriter(WriterInitContext context) { taskWriterFactory, metrics, context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + writeObserver); } @Override @@ -349,6 +355,7 @@ public static class Builder implements IcebergSinkBuilder { private ReadableConfig readableConfig = new Configuration(); private List equalityFieldColumns = null; private final List> maintenanceTasks = Lists.newArrayList(); + @Nullable private WriteObserver writeObserver; private Builder() {} @@ -716,6 +723,11 @@ public Builder toBranch(String branch) { return this; } + public Builder writeObserver(WriteObserver observer) { + this.writeObserver = observer; + return this; + } + IcebergSink build() { Preconditions.checkArgument( @@ -806,7 +818,8 @@ IcebergSink build() { overwriteMode, maintenanceTasks, flinkMaintenanceConfig, - equalityFieldColumnsSet); + equalityFieldColumnsSet, + writeObserver); } /** diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java index 7234cf74020e..7228b356f3bc 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkWriter.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.table.data.RowData; @@ -46,6 +48,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { private TaskWriter writer; private final int subTaskId; private final int attemptId; + @Nullable private final WriteObserver writeObserver; IcebergSinkWriter( String fullTableName, @@ -53,6 +56,16 @@ class IcebergSinkWriter implements CommittingSinkWriter { IcebergStreamWriterMetrics metrics, int subTaskId, int attemptId) { + this(fullTableName, taskWriterFactory, metrics, subTaskId, attemptId, null); + } + + IcebergSinkWriter( + String fullTableName, + TaskWriterFactory taskWriterFactory, + IcebergStreamWriterMetrics metrics, + int subTaskId, + int attemptId, + @Nullable WriteObserver writeObserver) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; // Initialize the task writer factory. @@ -62,6 +75,7 @@ class IcebergSinkWriter implements CommittingSinkWriter { this.metrics = metrics; this.subTaskId = subTaskId; this.attemptId = attemptId; + this.writeObserver = writeObserver; LOG.debug( "Created Stream Writer for table {} subtask {} attemptId {}", fullTableName, @@ -71,6 +85,9 @@ class IcebergSinkWriter implements CommittingSinkWriter { @Override public void write(RowData element, Context context) throws IOException, InterruptedException { + if (writeObserver != null) { + writeObserver.observe(element, context); + } writer.write(element); } @@ -108,6 +125,14 @@ public Collection prepareCommit() throws IOException { attemptId, result.dataFiles().length, result.deleteFiles().length); + + if (writeObserver != null) { + Map metadata = writeObserver.snapshotMetadata(); + if (metadata != null && !metadata.isEmpty()) { + WriteObserverMetadataHolder.set(metadata); + } + } + return Lists.newArrayList(result); } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java index 4a4a789bf9ef..a22870b42e42 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Map; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; @@ -34,6 +35,7 @@ import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ class IcebergWriteAggregator extends AbstractStreamOperator results; private final TableLoader tableLoader; + private final Map accumulatedObserverMetadata = Maps.newHashMap(); private long lastCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; @@ -106,12 +109,16 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { this.lastCheckpointId = checkpointId; + Map metadata = + accumulatedObserverMetadata.isEmpty() ? null : Maps.newHashMap(accumulatedObserverMetadata); + IcebergCommittable committable = new IcebergCommittable( writeToManifest(results, checkpointId), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), - checkpointId); + checkpointId, + metadata); CommittableMessage summary = new CommittableSummary<>(0, 1, checkpointId, 1, 1, 0); output.collect(new StreamRecord<>(summary)); @@ -120,6 +127,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { output.collect(new StreamRecord<>(message)); LOG.info("Emitted commit message to downstream committer operator"); results.clear(); + accumulatedObserverMetadata.clear(); } /** @@ -150,6 +158,13 @@ public void processElement(StreamRecord> element if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { results.add(((CommittableWithLineage) element.getValue()).getCommittable()); + + // Read observer metadata set by WriteResultSerializer.deserialize() on this thread. + // Merge across parallel writer subtasks — last value wins for duplicate keys. + Map metadata = WriteObserverMetadataHolder.getAndClear(); + if (metadata != null && !metadata.isEmpty()) { + accumulatedObserverMetadata.putAll(metadata); + } } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java new file mode 100644 index 000000000000..9d629ce48b82 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserver.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.table.data.RowData; + +/** + * Observer that is notified for each record written by {@link IcebergSinkWriter} and produces + * per-checkpoint metadata that flows through the sink pipeline to the Iceberg snapshot summary. + * + *

Use cases include per-record watermark extraction, data quality score tracking, or custom + * metadata that should be attached to each committed Iceberg snapshot. + * + *

The observer is called on the writer's task thread. {@link #observe(RowData, + * SinkWriter.Context)} is called for each record. At checkpoint time, {@link #snapshotMetadata()} + * is called to collect accumulated metadata, which is then carried through the aggregator to the + * committer and applied as additional Iceberg snapshot properties. The returned metadata map is + * merged across parallel writer subtasks in the aggregator. + * + *

Implementations must be {@link Serializable} because the observer travels through the Flink + * job graph (client → TaskManager) as part of {@link IcebergSink}. + */ +public interface WriteObserver extends Serializable { + + /** + * Called for each record written by the sink writer. + * + * @param element the record being written + * @param context the sink writer context, providing access to the current Flink watermark + */ + void observe(RowData element, SinkWriter.Context context); + + /** + * Called at checkpoint time to collect accumulated metadata for this checkpoint interval. + * + *

The returned map entries are applied as additional Iceberg snapshot properties alongside the + * static {@code snapshotProperties} configured on the sink builder. Implementations should reset + * their internal accumulators after this call so the next checkpoint interval starts fresh. + * + *

When multiple writer subtasks produce metadata, the aggregator merges them by taking the + * last value for each key. For watermark-style metadata, implementations should use keys that + * include a subtask identifier, or the aggregator's merge logic should be considered. + * + * @return metadata key-value pairs to include in the snapshot summary, or an empty map + */ + default Map snapshotMetadata() { + return Collections.emptyMap(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java new file mode 100644 index 000000000000..17ebc53a47f4 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteObserverMetadataHolder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Thread-local holder for passing {@link WriteObserver} metadata through the WriteResult + * serialization boundary. + * + *

On the writer side: {@link IcebergSinkWriter#prepareCommit()} stores metadata here, then + * {@link WriteResultSerializer#serialize} reads and clears it. Both run on the writer's task + * thread. + * + *

On the aggregator side: {@link WriteResultSerializer#deserialize} stores metadata here, then + * {@link IcebergWriteAggregator#processElement} reads and clears it. Both run on the aggregator's + * task thread. + * + *

This is safe because Flink processes elements sequentially on each task thread -- serialize + * and prepareCommit share one thread, deserialize and processElement share another. + */ +final class WriteObserverMetadataHolder { + private static final ThreadLocal> METADATA = new ThreadLocal<>(); + + private WriteObserverMetadataHolder() {} + + static void set(@Nullable Map metadata) { + METADATA.set(metadata); + } + + @Nullable + static Map getAndClear() { + Map metadata = METADATA.get(); + METADATA.remove(); + return metadata; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index 40a3ce0cb846..d94d00c86c27 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -20,16 +20,18 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; @Internal public class WriteResultSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public int getVersion() { @@ -40,24 +42,73 @@ public int getVersion() { public byte[] serialize(WriteResult writeResult) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + byte[] result = InstantiationUtil.serializeObject(writeResult); + view.writeInt(result.length); view.write(result); + + Map metadata = WriteObserverMetadataHolder.getAndClear(); + if (metadata != null && !metadata.isEmpty()) { + view.writeBoolean(true); + view.writeInt(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + view.writeUTF(entry.getKey()); + view.writeUTF(entry.getValue()); + } + } else { + view.writeBoolean(false); + } + return out.toByteArray(); } @Override public WriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - byte[] resultBuf = new byte[serialized.length]; - view.read(resultBuf); - try { - return InstantiationUtil.deserializeObject( - resultBuf, IcebergCommittableSerializer.class.getClassLoader()); - } catch (ClassNotFoundException cnc) { - throw new IOException("Could not deserialize the WriteResult object", cnc); - } + return deserializeV1(serialized); + } else if (version == 2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private WriteResult deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + byte[] resultBuf = new byte[serialized.length]; + view.read(resultBuf); + try { + return InstantiationUtil.deserializeObject( + resultBuf, WriteResultSerializer.class.getClassLoader()); + } catch (ClassNotFoundException cnc) { + throw new IOException("Could not deserialize the WriteResult object", cnc); + } + } + + private WriteResult deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + + int resultLen = view.readInt(); + byte[] resultBuf = new byte[resultLen]; + view.read(resultBuf); + WriteResult writeResult; + try { + writeResult = + InstantiationUtil.deserializeObject( + resultBuf, WriteResultSerializer.class.getClassLoader()); + } catch (ClassNotFoundException cnc) { + throw new IOException("Could not deserialize the WriteResult object", cnc); + } + + boolean hasMetadata = view.readBoolean(); + if (hasMetadata) { + int mapSize = view.readInt(); + Map metadata = Maps.newHashMapWithExpectedSize(mapSize); + for (int i = 0; i < mapSize; i++) { + metadata.put(view.readUTF(), view.readUTF()); + } + WriteObserverMetadataHolder.set(metadata); + } + + return writeResult; + } }