Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend RewriteManifests with a way to add/delete manifests #512

Merged
merged 1 commit into from Nov 12, 2019

Conversation

aokolnychyi
Copy link
Contributor

This PR extends RewriteManifests with a way to directly delete/add manifests, which enables us to rewrite manifests using an external process.

@aokolnychyi
Copy link
Contributor Author

@bryanck could you take a look?

@bryanck
Copy link
Contributor

bryanck commented Oct 4, 2019

I feel this new functionality might be better suited to a separate class, something like ReplaceManifests? The current rewrite functionality and add/delete don't share much in common. If we did want to share some code, maybe create a base class.

OutputFile newManifestPath = manifestPath(manifestSuffix.getAndIncrement());
Set<ManifestEntry.Status> allowedStatuses = Sets.newHashSet(ManifestEntry.Status.EXISTING);
newManifests.add(ManifestWriter.copyManifest(
reader, newManifestPath, snapshotId(), summaryBuilder, allowedStatuses));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't pass summaryBuilder because the files are not actually added or deleted. The set of files should be the same.

Side note: should we check that the number of data files in deleted manifests matches the number of data files in added manifests before committing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to BaseRewriteManifests, our utility reads only live entries via liveEntries in FilteredManifests and produces staged manifests that contain entries with status EXISTING. ManifestWriter$copyManifest is changed to work for arbitrary use cases. In this particular one, it won't report any files as added or removed because all manifest entries have one status: EXISTING (validated in addManifest).

// the manifest must be rewritten with this update's snapshot ID
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
OutputFile newManifestPath = manifestPath(manifestSuffix.getAndIncrement());
Set<ManifestEntry.Status> allowedStatuses = Sets.newHashSet(ManifestEntry.Status.EXISTING);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context, manifests that are copied are allowed to have any entries. The validation for the copy in AppendFiles that all entries were ADDED was because the entries are for files that were being appended to the dataset. But here, any manifest could be rewritten, so any status is allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the logic in BaseRewriteManifests and MergingSnapshotProducer: rewriting manifests produces a new snapshot where files that were present before are added with status EXISTING while keeping their original snapshot id.

By restricting manifest entries to EXISTING, we validate that RewriteManifests is not used to add/delete files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, but I don't think that the logic in the copy does this. We just need to restrict entries to live entries and convert ADDED entries to EXISTING.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left as is according to this comment.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Oct 6, 2019

@bryanck I also thought about having a separate class but I was a bit worried about adding a new method with a similar purpose to the Table API as it already contains quite some logic. The change did feel too big to me as we still reuse a couple of methods.

@aokolnychyi
Copy link
Contributor Author

@bryanck @rdblue thanks for the initial review! There are a couple of questions I want to clarify before updating the PR. Let me know what you think.

@aokolnychyi
Copy link
Contributor Author

@rdblue I am not sure changing the status of entries from ADDED to EXISTING while copying manifests would give us any benefits. Instead, I would leave that to the utilities that produce staged manifests. Later, we want to get rid of copying manifests at all.

Here is how we read metadata per file:

    val manifestPathDS = spark.sparkContext.parallelize(manifestPaths, numPartitions.toInt).toDS()
    manifestPathDS.flatMap { path =>
      val reader = ManifestReader.read(io.newInputFile(path), specLookup)
      val spec = reader.spec
      try {
        val filteredManifest = reader.select(ImmutableList.of("*"))
        filteredManifest.liveEntries.asScala.map { entry =>
          val file = entry.file.copy()
          val snapshotId = entry.snapshotId
          (spec.specId, snapshotId, strategy.clusterBy(spec, file), file)
        }
      } finally {
        reader.close()
      }
    }

While writing staged manifets, we call writer.existing(file, snapshotId) to have correct entry status.

@aokolnychyi
Copy link
Contributor Author

@bryanck @rdblue I've updated the PR and put the logic into a separate class as it seems reasonable to me. I still assume utilities will convert manifest entry statuses from ADDED to EXISTING themselves as it will allow us to avoid copying manifests in the future.

@aokolnychyi
Copy link
Contributor Author

@rdblue @bryanck Ready for another review round.

performRewrite(currentManifests);
} else {
// just keep any new manifests that were added since the last apply(), don't rerun
addExistingFromNewCommit(currentManifests);
Copy link
Contributor

@rdblue rdblue Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment and method name were useful to help understand what's happening here. We at least need to document that this function is responsible for adding manifests that were added since the snapshot that was originally rewritten. I can see how using "keep" in the method name makes sense since it adds manifests to keptManifests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about keepNewActiveManifests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I modified this part is that the comment is no longer always true after this change. For example, when we use only add/delete manifests, there will be no rewrite and might be no retry, but we will still call keepActiveManifests to keep all active manifests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I think that works.

for (ManifestFile manifest : newManifests) {
if (!committed.contains(manifest)) {
cleanUncommitted(newManifests, committed);
cleanUncommitted(addedManifests, committed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this clean up added manifest files?

I think that is a responsibility of the caller because the caller may be able to reuse some of them if the commit fails because a deleted manifest is no longer in the table. This is similar to a write operation, where if Iceberg can't commit the write, it doesn't delete the data files that were added in the failed operation. Instead, the writer has abort functions that are called outside of the snapshot commit: https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java#L183

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up copies of the added manifests, not the original manifests that were added. I believe the caller doesn't have access to the copied manifests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. Thanks for the explanation!

@rdblue
Copy link
Contributor

rdblue commented Nov 11, 2019

@aokolnychyi, there is just one problem with cleanUncommitted. Everything else is minor, though would be nice to fix the nits.

@rdblue
Copy link
Contributor

rdblue commented Nov 12, 2019

@aokolnychyi, looks like there's a test failure, but I think this is ready to merge when that's fixed.

@aokolnychyi
Copy link
Contributor Author

@rdblue, thanks for the review. I modified an error message and forgot to update a test. Updated the PR now.

@aokolnychyi
Copy link
Contributor Author

Seems like some Python tests failed this time.

@rdblue
Copy link
Contributor

rdblue commented Nov 12, 2019

Yeah, looks like python is having trouble. We'll look into it, but it shouldn't block this commit.

@rdblue rdblue merged commit 70c1940 into apache:master Nov 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants