Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method org.apache.iceberg.view.ViewBuilder org.apache.iceberg.view.ViewBuilder::withQueryColumnNames(java.util.List<java.lang.String>)"
justification: "Acceptable break due to updating View APIs and the View Spec"
org.apache.iceberg:iceberg-core:
- code: "java.method.abstractMethodAdded"
new: "method org.apache.iceberg.io.FileAppender<org.apache.iceberg.ManifestEntry<F>>\
\ org.apache.iceberg.ManifestWriter<F extends org.apache.iceberg.ContentFile<F\
\ extends org.apache.iceberg.ContentFile<F>>>::newAppender(org.apache.iceberg.PartitionSpec,\
\ org.apache.iceberg.io.OutputFile, java.util.Map<java.lang.String, java.lang.String>)"
justification: "{Adding a new method to support set file appender config for\
\ ManifestWriter}"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ project(':iceberg-core') {
testImplementation libs.esotericsoftware.kryo
testImplementation libs.guava.testlib
testImplementation libs.awaitility
testImplementation libs.snappy
testImplementation libs.zstd
}
}

Expand Down
30 changes: 22 additions & 8 deletions core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -41,6 +43,7 @@
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
Expand All @@ -59,14 +62,20 @@ public class ManifestReadBenchmark {
private static final int NUM_ROWS = 100000;
private static final int NUM_COLS = 10;

private String baseDir;
private String rootPath;
private String manifestListFile;

@Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
private String codec;

@Setup
public void before() {
baseDir =
String tmpDir =
Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
File baseDirectory = new File(tmpDir, codec);
baseDirectory.mkdir();
rootPath = baseDirectory.getAbsolutePath();
manifestListFile = String.format("%s/%s.avro", rootPath, UUID.randomUUID());

Random random = new Random(System.currentTimeMillis());
ManifestListWriter listWriter =
Expand All @@ -76,10 +85,15 @@ public void before() {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
org.apache.iceberg.Files.localOutput(
String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
String.format("%s/%s.avro", rootPath, UUID.randomUUID()));

ManifestWriter<DataFile> writer =
ManifestFiles.write(1, PartitionSpec.unpartitioned(), manifestFile, 1L);
ManifestFiles.write(
1,
PartitionSpec.unpartitioned(),
manifestFile,
1L,
ImmutableMap.of(AVRO_COMPRESSION, codec));
try (ManifestWriter<DataFile> finalWriter = writer) {
for (int j = 0; j < NUM_ROWS; j++) {
DataFile dataFile =
Expand Down Expand Up @@ -107,11 +121,11 @@ public void before() {

@TearDown
public void after() throws IOException {
if (baseDir != null) {
try (Stream<Path> walk = Files.walk(Paths.get(baseDir))) {
if (rootPath != null) {
try (Stream<Path> walk = Files.walk(Paths.get(rootPath))) {
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
}
baseDir = null;
rootPath = null;
}

manifestListFile = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {
specsById,
newFile,
snapshotId(),
summaryBuilder);
summaryBuilder,
ops.current().properties());
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.specsById(),
newManifestPath,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties());
}

@Override
Expand Down
62 changes: 53 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,30 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(formatVersion, spec, outputFile, snapshotId, ImmutableMap.of());
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param config config for manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
Map<String, String> config) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId, config);
case 2:
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId, config);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -198,11 +217,30 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, ImmutableMap.of());
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param config config for manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
Map<String, String> config) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, config);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -256,7 +294,8 @@ static ManifestFile copyAppendManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
Map<String, String> config) {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
try (ManifestReader<DataFile> reader =
Expand All @@ -267,7 +306,8 @@ static ManifestFile copyAppendManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.ADDED);
ManifestEntry.Status.ADDED,
config);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -280,7 +320,8 @@ static ManifestFile copyRewriteManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
Map<String, String> config) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
// exception if it is not
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
Expand All @@ -292,7 +333,8 @@ static ManifestFile copyRewriteManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.EXISTING);
ManifestEntry.Status.EXISTING,
config);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -305,8 +347,10 @@ private static ManifestFile copyManifestInternal(
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
ManifestEntry.Status allowedEntryStatus,
Map<String, String> config) {
ManifestWriter<DataFile> writer =
write(formatVersion, reader.spec(), outputFile, snapshotId, config);
boolean threw = true;
try {
for (ManifestEntry<DataFile> entry : reader.entries()) {
Expand Down
55 changes: 44 additions & 11 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

/**
* Writer for manifest files.
Expand All @@ -38,6 +40,8 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp

private final OutputFile file;
private final int specId;
// Config for file appender, such as for compression codec
private final Map<String, String> config;
private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
private final GenericManifestEntry<F> reused;
Expand All @@ -52,20 +56,27 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp
private long deletedRows = 0L;
private Long minDataSequenceNumber = null;

private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
private ManifestWriter(
PartitionSpec spec, OutputFile file, Long snapshotId, Map<String, String> config) {
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(spec, file);
this.config = config;
this.writer = newAppender(spec, file, config);
this.snapshotId = snapshotId;
this.reused = new GenericManifestEntry<>(spec.partitionType());
this.stats = new PartitionSummary(spec);
}

protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);

/** @deprecated since 1.4.0, will be removed in 1.5.0 */
@Deprecated
protected abstract FileAppender<ManifestEntry<F>> newAppender(
PartitionSpec spec, OutputFile outputFile);

protected abstract FileAppender<ManifestEntry<F>> newAppender(
PartitionSpec spec, OutputFile outputFile, Map<String, String> configs);

protected ManifestContent content() {
return ManifestContent.DATA;
}
Expand Down Expand Up @@ -216,8 +227,8 @@ public void close() throws IOException {
static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;

V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Map<String, String> config) {
super(spec, file, snapshotId, config);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}

Expand All @@ -228,7 +239,13 @@ protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {

@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file) {
PartitionSpec spec, OutputFile outputFile) {
return newAppender(spec, outputFile, ImmutableMap.of());
}

@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file, Map<String, String> configs) {
Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
Expand All @@ -239,6 +256,7 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "data")
.setAll(configs)
.overwrite()
.build();
} catch (IOException e) {
Expand All @@ -250,8 +268,9 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;

V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
V2DeleteWriter(
PartitionSpec spec, OutputFile file, Long snapshotId, Map<String, String> config) {
super(spec, file, snapshotId, config);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}

Expand All @@ -262,7 +281,13 @@ protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {

@Override
protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
PartitionSpec spec, OutputFile file) {
PartitionSpec spec, OutputFile outputFile) {
return newAppender(spec, outputFile, ImmutableMap.of());
}

@Override
protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
PartitionSpec spec, OutputFile file, Map<String, String> config) {
Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
Expand All @@ -273,6 +298,7 @@ protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "deletes")
.setAll(config)
.overwrite()
.build();
} catch (IOException e) {
Expand All @@ -289,8 +315,8 @@ protected ManifestContent content() {
static class V1Writer extends ManifestWriter<DataFile> {
private final V1Metadata.IndexedManifestEntry entryWrapper;

V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Map<String, String> config) {
super(spec, file, snapshotId, config);
this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType());
}

Expand All @@ -301,7 +327,13 @@ protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {

@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file) {
PartitionSpec spec, OutputFile outputFile) {
return newAppender(spec, outputFile, ImmutableMap.of());
}

@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file, Map<String, String> config) {
Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
Expand All @@ -311,6 +343,7 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "1")
.setAll(config)
.overwrite()
.build();
} catch (IOException e) {
Expand Down
Loading