feat: support AQE DPP broadcast reuse for Iceberg native scans#4215
Open
mbutrovich wants to merge 6 commits intoapache:mainfrom
Open
feat: support AQE DPP broadcast reuse for Iceberg native scans#4215mbutrovich wants to merge 6 commits intoapache:mainfrom
mbutrovich wants to merge 6 commits intoapache:mainfrom
Conversation
…still lots of instrumentation to remove.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #4022.
Rationale for this change
Under AQE, Spark's
PlanAdaptiveDynamicPruningFiltersrewritesSubqueryAdaptiveBroadcastExec(SAB) toSubqueryBroadcastExecso DPP filters reuse the join's already-materialized broadcast. For Iceberg native scans this rewrite was a no-op:CometIcebergNativeScanExeckeptruntimeFiltersinside its@transient originalPlan, where neither Spark's expression walks nor our owntransformExpressionsUppasses could see them. The SAB stayed unconverted and the dim table executed a second time as a standalone broadcast.#4112 fixed the equivalent problem for V1 native Parquet by lifting
runtimeFiltersto a top-level constructor field and using Spark's standardprepare/waitForSubqueriesflow. This PR applies the same design to V2 Iceberg, replacing the earlier prototype in #4033, and alignsCometNativeScanExecandCometIcebergNativeScanExecso both scans go through the same DPP and subquery resolution path.What changes are included in this PR?
runtimeFiltersto a top-level constructor field onCometIcebergNativeScanExecso Spark'sproductIterator-based expression walks (and ourtransformExpressionsUppasses) see and rewrite it directly. MirrorsBatchScanExecand matches the V1 design from feat: AQE DPP for native Parquet scans with broadcast reuse #4112.CometLeafExec.ensureSubqueriesResolved(), bridging Comet's customfindAllPlanDatadata-collection path with Spark's standardprepare->waitForSubqueriesflow. Removes the deadlock-prone reflection hack from feat: AQE DPP broadcast reuse for Iceberg native scans #4033 and eliminates ad-hoc double-checked locking.CometNativeScanExecto use the same flow (dropped its redundantdoPrepareoverride and outer DPP filter loop) so V1 and V2 stay in sync.CometPlanAdaptiveDynamicPruningFilters(3.5+) that converts the SAB insideruntimeFilterstoCometSubqueryBroadcastExec(orSubqueryBroadcastExecif the join fell back to vanilla Spark). Matches bybuildKeysexprIds to disambiguate multiple broadcast joins, and rewrites toLiteral.TrueLiteralwhen no matching broadcast join exists (e.g., SMJ) so DPP is disabled but results stay correct. On 3.4 Iceberg falls back without reuse:CometSpark34AqeDppFallbackRulewalks scanpartitionFilters, whichBatchScanExecdoesn't have.LazyIcebergMetricdefers metric value resolution.SparkPlanInfo.fromSparkPlanreads the metrics map for SQL UI events at planning time, before AQE's queryStageOptimizerRules run; without deferral that read would triggerserializedPartitionDataagainst an unconverted SAB.serializedPartitionDatarebuildsoriginalPlanwith the current top-levelruntimeFiltersbefore serializing. Otherwise Spark'sPlanAdaptiveDynamicPruningFiltersrewrite is invisible to the@transient originalPlanandserializePartitionsre-translates the unresolvedInSubqueryExec.ParallelCollectionRDDshape thatBatchScanExec.inputRDDreturns when DPP prunes all partitions (matched by class name since it isprivate[spark]).How are these changes tested?
CometIcebergNativeSuitecovering broadcast reuse, multiple DPP filters sharing a broadcast, buildKeys-based disambiguation across joins, BHJ fallback to vanilla Spark, SMJ (no broadcast) graceful disable, empty broadcast pruning all partitions, cross-stage scalar subqueries, and the V2 SPJ shape variations across Spark versions.CometIcebergNativeSuite,CometExecSuite,CometDppFallbackRepro3949Suite, andCometShuffleFallbackStickinessSuite.