Skip to content

[SPARK-56868][SQL] Extract V2 runtime-filter + partition planning into a shared helper#55887

Closed
vitaliili-db wants to merge 2 commits into
apache:masterfrom
vitaliili-db:spark-unify-v2-runtime-filter-helper
Closed

[SPARK-56868][SQL] Extract V2 runtime-filter + partition planning into a shared helper#55887
vitaliili-db wants to merge 2 commits into
apache:masterfrom
vitaliili-db:spark-unify-v2-runtime-filter-helper

Conversation

@vitaliili-db
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Lift the body of BatchScanExec.filteredPartitions (runtime filter pushdown, re-planning, and KeyedPartitioning validation + None-padding) into a new PushDownUtils.filterAndPlanPartitions helper so the logic can be reused by alternative DataSourceV2 physical scan operators.

Why are the changes needed?

The runtime-filter pushdown pipeline in BatchScanExec.filteredPartitions contains non-trivial logic: V2 predicate translation (DPP + scalar subqueries via SPARK-56467), iterative PartitionPredicate pushdown (SPARK-55596), KeyedPartitioning validation, and None-padding to preserve SPJ key alignment. This refactoring allows for reuse of this logic by alternative scan operators.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing coverage, no new logic added

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude (Anthropic), via Claude Code

…o a shared `PushDownUtils.filterAndPlanPartitions` helper

### What changes were proposed in this pull request?

Lift the body of `BatchScanExec.filteredPartitions` (runtime filter pushdown,
re-planning, and `KeyedPartitioning` validation + `None`-padding) into a new
`PushDownUtils.filterAndPlanPartitions` helper so the logic can be reused by
alternative DataSourceV2 physical scan operators in downstream forks
without duplicating ~60 lines of subtle SPJ-alignment code.

Three mechanical pieces:

1. `PushDownUtils.pushRuntimeFilters` now takes a by-name
   `Option[Seq[PartitionPredicateField]]` instead of `Table`. The only
   thing it needed from `Table` was `table.partitioning` for the
   iterative-pushdown branch; passing the precomputed schema lets callers
   that have a `Scan` but not a `Table` (e.g., a connector that holds the
   partition transforms separately) reuse this code path. By-name avoids
   any cost for scans that don't opt into iterative pushdown.
2. New `getPartitionPredicateSchema(transforms: Array[Transform], output)`
   overload that the existing `(table, output)` overload now delegates to.
3. New `filterAndPlanPartitions(scan, runtimeFilters, partitionPredicateSchema,
   output, outputPartitioning, inputPartitions): Seq[Option[InputPartition]]`
   helper. `BatchScanExec.filteredPartitions` is now a one-line call.

No behavior changes for existing callers. The only caller of
`pushRuntimeFilters` in `apache/spark` was `BatchScanExec.filteredPartitions`,
which is now replaced. The `getPartitionPredicateSchema(relation)` and
`(table, output)` overloads remain for other call sites
(`V2ScanRelationPushDown`, `GroupBasedRowLevelOperationScanPlanning`,
`OptimizeMetadataOnlyDeleteFromTable`).

### Why are the changes needed?

The runtime-filter pushdown pipeline in `BatchScanExec.filteredPartitions`
contains non-trivial logic: V2 predicate translation (DPP + scalar
subqueries via SPARK-56467), iterative `PartitionPredicate` pushdown
(SPARK-55596), `KeyedPartitioning` validation, and `None`-padding to
preserve SPJ key alignment. As more pushdown rounds get added (e.g., the
proposed `SupportsPushDownCatalystRuntimeFiltering` for Catalyst-runtime
filter pushdown), this logic gets harder to keep in sync across
alternative DSv2 physical operators.

Extracting the helper:
- Keeps the partition-pruning pipeline as a single source of truth.
- Lets downstream forks (e.g., the Databricks Photon scan operator) call
  the same helper instead of inlining a subset, avoiding silent drift.
- Decouples `pushRuntimeFilters` from `Table`, so callers that hold a
  `Scan` and a `Transform[]` (but no `Table`) can reuse it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests cover the helper path end-to-end through `BatchScanExec`:
- `DataSourceV2EnhancedRuntimePartitionFilterSuite` exercises rounds 1
  (translatable V2 predicates) and 2 (iterative `PartitionPredicate`).
- `DataSourceV2SQLSuiteV2Filter.SPARK-56467` exercises scalar-subquery
  partition pruning.

No new tests added; the helper is a direct extraction with no behavior
change.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude (Anthropic), via Claude Code
Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a pure refactor: it lifts the body of BatchScanExec.filteredPartitions (runtime-filter pushdown + re-planning + KeyedPartitioning validation + None-padding) into a new PushDownUtils.filterAndPlanPartitions helper. The extracted block is byte-identical to the original and existing tests (KeyGroupedPartitioningSuite, DataSourceV2EnhancedRuntimePartitionFilterSuite, etc.) continue to cover the path.

Two related signature changes happen in PushDownUtils:

  • pushRuntimeFilters drops its table: Table parameter in favor of partitionPredicateSchema: => Option[Seq[PartitionPredicateField]] (by-name). The by-name design lets callers skip schema derivation when iterative pushdown isn't supported.
  • A new getPartitionPredicateSchema(transforms: Array[Transform], output) overload is added so callers without a full Table can supply transforms directly.

Main feedback: the motivating "alternative DataSourceV2 physical scan operators" mentioned in the description aren't in this diff and no second caller exists in the tree. That makes the API shape (by-name thunks, new transforms-based overload, removal of the Table-accepting pushRuntimeFilters signature) hard to evaluate. Linking the planned consumer PR/JIRA — or including the consumer here — would make the abstraction concrete. A couple of smaller notes inline.

* @return one entry per original input partition: `Some(part)` for surviving partitions and
* `None` for partition keys whose splits were entirely pruned (SPJ alignment)
*/
def filterAndPlanPartitions(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you link the follow-up PR/JIRA for the alternative scan operator that motivates this extraction? Without seeing the second caller it's hard to validate the parameter shape (by-name partitionPredicateSchema, by-name inputPartitions, and the new transforms-based getPartitionPredicateSchema overload) — they look defensive for BatchScanExec but are presumably load-bearing for the upcoming caller.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is future work, refactoring to simplify BatchScanExec and centralize logic

scan: Scan,
runtimeFilters: Seq[Expression],
table: Table,
partitionPredicateSchema: => Option[Seq[PartitionPredicateField]],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this method took table: Table and computed the schema internally; now every caller has to call getPartitionPredicateSchema(table, output) themselves and pass the result. For callers that have a Table (the only existing caller today), that's two calls where there used to be one. Consider keeping a thin Table-accepting overload as the ergonomic default, with this partitionPredicateSchema-accepting form for callers that don't have a Table.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree to take Table for convenience. As there's only one caller here, let's just have the method take Table and do the extraction inside?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else {
(outputPartitioning match {
case k: KeyedPartitioning =>
inputPartitions.sortBy(_.asInstanceOf[HasPartitionKey].partitionKey())(k.keyRowOrdering)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When outputPartitioning is KeyedPartitioning, this branch unconditionally casts each input partition to HasPartitionKey. For BatchScanExec this is safe because DataSourceV2ScanExecBase.outputPartitioning only produces KeyedPartitioning when every input partition already implements HasPartitionKey. The helper itself doesn't document or enforce this invariant, though — a future caller pairing a KeyedPartitioning with non-HasPartitionKey partitions would hit a cryptic ClassCastException. Worth a sentence in the Scaladoc, or an explicit precondition check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, enforced condition

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to me, im ok with not linking another jira for some other potential use case, it seems a good refactor that reduces the size of BatchScanExec, if done right

*
* Must be called at execute time: runtime filters carry [[DynamicPruningExpression]] and
* scalar-subquery references whose values are only resolved after their broadcast/subquery
* side completes. Callers should wrap the result in a `lazy val` so the mutating
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dont need to mention 'lazy val' explicitly. but we can keep the recommendation for callers to ensure its called once per V2 scan instance

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param partitionPredicateSchema by-name schema for iterative [[PartitionPredicate]] pushdown
* @param output scan output attributes
* @param outputPartitioning Spark-side output partitioning (used for SPJ validation)
* @param inputPartitions by-name original (unfiltered) partitions; consulted only when
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename, 'originalPartitions'. its a bit awkward to decipher what the comment infers on the caller, i would just leave it those comments out and just explain what it is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I would skip the scaladoc on 'by-name' , its already apparent in the arg definition

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @return one entry per original input partition: `Some(part)` for surviving partitions and
* `None` for partition keys whose splits were entirely pruned (SPJ alignment)
*/
def filterAndPlanPartitions(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you guys think of replanWithRuntimeFilters() ? cc @gengliangwang

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, renamed

val newPartitions = scan.toBatch.planInputPartitions()

outputPartitioning match {
case k: KeyedPartitioning =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just add a small comment here (or above) that this block is to pad expected partitions with empty for SPJ case

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Address review feedback on `PushDownUtils.filterAndPlanPartitions`:

- Rename `filterAndPlanPartitions` -> `replanWithRuntimeFilters` for clarity.
- `pushRuntimeFilters` and `replanWithRuntimeFilters` now take `table: Table`
  directly; the partition-predicate schema is derived inside, lazily, only
  when the scan opts into iterative pushdown. Simplifies the single caller
  and drops the by-name `Option[Seq[PartitionPredicateField]]` parameter.
- Rename `inputPartitions` -> `originalPartitions` and bind it to a local
  in the no-filter branch so the by-name expression is evaluated once.
- Document the precondition that `KeyedPartitioning` implies every input
  partition implements `HasPartitionKey`; add an explicit
  `SparkException` in the un-filtered branch to match the filtered branch.
- Drop the prescriptive "wrap in a `lazy val`" Scaladoc; keep the
  substantive "must run at most once per scan instance" guidance.
- Drop the by-name prose from Scaladoc; the `=>` in the signature is
  self-explanatory.
- Add a short comment over the SPJ key-padding block explaining its role.

No behavior changes. The existing single caller (`BatchScanExec`) is
updated to the new signature.

Generated-by: Claude (Anthropic), via Claude Code
@gengliangwang
Copy link
Copy Markdown
Member

Thanks, merging to master/4.x

gengliangwang pushed a commit that referenced this pull request May 15, 2026
…o a shared helper

### What changes were proposed in this pull request?

Lift the body of `BatchScanExec.filteredPartitions` (runtime filter pushdown, re-planning, and `KeyedPartitioning` validation + `None`-padding) into a new `PushDownUtils.filterAndPlanPartitions` helper so the logic can be reused by alternative DataSourceV2 physical scan operators.

### Why are the changes needed?

The runtime-filter pushdown pipeline in `BatchScanExec.filteredPartitions` contains non-trivial logic: V2 predicate translation (DPP + scalar subqueries via SPARK-56467), iterative `PartitionPredicate` pushdown (SPARK-55596), `KeyedPartitioning` validation, and `None`-padding to preserve SPJ key alignment. This refactoring allows for reuse of this logic by alternative scan operators.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing coverage, no new logic added

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude (Anthropic), via Claude Code

Closes #55887 from vitaliili-db/spark-unify-v2-runtime-filter-helper.

Authored-by: Vitalii Li <vitalii.li@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 6a855fd)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants