Skip to content

Commit

Permalink
Add Manifest Stats in snapshot summary.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Apr 29, 2024
1 parent 01bc864 commit f2b85d3
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 4 deletions.
8 changes: 8 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,14 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
"1.5.0":
org.apache.iceberg:iceberg-core:
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.metrics.CounterResult org.apache.iceberg.metrics.CommitMetricsResult::totalDataManifestFiles()"
justification: "Added new parameters for manifest stats"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.metrics.CounterResult org.apache.iceberg.metrics.CommitMetricsResult::totalDeleteManifestFiles()"
justification: "Added new parameters for manifest stats"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> apply = Lists.newArrayList();
Iterables.addAll(apply, newManifestsWithMetadata);
apply.addAll(keptManifests);
apply.forEach(summaryBuilder::addedManifestStats);

return apply;
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
manifests.addAll(snapshot.allManifests(ops.io()));
}

manifests.forEach(summaryBuilder::addedManifestStats);

return manifests;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> manifests = Lists.newArrayList();
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));
manifests.forEach(summaryBuilder::addedManifestStats);

return manifests;
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SnapshotSummary {
public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
public static final String TOTAL_DATA_MANIFEST_FILES = "total-data-manifest-files";
public static final String TOTAL_DELETE_MANIFEST_FILES = "total-delete-manifest-files";

public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");

Expand Down Expand Up @@ -144,6 +146,10 @@ public void addedManifest(ManifestFile manifest) {
metrics.addedManifest(manifest);
}

public void addedManifestStats(ManifestFile manifest) {
metrics.addedManifestStats(manifest);
}

public void set(String property, String value) {
properties.put(property, value);
}
Expand Down Expand Up @@ -229,6 +235,8 @@ private static class UpdateMetrics {
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long removedEqDeletes = 0L;
private long totalDataManifestFiles = 0L;
private long totalDeleteManifestFiles = 0L;
private boolean trustSizeAndDeleteCounts = true;

void clear() {
Expand All @@ -248,6 +256,8 @@ void clear() {
this.removedPosDeletes = 0L;
this.addedEqDeletes = 0L;
this.removedEqDeletes = 0L;
this.totalDataManifestFiles = 0L;
this.totalDeleteManifestFiles = 0L;
this.trustSizeAndDeleteCounts = true;
}

Expand All @@ -263,6 +273,12 @@ void addTo(ImmutableMap.Builder<String, String> builder) {
setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles);
setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
setIf(totalDataManifestFiles > 0, builder, TOTAL_DATA_MANIFEST_FILES, totalDataManifestFiles);
setIf(
totalDeleteManifestFiles > 0,
builder,
TOTAL_DELETE_MANIFEST_FILES,
totalDeleteManifestFiles);

if (trustSizeAndDeleteCounts) {
setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize);
Expand Down Expand Up @@ -334,6 +350,18 @@ void addedManifest(ManifestFile manifest) {
this.trustSizeAndDeleteCounts = false;
break;
}

addedManifestStats(manifest);
}

void addedManifestStats(ManifestFile manifest) {
switch (manifest.content()) {
case DATA:
this.totalDataManifestFiles++;
break;
case DELETES:
this.totalDeleteManifestFiles++;
}
}

void merge(UpdateMetrics other) {
Expand All @@ -353,6 +381,8 @@ void merge(UpdateMetrics other) {
this.removedPosDeletes += other.removedPosDeletes;
this.addedEqDeletes += other.addedEqDeletes;
this.removedEqDeletes += other.removedEqDeletes;
this.totalDataManifestFiles += other.totalDataManifestFiles;
this.totalDeleteManifestFiles += other.totalDeleteManifestFiles;
this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface CommitMetricsResult {
String ADDED_EQ_DELETES = "added-equality-deletes";
String REMOVED_EQ_DELETES = "removed-equality-deletes";
String TOTAL_EQ_DELETES = "total-equality-deletes";
String TOTAL_DATA_MANIFEST_FILES = "total-data-manifest-files";
String TOTAL_DELETE_MANIFEST_FILES = "total-delete-manifest-files";

@Nullable
TimerResult totalDuration();
Expand Down Expand Up @@ -123,6 +125,12 @@ public interface CommitMetricsResult {
@Nullable
CounterResult totalEqualityDeletes();

@Nullable
CounterResult totalDataManifestFiles();

@Nullable
CounterResult totalDeleteManifestFiles();

static CommitMetricsResult from(
CommitMetrics commitMetrics, Map<String, String> snapshotSummary) {
Preconditions.checkArgument(null != commitMetrics, "Invalid commit metrics: null");
Expand Down Expand Up @@ -163,6 +171,10 @@ static CommitMetricsResult from(
.removedEqualityDeletes(
counterFrom(snapshotSummary, SnapshotSummary.REMOVED_EQ_DELETES_PROP))
.totalEqualityDeletes(counterFrom(snapshotSummary, SnapshotSummary.TOTAL_EQ_DELETES_PROP))
.totalDataManifestFiles(
counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DATA_MANIFEST_FILES))
.totalDeleteManifestFiles(
counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DELETE_MANIFEST_FILES))
.build();
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCommitReporting.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public void addAndDeleteDataFiles() {

assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(20L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);

assertThat(metrics.totalDataManifestFiles().value()).isEqualTo(1L);
assertThat(metrics.totalDeleteManifestFiles()).isNull();
}

@TestTemplate
Expand Down Expand Up @@ -122,6 +125,9 @@ public void addAndDeleteDeleteFiles() {
assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(30L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(30L);

assertThat(metrics.totalDataManifestFiles()).isNull();
assertThat(metrics.totalDeleteManifestFiles().value()).isEqualTo(1L);

// now remove those 2 positional + 1 equality delete files
table
.newRewrite()
Expand Down Expand Up @@ -153,6 +159,9 @@ public void addAndDeleteDeleteFiles() {

assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(30L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);

assertThat(metrics.totalDataManifestFiles()).isNull();
assertThat(metrics.totalDeleteManifestFiles().value()).isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -191,5 +200,7 @@ public void addAndDeleteManifests() throws IOException {
assertThat(metrics.addedDataFiles().value()).isEqualTo(1L);
assertThat(metrics.addedRecords().value()).isEqualTo(1L);
assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(10L);
assertThat(metrics.totalDataManifestFiles().value()).isEqualTo(2L);
assertThat(metrics.totalDeleteManifestFiles()).isNull();
}
}
34 changes: 33 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TestSnapshot.testManifestStats;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
Expand Down Expand Up @@ -62,6 +63,8 @@ public void testRewriteManifestsAppendedDirectly() throws IOException {
"manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));

table.newFastAppend().appendManifest(newManifest).commit();

testManifestStats(table);
long appendId = table.currentSnapshot().snapshotId();

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);
Expand All @@ -71,6 +74,7 @@ public void testRewriteManifestsAppendedDirectly() throws IOException {
List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
validateManifestEntries(
manifests.get(0), ids(appendId), files(FILE_A), statuses(ManifestEntry.Status.EXISTING));
}
Expand All @@ -87,6 +91,8 @@ public void testRewriteManifestsWithScanExecutor() throws IOException {

table.newFastAppend().appendManifest(newManifest).commit();

testManifestStats(table);

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);
AtomicInteger scanThreadsIndex = new AtomicInteger(0);
table
Expand All @@ -106,6 +112,7 @@ public void testRewriteManifestsWithScanExecutor() throws IOException {

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);
testManifestStats(table);
assertThat(scanThreadsIndex.get())
.as("Thread should be created in provided pool")
.isGreaterThan(0);
Expand All @@ -129,11 +136,13 @@ public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOExceptio

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

testManifestStats(table);
table.rewriteManifests().clusterBy(file -> "").commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
// get the correct file order
List<DataFile> files;
List<Long> ids;
Expand Down Expand Up @@ -162,12 +171,14 @@ public void testReplaceManifestsSeparate() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

// cluster by path will split the manifest into two
testManifestStats(table);

// cluster by path will split the manifest into two
table.rewriteManifests().clusterBy(file -> file.path()).commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(2);
testManifestStats(table);
manifests.sort(Comparator.comparing(ManifestFile::path));

validateManifestEntries(
Expand All @@ -185,6 +196,7 @@ public void testReplaceManifestsConsolidate() throws IOException {
table.newFastAppend().appendFile(FILE_B).commit();
long appendIdB = table.currentSnapshot().snapshotId();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// cluster by constant will combine manifests into one
Expand All @@ -194,6 +206,7 @@ public void testReplaceManifestsConsolidate() throws IOException {
List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
// get the file order correct
List<DataFile> files;
List<Long> ids;
Expand Down Expand Up @@ -277,6 +290,8 @@ public void testReplaceManifestsMaxSize() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

testManifestStats(table);

// cluster by constant will combine manifests into one but small target size will create one per
// entry
BaseRewriteManifests rewriteManifests = spy((BaseRewriteManifests) table.rewriteManifests());
Expand Down Expand Up @@ -319,6 +334,7 @@ public void testConcurrentRewriteManifest() throws IOException {
})
.commit();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// commit the rewrite manifests in progress - this should perform a full rewrite as the manifest
Expand Down Expand Up @@ -362,6 +378,7 @@ public void testAppendDuringRewriteManifest() {
table.newFastAppend().appendFile(FILE_B).commit();
long appendIdB = table.currentSnapshot().snapshotId();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// commit the rewrite manifests in progress
Expand Down Expand Up @@ -395,6 +412,7 @@ public void testRewriteManifestDuringAppend() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

testManifestStats(table);
// commit the append in progress
append.commit();
long appendIdB = table.currentSnapshot().snapshotId();
Expand Down Expand Up @@ -422,6 +440,9 @@ public void testBasicManifestReplacement() throws IOException {
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);

table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();

testManifestStats(table);

Snapshot secondSnapshot = table.currentSnapshot();

ManifestFile firstNewManifest =
Expand All @@ -439,6 +460,8 @@ public void testBasicManifestReplacement() throws IOException {
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();

testManifestStats(table);

Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests(table.io());
assertThat(manifests).hasSize(3);
Expand Down Expand Up @@ -480,12 +503,17 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

testManifestStats(table);

Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests(table.io());
assertThat(firstSnapshotManifests).hasSize(1);
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);

table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();

testManifestStats(table);

Snapshot secondSnapshot = table.currentSnapshot();

ManifestFile firstNewManifest =
Expand All @@ -503,6 +531,8 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();

testManifestStats(table);

Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests(table.io());
assertThat(manifests).hasSize(3);
Expand Down Expand Up @@ -573,6 +603,8 @@ public void testWithMultiplePartitionSpec() throws IOException {

table.newAppend().appendFile(newFileZ).commit();

testManifestStats(table);

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(3);

RewriteManifests rewriteManifests = table.rewriteManifests();
Expand Down

0 comments on commit f2b85d3

Please sign in to comment.