Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40429][SQL] Only set KeyGroupedPartitioning when the referenced column is in the output #37886

Closed
wants to merge 3 commits into from

Conversation

huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

Only set KeyGroupedPartitioning when the referenced column is in the output

Why are the changes needed?

bug fixing

Does this PR introduce any user-facing change?

no

How was this patch tested?

new test

@github-actions github-actions bot added the SQL label Sep 14, 2022
@huaxingao
Copy link
Contributor Author

cc @cloud-fan @sunchao

None
} else {
val ref = AttributeSet.fromAttributeSets(partitioning.get.map(_.references))
if (ref.subsetOf(AttributeSet(d.output))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (ref.subsetOf(AttributeSet(d.output))) {
if (ref.subsetOf(d.outputSet)) {

if (partitioning.isEmpty) {
None
} else {
val ref = AttributeSet.fromAttributeSets(partitioning.get.map(_.references))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about partitioning.get.forall(p => p.references.subsetOf(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Changed.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @huaxingao and @cloud-fan .
Merged to master.

@dongjoon-hyun
Copy link
Member

Could you make a backporting PR, @huaxingao ?

@huaxingao
Copy link
Contributor Author

Thanks @cloud-fan @dongjoon-hyun

@huaxingao huaxingao deleted the keyGroupedPartitioning branch September 15, 2022 16:05
dongjoon-hyun pushed a commit that referenced this pull request Sep 15, 2022
…renced column is in the output

### What changes were proposed in this pull request?
back porting [PR](#37886) to 3.3.
Only set `KeyGroupedPartitioning` when the referenced column is in the output

### Why are the changes needed?
bug fixing

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New test

Closes #37901 from huaxingao/3.3.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Aug 24, 2023
…keys

  ### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case.
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys (previously grouped only by partition values)
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well)
  - add an additional sort in the end of partitions based on join key, as when we group the non-replicate side, partition ordering becomes out of order.

  ### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

  ### Does this PR introduce _any_ user-facing change?
No

  ### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Found two problems, will address in separate PR:
- apache#37886  made another change so that we have to select all join keys, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Aug 24, 2023
…keys

  ### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case.
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys (previously grouped only by partition values)
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well)
  - add an additional sort in the end of partitions based on join key, as when we group the non-replicate side, partition ordering becomes out of order.

  ### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

  ### Does this PR introduce _any_ user-facing change?
No

  ### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Found two problems, will address in separate PR:
- apache#37886  made another change so that we have to select all join keys, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Sep 8, 2023
…keys

 ### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- "Project" partitions by join keys in KeyGroupedPartitioning/KeyGroupedShuffleSpec
- Add join key grouping to the partition grouping in BatchScanExec

   ### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

    ### Does this PR introduce _any_ user-facing change?
No

    ### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this in separate PR.
dongjoon-hyun pushed a commit that referenced this pull request Sep 11, 2023
…keys

### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of #37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes #42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
szehon-ho added a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…keys

- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

No

-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes apache#42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Mar 26, 2024
…keys

- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

No

-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes apache#42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants