From d83ad26be6dd8ddc9bb44ddbb2c012ef760cf747 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Thu, 1 Dec 2022 21:37:31 +0800 Subject: [PATCH] avoid generating large manifest file --- .../java/org/apache/iceberg/FastAppend.java | 47 ++-- .../iceberg/MergingSnapshotProducer.java | 44 ++-- .../apache/iceberg/RollingManifestWriter.java | 167 ++++++++++++++ .../org/apache/iceberg/SnapshotProducer.java | 16 ++ .../apache/iceberg/TestManifestWriter.java | 203 ++++++++++++++++++ 5 files changed, 439 insertions(+), 38 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/RollingManifestWriter.java diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 5e5e5128411a..3079757392cd 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -49,7 +49,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); - private ManifestFile newManifest = null; + private List newManifests = null; private boolean hasNewFiles = false; FastAppend(String tableName, TableOperations ops) { @@ -143,12 +143,12 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override public List apply(TableMetadata base, Snapshot snapshot) { - List newManifests = Lists.newArrayList(); + List manifests = Lists.newArrayList(); try { - ManifestFile manifest = writeManifest(); - if (manifest != null) { - newManifests.add(manifest); + List newWrittenManifests = writeNewManifests(); + if (newWrittenManifests != null) { + manifests.addAll(newWrittenManifests); } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to write manifest"); @@ -158,13 +158,13 @@ public List apply(TableMetadata base, Snapshot snapshot) { Iterables.transform( Iterables.concat(appendManifests, rewrittenAppendManifests), manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); - Iterables.addAll(newManifests, appendManifestsWithMetadata); + Iterables.addAll(manifests, appendManifestsWithMetadata); if (snapshot != null) { - newManifests.addAll(snapshot.allManifests(ops.io())); + manifests.addAll(snapshot.allManifests(ops.io())); } - return newManifests; + return manifests; } @Override @@ -178,8 +178,17 @@ public Object updateEvent() { @Override protected void cleanUncommitted(Set committed) { - if (newManifest != null && !committed.contains(newManifest)) { - deleteFile(newManifest.path()); + if (newManifests != null) { + List committedNewManifests = Lists.newArrayList(); + for (ManifestFile manifest : newManifests) { + if (committed.contains(manifest)) { + committedNewManifests.add(manifest); + } else { + deleteFile(manifest.path()); + } + } + + this.newManifests = committedNewManifests; } // clean up only rewrittenAppendManifests as they are always owned by the table @@ -191,24 +200,24 @@ protected void cleanUncommitted(Set committed) { } } - private ManifestFile writeManifest() throws IOException { - if (hasNewFiles && newManifest != null) { - deleteFile(newManifest.path()); - newManifest = null; + private List writeNewManifests() throws IOException { + if (hasNewFiles && newManifests != null) { + newManifests.forEach(file -> deleteFile(file.path())); + newManifests = null; } - if (newManifest == null && newFiles.size() > 0) { - ManifestWriter writer = newManifestWriter(spec); + if (newManifests == null && newFiles.size() > 0) { + RollingManifestWriter writer = newRollingManifestWriter(spec); try { - writer.addAll(newFiles); + newFiles.forEach(writer::add); } finally { writer.close(); } - this.newManifest = writer.toManifestFile(); + this.newManifests = writer.toManifestFiles(); hasNewFiles = false; } - return newManifest; + return newManifests; } } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 50270a05f3dd..fc3486366e5f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -94,7 +94,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private PartitionSpec dataSpec; // cache new data manifests after writing - private ManifestFile cachedNewDataManifest = null; + private List cachedNewDataManifests = null; private boolean hasNewDataFiles = false; // cache new manifests for delete files @@ -907,9 +907,17 @@ public Object updateEvent() { } private void cleanUncommittedAppends(Set committed) { - if (cachedNewDataManifest != null && !committed.contains(cachedNewDataManifest)) { - deleteFile(cachedNewDataManifest.path()); - this.cachedNewDataManifest = null; + if (cachedNewDataManifests != null) { + List committedNewManifests = Lists.newArrayList(); + for (ManifestFile manifest : cachedNewDataManifests) { + if (committed.contains(manifest)) { + committedNewManifests.add(manifest); + } else { + deleteFile(manifest.path()); + } + } + + this.cachedNewDataManifests = committedNewManifests; } ListIterator deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); @@ -952,10 +960,8 @@ protected void cleanUncommitted(Set committed) { private Iterable prepareNewDataManifests() { Iterable newManifests; if (newDataFiles.size() > 0) { - ManifestFile newManifest = newDataFilesAsManifest(); - newManifests = - Iterables.concat( - ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests); + List dataFileManifests = newDataFilesAsManifests(); + newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests); } else { newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests); } @@ -965,18 +971,18 @@ private Iterable prepareNewDataManifests() { manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); } - private ManifestFile newDataFilesAsManifest() { - if (hasNewDataFiles && cachedNewDataManifest != null) { - deleteFile(cachedNewDataManifest.path()); - cachedNewDataManifest = null; + private List newDataFilesAsManifests() { + if (hasNewDataFiles && cachedNewDataManifests != null) { + cachedNewDataManifests.forEach(file -> deleteFile(file.path())); + cachedNewDataManifests = null; } - if (cachedNewDataManifest == null) { + if (cachedNewDataManifests == null) { try { - ManifestWriter writer = newManifestWriter(dataSpec()); + RollingManifestWriter writer = newRollingManifestWriter(dataSpec()); try { if (newDataFilesDataSequenceNumber == null) { - writer.addAll(newDataFiles); + newDataFiles.forEach(writer::add); } else { newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber)); } @@ -984,14 +990,14 @@ private ManifestFile newDataFilesAsManifest() { writer.close(); } - this.cachedNewDataManifest = writer.toManifestFile(); + this.cachedNewDataManifests = writer.toManifestFiles(); this.hasNewDataFiles = false; } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest writer"); } } - return cachedNewDataManifest; + return cachedNewDataManifests; } private Iterable prepareDeleteManifests() { @@ -1017,7 +1023,7 @@ private List newDeleteFilesAsManifests() { (specId, deleteFiles) -> { PartitionSpec spec = ops.current().spec(specId); try { - ManifestWriter writer = newDeleteManifestWriter(spec); + RollingManifestWriter writer = newRollingDeleteManifestWriter(spec); try { deleteFiles.forEach( df -> { @@ -1030,7 +1036,7 @@ private List newDeleteFilesAsManifests() { } finally { writer.close(); } - cachedNewDeleteManifests.add(writer.toManifestFile()); + cachedNewDeleteManifests.addAll(writer.toManifestFiles()); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest writer"); } diff --git a/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java new file mode 100644 index 000000000000..0a0dad8e341d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Supplier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** As opposed to {@link ManifestWriter}, a rolling writer could produce multiple manifest files. */ +public class RollingManifestWriter> implements Closeable { + private static final int ROWS_DIVISOR = 250; + + private final FileIO fileIO; + private final Supplier> manifestWriterSupplier; + private final long targetFileSizeInBytes; + private final List manifestFiles; + + private long currentFileRows = 0; + private ManifestWriter currentWriter = null; + + private boolean closed = false; + + public RollingManifestWriter( + FileIO fileIO, + Supplier> manifestWriterSupplier, + long targetFileSizeInBytes) { + this.fileIO = fileIO; + this.manifestWriterSupplier = manifestWriterSupplier; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.manifestFiles = Lists.newArrayList(); + } + + /** + * Add an added entry for a file. + * + *

The entry's snapshot ID will be this manifest's snapshot ID. The data and file sequence + * numbers will be assigned at commit. + * + * @param addedFile a data file + */ + public void add(F addedFile) { + currentWriter().add(addedFile); + currentFileRows++; + } + + /** + * Add an added entry for a file with a specific sequence number. + * + *

The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + * number will be the provided data sequence number. The entry's file sequence number will be + * assigned at commit. + * + * @param addedFile a data file + * @param dataSequenceNumber a data sequence number for the file + */ + public void add(F addedFile, long dataSequenceNumber) { + currentWriter().add(addedFile, dataSequenceNumber); + currentFileRows++; + } + + /** + * Add an existing entry for a file. + * + *

The original data and file sequence numbers, snapshot ID, which were assigned at commit, + * must be preserved when adding an existing entry. + * + * @param existingFile a file + * @param fileSnapshotId snapshot ID when the data file was added to the table + * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added) + * @param fileSequenceNumber a file sequence number (assigned when the file was added) + */ + public void existing( + F existingFile, long fileSnapshotId, long dataSequenceNumber, Long fileSequenceNumber) { + currentWriter().existing(existingFile, fileSnapshotId, dataSequenceNumber, fileSequenceNumber); + currentFileRows++; + } + + /** + * Add a delete entry for a file. + * + *

The entry's snapshot ID will be this manifest's snapshot ID. However, the original data and + * file sequence numbers of the file must be preserved when the file is marked as deleted. + * + * @param deletedFile a file + * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added) + * @param fileSequenceNumber a file sequence number (assigned when the file was added) + */ + public void delete(F deletedFile, long dataSequenceNumber, Long fileSequenceNumber) { + currentWriter().delete(deletedFile, dataSequenceNumber, fileSequenceNumber); + currentFileRows++; + } + + private ManifestWriter currentWriter() { + if (currentWriter == null) { + this.currentWriter = manifestWriterSupplier.get(); + } else if (shouldRollToNewFile()) { + closeCurrentWriter(); + this.currentWriter = manifestWriterSupplier.get(); + } + + return currentWriter; + } + + private boolean shouldRollToNewFile() { + return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes; + } + + private void closeCurrentWriter() { + if (currentWriter != null) { + ManifestFile currentFile; + try { + currentWriter.close(); + currentFile = currentWriter.toManifestFile(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close current writer", e); + } + + if (currentFileRows == 0L) { + try { + fileIO.deleteFile(currentFile.path()); + } catch (UncheckedIOException e) { + // the file may not have been created, and it isn't worth failing the job to clean up, + // skip deleting + } + } else { + manifestFiles.add(currentFile); + } + + this.currentFileRows = 0; + this.currentWriter = null; + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrentWriter(); + this.closed = true; + } + } + + public List toManifestFiles() { + Preconditions.checkState(closed, "Cannot get ManifestFile list from unclosed writer"); + return manifestFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 226388a2b028..c955382e0b77 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -26,6 +26,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -85,6 +87,7 @@ public void accept(String file) { private final AtomicInteger manifestCount = new AtomicInteger(0); private final AtomicInteger attempt = new AtomicInteger(0); private final List manifestLists = Lists.newArrayList(); + private final long targetManifestSizeBytes; private MetricsReporter reporter = LoggingMetricsReporter.instance(); private volatile Long snapshotId = null; private TableMetadata base; @@ -107,6 +110,9 @@ protected SnapshotProducer(TableOperations ops) { } return addMetadata(ops, file); }); + this.targetManifestSizeBytes = + ops.current() + .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); } protected abstract ThisT self(); @@ -494,6 +500,16 @@ protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); } + protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) { + return new RollingManifestWriter<>( + ops.io(), () -> newManifestWriter(spec), targetManifestSizeBytes); + } + + protected RollingManifestWriter newRollingDeleteManifestWriter(PartitionSpec spec) { + return new RollingManifestWriter<>( + ops.io(), () -> newDeleteManifestWriter(spec), targetManifestSizeBytes); + } + protected ManifestReader newManifestReader(ManifestFile manifest) { return ManifestFiles.read(manifest, ops.io(), ops.current().specsById()); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index 245ad1b8176c..bd833c02f9bf 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -20,12 +20,15 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.UUID; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -43,6 +46,9 @@ public TestManifestWriter(int formatVersion) { super(formatVersion); } + private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 250; + private static final long SMALL_FILE_SIZE = 10L; + @Test public void testManifestStats() throws IOException { ManifestFile manifest = @@ -218,6 +224,166 @@ public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() thr statuses(Status.EXISTING, Status.EXISTING)); } + @Test + public void testRollingManifestWriterNoRecords() throws IOException { + RollingManifestWriter writer = newRollingWriteManifest(SMALL_FILE_SIZE); + + writer.close(); + Assertions.assertThat(writer.toManifestFiles()).isEmpty(); + + writer.close(); + Assertions.assertThat(writer.toManifestFiles()).isEmpty(); + } + + @Test + public void testRollingDeleteManifestWriterNoRecords() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + RollingManifestWriter writer = newRollingWriteDeleteManifest(SMALL_FILE_SIZE); + + writer.close(); + Assertions.assertThat(writer.toManifestFiles()).isEmpty(); + + writer.close(); + Assertions.assertThat(writer.toManifestFiles()).isEmpty(); + } + + @Test + public void testRollingManifestWriterSplitFiles() throws IOException { + RollingManifestWriter writer = newRollingWriteManifest(SMALL_FILE_SIZE); + + int[] addedFileCounts = new int[3]; + int[] existingFileCounts = new int[3]; + int[] deletedFileCounts = new int[3]; + long[] addedRowCounts = new long[3]; + long[] existingRowCounts = new long[3]; + long[] deletedRowCounts = new long[3]; + + for (int i = 0; i < FILE_SIZE_CHECK_ROWS_DIVISOR * 3; i++) { + int type = i % 3; + int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR; + if (type == 0) { + writer.add(newFile(i)); + addedFileCounts[fileIndex] += 1; + addedRowCounts[fileIndex] += i; + } else if (type == 1) { + writer.existing(newFile(i), 1, 1, null); + existingFileCounts[fileIndex] += 1; + existingRowCounts[fileIndex] += i; + } else { + writer.delete(newFile(i), 1, null); + deletedFileCounts[fileIndex] += 1; + deletedRowCounts[fileIndex] += i; + } + } + + writer.close(); + List manifestFiles = writer.toManifestFiles(); + Assertions.assertThat(manifestFiles.size()).isEqualTo(3); + + checkManifests( + manifestFiles, + addedFileCounts, + existingFileCounts, + deletedFileCounts, + addedRowCounts, + existingRowCounts, + deletedRowCounts); + + writer.close(); + manifestFiles = writer.toManifestFiles(); + Assertions.assertThat(manifestFiles.size()).isEqualTo(3); + + checkManifests( + manifestFiles, + addedFileCounts, + existingFileCounts, + deletedFileCounts, + addedRowCounts, + existingRowCounts, + deletedRowCounts); + } + + @Test + public void testRollingDeleteManifestWriterSplitFiles() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + RollingManifestWriter writer = newRollingWriteDeleteManifest(SMALL_FILE_SIZE); + + int[] addedFileCounts = new int[3]; + int[] existingFileCounts = new int[3]; + int[] deletedFileCounts = new int[3]; + long[] addedRowCounts = new long[3]; + long[] existingRowCounts = new long[3]; + long[] deletedRowCounts = new long[3]; + for (int i = 0; i < 3 * FILE_SIZE_CHECK_ROWS_DIVISOR; i++) { + int type = i % 3; + int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR; + if (type == 0) { + writer.add(newPosDeleteFile(i)); + addedFileCounts[fileIndex] += 1; + addedRowCounts[fileIndex] += i; + } else if (type == 1) { + writer.existing(newPosDeleteFile(i), 1, 1, null); + existingFileCounts[fileIndex] += 1; + existingRowCounts[fileIndex] += i; + } else { + writer.delete(newPosDeleteFile(i), 1, null); + deletedFileCounts[fileIndex] += 1; + deletedRowCounts[fileIndex] += i; + } + } + + writer.close(); + List manifestFiles = writer.toManifestFiles(); + Assertions.assertThat(manifestFiles.size()).isEqualTo(3); + + checkManifests( + manifestFiles, + addedFileCounts, + existingFileCounts, + deletedFileCounts, + addedRowCounts, + existingRowCounts, + deletedRowCounts); + + writer.close(); + manifestFiles = writer.toManifestFiles(); + Assertions.assertThat(manifestFiles.size()).isEqualTo(3); + + checkManifests( + manifestFiles, + addedFileCounts, + existingFileCounts, + deletedFileCounts, + addedRowCounts, + existingRowCounts, + deletedRowCounts); + } + + private void checkManifests( + List manifests, + int[] addedFileCounts, + int[] existingFileCounts, + int[] deletedFileCounts, + long[] addedRowCounts, + long[] existingRowCounts, + long[] deletedRowCounts) { + for (int i = 0; i < manifests.size(); i++) { + ManifestFile manifest = manifests.get(i); + + Assertions.assertThat(manifest.hasAddedFiles()).isTrue(); + Assertions.assertThat(manifest.addedFilesCount()).isEqualTo(addedFileCounts[i]); + Assertions.assertThat(manifest.addedRowsCount()).isEqualTo(addedRowCounts[i]); + + Assertions.assertThat(manifest.hasExistingFiles()).isTrue(); + Assertions.assertThat(manifest.existingFilesCount()).isEqualTo(existingFileCounts[i]); + Assertions.assertThat(manifest.existingRowsCount()).isEqualTo(existingRowCounts[i]); + + Assertions.assertThat(manifest.hasDeletedFiles()).isTrue(); + Assertions.assertThat(manifest.deletedFilesCount()).isEqualTo(deletedFileCounts[i]); + Assertions.assertThat(manifest.deletedRowsCount()).isEqualTo(deletedRowCounts[i]); + } + } + private DataFile newFile(long recordCount) { return newFile(recordCount, null); } @@ -234,4 +400,41 @@ private DataFile newFile(long recordCount, StructLike partition) { } return builder.build(); } + + private DeleteFile newPosDeleteFile(long recordCount) { + return FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withRecordCount(recordCount) + .build(); + } + + private RollingManifestWriter newRollingWriteManifest(long targetFileSize) { + return new RollingManifestWriter<>( + table.io(), + () -> { + OutputFile newManifestFile = newManifestFile(); + return ManifestFiles.write(formatVersion, SPEC, newManifestFile, null); + }, + targetFileSize); + } + + private RollingManifestWriter newRollingWriteDeleteManifest(long targetFileSize) { + return new RollingManifestWriter<>( + table.io(), + () -> { + OutputFile newManifestFile = newManifestFile(); + return ManifestFiles.writeDeleteManifest(formatVersion, SPEC, newManifestFile, null); + }, + targetFileSize); + } + + private OutputFile newManifestFile() { + try { + return Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } }