Skip to content

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to use different ShuffleOrigin for the shuffle required from stateful operators.

Spark has been using ENSURE_REQUIREMENTS as ShuffleOrigin which is open for optimization e.g. AQE can adjust the shuffle spec. Quoting the code of ENSURE_REQUIREMENTS:

// Indicates that the shuffle operator was added by the internal `EnsureRequirements` rule. It
// means that the shuffle operator is used to ensure internal data partitioning requirements and
// Spark is free to optimize it as long as the requirements are still ensured.
case object ENSURE_REQUIREMENTS extends ShuffleOrigin

But the distribution requirement for stateful operators is lot more strict - it has to use the all expressions to calculate the hash (for partitioning) and the number of shuffle partitions must be the same with the spec. This is because stateful operator assumes that there is 1:1 mapping between the partition for the operator and the "physical" partition for checkpointed state. That said, it is fragile if we allow any optimization to be made against shuffle for stateful operator.

To prevent this, this PR introduces a new ShuffleOrigin with note that the shuffle is not expected to be "modified".

Why are the changes needed?

This exposes a possibility of broken state based on the contract. We introduced StatefulOpClusteredDistribution in similar reason.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT added.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan Please take a look, thanks!

@HeartSaVioR
Copy link
Contributor Author

I see no further feedback from others. Thanks! Merging to master.

himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…quired from stateful operators

### What changes were proposed in this pull request?

This PR proposes to use different ShuffleOrigin for the shuffle required from stateful operators.

Spark has been using ENSURE_REQUIREMENTS as ShuffleOrigin which is open for optimization e.g. AQE can adjust the shuffle spec. Quoting the code of ENSURE_REQUIREMENTS:

```
// Indicates that the shuffle operator was added by the internal `EnsureRequirements` rule. It
// means that the shuffle operator is used to ensure internal data partitioning requirements and
// Spark is free to optimize it as long as the requirements are still ensured.
case object ENSURE_REQUIREMENTS extends ShuffleOrigin
```

But the distribution requirement for stateful operators is lot more strict - it has to use the all expressions to calculate the hash (for partitioning) and the number of shuffle partitions must be the same with the spec. This is because stateful operator assumes that there is 1:1 mapping between the partition for the operator and the "physical" partition for checkpointed state. That said, it is fragile if we allow any optimization to be made against shuffle for stateful operator.

To prevent this, this PR introduces a new ShuffleOrigin with note that the shuffle is not expected to be "modified".

### Why are the changes needed?

This exposes a possibility of broken state based on the contract. We introduced StatefulOpClusteredDistribution in similar reason.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48382 from HeartSaVioR/SPARK-49905.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants