Skip to content

Conversation

@mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Feb 8, 2026

Claude helped me with the PR description. After fact-checking a couple of hallucinations about tests that it thought were added, I think this is accurate now:

Which issue does this PR close?

Closes #3442.

Rationale for this change

Dynamic Partition Pruning (DPP) is an important optimization for star schema queries. Previously, native_datafusion scans fell back to Spark when DPP was present. This PR adds full DPP support by deferring partition serialization to execution time, after DPP subqueries resolve.

Absent of DPP optimizations, this also reduces the serialization overhead of scanning large amounts of Parquet data. Previously every Spark partition received every other partition's metadata about Parquet files to read.

What changes are included in this PR?

Architecture:

  • CometNativeScanExec now defers partition serialization to execution time via lazy serializedPartitionData
  • At planning time, CometNativeScan.convert() creates a placeholder operator with just a scan_id
  • At execution time, serializePartitions() resolves DPP subqueries and serializes the filtered partitions
  • Uses originalPlan.partitionFilters instead of partitionFilters because AQE's PlanDynamicPruningFilters transforms InSubqueryExecLiteral.TrueLiteral via makeCopy, but originalPlan is not in the active plan tree and retains the original filters

Config:

  • New spark.comet.scan.dpp.enabled (default: true) replaces spark.comet.dppFallback.enabled
Scan Mode COMET_DPP_ENABLED Result
native_datafusion true (default) CometNativeScanExec with DPP (this PR)
native_datafusion false Fall back to Spark (existing behavior)
Iceberg native true (default) CometIcebergNativeScanExec with DPP (#3349)
Iceberg native false Fall back to Spark (this PR)
auto N/A Fall back to Spark (existing behavior)
native_iceberg_compat N/A Fall back to Spark (existing behavior)

Shims:

  • Added getDppFilteredFilePartitions() and getDppFilteredBucketedFilePartitions() to ShimCometScanExec for Spark 3.4/3.5/4.0
  • Added resolveSubqueryAdaptiveBroadcast() to ShimSubqueryBroadcast for DPP subquery resolution

Other:

  • Removed custom equals/hashCode from CometNativeScanExec in favor of case class defaults to prevent incorrect AQE exchange reuse between scans with different projections

How are these changes tested?

Comet tests:
New tests in CometExecSuite:

  • DPP with native_datafusion scan - join with dynamic partition pruning - verifies basic DPP with partition pruning
  • DPP with native_datafusion scan - multiple partition columns - verifies DPP with two partition columns
  • DPP with native_datafusion scan - SubqueryExec (non-broadcast DPP) - verifies DPP works with non-broadcast subqueries
  • DPP with native_datafusion scan - ReusedSubqueryExec (subquery reuse) - verifies DPP works with reused subqueries

New test in CometIcebergNativeSuite:

  • runtime filtering - DPP with non-broadcast join - verifies Iceberg DPP works with SubqueryExec

Spark SQL tests:
After implementing, we had 24 Spark SQL test failures related to DPP. Updated Spark 3.5.8 diff now looks for CometNativeScanExec and properly pass the tests now.

We also had 22 test failures related to bucket scans with DPP. I modified the partitioning logic and tests pass, so I am confident that we're getting good test coverage out of Spark SQL tests.

@mbutrovich mbutrovich added this to the 0.14.0 milestone Feb 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add DPP support to native_datafusion/CometNativeScan

1 participant