Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32656][SQL] Repartition bucketed tables for sort merge join / shuffled hash join if applicable #29473

Closed
wants to merge 11 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented Aug 19, 2020

What changes were proposed in this pull request?

#28123 and #29079 introduced coalescing bucketed tables for sort merge join / shuffled hash join.

This PR proposes to introduce repartitioning bucketed tables to increase parallelism at the cost of reading duplicate source data. It is applied if the following conditions are met:

  • Join is sort merge join or shuffled hash join.
  • Join keys match with output partition expressions on their respective sides.
  • The larger bucket count is divisible by the smaller bucket count.
  • spark.sql.sources.bucketing.readStrategyInJoin is set to repartition.
  • The ratio of the number of buckets should be less than the value set in spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio.

Why are the changes needed?

Coalescing buckets is useful but repartitioning can also help due to the increased parallelism depending on the workloads.

Does this PR introduce any user-facing change?

Yes. If the bucket repartitioning conditions explained above are met, a full shuffle can be eliminated (also note that you will see SelectedBucketsCount: 4 out of 4 (Repartitioned to 8) in the physical plan):

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.sources.bucketing.readStrategyInJoin", "repartition")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

== Physical Plan ==
*(3) SortMergeJoin [i#38], [i#44], Inner
:- *(1) Sort [i#38 ASC NULLS FIRST], false, 0
:  +- *(1) Filter isnotnull(i#38)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t1[i#38,j#39,k#40] Batched: true, DataFilters: [isnotnull(i#38)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
   +- *(2) Filter isnotnull(i#44)
      +- FileScan parquet default.t2[i#44,j#45,k#46] Batched: false, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4 (Repartitioned to 8)

How was this patch tested?

Added new tests.

@imback82 imback82 marked this pull request as draft August 19, 2020 01:15
@imback82 imback82 changed the title [WIP][SPARK-32656][SQL] Repartition bucketed tables for sort merge join / shuffled hash join if applicable [SPARK-32656][SQL] Repartition bucketed tables for sort merge join / shuffled hash join if applicable Aug 19, 2020
@SparkQA
Copy link

SparkQA commented Aug 19, 2020

Test build #127609 has finished for PR 29473 at commit 21882ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor

c21 commented Aug 22, 2020

@imback82 - thanks for working on this.

Seeing this is still marked as draft. Please change it once it's ready for review. Thanks.

@maropu
Copy link
Member

maropu commented Aug 24, 2020

Thanks for the work, @imback82. Just a question; we cannot set a simpler rule to select which strategy (reapportioning or coalescing) we use when reading buckets? I think it is a bit annoying to set true to repartitionBucketsInJoin.enabled or coalesceBucketsInJoin.enabled data-by-data. What's a factor to make a difference between the two strategies, e.g., bucket size? For example, if one has too small buckets (this case is extreme though), repartitioning for higher cardinality might not be able to help for better performance.

if (conf.coalesceBucketsInJoinEnabled && conf.repartitionBucketsInJoinEnabled) {
throw new AnalysisException("Both 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' and " +
"'spark.sql.bucketing.repartitionBucketsInJoin.enabled' cannot be set to true at the" +
"same time")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use Enumeration and checkValues instead? I think this check should be done in SQLConf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example;

  object BucketReadStrategyMode extends Enumeration {
    val COALESCING, REPARTITIONING, AUTOMATIC, OFF  = Value
  }

  val BUCKET_READ_STORATEGY_MODE =
    .buildConf("...")
    .version("3.1.0")
    .stringConf
    .transform(_.toUpperCase(Locale.ROOT))
    .checkValues(BucketReadStrategyMode.values.map(_.toString))
    .createWithDefault(BucketReadStrategyMode.OFF.toString)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I think the new config makes more sense. I renamed few, and let me know if it doesn't make sense.

Btw, do you think I can introduce AUTOMATIC as a follow up since this PR is sizable? Let me know if you want to see it in this PR. Thanks.

// `RepartitioningBucketRDD` converts columnar batches to rows to calculate bucket id for each
// row, thus columnar is not supported when `RepartitioningBucketRDD` is used to avoid
// conversions from batches to rows and back to batches.
relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about how much this columnar execution makes performance gains though, the proposed idea is to give up the gains then use bucket repartitioning instead?

Copy link
Contributor Author

@imback82 imback82 Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the datasource will still be read as batches in this case (if whole stage codegen is enabled).

I see that physical plans operate on rows, so batches are converted to rows via ColumnarToRow anyway. So, I think perf impact would be minimal here; the difference could be the code-gen conversion from columnar to row vs. iterating batch.rowIterator() in BucketRepartitioningRDD.

relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets
}

@transient private lazy val isRepartitioningBuckets: Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need : Boolean ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same style from override lazy val supportsColumnar: Boolean, etc. Is this still not needed?

@imback82
Copy link
Contributor Author

Just a question; we cannot set a simpler rule to select which strategy (reapportioning or coalescing) we use when reading buckets? I think it is a bit annoying to set true to repartitionBucketsInJoin.enabled or coalesceBucketsInJoin.enabled data-by-data.

Good point. One use case for repartition over coalesce is when there are enough cores available in the cluster, not to reduce the parallelism by coalescing. @c21, did you observe any patterns or heuristics on your workloads where repartition is preferred?

For example, if one has too small buckets (this case is extreme though), repartitioning for higher cardinality might not be able to help for better performance.

This is still guarded by spark.sql.bucketing.coalesceOrRepartitionBucketsInJoin.maxBucketRatio, so this scenario is a little less concerning?

@maropu
Copy link
Member

maropu commented Aug 28, 2020

btw, still Draft?

@maropu
Copy link
Member

maropu commented Aug 28, 2020

Also, I think its better to describe performance numbers for this proposed idea in the PR description above.

@imback82
Copy link
Contributor Author

btw, still Draft?

I still need to add few more tests.

Also, I think its better to describe performance numbers for this proposed idea in the PR description above.

Yes, will update.

@SparkQA
Copy link

SparkQA commented Aug 29, 2020

Test build #128003 has finished for PR 29473 at commit 5665bc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82 imback82 marked this pull request as ready for review August 29, 2020 03:25
@SparkQA
Copy link

SparkQA commented Aug 29, 2020

Test build #128011 has finished for PR 29473 at commit e2374ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
val BUCKET_READ_STRATEGY_IN_JOIN =
buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we have a name also consistent with existing config "spark.sql.sources.bucketing", e.g. "spark.sql.sources.bucketing.readStrategyInJoin". No big deal, but "bucketing.bucket..." seems a little bit verbose. Point out here because users might depend on this config for bucketing optimization and raise questions for developers with this config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I change this to spark.sql.sources.bucketing.readStrategyInJoin.

.createWithDefault(BucketReadStrategyInJoin.OFF.toString)

val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above, might be just ""spark.sql.sources.bucketing.readStrategyInJoinMaxBucketRatio" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio, but I don't have a strong opinion on this if spark.sql.sources.bucketing.readStrategyInJoinMaxBucketRatio is better.

@@ -314,7 +324,7 @@ case class FileSourceScanExec(
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

// TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced.
if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || isRepartitioningBuckets)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need || isRepartitioningBuckets right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repartition can still maintain the sort order whereas coalescing cannot, thus this check is needed.

// There are now more files to be read.
val filesNum = filePartitions.map(_.files.size.toLong).sum
val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
driverMetrics("numFiles") = filesNum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per setFilesNumAndSizeMetric, should we set staticFilesNum here or numFiles ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think staticFilesNum is used only for dynamic partition pruning:

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics = if (partitionFilters.filter(isDynamicPruningFilter).nonEmpty) {
Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
"staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static size of files read"))

@c21
Copy link
Contributor

c21 commented Aug 29, 2020

did you observe any patterns or heuristics on your workloads where repartition is preferred?

From our side, honestly now we don't have any automation for deciding coalesce vs repartition. We provided configs similar here for users themselves to control coalesce vs repartition.

I think a rule of thumb can be we don't want to
(1).coalesce: if the coalesced table is too big and # of coalesced buckets is too few, then each task has too much data and will take more time.
(2).repartition: if the repartition table is too big and # of repartitioned buckets is too many, then too much duplicated data is read and will have too much more CPU/IO cost (might be worse than just shuffling this table).

@SparkQA
Copy link

SparkQA commented Aug 29, 2020

Test build #128012 has finished for PR 29473 at commit 7481e36.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2020

Test build #128028 has finished for PR 29473 at commit 366c9c3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2020

Test build #128027 has finished for PR 29473 at commit 2c4925b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

github-actions bot commented Dec 9, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 9, 2020
@github-actions github-actions bot closed this Dec 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants