Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43021][SQL] CoalesceBucketsInJoin not work when using AQE #40688

Closed
wants to merge 4 commits into from

Conversation

zzzzming95
Copy link
Contributor

@zzzzming95 zzzzming95 commented Apr 6, 2023

What changes were proposed in this pull request?

Add CoalesceBucketsInJoin to AQE preprocessingRules.

Why are the changes needed?

Previously optimized bucket join: 'CoalesceBucketsInJoin'` : #28123

But when using AQE , CoalesceBucketsInJoin can not match beacuse the top of the spark plan is AdaptiveSparkPlan.

The code :

  val spark = SparkSession.builder()
    .appName("BucketJoin")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", true)
    .config("spark.driver.memory", "4")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", true)
    .enableHiveSupport()
    .getOrCreate()

    val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
    val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
    df1.write.format("parquet").bucketBy(4, "i").saveAsTable("t1")
    df2.write.format("parquet").bucketBy(2, "i").saveAsTable("t2")
    val t1 = spark.table("t1")
    val t2 = spark.table("t2")
    val joined = t1.join(t2, t1("i") === t2("i"))
    joined.explain()

Before the PR

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#50], [i#56], Inner
   :- Sort [i#50 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#50)
   :     +- FileScan parquet spark_catalog.default.t1[i#50,j#51,k#52] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
   +- Sort [i#56 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(i#56, 4), ENSURE_REQUIREMENTS, [plan_id=78]
         +- Filter isnotnull(i#56)
            +- FileScan parquet spark_catalog.default.t2[i#56,j#57,k#58] Batched: true, Bucketed: false (disabled by query planner), DataFilters: [isnotnull(i#56)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>

After the PR output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#50], [i#56], Inner
   :- Sort [i#50 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#50)
   :     +- FileScan parquet spark_catalog.default.t1[i#50,j#51,k#52] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4 (Coalesced to 2)
   +- Sort [i#56 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(i#56)
         +- FileScan parquet spark_catalog.default.t2[i#56,j#57,k#58] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#56)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 2 out of 2

Additional Notes:

We don't add CoalesceBucketsInJoin to AdaptiveSparkPlanExec#queryStageOptimizerRules because queryStageOptimizerRules is not applied at the beginning of the init plan. Instead, they are applied in the createQueryStages() method. And createQueryStages() is bottom-up, which causes the exchange to be eliminated to be wrapped in a layer of ShuffleQueryStage first, making CoalesceBucketsInJoin unrecognizable.

Does this PR introduce any user-facing change?

No

How was this patch tested?

add UT

@github-actions github-actions bot added the SQL label Apr 6, 2023
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-43021] CoalesceBucketsInJoin not work when using AQE [SPARK-43021][SQL] CoalesceBucketsInJoin not work when using AQE Apr 7, 2023
@dongjoon-hyun
Copy link
Member

cc @imback82, @cloud-fan , @viirya , @sunchao

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need adding test case.

@zzzzming95
Copy link
Contributor Author

We may need adding test case.

yeah , i will add UT later

@zzzzming95
Copy link
Contributor Author

One more question , it time to make the default value of SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED as true ?

@dongjoon-hyun
Copy link
Member

Maybe, no? If this is not working properly before, we cannot enable this configuration at Apache Spark 3.5.0. Since we need to wait for one release cycle, we may be able to do that at Apache Spark 3.6.0 if we want.

One more question , it time to make the default value of SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED as true ?

@zzzzming95
Copy link
Contributor Author

Maybe, no? If this is not working properly before, we cannot enable this configuration at Apache Spark 3.5.0. Since we need to wait for one release cycle, we may be able to do that at Apache Spark 3.6.0 if we want.

One more question , it time to make the default value of SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED as true ?

Yes, this is the more logical way.

@zzzzming95
Copy link
Contributor Author

The CI build failure doesn't seem to be caused by this patch, can you take a look?

@dongjoon-hyun @viirya

@dongjoon-hyun
Copy link
Member

Please rebase to the master branch once more, @zzzzming95 .

The CI build failure doesn't seem to be caused by this patch, can you take a look?

@@ -118,6 +118,7 @@ case class AdaptiveSparkPlanExec(
val ensureRequirements =
EnsureRequirements(requiredDistribution.isDefined, requiredDistribution)
Seq(
CoalesceBucketsInJoin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put it in queryStageOptimizerRules?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rules in queryStageOptimizerRules are invoked less often which is more efficient. The rule CoalesceBucketsInJoin does not change plan partitioning and seems can be put in queryStageOptimizerRules

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my test , the UT run failed if CoalesceBucketsInJoin add in queryStageOptimizerRules .

Copy link
Contributor

@cloud-fan cloud-fan Apr 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we spend a bit of time understanding why? Then we can write a code comment to explain it and future developers won't try to move this rule to queryStageOptimizerRules ever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah , I will provide detailed information and supplement it .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because queryStageOptimizerRules is not applied at the beginning of the init plan. Instead, they are applied in the createQueryStages() method. And createQueryStages() is bottom-up, which causes the exchange to be eliminated to be wrapped in a layer of ShuffleQueryStage first, making CoalesceBucketsInJoin unrecognizable. And I have added these to the notes at the top. thanks @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoalesceBucketsInJoin should before EnsureRequirements.

@zzzzming95
Copy link
Contributor Author

@cloud-fan @dongjoon-hyun @viirya Please merge to master . Thanks ~

@dongjoon-hyun
Copy link
Member

To be clear, this PR didn't get any approval yet, @zzzzming95 .

Please merge to master . Thanks ~

assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
} else {
assert(scans.isEmpty)
query: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the indentation is wrong now, can we restore to 4 spaces as before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/pull/40731/files#diff-1dd0d5a38f73f2993e5852f759a3934396c083d4fc4cc334e73ccc8eb929a717R1013

The original DisableAdaptiveExecution logic of this UT is removed here. The current implementation does both.

Comment on lines 1022 to 1024
query: String,
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
query: String,
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
query: String,
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in a43a6b3 Apr 14, 2023
@neshkeev
Copy link
Contributor

I was the OP of the issue in the jira.

Thank you for the fix, but I discovered a weird behavior when hints applied and I don't know how to interpret it. Please check SPARK-43326 I filled

@zzzzming95
Copy link
Contributor Author

I was the OP of the issue in the jira.

Thank you for the fix, but I discovered a weird behavior when hints applied and I don't know how to interpret it. Please check SPARK-43326 I filled

Okay, I will follow up on this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants