Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark-3.4: Harmonize RewriteDataFilesSparkAction #7630

Merged
merged 3 commits into from
May 19, 2023

Conversation

ajantha-bhat
Copy link
Member

@ajantha-bhat ajantha-bhat commented May 17, 2023

some code style fixups to RewriteDeleteFileSparkAction that can be applied as well to RewriteDataFileSparkAction #7565 (review)

Fixes #7625

@@ -159,7 +159,7 @@ public RewriteDataFiles.Result execute() {

validateAndInitOptions();

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add EMPTY_RESULT static variable like in the action for position deletes? There are two return statements in this method (not this line).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Added.

@@ -235,6 +199,37 @@ Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId
}
}

private StructLikeMap<List<FileScanTask>> groupByPartition(
StructType partitionType, Iterable<FileScanTask> tasks) {
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should continue to use table.spec() and we don't need coercePartition. The old behavior of grouping all files from an old spec should be preserved. The two actions behave differently w.r.t. output partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats strange. So, we don't have enough testcases to catch this behaviour?

I have reverted it back now. Also, I didn't use transform in the end of planFileGroups because it needs a special check of fileGroups.size() > 0 which is not handled in transform due to generics

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

if (partialProgressEnabled) {
return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
Copy link
Collaborator

@szehon-ho szehon-ho May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess i'd personally put the variable back, as in the RewriteDeleteFilesSparkAction case it was just a call without arguments, but here there are arguments so probably is more clearer if we assign it separately, but not a big deal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too thought about it during the change.
Since it is in if-else, I don't think it will make a huge difference.

I will leave it as it is (because the style looks similar to RewriteDeleteFilesSparkAction)

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I took another pass, and realize we can still do the refactoring while preserving the logic as @aokolnychyi suggested

}
}

Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
Copy link
Collaborator

@szehon-ho szehon-ho May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment on the exact line , but on line 196-304 below , initially you had done some refactoring there. I see it was reverted as @aokolnychyi said we can't coerce partition to change the behavior in this discussion: #7630 (comment)

That is correct, but we can still do some refactoring without coerce. I took a brief look and came up with this to maintain the logic:

First add the methods:

private StructLikeMap<List<FileScanTask>> groupByPartition(
      StructType partitionType, Iterable<FileScanTask> tasks) {
    StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
    StructLike emptyStruct = GenericRecord.create(partitionType);

    for (FileScanTask task : tasks) {
      // If a task uses an incompatible partition spec the data inside could contain values
      // which belong to multiple partitions in the current spec. Treating all such files as
      // un-partitioned and grouping them together helps to minimize new files made.
      StructLike taskPartition =
          task.file().specId() == table.spec().specId()
              ? task.file().partition()
              : emptyStruct;

      List<FileScanTask> files = filesByPartition.get(taskPartition);
      if (files == null) {
        files = Lists.newArrayList();
      }

      files.add(task);
      filesByPartition.put(taskPartition, files);
    }
    return filesByPartition;
  }

  private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(StructLikeMap<List<FileScanTask>> filesByPartition) {
    return filesByPartition.transformValues(this::planFileGroups);
  }

  private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
    return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
  }

and then we can make this method like:

  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
    CloseableIterable<FileScanTask> fileScanTasks =
        table
            .newScan()
            .useSnapshot(startingSnapshotId)
            .filter(filter)
            .ignoreResiduals()
            .planFiles();

    try {
      StructType partitionType = table.spec().partitionType();
      StructLikeMap<List<FileScanTask>> filesByPartition = groupByPartition(partitionType, fileScanTasks);
      return fileGroupsByPartition(filesByPartition);
    } finally {
      try {
        fileScanTasks.close();
      } catch (IOException io) {
        LOG.error("Cannot properly close file iterable while planning for rewrite", io);
      }
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried it before. Have you checked this comment?
#7630 (comment)

If I use transformValues, it cannot check the existing fileGroups.size() > 0 check due to generics. Are we ok with that? That's why I didn't do

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried it before. Have you checked this comment?
#7630 (comment)

Yea I saw that, so hence my suggestion did not use coerce, but rather the table's latest partitionType. It should be exactly the same as the existing code now?

If I use transformValues, it cannot check the existing fileGroups.size() > 0 check due to generics. Are we ok with that? That's why I didn't do

Yea , in RewritePositionDeleteFilesSparkAction, we've moved that check as part of toGroupStream, which I think you also copied here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've moved that check as part of toGroupStream

I can see this now. SGTM.
I have addressed the suggestion.

@szehon-ho szehon-ho merged commit 8ace681 into apache:master May 19, 2023
41 checks passed
@szehon-ho
Copy link
Collaborator

Merged, thanks @ajantha-bhat

@ajantha-bhat
Copy link
Member Author

@szehon-ho: Thanks for the review. I will backport to other spark versions later today.

Also, I can see that new changes like RewritePositionDeleteFilesSparkAction.java is not backported. Do we need to handle this before 1.3 release?

szehon-ho pushed a commit that referenced this pull request May 23, 2023
This commit backports PR #7630 to Spark 3.3.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Harmonize RewriteData, DeleteFilesSparkAction
4 participants