Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
Expand All @@ -36,12 +39,23 @@ class IcebergCommittable implements Serializable {
private final String jobId;
private final String operatorId;
private final long checkpointId;
private final Map<String, String> observerMetadata;

IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
this(manifest, jobId, operatorId, checkpointId, null);
}

IcebergCommittable(
byte[] manifest,
String jobId,
String operatorId,
long checkpointId,
@Nullable Map<String, String> observerMetadata) {
this.manifest = manifest;
this.jobId = jobId;
this.operatorId = operatorId;
this.checkpointId = checkpointId;
this.observerMetadata = observerMetadata != null ? observerMetadata : Collections.emptyMap();
}

byte[] manifest() {
Expand All @@ -60,6 +74,10 @@ Long checkpointId() {
return checkpointId;
}

Map<String, String> observerMetadata() {
return observerMetadata;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
* This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer
Expand All @@ -31,7 +33,7 @@
* <p>In both cases only the respective part is serialized.
*/
public class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
private static final int VERSION = 1;
private static final int VERSION = 2;

@Override
public int getVersion() {
Expand All @@ -47,22 +49,62 @@ public byte[] serialize(IcebergCommittable committable) throws IOException {
view.writeLong(committable.checkpointId());
view.writeInt(committable.manifest().length);
view.write(committable.manifest());

Map<String, String> metadata = committable.observerMetadata();
if (metadata != null && !metadata.isEmpty()) {
view.writeBoolean(true);
view.writeInt(metadata.size());
for (Map.Entry<String, String> entry : metadata.entrySet()) {
view.writeUTF(entry.getKey());
view.writeUTF(entry.getValue());
}
} else {
view.writeBoolean(false);
}

return out.toByteArray();
}

@Override
public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException {
if (version == 1) {
DataInputDeserializer view = new DataInputDeserializer(serialized);
String jobId = view.readUTF();
String operatorId = view.readUTF();
long checkpointId = view.readLong();
int manifestLen = view.readInt();
byte[] manifestBuf;
manifestBuf = new byte[manifestLen];
view.read(manifestBuf);
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);
return deserializeV1(serialized);
} else if (version == 2) {
return deserializeV2(serialized);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}

private IcebergCommittable deserializeV1(byte[] serialized) throws IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
String jobId = view.readUTF();
String operatorId = view.readUTF();
long checkpointId = view.readLong();
int manifestLen = view.readInt();
byte[] manifestBuf = new byte[manifestLen];
view.read(manifestBuf);
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);
}

private IcebergCommittable deserializeV2(byte[] serialized) throws IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
String jobId = view.readUTF();
String operatorId = view.readUTF();
long checkpointId = view.readLong();
int manifestLen = view.readInt();
byte[] manifestBuf = new byte[manifestLen];
view.read(manifestBuf);

Map<String, String> metadata = null;
boolean hasMetadata = view.readBoolean();
if (hasMetadata) {
int mapSize = view.readInt();
metadata = Maps.newHashMapWithExpectedSize(mapSize);
for (int i = 0; i < mapSize; i++) {
metadata.put(view.readUTF(), view.readUTF());
}
}

return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.iceberg.AppendFiles;
Expand Down Expand Up @@ -157,22 +158,31 @@ private void commitPendingRequests(
long checkpointId = commitRequestMap.lastKey();
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();

// Merge observer metadata from all committables
Map<String, String> mergedObserverMetadata = Maps.newHashMap();
for (Map.Entry<Long, CommitRequest<IcebergCommittable>> e : commitRequestMap.entrySet()) {
if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue().getCommittable().manifest())) {
IcebergCommittable committable = e.getValue().getCommittable();
if (!committable.observerMetadata().isEmpty()) {
mergedObserverMetadata.putAll(committable.observerMetadata());
}
if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.manifest())) {
pendingResults.put(e.getKey(), EMPTY_WRITE_RESULT);
} else {
DeltaManifests deltaManifests =
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue().getCommittable().manifest());
DeltaManifestsSerializer.INSTANCE, committable.manifest());
pendingResults.put(
e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
}
}

Map<String, String> observerMetadata =
mergedObserverMetadata.isEmpty() ? null : mergedObserverMetadata;
CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata);
if (committerMetrics != null) {
committerMetrics.updateCommitSummary(summary);
}
Expand All @@ -195,14 +205,15 @@ private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
String operatorId,
@Nullable Map<String, String> observerMetadata) {
long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId);
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata);
} else {
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId);
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, observerMetadata);
}
continuousEmptyCheckpoints = 0;
} else {
Expand All @@ -215,7 +226,8 @@ private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
String operatorId,
@Nullable Map<String, String> observerMetadata) {
long checkpointId = pendingResults.lastKey();
Preconditions.checkState(
summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files.");
Expand All @@ -229,14 +241,16 @@ private void replacePartitions(
String description = "dynamic partition overwrite";

logCommitSummary(summary, description);
commitOperation(dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId);
commitOperation(
dynamicOverwrite, description, newFlinkJobId, operatorId, checkpointId, observerMetadata);
}

private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
String operatorId,
@Nullable Map<String, String> observerMetadata) {
long checkpointId = pendingResults.lastKey();
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
Expand All @@ -250,7 +264,8 @@ private void commitDeltaTxn(
String description = "append";
logCommitSummary(summary, description);
// fail all commits as really its only one
commitOperation(appendFiles, description, newFlinkJobId, operatorId, checkpointId);
commitOperation(
appendFiles, description, newFlinkJobId, operatorId, checkpointId, observerMetadata);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
Expand All @@ -274,7 +289,8 @@ private void commitDeltaTxn(

String description = "rowDelta";
logCommitSummary(summary, description);
commitOperation(rowDelta, description, newFlinkJobId, operatorId, e.getKey());
commitOperation(
rowDelta, description, newFlinkJobId, operatorId, e.getKey(), observerMetadata);
}
}
}
Expand All @@ -284,9 +300,13 @@ private void commitOperation(
String description,
String newFlinkJobId,
String operatorId,
long checkpointId) {
long checkpointId,
@Nullable Map<String, String> observerMetadata) {

snapshotProperties.forEach(operation::set);
if (observerMetadata != null) {
observerMetadata.forEach(operation::set);
}
// custom snapshot metadata properties will be overridden if they conflict with internal ones
// used by the sink.
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
Expand Down Expand Up @@ -174,6 +175,8 @@ public class IcebergSink
private final transient List<MaintenanceTaskBuilder<?>> maintenanceTasks;
private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;

@Nullable private final WriteObserver writeObserver;

private IcebergSink(
TableLoader tableLoader,
Table table,
Expand All @@ -188,7 +191,8 @@ private IcebergSink(
boolean overwriteMode,
List<MaintenanceTaskBuilder<?>> maintenanceTasks,
FlinkMaintenanceConfig flinkMaintenanceConfig,
Set<String> equalityFieldColumns) {
Set<String> equalityFieldColumns,
@Nullable WriteObserver writeObserver) {
this.tableLoader = tableLoader;
this.snapshotProperties = snapshotProperties;
this.uidSuffix = uidSuffix;
Expand All @@ -212,6 +216,7 @@ private IcebergSink(
this.maintenanceTasks = maintenanceTasks;
this.flinkMaintenanceConfig = flinkMaintenanceConfig;
this.equalityFieldColumns = equalityFieldColumns;
this.writeObserver = writeObserver;
}

@Override
Expand All @@ -232,7 +237,8 @@ public SinkWriter<RowData> createWriter(WriterInitContext context) {
taskWriterFactory,
metrics,
context.getTaskInfo().getIndexOfThisSubtask(),
context.getTaskInfo().getAttemptNumber());
context.getTaskInfo().getAttemptNumber(),
writeObserver);
}

@Override
Expand Down Expand Up @@ -349,6 +355,7 @@ public static class Builder implements IcebergSinkBuilder<Builder> {
private ReadableConfig readableConfig = new Configuration();
private List<String> equalityFieldColumns = null;
private final List<MaintenanceTaskBuilder<?>> maintenanceTasks = Lists.newArrayList();
@Nullable private WriteObserver writeObserver;

private Builder() {}

Expand Down Expand Up @@ -716,6 +723,11 @@ public Builder toBranch(String branch) {
return this;
}

public Builder writeObserver(WriteObserver observer) {
this.writeObserver = observer;
return this;
}

IcebergSink build() {

Preconditions.checkArgument(
Expand Down Expand Up @@ -806,7 +818,8 @@ IcebergSink build() {
overwriteMode,
maintenanceTasks,
flinkMaintenanceConfig,
equalityFieldColumnsSet);
equalityFieldColumnsSet,
writeObserver);
}

/**
Expand Down
Loading
Loading