Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution #32875

Closed
wants to merge 29 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Jun 11, 2021

What changes were proposed in this pull request?

This PR proposes the following:

  1. Introducing a new trait ShuffleSpec which is used in EnsureRequirements when the node has more than one children and serves two purposes: 1) compare all children and check if they are compatible w.r.t partitioning & distribution, 2) create a new partitioning to re-shuffle the other side in case they are not compatible.
  2. Remove HashClusteredDistribution and replace its usages with ClusteredDistribution.

Under the new mechanism, when EnsureRequirements check whether shuffles are required for a plan node with >1 children, it does the following:

  1. check each child of the node and see if it can satisfy the corresponding required distribution.
  2. check if all children of the node are compatible with each other w.r.t their partitioning and distribution
  3. if 2) fails, choose the best shuffle spec (in terms of shuffle parallelism) that can be used to repartition the other children, so that they will all have compatible partitioning

Why are the changes needed?

Spark currently only allow bucket join when the set of cluster keys from output partitioning exactly match the set of join keys from the required distribution. For instance, in the following:

SELECT * FROM A JOIN B ON A.c1 = B.c1 AND A.c2 = B.c2

bucket join will only be triggered if both A and B are bucketed on columns c1 and c2, in which case Spark will avoid shuffling both sides of the join.

The above requirement, however, is too strict, as shuffle can also be avoided if both A and B are bucketed on either column c1 or c2. That is, if all rows that have the same value in column c1 are clustered into the same partition, then all rows have the same values in column c1 and c2 are also clustered into the same partition.

In order to allow this, we'll need to change the logic of deciding whether two sides of a join operator are "co-partitioned". Currently, this is done by checking each side's output partitioning against its required distribution separately, using Partitioning.satisfies method. Since HashClusteredDistribution requires a HashPartitioning to have the exact match on the cluster keys, this can be done in isolation without looking at the other side's output partitioning and required distribution.

However, the approach is no longer valid if we are going to relax the above constraint, as we need to compare the output partitioning and required distribution on both sides. For instance, in the above example, if A is bucketed on c1 while B is bucketed on c2, we may need to do the following check:

  1. identify where A.c1 and B.c2 is used in the join keys (e.g., position 0 and 1 respectively)
  2. check if the positions derived from both sides exactly match each other (this becomes more complicated if a key appears in multiple positions within the join keys.)

In order to achieve the above, this proposes the following:

trait ShuffleSpec {
  // Used as a cost indicator to shuffle children
  def numPartitions: Int

  // Used to check whether this spec is compatible with `other`
  def isCompatibleWith(other: ShuffleSpec): Boolean

  // Used to create a new partitioning for the other `distribution` in case `isCompatibleWith` failed.
  def createPartitioning(distribution: Distribution): Partitioning
}

A similar API is also required if we are going to support DSv2 DataSourcePartitioning as output partitioning in bucket join scenario, or support custom hash functions such as HiveHash for bucketing. With the former, even if both A and B are partitioned on columns c1 and c2 in the above example, they could be partitioned via different transform expressions, e.g., A is on (bucket(32, c1), day(c2) while B is on (bucket(32, c1), hour(c2). This means we'll need to compare the partitioning from both sides of the join which makes the current approach with Partitioning.satisfies insufficient. The same API isCompatibleWith can potentially be reused for the purpose.

Does this PR introduce any user-facing change?

Yes, now bucket join will be enabled for more cases as mentioned above.

How was this patch tested?

  1. Added a new test suite ShuffleSpecSuite
  2. Added additional tests in EnsureRequirementsSuite.

@SparkQA
Copy link

SparkQA commented Jun 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44209/

@SparkQA
Copy link

SparkQA commented Jun 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44209/

@SparkQA
Copy link

SparkQA commented Jun 11, 2021

Test build #139680 has finished for PR 32875 at commit 3f3ad6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2021

Test build #140377 has finished for PR 32875 at commit 6c441b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait Requirement extends Ordered[Requirement]
  • case class HashRequirement(
  • case class RequirementCollection(requirements: Seq[Requirement]) extends Requirement

@rdblue
Copy link
Contributor

rdblue commented Jul 9, 2021

@sunchao, do both changes in this PR need to be done together? If not, then you may want to separate them into different PRs. If they are tied together, then noting why this needs to be one PR would be helpful.

@sunchao
Copy link
Member Author

sunchao commented Jul 13, 2021

Thanks @rdblue for taking a look. Yes IMO these two are correlated, and I've just updated the "Why are the changes needed?" section above to clarify the motivation. Let me know if you think otherwise or have any comment on this.

The current WIP PR is somewhat messy and I plan to go back to this soon after finishing the item on my hand.

@sunchao sunchao marked this pull request as ready for review August 31, 2021 16:59
@sunchao sunchao marked this pull request as draft August 31, 2021 16:59
@SparkQA
Copy link

SparkQA commented Sep 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47932/

@SparkQA
Copy link

SparkQA commented Sep 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47932/

@SparkQA
Copy link

SparkQA commented Sep 17, 2021

Test build #143425 has finished for PR 32875 at commit 7190faa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao sunchao changed the title [WIP][SPARK-35703] Remove HashClusteredDistribution and relax constraint for bucket join [WIP][SPARK-35703] Relax constraint for bucket join and remove HashClusteredDistribution Sep 21, 2021
@sunchao sunchao changed the title [WIP][SPARK-35703] Relax constraint for bucket join and remove HashClusteredDistribution [SPARK-35703] Relax constraint for bucket join and remove HashClusteredDistribution Sep 21, 2021
@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47987/

@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47987/

@SparkQA
Copy link

SparkQA commented Sep 21, 2021

Test build #143476 has finished for PR 32875 at commit 622467b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48261/

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Test build #143750 has finished for PR 32875 at commit fb32460.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48261/

@SparkQA
Copy link

SparkQA commented Oct 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48288/

@SparkQA
Copy link

SparkQA commented Oct 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48289/

cloud-fan added a commit that referenced this pull request Jan 13, 2022
…s contain all the join keys

### What changes were proposed in this pull request?

This is a followup of #32875 . Basically #32875 did two improvements:
1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function
2. allow bucket join even if the hash partition keys are subset of join keys.

The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned.

This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv.

### Why are the changes needed?

Avoid perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes #35138 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dchvn pushed a commit to dchvn/spark that referenced this pull request Jan 19, 2022
…s contain all the join keys

### What changes were proposed in this pull request?

This is a followup of apache#32875 . Basically apache#32875 did two improvements:
1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function
2. allow bucket join even if the hash partition keys are subset of join keys.

The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned.

This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv.

### Why are the changes needed?

Avoid perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes apache#35138 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Jan 19, 2022
…-partitioning requirement

### What changes were proposed in this pull request?

This is a followup of #32875 . This PR updates `ValidateRequirements` to match the new change in the partitioning-distribution framework, and check the co-partitioning requirement for join nodes.

### Why are the changes needed?

Fix bugs in `ValidateRequirements`

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new test suite

Closes #35225 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 27, 2022

@sunchao

Sorry for the post-review. I didn't know this PR may affect streaming query and indicated later.

I discussed with @cloud-fan about this change, and we are concerned about any possibility on skipping shuffle against grouping keys in stateful operators, "including stream-stream join".

In Structured Streaming, state is partitioned with grouping keys based on Spark's internal hash function, and the number of partition is static. That said, if Spark does not respect the distribution of state against stateful operator, it leads to correctness problem.

So please consider that same key should be co-located for three aspects (left, right, state) in stream-stream join. It's going to apply the same for non-join case, e.g. aggregation against bucket table. other stateful operators will have two aspects, (key, state). In short, state partition must be considered.

@cloud-fan
Copy link
Contributor

@sunchao can we add back HashClusteredDistribution and use it for streaming join/aggregate?

@sunchao
Copy link
Member Author

sunchao commented Jan 27, 2022

@HeartSaVioR no worries, I should have pinged you too :)

In Structured Streaming, state is partitioned with grouping keys based on Spark's internal hash function, and the number of partition is static. That said, if Spark does not respect the distribution of state against stateful operator, it leads to correctness problem.

Could you give me a concrete example of this? Currently the rule only skips shuffle in join if both sides report the same distribution. Also, with the first follow-up by @cloud-fan I think we've already restored the previous behavior.

I'm no Spark streaming expert so still trying to know more about the problem here. :)

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 27, 2022

Could you give me a concrete example of this?

Actually it's hypothetical one; I tried to reproduce it but you may see #35341 that I failed to reproduce with built-in sources.

Currently the rule only skips shuffle in join if both sides report the same distribution

If there is any chance for both sides to change the distributions altogether, rule will skip shuffle in join since they are still having same distribution, but stream-stream join should read from state as well which may be partitioned in different way and the partitioning is not flexible.

That said, state partitioning cannot be changed during query lifetime, at least for now. And we don't have a way to maintain the information of state partition (we only ensure the number of partitions of state, via sticking the config value for the number of "shuffle" partition), so the safest way for state is to constrain the state partitioning as Spark's hash partitioning.

The thing is the hash function - even if source is partitioned/bucketed in same criteria (columns), the hash function and the number of partitions must be same as Spark's state as well for stateful operators. That said, at least as of now, stateful operators cannot leverage the benefits on source side distribution. There's a room to improve this, like let state follow the distribution of inputs, but also store the distribution info to metadata so that the stateful operator forces the state distribution (triggering shuffle) if they are different in future runs.

with the first follow-up by @cloud-fan I think we've already restored the previous behavior.

Could you please refer the commit/PR for this?

@sunchao
Copy link
Member Author

sunchao commented Jan 27, 2022

Thanks @HeartSaVioR . The PR I mentioned is #35138. To my understanding, Spark also skips shuffle for stream-stream join before this PR, when the distribution (bucket) keys exactly match join keys, so it seems the potential issue you mentioned above could occur before this PR too?

@HeartSaVioR
Copy link
Contributor

Oh well... I may need to try hacking the file stream source to support bucketing (regardless of whether it works correctly or not) and check the physical plan.

cc. @cloud-fan Could you please help triage that the problem may exist even before this PR? Would using HashClusteredDistribution "force" using Spark's internal hash function on distribution?

@c21
Copy link
Contributor

c21 commented Jan 27, 2022

I think this PR introduced two things:

So if we speak of the current master branch, I feel it should not break anything for streaming use cases.

when the distribution (bucket) keys exactly match join keys, so it seems the potential issue you mentioned above could occur before this PR too?

Would using HashClusteredDistribution "force" using Spark's internal hash function on distribution?

HashClusteredDistribution forces to use Spark internal hash function - Murmur3Hash. Before this PR, the only way to bucket table is to use HashPartitioning, which has hardcoded hash function in HashPartitioning.partitionIdExpression. So I think if data source bucketed as same as join keys, stream-stream join should be able to avoid shuffle before this PR. After this PR, the stream-stream join behavior is not changed, it should still be able to avoid shuffle.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 28, 2022

Avoid shuffle when joining DSv2(s) bucketed in the way different than Spark bucketing.

This already assumes DSv2 bucketing algorithm can be different from Spark's one and Spark avoids shuffle for this case. It is of course a great improvement in general, but in streaming context, state partitioning is not considered here.

Given state partitioning is not flexible and we are yet to make metadata on state to have the information of the partitioning, our major assumption of the state partitioning is using Spark's internal hash function with the number of shuffle partition as the number of partition. If there is any case the assumption can be broken, we are yet to allow the case for streaming query. That said, there's a room for improvement.

So I think if data source bucketed as same as join keys, stream-stream join should be able to avoid shuffle before this PR. > After this PR, the stream-stream join behavior is not changed, it should still be able to avoid shuffle.

I meant shuffle must be performed for stream-stream join if the source doesn't follow the Spark's internal hash function, to retain the major assumption. Same for other stateful operators.

That said, we may have already broken these cases since we didn't change these operators in this PR. I assume it was due to interchangeability between ClusteredDistribution and HashClusteredDistribution - in older version of Spark I found no difference between twos.

@HeartSaVioR
Copy link
Contributor

cc. @tdas @zsxwing @brkyvz Could you please help double check my theory? Thanks in advance.

@cloud-fan
Copy link
Contributor

I think this is kind of a potential bug. Let's say that we have 2 tables that can report hash partitioning optionally (e.g. controlled by a flag). Assume a streaming query is first run with the flag off, which means the tables do not report hash partitioning, then Spark will add shuffles before the stream-stream join, and the join state (steaming checkpoint) is partitioned by Spark's murmur3 hash function. Then we restart the streaming query with the flag on, and the 2 tables report hash partitioning (not the same as Spark's murmur3). Spark will not add shuffles before stream-stream join this time, and leads to wrong result, because the left/right join child is not co-partitioned with the join state in the previous run.

@HeartSaVioR
Copy link
Contributor

Yes, since we can't change the state partitioning, state partitioning must be considered as the first priority for stateful operators - even we make some improvements here (having information of physical partitioning of state) and feel safer to store state as same as source's partition, the operator must request state's partitioning so that it doesn't break anything when source has changed.

@sunchao
Copy link
Member Author

sunchao commented Jan 28, 2022

Thanks! Yes this makes sense, I spoke with @viirya offline today and he also pointed this potential issue.

Let me work on a fix. I'm thinking whether there is anyway to treat streaming case specially in EnsureRequirements instead of reintroducing HashClusteredDistribution though.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 28, 2022

I would suspect we will do the same mistake unintentionally if we don't explicitly call out. Please bring back the way for "operators" to explicitly require Spark's internal hash partition on specifying requirement on distribution. This would be future-proof when the state partitioning can be flexible - at that time the operators would require Spark to fit to the partitioning of state (even if it's not Spark's internal hash partition).

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 28, 2022

When we specify HashClusteredDistribution on stateful operator, there are major assumptions that

  1. HashClusteredDistribution creates HashPartitioning and we will never ever change it for the future.
  2. We will never ever change the implementation of partitionIdExpression in HashPartitioning for the future, so that Partitioner will behave consistently across Spark versions.
  3. No partitioning except HashPartitioning can satisfy HashClusteredDistribution.

(I think we may be better to leave code comment for above to prevent the changes against HashClusteredDistribution.)

Let's say, the child operator is range partitioned and we add stateful operator with ClusteredDistribution as required distribution. The range partitioning can satisfy ClusteredDistribution but the physical partitioning of the child is totally different with state and it leads correctness issue (even silently).

Seems like DataSourcePartitioning doesn't allow the partitioning from data source to be satisfy HashClusteredDistribution - it only checks with ClusteredDistribution. This must not be changed unless the partitioning from data source guarantees the same physical partitioning with Spark's internal hash partitioning, which we don't have any way to guarantee it with the interface of Partitioning.

EDIT: If we want to have a kind of HashClusteredDistribution which allows partitioning using any hash function, we will have to explicitly separate both.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 28, 2022

That said, I see other missing spots in stateful operators using ClusteredDistribution (except stream-stream join), so unfortunately it seems to be a long-standing issue. Once we revive HashClusteredDistribution, I'll fix the stateful operators to use it.

@sunchao
Copy link
Member Author

sunchao commented Jan 28, 2022

Then we restart the streaming query with the flag on, and the 2 tables report hash partitioning (not the same as Spark's murmur3).

One question @cloud-fan : is this already a correctness issue previously? say if one side of join reports HashPartitioning with non-murmur3 hash while the other side reports HashPartitioning with murmur3 hash (for instance, there's a Spark shuffle operator between the data source scan and join). I wonder if the issue can happen even if data sources report HashPartitioning with Spark's murmur3 hash.

Thanks @HeartSaVioR for your comments, duly noted. Let me bring back HashClusteredDistribution then. I'll also add more comments to make it more future-proof and no partitioning other than HashPartitioning can satisfy it. Would you please help me to come up with a test for this potential issue?

Seems like DataSourcePartitioning doesn't allow the partitioning from data source to be satisfy HashClusteredDistribution - it only checks with ClusteredDistribution.

That's correct.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 28, 2022

If we feel the name of HashClusteredDistribution is too generic to infer that it's tightly coupled with Spark's hash partitioning, it might not be a crazy idea we add some prefix in the classname to make it clear.
(Not 100% sure we would like to have generic version of HashClusteredDistribution later though.)

Same may apply to HashPartitioning.

@@ -87,31 +89,6 @@ case class ClusteredDistribution(
}
}

/**
* Represents data where tuples have been clustered according to the hash of the given
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized we even documented the characteristic I mentioned.

That said, I'd slightly in favor of be very clear about specialization of HashClusteredDistribution & HashPartitioning on Spark internal, via prefix on naming. The name seems to be too general and no one would know about the characteristic unless reading through the classdoc carefully. And it would be very confusing when someone finds a needs to have "general" HashClusteredDistribution & HashPartitioning and somehow finds these classes.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 4, 2022

I confirmed that StreamingAggregation has same problem with stream-stream join problem described in SPARK-24588.

Test code:

  test("simple count, update mode, check plan") {
    val inputData = MemoryStream[Int]

    val aggregated =
      inputData.toDF()
        .select('value as 'a, 'value * 2 as 'b)
        .repartition('b)
        .groupBy('a, 'b)
        .agg(count("*"))
        .as[(Int, Int, Long)]

    testStream(aggregated, Update)(
      AddData(inputData, 3),
      CheckLastBatch((3, 6, 1)),
      AddData(inputData, 3, 2),
      CheckLastBatch((3, 6, 2), (2, 4, 1)),
      StopStream,
      StartStream(),
      AddData(inputData, 3, 2, 1),
      CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)),
      // By default we run in new tuple mode.
      AddData(inputData, 4, 4, 4, 4),
      CheckLastBatch((4, 8, 4)),
      Execute { query =>
        logWarning(s"DEBUG: ${query.lastExecution.executedPlan}")
      }
    )
  }

Output:

16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite: DEBUG: WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x0000000800fcc040@650b39f1
+- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3, b#4, count(1)#11L])
   +- StateStoreSave [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], Update, 0, 2
      +- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L])
         +- StateStoreRestore [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2
            +- *(2) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L])
               +- *(2) HashAggregate(keys=[a#3, b#4], functions=[partial_count(1)], output=[a#3, b#4, count#78L])
                  +- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL, [id=#580]
                     +- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4]
                        +- MicroBatchScan[value#1] MemoryStreamDataSource

Note that there was only a single shuffle performed via .repartition('b) and it satisfies the child distribution on a & b since the required distribution is ClusteredDistribution.

While this seems OK and produces correct output, we can modify the query in various ways to break the query in further run - 1) remove .repartition('b) 2) replace it with .repartition('a) 3) replace it with .repartition('a, 'b) 4) replace it with .repartition('b, 'a). All cases will satisfy ClusteredDistribution("a", "b") (if my understanding is correct) and does not trigger shuffle with hash partitioning. (Actually Spark should ensure hash partitioning with 'b only for the lifetime of this query, as state has been stored. That is another problem we have no solution for now.)

The problem persisted on all stateful operators (otherwise this PR had to touch more places). Since HashClusteredDistribution was introduced in SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem.

@HeartSaVioR
Copy link
Contributor

I just took the step on the work - please review #35419

HyukjinKwon pushed a commit that referenced this pull request Jan 26, 2023
…atMapCoGroupsInPandas (as in CoGroup)

### What changes were proposed in this pull request?
Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

### Why are the changes needed?
This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: #32875

### Does this PR introduce _any_ user-facing change?
This fixes correctness.

### How was this patch tested?
A unit test in `EnsureRequirementsSuite`.

Closes #39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 26, 2023
…atMapCoGroupsInPandas (as in CoGroup)

Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875

This fixes correctness.

A unit test in `EnsureRequirementsSuite`.

Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…atMapCoGroupsInPandas (as in CoGroup)

### What changes were proposed in this pull request?
Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

### Why are the changes needed?
This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875

### Does this PR introduce _any_ user-facing change?
This fixes correctness.

### How was this patch tested?
A unit test in `EnsureRequirementsSuite`.

Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants