Skip to content

Conversation

@chirag-s-db
Copy link
Contributor

What changes were proposed in this pull request?

When a KeyGroupedShuffleSpec is used to shuffle another child of a JOIN, we must be able to push down JOIN keys or partition values to be able to ensure that both children have matching partitioning. If one child reports a KeyGroupedPartitioning but we can't push down these values (for example, if the child was a key-grouped scan that was checkpointed), then this information cannot be pushed down to the child scan and we should avoid using this shuffle spec to shuffle other children.

Why are the changes needed?

Prevents potential correctness issue when key-grouped partitioning is used on a checkpointed RDD.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

See test changes.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 17, 2025
@chirag-s-db
Copy link
Contributor Author

@cloud-fan @szehon-ho Could you take a look at this PR when you get the chance?

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! @sunchao FYI

// To choose a KeyGroupedShuffleSpec, we must be able to push down SPJ parameters into
// the scan (for join key positions). If these parameters can't be pushed down, this
// spec can't be used to shuffle other children.
case (idx, _: KeyGroupedShuffleSpec) => canPushDownSPJParamsToScan(children(idx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space after '=>'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Choose all the specs that can be used to shuffle other children
val candidateSpecs = specs
.filter(_._2.canCreatePartitioning)
.filter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for reference , were both checks needed? ie this and the other check in 'checkKeyGroupCompatible'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, both checks are needed. The reason We need the check in checkKeyGroupCompatible for the case that both children are key-grouped partitionings, and this check handles the case where only 1 child is a key-grouped partitioning and is shuffling a non-KGP plan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand this too. In checkKeyGroupCompatible we already makes sure that both children are of KeyGroupedPartitioning. This new check additionally checks that leaf nodes from both are all KeyGroupedPartitionedScan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkKeyGroupCompatible applies to the case where we have 2 KeyGroupedPartitioned scans that are being joined against each other. For example, something like:

SortMergeJoinExec ...
  +- BatchScanExec tbl1 ... -> reporting KeyGroupedPartitioning
  +- BatchScanExec tbl2 ... -> reporting KeyGroupedPartitioning

If one child is not KeyGroupedPartitioned, we can still avoid the shuffle for one child (in general):

SortMergeJoinExec ...
  +- BatchScanExec tbl1 ... -> reporting KeyGroupedPartitioning
  +- ShuffleExchangeExec KeyGroupedPartitioning
    +- BatchScanExec tbl2 ... -> reporting UnknownPartitioning

However, if the child reporting the KeyGroupedPartitioning is not a BatchScanExec, then we can't safely push down the JOIN keys, making it unsafe to do this. This may arise if we call .checkpoint() on a BatchScanExec:

SortMergeJoinExec ...
  +- RDDScanExec ... -> reporting KeyGroupedPartitioning (coming from ckpt of tbl1 scan)
  +- ShuffleExchangeExec KeyGroupedPartitioning
    +- BatchScanExec tbl2 ... -> reporting UnknownPartitioning

This extra check is for this second case, where we want to make sure that we're not using a KeyGroupedPartitioning to shuffle another child of a JOIN without being able to push down JOIN keys. The test "SPARK-53322: checkpointed scans can't shuffle other children on SPJ" is for this case, and will fail without this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation!


private val EnsureRequirements = new EnsureRequirements()

/** Helper to add dummy BatchScanExec child to a dummy plan (to ensure SPJ can kick in). */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually why not have another case class altogether (that inherit from DummySparkPlan) and use that in the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't be a case class (no case-to-case inheritance), but we can just do a normal class

test("SPARK-53322: checkpointed scans avoid shuffles for aggregates") {
withTempDir { dir =>
spark.sparkContext.setCheckpointDir(dir.getPath)
val items_partitions = Array(identity("id"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use camel cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Choose all the specs that can be used to shuffle other children
val candidateSpecs = specs
.filter(_._2.canCreatePartitioning)
.filter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand this too. In checkKeyGroupCompatible we already makes sure that both children are of KeyGroupedPartitioning. This new check additionally checks that leaf nodes from both are all KeyGroupedPartitionedScan?

@chirag-s-db chirag-s-db requested a review from sunchao November 18, 2025 15:52
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending CI

// Choose all the specs that can be used to shuffle other children
val candidateSpecs = specs
.filter(_._2.canCreatePartitioning)
.filter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation!

@cloud-fan
Copy link
Contributor

@chirag-s-db can you re-trigger the CI jobs? Seems flaky

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in dff0620 Nov 19, 2025
@cloud-fan
Copy link
Contributor

@chirag-s-db can you open a new PR against branch-4.1 as this is for correctness?

chirag-s-db added a commit to chirag-s-db/spark that referenced this pull request Nov 19, 2025
…positions can be fully pushed down

### What changes were proposed in this pull request?
When a KeyGroupedShuffleSpec is used to shuffle another child of a JOIN, we must be able to push down JOIN keys or partition values to be able to ensure that both children have matching partitioning. If one child reports a KeyGroupedPartitioning but we can't push down these values (for example, if the child was a key-grouped scan that was checkpointed), then this information cannot be pushed down to the child scan and we should avoid using this shuffle spec to shuffle other children.

### Why are the changes needed?
Prevents potential correctness issue when key-grouped partitioning is used on a checkpointed RDD.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
See test changes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#53098 from chirag-s-db/checkpoint-pushdown.

Lead-authored-by: Chirag Singh <chirag.singh@databricks.com>
Co-authored-by: Chirag Singh <137233133+chirag-s-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@chirag-s-db
Copy link
Contributor Author

@cloud-fan Opened #53132

huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…positions can be fully pushed down

### What changes were proposed in this pull request?
When a KeyGroupedShuffleSpec is used to shuffle another child of a JOIN, we must be able to push down JOIN keys or partition values to be able to ensure that both children have matching partitioning. If one child reports a KeyGroupedPartitioning but we can't push down these values (for example, if the child was a key-grouped scan that was checkpointed), then this information cannot be pushed down to the child scan and we should avoid using this shuffle spec to shuffle other children.

### Why are the changes needed?
Prevents potential correctness issue when key-grouped partitioning is used on a checkpointed RDD.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
See test changes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#53098 from chirag-s-db/checkpoint-pushdown.

Lead-authored-by: Chirag Singh <chirag.singh@databricks.com>
Co-authored-by: Chirag Singh <137233133+chirag-s-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants