Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ManifestFile Stats in snapshot summary. #10246

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,14 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
"1.5.0":
org.apache.iceberg:iceberg-core:
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.metrics.CounterResult org.apache.iceberg.metrics.CommitMetricsResult::totalDataManifestFiles()"
justification: "Added new parameters for manifest stats"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.metrics.CounterResult org.apache.iceberg.metrics.CommitMetricsResult::totalDeleteManifestFiles()"
justification: "Added new parameters for manifest stats"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> apply = Lists.newArrayList();
Iterables.addAll(apply, newManifestsWithMetadata);
apply.addAll(keptManifests);
apply.forEach(summaryBuilder::addedManifestStats);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add manifestStats with any newManifest.


return apply;
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
manifests.addAll(snapshot.allManifests(ops.io()));
}

manifests.forEach(summaryBuilder::addedManifestStats);
Copy link
Member

@jbonofre jbonofre Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid question: do we have a rough estimate on this change in terms of manifest append performance ? I guess it's not important, but just wondering.

Copy link
Contributor

@Fokko Fokko Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also my main question.

My train of thought: You will need to read the manifest-list in any situation. The number of manifest can vary widely:

  • If FastAppends are used frequently, there will be many small manifests that you want to bundle into batches.
  • If MergeAppends are used, the manifests are rather hefty (8 megabytes by default, set using commit.manifest.target-size-bytes).

With the knowledge from the summary, you could spin up executors before reading the manifest-list, but this can be difficult since you would also need to know the sizes of the manifest to do some effective planning.

The downside is that we add extra information to the metadata-JSON, which can also grow in size when there are many commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Fokko for feedback.

With the knowledge from the summary, you could spin up executors before reading the manifest-list, but this can be difficult since you would also need to know the sizes of the manifest to do some effective planning.

With manifest stats, It can help with better planning in terms of manifest file scans. Without these stats in metadata json either someone will have to assume or do one more IO with manifestList.

The downside is that we add extra information to the metadata-JSON, which can also grow in size when there are many commits.

I agree it adds some extra bytes to metadataJson. But I think here we have options to optimize with expire snapshots. But to avoid an extra IO for planning I dont find any other way.

WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid question: do we have a rough estimate on this change in terms of manifest append performance ? I guess it's not important, but just wondering.

It is only going to add cost with metadata json size. Adding it to snapshot summary is inline operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nk1506 I think the main question (at least from me) is essentially:

Is information on just the total number of data or delete manifests actually helpful for being able to help with whatever planning estimation you're trying to do?

As @Fokko said you'll probably want to know manifest sizes (in bytes) of the files involved in planning since there can be variance depending on the strategy used for the append. I think with just determining based on number of files (and not including sizes, which means you have to read the manifest anyways) there would be a lot of overprovisioning and underprovisioning (if you're trying to a distributed planning).

But definitely open to hearing more, and especially if you are able to share any data points on how this metric helped ; that would give us some more confidence that this is useful and can generalize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But definitely open to hearing more, and especially if you are able to share any data points on how this metric helped ; that would give us some more confidence that this is useful and can generalize.

+1 on this, how would knowing these 2 new stats help with planning?

  • Knowing how many can help you skip any of the manifests during planning?
  • or knowing such will help make manifest rewrite decision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.

Regarding the usages of manifest counts for planning here is my feedback:

  1. Having Manifest counts in advance helps to plan the parallelism. Like spark is doing after reading from ManifestList.
  2. How it will help with SnapshotSummary ?

Engine like Spark doesn't get any benefits from these stats. Since it's parallelism is dynamic with runtime in nature.

But other engines like Dremio which decides it's parallelism(during compiletime) in advance. Providing these stats will help for better parallelism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a discussion with @nk1506 to understand it better.

Dremio (or query engines that uses CBO) need to estimate the cost of the query plan.
parallelism is one of the factor for cost estimation.

Query has planning and execution phase.
So, during planning phase we would like to know how many manifests exist to estimate the parallelism required for reading manifests without doing an actual IO of manifest list (as we want planning phase to be as fast as possible).
We currently estimate the parallelism of data files IO by reading the stats that exist in the snapshot summary. But the stats related to manifest count is missing in snapshot summary and we are unable to estimate. Hence, the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Fokko, @amogh-jahagirdar, @dramaticlly: Can we conclude (move forward) on this based on my above comment? Having these stats will leads to better estimate based on parallelism. Agree that Having size based cost estimation will be more accurate. But count based estimation is still better than no stats.

Is other engines that use CBO like Trino is intersted in these stats?
cc: @findepi, @findinpath thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return manifests;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> manifests = Lists.newArrayList();
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));
manifests.forEach(summaryBuilder::addedManifestStats);

return manifests;
}
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SnapshotSummary {
public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
public static final String TOTAL_DATA_MANIFEST_FILES = "total-data-manifest-files";
public static final String TOTAL_DELETE_MANIFEST_FILES = "total-delete-manifest-files";

public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");

Expand Down Expand Up @@ -144,6 +146,10 @@ public void addedManifest(ManifestFile manifest) {
metrics.addedManifest(manifest);
}

public void addedManifestStats(ManifestFile manifest) {
metrics.addedManifestStats(manifest);
}

public void set(String property, String value) {
properties.put(property, value);
}
Expand Down Expand Up @@ -229,6 +235,8 @@ private static class UpdateMetrics {
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long removedEqDeletes = 0L;
private long totalDataManifestFiles = 0L;
private long totalDeleteManifestFiles = 0L;
private boolean trustSizeAndDeleteCounts = true;

void clear() {
Expand All @@ -248,6 +256,8 @@ void clear() {
this.removedPosDeletes = 0L;
this.addedEqDeletes = 0L;
this.removedEqDeletes = 0L;
this.totalDataManifestFiles = 0L;
this.totalDeleteManifestFiles = 0L;
this.trustSizeAndDeleteCounts = true;
}

Expand All @@ -263,6 +273,12 @@ void addTo(ImmutableMap.Builder<String, String> builder) {
setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles);
setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
setIf(totalDataManifestFiles > 0, builder, TOTAL_DATA_MANIFEST_FILES, totalDataManifestFiles);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are zero counts these stats will be not added to summary.

setIf(
totalDeleteManifestFiles > 0,
builder,
TOTAL_DELETE_MANIFEST_FILES,
totalDeleteManifestFiles);

if (trustSizeAndDeleteCounts) {
setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize);
Expand Down Expand Up @@ -336,6 +352,16 @@ void addedManifest(ManifestFile manifest) {
}
}

void addedManifestStats(ManifestFile manifest) {
switch (manifest.content()) {
case DATA:
this.totalDataManifestFiles++;
break;
case DELETES:
this.totalDeleteManifestFiles++;
}
}

void merge(UpdateMetrics other) {
this.addedFiles += other.addedFiles;
this.removedFiles += other.removedFiles;
Expand All @@ -353,6 +379,8 @@ void merge(UpdateMetrics other) {
this.removedPosDeletes += other.removedPosDeletes;
this.addedEqDeletes += other.addedEqDeletes;
this.removedEqDeletes += other.removedEqDeletes;
this.totalDataManifestFiles += other.totalDataManifestFiles;
this.totalDeleteManifestFiles += other.totalDeleteManifestFiles;
this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface CommitMetricsResult {
String ADDED_EQ_DELETES = "added-equality-deletes";
String REMOVED_EQ_DELETES = "removed-equality-deletes";
String TOTAL_EQ_DELETES = "total-equality-deletes";
String TOTAL_DATA_MANIFEST_FILES = "total-data-manifest-files";
String TOTAL_DELETE_MANIFEST_FILES = "total-delete-manifest-files";

@Nullable
TimerResult totalDuration();
Expand Down Expand Up @@ -123,6 +125,12 @@ public interface CommitMetricsResult {
@Nullable
CounterResult totalEqualityDeletes();

@Nullable
CounterResult totalDataManifestFiles();

@Nullable
CounterResult totalDeleteManifestFiles();

static CommitMetricsResult from(
CommitMetrics commitMetrics, Map<String, String> snapshotSummary) {
Preconditions.checkArgument(null != commitMetrics, "Invalid commit metrics: null");
Expand Down Expand Up @@ -163,6 +171,10 @@ static CommitMetricsResult from(
.removedEqualityDeletes(
counterFrom(snapshotSummary, SnapshotSummary.REMOVED_EQ_DELETES_PROP))
.totalEqualityDeletes(counterFrom(snapshotSummary, SnapshotSummary.TOTAL_EQ_DELETES_PROP))
.totalDataManifestFiles(
counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DATA_MANIFEST_FILES))
.totalDeleteManifestFiles(
counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DELETE_MANIFEST_FILES))
.build();
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCommitReporting.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public void addAndDeleteDataFiles() {

assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(20L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);

assertThat(metrics.totalDataManifestFiles().value()).isEqualTo(1L);
assertThat(metrics.totalDeleteManifestFiles()).isNull();
}

@TestTemplate
Expand Down Expand Up @@ -122,6 +125,9 @@ public void addAndDeleteDeleteFiles() {
assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(30L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(30L);

assertThat(metrics.totalDataManifestFiles()).isNull();
assertThat(metrics.totalDeleteManifestFiles().value()).isEqualTo(1L);

// now remove those 2 positional + 1 equality delete files
table
.newRewrite()
Expand Down Expand Up @@ -153,6 +159,9 @@ public void addAndDeleteDeleteFiles() {

assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(30L);
assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);

assertThat(metrics.totalDataManifestFiles()).isNull();
assertThat(metrics.totalDeleteManifestFiles().value()).isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -191,5 +200,7 @@ public void addAndDeleteManifests() throws IOException {
assertThat(metrics.addedDataFiles().value()).isEqualTo(1L);
assertThat(metrics.addedRecords().value()).isEqualTo(1L);
assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(10L);
assertThat(metrics.totalDataManifestFiles().value()).isEqualTo(2L);
assertThat(metrics.totalDeleteManifestFiles()).isNull();
}
}
34 changes: 33 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TestSnapshot.testManifestStats;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
Expand Down Expand Up @@ -62,6 +63,8 @@ public void testRewriteManifestsAppendedDirectly() throws IOException {
"manifest-file-1.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A));

table.newFastAppend().appendManifest(newManifest).commit();

testManifestStats(table);
long appendId = table.currentSnapshot().snapshotId();

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);
Expand All @@ -71,6 +74,7 @@ public void testRewriteManifestsAppendedDirectly() throws IOException {
List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
validateManifestEntries(
manifests.get(0), ids(appendId), files(FILE_A), statuses(ManifestEntry.Status.EXISTING));
}
Expand All @@ -87,6 +91,8 @@ public void testRewriteManifestsWithScanExecutor() throws IOException {

table.newFastAppend().appendManifest(newManifest).commit();

testManifestStats(table);

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);
AtomicInteger scanThreadsIndex = new AtomicInteger(0);
table
Expand All @@ -106,6 +112,7 @@ public void testRewriteManifestsWithScanExecutor() throws IOException {

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);
testManifestStats(table);
assertThat(scanThreadsIndex.get())
.as("Thread should be created in provided pool")
.isGreaterThan(0);
Expand All @@ -129,11 +136,13 @@ public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOExceptio

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

testManifestStats(table);
table.rewriteManifests().clusterBy(file -> "").commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
// get the correct file order
List<DataFile> files;
List<Long> ids;
Expand Down Expand Up @@ -162,12 +171,14 @@ public void testReplaceManifestsSeparate() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

// cluster by path will split the manifest into two
testManifestStats(table);

// cluster by path will split the manifest into two
table.rewriteManifests().clusterBy(file -> file.path()).commit();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(2);
testManifestStats(table);
manifests.sort(Comparator.comparing(ManifestFile::path));

validateManifestEntries(
Expand All @@ -185,6 +196,7 @@ public void testReplaceManifestsConsolidate() throws IOException {
table.newFastAppend().appendFile(FILE_B).commit();
long appendIdB = table.currentSnapshot().snapshotId();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// cluster by constant will combine manifests into one
Expand All @@ -194,6 +206,7 @@ public void testReplaceManifestsConsolidate() throws IOException {
List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(1);

testManifestStats(table);
// get the file order correct
List<DataFile> files;
List<Long> ids;
Expand Down Expand Up @@ -277,6 +290,8 @@ public void testReplaceManifestsMaxSize() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

testManifestStats(table);

// cluster by constant will combine manifests into one but small target size will create one per
// entry
BaseRewriteManifests rewriteManifests = spy((BaseRewriteManifests) table.rewriteManifests());
Expand Down Expand Up @@ -319,6 +334,7 @@ public void testConcurrentRewriteManifest() throws IOException {
})
.commit();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// commit the rewrite manifests in progress - this should perform a full rewrite as the manifest
Expand Down Expand Up @@ -362,6 +378,7 @@ public void testAppendDuringRewriteManifest() {
table.newFastAppend().appendFile(FILE_B).commit();
long appendIdB = table.currentSnapshot().snapshotId();

testManifestStats(table);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);

// commit the rewrite manifests in progress
Expand Down Expand Up @@ -395,6 +412,7 @@ public void testRewriteManifestDuringAppend() {

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);

testManifestStats(table);
// commit the append in progress
append.commit();
long appendIdB = table.currentSnapshot().snapshotId();
Expand Down Expand Up @@ -422,6 +440,9 @@ public void testBasicManifestReplacement() throws IOException {
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);

table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();

testManifestStats(table);

Snapshot secondSnapshot = table.currentSnapshot();

ManifestFile firstNewManifest =
Expand All @@ -439,6 +460,8 @@ public void testBasicManifestReplacement() throws IOException {
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();

testManifestStats(table);

Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests(table.io());
assertThat(manifests).hasSize(3);
Expand Down Expand Up @@ -480,12 +503,17 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

testManifestStats(table);

Snapshot firstSnapshot = table.currentSnapshot();
List<ManifestFile> firstSnapshotManifests = firstSnapshot.allManifests(table.io());
assertThat(firstSnapshotManifests).hasSize(1);
ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);

table.newFastAppend().appendFile(FILE_C).appendFile(FILE_D).commit();

testManifestStats(table);

Snapshot secondSnapshot = table.currentSnapshot();

ManifestFile firstNewManifest =
Expand All @@ -503,6 +531,8 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc
rewriteManifests.addManifest(secondNewManifest);
rewriteManifests.commit();

testManifestStats(table);

Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests(table.io());
assertThat(manifests).hasSize(3);
Expand Down Expand Up @@ -573,6 +603,8 @@ public void testWithMultiplePartitionSpec() throws IOException {

table.newAppend().appendFile(newFileZ).commit();

testManifestStats(table);

assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(3);

RewriteManifests rewriteManifests = table.rewriteManifests();
Expand Down