Skip to content

Commit

Permalink
Simplify intermediate data in Iceberg sink; use manifest files (#31090)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Apr 26, 2024
1 parent fe00df6 commit d07cc62
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 346 deletions.
1 change: 0 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.avro
implementation library.java.hadoop_common

testImplementation library.java.hadoop_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void processElement(
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendFile(writtenFile.getDataFile());
update.appendManifest(writtenFile.getManifestFile());
}
update.commit();
out.outputWithTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,197 +17,69 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

@AutoValue
@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
@DefaultSchema(AutoValueSchema.class)
abstract class FileWriteResult {
public abstract TableIdentifier getTableIdentifier();

public abstract PartitionSpec getPartitionSpec();
private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
private transient @MonotonicNonNull ManifestFile cachedManifestFile;

public abstract DataFile getDataFile();
abstract String getTableIdentifierString();

public static Builder builder() {
return new AutoValue_FileWriteResult.Builder();
}
@SuppressWarnings("mutable")
abstract byte[] getManifestFileBytes();

@AutoValue.Builder
abstract static class Builder {
public abstract Builder setTableIdentifier(TableIdentifier tableId);

public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);

public abstract Builder setDataFile(DataFile dataFiles);

public abstract FileWriteResult build();
}

public static class FileWriteResultCoder extends StructuredCoder<FileWriteResult> {
static final int VERSION = 0;
private static final FileWriteResultCoder SINGLETON = new FileWriteResultCoder();

private static final Coder<String> tableIdentifierCoder = StringUtf8Coder.of();
private static final Coder<PartitionSpec> partitionSpecCoder =
SerializableCoder.of(PartitionSpec.class);
private static final Coder<byte[]> dataFileBytesCoder = ByteArrayCoder.of();

private static Schema getDataFileAvroSchema(FileWriteResult fileWriteResult) {
Types.StructType partitionType = fileWriteResult.getPartitionSpec().partitionType();
Types.StructType dataFileStruct = DataFile.getType(partitionType);
Map<Types.StructType, String> dataFileNames =
ImmutableMap.of(
dataFileStruct, "org.apache.iceberg.GenericDataFile",
partitionType, "org.apache.iceberg.PartitionData");
return AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
}

@Override
public void encode(FileWriteResult value, OutputStream outStream)
throws CoderException, IOException {
// "version" of this coder.
// If breaking changes are introduced (e.g. from Beam, Iceberg, Avro, etc..),
// then update this version and create a fork in decode() below for the new decode logic.
// This helps keep the pipeline update-compatible
outStream.write(VERSION);

tableIdentifierCoder.encode(value.getTableIdentifier().toString(), outStream);
partitionSpecCoder.encode(value.getPartitionSpec(), outStream);
dataFileBytesCoder.encode(
AvroEncoderUtil.encode(value.getDataFile(), getDataFileAvroSchema(value)), outStream);
}

@Override
public FileWriteResult decode(InputStream inStream) throws CoderException, IOException {
// Forking logic can be added here depending on the version of this coder
assert inStream.read() == 0;

TableIdentifier tableId = TableIdentifier.parse(tableIdentifierCoder.decode(inStream));
PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream);
DataFile dataFile =
checkArgumentNotNull(
AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)),
"Decoding of dataFile resulted in null");
return FileWriteResult.builder()
.setTableIdentifier(tableId)
.setDataFile(dataFile)
.setPartitionSpec(partitionSpec)
.build();
}

@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.emptyList();
}

@Override
public Object structuralValue(FileWriteResult fileWriteResult) {
return new FileWriteResultDeepEqualityWrapper(fileWriteResult);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {}

@Override
public TypeDescriptor<FileWriteResult> getEncodedTypeDescriptor() {
return TypeDescriptor.of(FileWriteResult.class);
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
}
return cachedTableIdentifier;
}

public static FileWriteResultCoder of() {
return SINGLETON;
@SchemaIgnore
public ManifestFile getManifestFile() {
if (cachedManifestFile == null) {
try {
cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
} catch (IOException exc) {
throw new RuntimeException("Error decoding manifest file bytes");
}
}
return cachedManifestFile;
}

@SuppressWarnings("unused") // used via `DefaultCoder` annotation
public static CoderProvider getCoderProvider() {
return CoderProviders.forCoder(
TypeDescriptor.of(FileWriteResult.class), FileWriteResultCoder.of());
}
public static Builder builder() {
return new AutoValue_FileWriteResult.Builder();
}

private static class FileWriteResultDeepEqualityWrapper {
private final FileWriteResult fileWriteResult;
@AutoValue.Builder
abstract static class Builder {

private FileWriteResultDeepEqualityWrapper(FileWriteResult fileWriteResult) {
this.fileWriteResult = fileWriteResult;
}
abstract Builder setTableIdentifierString(String tableIdString);

@Override
public boolean equals(@Nullable Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof FileWriteResultDeepEqualityWrapper)) {
return false;
}
FileWriteResultDeepEqualityWrapper other = (FileWriteResultDeepEqualityWrapper) obj;
abstract Builder setManifestFileBytes(byte[] manifestFileBytes);

return Objects.equals(
fileWriteResult.getTableIdentifier(), other.fileWriteResult.getTableIdentifier())
&& Objects.equals(
fileWriteResult.getPartitionSpec(), other.fileWriteResult.getPartitionSpec())
&& dataFilesEqual(fileWriteResult.getDataFile(), other.fileWriteResult.getDataFile());
@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
}

private boolean dataFilesEqual(DataFile first, DataFile second) {
return Objects.equals(first.pos(), second.pos())
&& first.specId() == second.specId()
&& Objects.equals(first.content(), second.content())
&& Objects.equals(first.path(), second.path())
&& Objects.equals(first.format(), second.format())
&& Objects.equals(first.partition(), second.partition())
&& first.recordCount() == second.recordCount()
&& first.fileSizeInBytes() == second.fileSizeInBytes()
&& Objects.equals(first.columnSizes(), second.columnSizes())
&& Objects.equals(first.valueCounts(), second.valueCounts())
&& Objects.equals(first.nullValueCounts(), second.nullValueCounts())
&& Objects.equals(first.nanValueCounts(), second.nanValueCounts())
&& Objects.equals(first.lowerBounds(), second.lowerBounds())
&& Objects.equals(first.upperBounds(), second.upperBounds())
&& Objects.equals(first.keyMetadata(), second.keyMetadata())
&& Objects.equals(first.splitOffsets(), second.splitOffsets())
&& Objects.equals(first.equalityFieldIds(), second.equalityFieldIds())
&& Objects.equals(first.sortOrderId(), second.sortOrderId())
&& Objects.equals(first.dataSequenceNumber(), second.dataSequenceNumber())
&& Objects.equals(first.fileSequenceNumber(), second.fileSequenceNumber());
@SchemaIgnore
public Builder setManifestFile(ManifestFile manifestFile) throws IOException {
return setManifestFileBytes(ManifestFiles.encode(manifestFile));
}

@Override
public int hashCode() {
return Objects.hash(
fileWriteResult.getTableIdentifier(),
fileWriteResult.getPartitionSpec(),
fileWriteResult.getDataFile());
}
public abstract FileWriteResult build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -37,6 +40,7 @@ class RecordWriter {
private final DataWriter<Record> icebergDataWriter;

private final Table table;
private final String absoluteFilename;

RecordWriter(Catalog catalog, IcebergDestination destination, String filename)
throws IOException {
Expand All @@ -46,9 +50,9 @@ class RecordWriter {

RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException {
this.table = table;

String absoluteFilename = table.location() + "/" + filename;
this.absoluteFilename = table.location() + "/" + filename;
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);

switch (fileFormat) {
case AVRO:
icebergDataWriter =
Expand Down Expand Up @@ -92,7 +96,15 @@ public long bytesWritten() {
return icebergDataWriter.length();
}

public DataFile dataFile() {
return icebergDataWriter.toDataFile();
public ManifestFile getManifestFile() throws IOException {
String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest");
OutputFile outputFile = table.io().newOutputFile(manifestFilename);
ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(getTable().spec(), outputFile)) {
openWriter.add(icebergDataWriter.toDataFile());
manifestWriter = openWriter;
}

return manifestWriter.toManifestFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public void processElement(
c.output(
FileWriteResult.builder()
.setTableIdentifier(destination.getTableIdentifier())
.setDataFile(writer.dataFile())
.setPartitionSpec(writer.getTable().spec())
.setManifestFile(writer.getManifestFile())
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void processElement(@Element Row element, BoundedWindow window, MultiOutp
out.get(WRITTEN_FILES_TAG)
.output(
FileWriteResult.builder()
.setDataFile(writer.dataFile())
.setManifestFile(writer.getManifestFile())
.setTableIdentifier(destination.getTableIdentifier())
.build());
writer = createAndInsertWriter(destination, window);
Expand Down Expand Up @@ -307,9 +307,8 @@ private void outputFinalWrittenFiles(DoFn<Row, FileWriteResult>.FinishBundleCont
getWindows().get(destination), "internal error: no windows for destination");
c.output(
FileWriteResult.builder()
.setDataFile(writer.dataFile())
.setManifestFile(writer.getManifestFile())
.setTableIdentifier(destination.getTableIdentifier())
.setPartitionSpec(writer.getTable().spec())
.build(),
window.maxTimestamp(),
window);
Expand Down

0 comments on commit d07cc62

Please sign in to comment.