Skip to content

feat: AQE DPP broadcast reuse for Iceberg native scans#4033

Open
mbutrovich wants to merge 5 commits intoapache:mainfrom
mbutrovich:iceberg_reuse
Open

feat: AQE DPP broadcast reuse for Iceberg native scans#4033
mbutrovich wants to merge 5 commits intoapache:mainfrom
mbutrovich:iceberg_reuse

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4022.

Rationale for this change

Under AQE, Spark's PlanAdaptiveDynamicPruningFilters converts SubqueryAdaptiveBroadcastExec to SubqueryBroadcastExec for DPP broadcast reuse. However, CometIcebergNativeScanExec wraps BatchScanExec and hides its runtimeFilters from the plan's expression tree. Spark's rule can't see the DPP expressions, so the SAB stays unconverted and the dim table executes independently — a double broadcast execution.

The existing workaround used reflection to set InSubqueryExec's private result field, bypassing executeCollect() (which throws on SAB). This was fragile and didn't achieve broadcast reuse.

What changes are included in this PR?

New rule: CometPlanAdaptiveDynamicPruningFilters

A columnar rule (registered as postColumnarTransitions) that converts SubqueryAdaptiveBroadcastExec to CometSubqueryBroadcastExec inside CometIcebergNativeScanExec.originalPlan.runtimeFilters. The subquery wraps the join's already-materialized BroadcastQueryStageExec, achieving true broadcast reuse — no re-execution of the dim table.

Key design decisions:

  • Registered as postColumnarTransitions, not queryStageOptimizerRule. CometExecRule runs in preColumnarTransitions and recreates scan instances, which would discard earlier modifications.
  • Matches by buildKeys exprIds to disambiguate multiple broadcast joins in the same plan.
  • Searches both CometBroadcastHashJoinExec and BroadcastHashJoinExec to handle Spark fallback (e.g., disabled Comet BHJ config). Uses CometSubqueryBroadcastExec for Comet broadcasts (Arrow data) and SubqueryBroadcastExec for Spark broadcasts (HashedRelation).
  • Falls back to Literal.TrueLiteral when no matching broadcast join exists (e.g., SortMergeJoin). This disables DPP but produces correct results.

Metrics: LazyIcebergMetric

Replaces capturedMetricValues -> serializedPartitionData chain with LazyIcebergMetric, whose value getter lazily triggers planning. This decouples metrics MAP construction (accessed by SparkPlanInfo before AQE runs) from DPP resolution, which must happen after the rule converts the SAB.

equals fix

CometIcebergNativeScanExec.equals now includes runtimeFilters. Without this, transformUp can't detect changes when the rule replaces SAB expressions, because the old and new scans are "equal" by the old definition.

Shim changes

  • ShimSubqueryBroadcast: adds createSubqueryBroadcastExec (version-safe constructor) and resolveSubqueryAdaptiveBroadcast (reflection fallback for 3.4, unreachable throw for 3.5+).
  • ShimCometSparkSessionExtensions: moved from spark-3.x/ to spark-3.4/ and spark-3.5/ (originally needed for injectQueryStageOptimizerRule shim, kept as-is since the split is harmless).

Reflection hack removal

The inline reflection code in CometIcebergNativeScanExec.serializedPartitionData (setInSubqueryResult, Cast matching, manual column index lookup) is removed on 3.5+. On 3.4, the reflection fallback is preserved in the shim since the rule API (injectColumnar) works on 3.4 but CometPlanAdaptiveDynamicPruningFilters may not convert the SAB in all edge cases.

How are these changes tested?

8 new tests in CometIcebergNativeSuite:

  • AQE DPP - single filter with partition pruning: verifies CometSubqueryBroadcastExec + BroadcastQueryStageExec child + correct results
  • AQE DPP - multiple filters on two partition columns: two DPP filters on the same join reuse the same broadcast
  • AQE DPP - CometSubqueryBroadcastExec replaces SubqueryAdaptiveBroadcastExec: verifies only 1 CometBroadcastExchangeExec in the plan (broadcast reuse)
  • AQE DPP - multiple DPP filters reuse same broadcast: both filters reuse the same BroadcastQueryStageExec
  • AQE DPP - two separate broadcast joins disambiguated by buildKeys: two separate broadcast joins, each SAB maps to the correct one
  • AQE DPP - graceful fallback when broadcast join is not Comet: disabled Comet BHJ, falls back to SubqueryBroadcastExec
  • AQE DPP - empty broadcast result prunes all partitions: empty dim table, query returns empty
  • AQE DPP - no broadcast join (SMJ) disables DPP gracefully: SortMergeJoin with no broadcast, SAB converted to Literal.TrueLiteral

…eflection hack with a columnar rule that wires CometSubqueryBroadcastExec to reuse the join's already-materialized broadcast exchange, eliminating the double execution of the dim table.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf: Iceberg DPP executes dim table broadcast twice instead of reusing join's broadcast exchange

1 participant