Skip to content

[AURON #2378] Support runtime filters in native Iceberg scan#2379

Open
lyne7-sc wants to merge 7 commits into
apache:masterfrom
lyne7-sc:feat/iceberg_runtime_filter
Open

[AURON #2378] Support runtime filters in native Iceberg scan#2379
lyne7-sc wants to merge 7 commits into
apache:masterfrom
lyne7-sc:feat/iceberg_runtime_filter

Conversation

@lyne7-sc

@lyne7-sc lyne7-sc commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2378

Rationale for this change

Spark's BatchScanExec can carry runtime filters for dynamic partition pruning. These filters are resolved after planning and can reduce the input partitions/files that a scan should read.

Auron's native Iceberg scan should preserve and apply these runtime filters so dynamic partition pruning can reduce Iceberg scan work on the native path.

What changes are included in this PR?

  • Pass BatchScanExec.runtimeFilters into NativeIcebergTableScanExec.
  • Re-plan the Iceberg scan with runtime filters before native execution.
  • Cache runtime-filtered Iceberg scan plans separately from static scan plans.
  • Add a shim method to copy BatchScanExec with runtime filters across supported Spark versions.
  • Add Iceberg integration tests for:
    • Parquet native scan with dynamic partition pruning;
    • ORC native scan with dynamic partition pruning;
    • dynamic pruning to empty partitions;
    • changelog scan with dynamic partition pruning.

Are there any user-facing changes?

Queries using native Iceberg scan can read fewer partitions/files when Spark dynamic partition pruning applies. No API change.

How was this patch tested?

Added AuronIcebergIntegrationSuite cases.

def withRuntimeFilters(
exec: BatchScanExec,
runtimeFilters: Seq[SparkExpression]): BatchScanExec = {
if (exec.runtimeFilters == runtimeFilters) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard exec.runtimeFilters == runtimeFilters looks like it's always true, so the Shims.get.copyBatchScanExecWithRuntimeFilters(...) else-branch is never taken. NativeIcebergTableScanExec is only ever constructed at IcebergConvertProvider.scala:59 as NativeIcebergTableScanExec(e, plan, e.runtimeFilters), so its runtimeFilters field is always the same object as basedScan.runtimeFilters (same e). Both call sites of withRuntimeFiltersNativeIcebergTableScanExec.scala:68 and :250 — pass withRuntimeFilters(basedScan, runtimeFilters) with runtimeFilters eq basedScan.runtimeFilters, and the node (a LeafExecNode) is never rebuilt with different filters anywhere in the tree.

Two things follow from that. First, all five @sparkver overloads of the new shim in ShimsImpl.scala (plus the abstract method in Shims.scala) are never invoked at runtime, so the new integration tests can't exercise them — a wrong version-specific copy(...) argument list would surface only as a compile error on that profile, never as a test failure. Second, doCanonicalize at NativeIcebergTableScanExec.scala:249-250 reduces to the previous basedScan.canonicalized (the wrapper returns basedScan unchanged), so the new comment there — "first make sure it sees the top-level runtime filters" — describes a transformation that doesn't currently happen.

Is this intentional groundwork for a future path that builds the node with filters different from basedScan (in which case a comment saying so, plus a test that takes the copy branch, would make the shim's ~40 version-specific lines defensible), or could withRuntimeFilters and the shim be dropped in favor of using basedScan directly? Since the field is always basedScan.runtimeFilters, I'm curious which direction you had in mind.

exec.applyPartialClustering,
exec.replicatePartitions)

@sparkver("3.5 / 4.0 / 4.1")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This groups 4.1 with 3.5/4.0 on the assumption that Spark 4.1's BatchScanExec constructor is still (output, scan, runtimeFilters, ordering, table, spjParams). The shims module compiles for 4.1 even though iceberg doesn't build there, so if 4.1 changed that constructor the 4.1 profile would fail to compile rather than fail a test. Was the 4.1 branch actually built against a 4.1 profile, or is this optimistic grouping ahead of 4.1 GA? If it hasn't been compiled against 4.1 yet, would it be safer to split 4.1 into its own branch (or drop it from the group) until the signature is confirmed?

private lazy val plan: IcebergScanPlan = {
if (runtimeFilters.nonEmpty) {
val filteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters)
IcebergScanSupport.plan(filteredScan, useRuntimeFilters = true).getOrElse(staticPlan)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When runtime-filtered planning returns None, this falls back to staticPlan — the unfiltered plan over all partitions — with no log line. Correctness is safe here (the enclosing join re-applies the predicate, and prune-to-empty returns Some(Seq.empty) rather than None, so it doesn't hit this branch), so this is really an observability question: someone debugging "did DPP actually apply?" gets no signal that the query quietly scanned everything. Would a logWarning on the getOrElse(staticPlan) branch help — something noting that runtime-filtered planning was unavailable and all partitions are being scanned?

None
}
} catch {
case NonFatal(t) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This NonFatal catch wraps prepare / waitForSubqueries / filteredPartitions and returns None, which flows up to inputPartitions and full-scans all partitions. That also swallows a genuine DPP subquery or broadcast execution failure and turns it into a silent full-table scan, where vanilla Spark's BatchScanExec would have surfaced the failure. Partitioning errors here isn't wrong for correctness (the join re-filters), but it changes error semantics — a real failure in waitForSubqueries gets masked. Should a waitForSubqueries failure be swallowed and fall through to a full scan at all, or would you rather let it propagate so a real subquery/broadcast failure stays visible? (It does log a warning, so this is about whether full-scanning past a genuine failure is the right default, not about missing logs.)

}

try {
MethodUtils.invokeMethod(exec, true, "prepare")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepare() is a public SparkPlan method and exec is a typed BatchScanExec, so exec.prepare() compiles directly — only waitForSubqueries (protected) and filteredPartitions genuinely need reflective access. Calling exec.prepare() here would drop one reflection call and leave a single reflection helper (invokeDeclaredMethod) for the two methods that actually need it. Minor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support runtime filters for native Iceberg scan

2 participants