Skip to content

Conversation

@andygrove
Copy link
Member

Summary

This PR introduces an experimental optimization that allows the native shuffle writer to directly execute the child native plan instead of reading intermediate batches via JNI. This avoids the JNI round-trip for single-source native plans.

Current flow:

Native Plan → ColumnarBatch → JNI → ScanExec → ShuffleWriterExec

Optimized flow:

Native Plan → (directly in native) → ShuffleWriterExec

Configuration

The optimization is controlled by a new config option:

  • spark.comet.exec.shuffle.directNative.enabled (default: false)

Scope

The optimization currently applies when:

  • Native shuffle mode is enabled (spark.comet.shuffle.mode=native)
  • The shuffle's child is a single-source native plan with CometNativeScanExec
  • The partitioning is not RangePartitioning (which requires sampling)

Changes

File Change
CometShuffleDependency.scala Added childNativePlan field to pass native plan to writer
CometShuffleExchangeExec.scala Added detection logic for single-source native plans
CometShuffleManager.scala Pass native plan to shuffle writer
CometNativeShuffleWriter.scala Use child native plan directly when available
CometConf.scala Added COMET_SHUFFLE_DIRECT_NATIVE_ENABLED config option
CometDirectNativeShuffleSuite.scala Comprehensive test suite with 15 tests

Test plan

  • Added CometDirectNativeShuffleSuite with 15 tests covering:
    • Basic optimization with single scan source
    • Filter/project operators
    • Single partition and multi-partition cases
    • Config enable/disable behavior
    • Range partitioning fallback
    • JVM shuffle mode fallback
    • Various data types
    • Edge cases (empty tables, filtered results)
    • Correctness comparison between optimized and non-optimized paths
  • Verified existing CometNativeShuffleSuite tests still pass (16/16)

🤖 Generated with Claude Code

andygrove and others added 2 commits January 20, 2026 19:45
This PR introduces an experimental optimization that allows the native shuffle
writer to directly execute the child native plan instead of reading intermediate
batches via JNI. This avoids the JNI round-trip for single-source native plans.

Current flow:
  Native Plan → ColumnarBatch → JNI → ScanExec → ShuffleWriterExec

Optimized flow:
  Native Plan → (directly in native) → ShuffleWriterExec

The optimization is:
- Disabled by default (spark.comet.exec.shuffle.directNative.enabled=false)
- Only applies to CometNativeShuffle (not columnar JVM shuffle)
- Only applies to single-source native scans (CometNativeScanExec)
- Does not apply to RangePartitioning (requires sampling)

Changes:
- CometShuffleDependency: Added childNativePlan field to pass native plan
- CometShuffleExchangeExec: Added detection logic for single-source native plans
- CometShuffleManager: Pass native plan to shuffle writer
- CometNativeShuffleWriter: Use child native plan directly when available
- CometConf: Added COMET_SHUFFLE_DIRECT_NATIVE_ENABLED config option
- CometDirectNativeShuffleSuite: Comprehensive test suite with 15 tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@codecov-commenter
Copy link

codecov-commenter commented Jan 21, 2026

Codecov Report

❌ Patch coverage is 63.43284% with 49 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.96%. Comparing base (f09f8af) to head (6132098).
⚠️ Report is 863 commits behind head on main.

Files with missing lines Patch % Lines
...t/execution/shuffle/CometShuffleExchangeExec.scala 20.51% 27 Missing and 4 partials ⚠️
...t/execution/shuffle/CometNativeShuffleWriter.scala 79.06% 9 Missing and 9 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3230      +/-   ##
============================================
+ Coverage     56.12%   59.96%   +3.83%     
- Complexity      976     1433     +457     
============================================
  Files           119      170      +51     
  Lines         11743    15819    +4076     
  Branches       2251     2616     +365     
============================================
+ Hits           6591     9486    +2895     
- Misses         4012     5008     +996     
- Partials       1140     1325     +185     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

andygrove and others added 2 commits January 21, 2026 08:26
Subqueries (e.g., bloom filters with might_contain) are registered with
the parent execution context ID. Direct native shuffle creates a new
execution context with a different ID, causing subquery lookup to fail
with "Subquery X not found for plan Y" errors.

This change detects ScalarSubquery expressions in the child plan and
falls back to the standard execution path when present.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants