Skip to content

[SPARK-56669][SQL] Implement group filtering for WriteDelta row level operations#55612

Closed
aokolnychyi wants to merge 1 commit intoapache:masterfrom
aokolnychyi:spark-56669
Closed

[SPARK-56669][SQL] Implement group filtering for WriteDelta row level operations#55612
aokolnychyi wants to merge 1 commit intoapache:masterfrom
aokolnychyi:spark-56669

Conversation

@aokolnychyi
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR implement group filtering for WriteDelta row level operations.

Why are the changes needed?

These changes are needed to close the gap in WriteDelta plans.

Does this PR introduce any user-facing change?

Changes are backward compatible.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

Claude Code v2.1.123.

@aokolnychyi
Copy link
Copy Markdown
Contributor Author

@gengliangwang @juliuszsompolski @cloud-fan, can you please review this one?

Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for closing this gap — WriteDelta carrying a groupFilterCondition symmetric to ReplaceData makes a lot of sense, and routing both through a shared injectGroupFilters keeps the rule clean.

One thing I think is missing: RewriteDeleteFromTable.buildWriteDeltaPlan (line 118) still constructs WriteDelta(writeRelation, cond, project, relation, projections) without groupFilterCondition, while its sibling buildReplaceDataPlan (line 92) sets one. As a result, DELETE on SupportsDelta connectors won't benefit from this PR even though RewriteUpdateTable.buildWriteDeltaPlan and RewriteMergeIntoTable.buildWriteDeltaPlan do. Was this intentional, or should it also do val groupFilterCond = if (groupFilterEnabled) Some(cond) else None and pass it in? If so, a delta-based DELETE test analogous to the new update runtime group filtering (...) cases would be worth adding to DeltaBasedDeleteFromTableSuite.

A few smaller comments inline.

protected def groupFilterEnabled: Boolean = conf.runtimeRowLevelOperationGroupFilterEnabled

// converts a MERGE condition into an EXISTS subquery for runtime filtering
protected def toMergeGroupFilterCondition(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both call sites of toMergeGroupFilterCondition are inside RewriteMergeIntoTable.scala (the MERGE buildReplaceDataPlan and buildWriteDeltaPlan). Lifting it into RewriteRowLevelCommand widens the trait's surface for RewriteUpdateTable and RewriteDeleteFromTable, which don't use it. Could it stay as a private def inside RewriteMergeIntoTable instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, forgot to revert. Fixed.

@@ -93,7 +103,7 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
// this rule assigns runtime filters to both scan relations (will be shared at runtime)
// and must transform the runtime filter condition to use correct expr IDs for each relation
// see RewriteUpdateTable for more details
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this branch fires for both ReplaceData and WriteDelta, the comment block above only describes the group-based path: the UNION rewrite with two identical scan relations only applies to ReplaceData UPDATE (via RewriteUpdateTable.buildReplaceDataWithUnionPlan); the new WriteDelta UPDATE path has a single scan and the attrMap/transform step is effectively a no-op. A small clarification that delta-based UPDATE goes through the same code with a single scan would help future readers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

None
}

private def findReadRelation(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body of DeltaBasedRowLevelOperation.findReadRelation is essentially a copy of GroupBasedRowLevelOperation.findReadRelation, differing only in the absence of the allowMultipleReads cases. Worth pulling the shared collection + size-1 + error logic into a small helper (e.g., into RewriteRowLevelCommand or a private object here) so the two extractors stay aligned over time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored.

@aokolnychyi
Copy link
Copy Markdown
Contributor Author

@gengliangwang, this does not apply to DELETE because they project only row ID columns, so this optimization doesn't help there.

@gengliangwang
Copy link
Copy Markdown
Member

Thanks, merging to master

@LuciferYang
Copy link
Copy Markdown
Contributor

LuciferYang commented Apr 30, 2026

Hi @aokolnychyi, this looks to have broken master sql - other tests. Parent commit 68b936a9c36 is green; merge commit 5ef2e1ba174 and HEAD 384cd0d3e83 both have 25 failures, all in the 7 Delta-based row-level operation suites.

The failing tests are the "transactional checks" ones added in this PR (MergeIntoTableSuiteBase.scala:72 and several around UpdateTableSuiteBase.scala:952). Sample failure from DeltaBasedMergeIntoTableSuite.self merge with transactional checks:

ArrayBuffer([IsNotNull(dep), EqualTo(dep,hr)], [IsNotNull(salary), EqualTo(salary,100)],
            [IsNotNull(dep), EqualTo(dep,hr)], [IsNotNull(salary), EqualTo(salary,100)])
had size 4 instead of expected size 2

The doubling matches the new group-filter scan: matchingRowsPlan re-scans the target, and for MERGE RewritePredicateSubquery also re-scans the source. The hard-coded counts in the new base-class tests still assume the old delta behavior:

val expectedNumScans = if (deltaMerge) 2 else 4   // delta now 4
val expectedNumTargetScans = if (deltaMerge) 1 else 2   // delta now 2
val expectedNumSourceScans = if (deltaMerge) 1 else 2   // delta now 2

The DeltaBased*Suite.scala files were updated in this PR but the assertions in MergeIntoTableSuiteBase / UpdateTableSuiteBase were missed. CI: check-run 73714758514 on 5ef2e1ba174.

Local repro:

build/sbt "sql/testOnly org.apache.spark.sql.connector.DeltaBased*MergeIntoTable*Suite org.apache.spark.sql.connector.DeltaBased*UpdateTable*Suite"

Do you have time to push a follow-up?

@LuciferYang
Copy link
Copy Markdown
Contributor

I will revert this one first to restore ci. also cc @gengliangwang @cloud-fan

gengliangwang pushed a commit that referenced this pull request May 1, 2026
… operations

### What changes were proposed in this pull request?

This PR implements group filtering for WriteDelta row level operations.

It re-applies #55612 (commit `5ef2e1ba174`, reverted in `8e8fee2692f`) and resolves the test failures reported in #55612 (comment) by updating the scan-count assertions in the transactional check tests in `MergeIntoTableSuiteBase` and `UpdateTableSuiteBase`. With group filtering, `matchingRowsPlan` re-scans the target, and for MERGE `RewritePredicateSubquery` also re-scans the source. For MERGE the delta scan counts now match the non-delta values, so the `deltaMerge` conditionals collapse. For UPDATE the delta counts double but remain under the non-delta values because `ReplaceData` still adds further scans.

### Why are the changes needed?

These changes are needed to close the gap in WriteDelta plans.

### Does this PR introduce _any_ user-facing change?

Changes are backward compatible.

### How was this patch tested?

This PR comes with tests. Locally verified all 9 affected suites are green (517 tests):

```
build/sbt 'sql/testOnly \
  org.apache.spark.sql.connector.DeltaBasedMergeIntoTableSuite \
  org.apache.spark.sql.connector.DeltaBasedMergeIntoTableWithDeletionVectorsSuite \
  org.apache.spark.sql.connector.DeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite \
  org.apache.spark.sql.connector.DeltaBasedUpdateTableSuite \
  org.apache.spark.sql.connector.DeltaBasedUpdateTableWithDeletionVectorsSuite \
  org.apache.spark.sql.connector.DeltaBasedUpdateAsDeleteAndInsertTableSuite \
  org.apache.spark.sql.connector.DeltaBasedNoMetadataDeleteFromTableSuite \
  org.apache.spark.sql.connector.GroupBasedMergeIntoTableSuite \
  org.apache.spark.sql.connector.GroupBasedUpdateTableSuite'
```

### Was this patch authored or co-authored using generative AI tooling?

Claude Code v2.1.123.

Closes #55635 from gengliangwang/spark-56669-redo.

Lead-authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Co-authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants