Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, Spark 3.4: Add filter to Rewrite position deletes #7582

Merged
merged 4 commits into from Aug 7, 2023

Conversation

szehon-ho
Copy link
Collaborator

This adds support for filter in RewritePositionDeleteFiles.

Logic: RewritePositionDeletesFiles is based on PositionDeletesTable (a metadata table representing position deletes). Like all metadata table, it does partition predicate pushdown by transforming the partition spec into something that can evaluate the partition predicate on the metadata table (ie my_table.position_deletes.partition.part_col instead of my_table.part_col).

But here the RewritePositionDeleteFiles action actually gets a filter on the original table, not the PositionDeletesTable metadata table. So we short-circuit this partition-spec transformation in this case.

This is done by adding to the PositionDeletesTableScan a new method baseTableFilter() that takes filter based on the base table, not the position_deletes table. Some checks are added to ensure it is exclusively set from the filter based on the position_deletes table.

@@ -130,18 +132,38 @@ private Schema calculateSchema() {
public static class PositionDeletesBatchScan
extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan {

private boolean filterSet = false;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is messy, but the overall idea is, we either have a filter set on the metadata table, or the base table, and can only handle one of these.

@szehon-ho
Copy link
Collaborator Author

Rebased

@Override
protected CloseableIterable<ScanTask> doPlanFiles() {
String schemaString = SchemaParser.toJson(tableSchema());

// prepare transformed partition specs and caches
Map<Integer, PartitionSpec> transformedSpecs = transformSpecs(tableSchema(), table().specs());
Map<Integer, PartitionSpec> transformedSpecs = transformSpecsIfNecessary();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, can't they actually work together? There seems to be quite a bit of logic that decides whether to use a filter on the base table or a filter on the metadata table.

Suppose we add baseTableFilter as in this PR. Can we do something like this later?

Expressions.and(filter(), Projections.inclusive(spec, isCaseSensitive()).project(baseTableRowFilter))

Whenever we compute evalCache?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , the latest change supports both filters now. I use another ManifestEvaluator, gotten via ManifestEvaluator.forPartitionFilter(), which internally does the Projection


LoadingCache<Integer, ResidualEvaluator> residualCache =
partitionCacheOf(
transformedSpecs,
spec ->
ResidualEvaluator.of(
spec,
shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(),
shouldIgnoreResiduals() ? Expressions.alwaysTrue() : effectiveFilter(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this seems a bit suspicious to use the base table filter as the residual. This will be propagated to task and I am not sure those filters will be even resolvable against the metadata table schema.

I need to take a closer look with fresh eyes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, made this back to filter()

* @return this for method chaining
*/
@Override
public BatchScan filter(Expression expr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually needed? Won't the base implementation work given the current version of newRefinedScan?

partitionCacheOf(
transformedSpecs,
spec -> ManifestEvaluator.forRowFilter(filter(), spec, isCaseSensitive()));

// iterate through delete manifests
List<ManifestFile> manifests = snapshot().deleteManifests(table().io());

CloseableIterable<ManifestFile> matchingManifests =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we do this filter only if either of the filter expression is non-trivial? Otherwise, what's the point of doing this work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is for manifests, so shouldn't matter that much. Never mind.

@@ -223,12 +289,16 @@ public void close() throws IOException {

@Override
public CloseableIterator<ScanTask> iterator() {
Expression partitionFilter =
Projections.inclusive(spec, isCaseSensitive()).project(baseTableFilter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have cached this and used ManifestEvaluator.forPartitionFilter but it is probably not worth it.

@@ -133,16 +137,23 @@ public RewritePositionDeleteFiles.Result execute() {
}

private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
Table deletesTable =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add it here and why rename fileTasks? Don't we have to modify planFiles instead?

private CloseableIterable<PositionDeletesScanTask> planFiles() {
  Table deletesTable =
      MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);

  PositionDeletesBatchScan scan = (PositionDeletesBatchScan) deletesTable.newBatchScan();

  return CloseableIterable.transform(
      scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
      task -> (PositionDeletesScanTask) task);
}

CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
PositionDeletesTable.PositionDeletesBatchScan deletesScan =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do direct import to shorten the lines like in the sample snippet I mentioned above?

@@ -421,7 +516,6 @@ public void testPartitionEvolutionRemove() throws Exception {
.rewritePositionDeletes(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed?

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems correct to me overall. I left a few suggestions.

@szehon-ho szehon-ho merged commit 51782d3 into apache:master Aug 7, 2023
41 checks passed
@szehon-ho
Copy link
Collaborator Author

Thanks @aokolnychyi for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants