Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys #35138

Closed
wants to merge 5 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup of #32875 . Basically #32875 did two improvements:

  1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function
  2. allow bucket join even if the hash partition keys are subset of join keys.

The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned.

This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv.

Why are the changes needed?

Avoid perf regression

Does this PR introduce any user-facing change?

no

How was this patch tested?

@github-actions github-actions bot added the SQL label Jan 7, 2022
@@ -1,90 +1,97 @@
TakeOrderedAndProject [i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov]
Copy link
Contributor Author

@cloud-fan cloud-fan Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it locally. Now the plan golden files are exactly the same with the ones before #32875

@cloud-fan
Copy link
Contributor Author

cc @sunchao @c21

// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all
// the join keys.
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) {
distribution.clustering.forall(x => partitioning.expressions.exists(_.semanticEquals(x)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to require partitioning.expressions to be exactly same with distribution.clustering as well? e.g. for followed cases:

partitioning.expressions: [a, b]
distribution.clustering: [b, a]
partitioning.expressions: [a, b, a]
distribution.clustering: [a, b]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. To fully restore to the previous behavior, we should require an exact match, though I think the current change should cover the data skew issues.

I'll make the change to be conservative.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the case where partitioning expressions is a superset of distribution clustering should already be rejected by HashPartitioning#satisfies, but it maybe better to make it more explicit here.

@@ -396,6 +396,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS =
buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this config take effect for all physical operators having 2 children and requiring ClusteredDistribution? Example like CoGroupExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and they use HashClusteredDistribution before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then should we make the config name more general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about spark.sql.requireAllClusterKeysAsPartitionKeysToCoParition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly inclined to a shorter name like spark.sql.join.requireAllJoinKeysAsPartitionKeys but it's just a personal flavor. Also I'd suggest something like spark.sql.enableStrictShuffleKeysCheck but it's up to you.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. @cloud-fan did you actually observe the perf regression in real Spark jobs? just curious whether it'll be very common when this is config is disabled.

// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all
// the join keys.
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) {
distribution.clustering.forall(x => partitioning.expressions.exists(_.semanticEquals(x)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the case where partitioning expressions is a superset of distribution clustering should already be rejected by HashPartitioning#satisfies, but it maybe better to make it more explicit here.

@@ -396,6 +396,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS =
buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly inclined to a shorter name like spark.sql.join.requireAllJoinKeysAsPartitionKeys but it's just a personal flavor. Also I'd suggest something like spark.sql.enableStrictShuffleKeysCheck but it's up to you.

@cloud-fan
Copy link
Contributor Author

@sunchao I haven't tried it on real workloads yet, but it's pretty obvious that we can construct a query with certain input data to expose this regression.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan could you fix the test failure: looks like org.apache.spark.sql.catalyst.ShuffleSpecSuite is failing. LGTM after the test failures are addressed. I also compared the golden files with this PR and #32875 combined and there're no changes which is expected.

@c21
Copy link
Contributor

c21 commented Jan 11, 2022

I am supporting to disable this feature by default to be safe. But just two cents from our production experience. We enabled the same feature (avoid shuffle if bucket keys are subset of join keys) by default in our production for several years, and didn't see much data skew issues. Our workload is not representative in industry, but just to provide some observation in one large scale environment.

@sunchao
Copy link
Member

sunchao commented Jan 11, 2022

Thanks @c21 ! this is good data point. We're also planning to evaluate this feature in production jobs.

@cloud-fan
Copy link
Contributor Author

I think we can still roll out this optimization later, with some heuristics to avoid bad cases. We just need more time to evaluate and do experiments.

@sunchao
Copy link
Member

sunchao commented Jan 11, 2022

Hmm org.apache.spark.sql.sources.BucketedReadWithHiveSupportSuite also failed.

@sunchao
Copy link
Member

sunchao commented Jan 12, 2022

Oops we missed another one: "SPARK-27485: EnsureRequirements.reorder should handle duplicate expressions" in PlannerSuite

@cloud-fan
Copy link
Contributor Author

thanks for review, merging to master!

@cloud-fan cloud-fan closed this in 4b4ff4b Jan 13, 2022
dchvn pushed a commit to dchvn/spark that referenced this pull request Jan 19, 2022
…s contain all the join keys

### What changes were proposed in this pull request?

This is a followup of apache#32875 . Basically apache#32875 did two improvements:
1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function
2. allow bucket join even if the hash partition keys are subset of join keys.

The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned.

This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv.

### Why are the changes needed?

Avoid perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes apache#35138 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants