Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29544] [SQL] optimize skewed partition based on data size #26434

Closed
wants to merge 24 commits into from

Conversation

@JkSelf
Copy link
Contributor

JkSelf commented Nov 8, 2019

What changes were proposed in this pull request?

Skew Join is common and can severely downgrade performance of queries, especially those with joins. This PR aim to optimization the skew join based on the runtime Map output statistics by adding "OptimizeSkewedPartitions" rule. And The details design doc is here. Currently we can support "Inner, Cross, LeftSemi, LeftAnti, LeftOuter, RightOuter" join type.

Why are the changes needed?

To optimize the skewed partition in runtime based on AQE

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@JkSelf

This comment has been minimized.

Copy link
Contributor Author

JkSelf commented Nov 8, 2019

@cloud-fan Please help me review. Thanks for your help.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2019

Test build #113434 has finished for PR 26434 at commit c64df90.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan]
  • case class SkewedShuffleReaderExec(
  • class SkewedShuffledRowRDD(
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2019

Test build #113435 has finished for PR 26434 at commit f0f03c5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 8, 2019

Test build #113444 has finished for PR 26434 at commit 5f56f89.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@JkSelf

This comment has been minimized.

Copy link
Contributor Author

JkSelf commented Nov 8, 2019

Please help to retest. Thanks.

@maropu

This comment has been minimized.

Copy link
Member

maropu commented Nov 9, 2019

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 9, 2019

Test build #113484 has finished for PR 26434 at commit 5f56f89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@dongjoon-hyun dongjoon-hyun added the SQL label Nov 9, 2019
* Get a reader for the specific partitionIndex in map output statistics that are
* produced by range mappers. Called on executors by reduce tasks.
*/
def getReaderForRangeMapper[K, C](

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 11, 2019

Contributor

we can probably merge this method with getReaderForOneMapper. one mapper is just a special range mappers.

}
}

case class SkewedShuffleReaderExec(

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 11, 2019

Contributor

This is different from local/coalesced shuffle reader as it reads only one reduce partition. Maybe better to call it PostShufflePartitionReader

val size = metrics.bytesByPartitionId(partitionId)
val factor = size / medianSize
val numMappers = getShuffleStage(stage).
plan.shuffleDependency.rdd.partitions.length

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 11, 2019

Contributor

we can easily get the data size of each mapper, shall we split mapper ranges based on data size?

@@ -279,6 +279,24 @@ private[sql] trait SQLTestData { self =>
df
}

protected lazy val skewData1: DataFrame = {

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 11, 2019

Contributor

let's define the data in where they are used. This file should only contains dataframes that are being used by multiple test suites.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 12, 2019

Test build #113622 has finished for PR 26434 at commit bbf585d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PostShufflePartitionReader(
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
*/
def convertMapStatuses(

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 12, 2019

Contributor

can we merge the two convertMapStatuses?

@@ -116,7 +116,8 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
class ShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
specifiedPartitionStartIndices: Option[Array[Int]] = None,

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 12, 2019

Contributor

how about Option[Array[(Int, Int)]]?

This comment has been minimized.

Copy link
@JkSelf

JkSelf Nov 13, 2019

Author Contributor

maybe the separate definition to specifiedPartitionStartIndices and specifiedPartitionEndIndices more clear?

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 13, 2019

Contributor

Option[Array[(Int, Int)]] is more type-safe. It eliminates problems like

  1. specifiedPartitionStartIndices is specified but specifiedPartitionEndIndices is not.
  2. these 2 have different lengths.
partitionId, rightMapIdStartIndices(j), rightEndMapId)

subJoins +=
SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 12, 2019

Contributor

just for brainstorm: here we are joining 2 partitions, not 2 RDDs, so there will be no shuffle. Is it better to run hash join than SMJ? cc @maryannxue

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 12, 2019

Contributor

Furthermore, if only one side is skew, maybe better to plan a broadcast hash join instead of a union of many SMJs?

@JkSelf JkSelf force-pushed the JkSelf:skewedPartitionBasedSize branch from bbf585d to 18cdcd9 Dec 3, 2019
@JkSelf

This comment has been minimized.

Copy link
Contributor Author

JkSelf commented Dec 3, 2019

@cloud-fan fix the conflicts and resolve the comments. Please help review again. Thanks for your help.

var postMapPartitionSize: Long = mapPartitionSize(i)
partitionStartIndices += i
while (i < numMappers && i + 1 < numMappers) {
val nextIndex = if (i + 1 < numMappers) {

This comment has been minimized.

Copy link
@manuzhang

manuzhang Dec 3, 2019

Contributor

Isn't this always true ?

This comment has been minimized.

Copy link
@JkSelf

JkSelf Dec 9, 2019

Author Contributor

@manuzhang Thanks for your review. Offline discussion with wenchen, we decided to remove this method. And split the skewed partition with the number of mappers.

i + 1
} else numMappers -1

if (postMapPartitionSize + mapPartitionSize(nextIndex) > advisoryTargetPostShuffleInputSize) {

This comment has been minimized.

Copy link
@manuzhang

manuzhang Dec 3, 2019

Contributor

What if this never comes true when adaptiveSkewedSizeThreshold is smaller than targetPostShuffleInputSize ? I'm also wondering whether targetPostShuffleInputSize can be reused for the threshold.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Dec 3, 2019

Test build #114751 has finished for PR 26434 at commit 18cdcd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
}
case None =>
Iterator.empty
}
}



This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

nit: unnecessary change

@@ -410,6 +410,27 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED = buildConf("spark.sql.adaptive.skewedJoin.enabled")

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

nit: spark.sql.adaptive.optimizeSkewedJoin.enabled

buildConf("spark.sql.adaptive.skewedPartitionFactor")
.doc("A partition is considered as a skewed partition if its size is larger than" +
" this factor multiple the median partition size and also larger than " +
"spark.sql.adaptive.skewedPartitionSizeThreshold.")

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

nit: ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key} instead of hardcode


def adaptiveSkewedFactor: Int = getConf(ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR)

def adaptiveSkewedSizeThreshold: Long =

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

nit: we don't need these 3 methods if they are only called 1 or 2 times.

val shuffleStageCheck = ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) &&
ShuffleQueryStageExec.isShuffleQueryStageExec(rightStage)
val statisticsReady: Boolean = if (shuffleStageCheck) {
getStatistics(leftStage) != null && getStatistics(rightStage) != null

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

how can the stats be null?

This comment has been minimized.

Copy link
@JkSelf

JkSelf Dec 5, 2019

Author Contributor

it seems not be null. Wrong understanding about the leftStage and rightStage may not done simultaneously.

joinTypeSupported && statisticsReady
}

private def supportSplitOnLeftPartition(joinType: JoinType) = joinType != RightOuter

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Dec 3, 2019

Contributor

nit: it's more robust to list the supported join types.

@JkSelf

This comment has been minimized.

Copy link
Contributor Author

JkSelf commented Dec 9, 2019

@cloud-fan updated the comments online and offline. Please help me review again. Thanks.

@cloud-fan

This comment has been minimized.

Copy link
Contributor

cloud-fan commented Jan 13, 2020

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #116593 has finished for PR 26434 at commit cee1c8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@JkSelf

This comment has been minimized.

Copy link
Contributor Author

JkSelf commented Jan 13, 2020

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #116610 has finished for PR 26434 at commit cee1c8c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor

cloud-fan commented Jan 13, 2020

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #116616 has finished for PR 26434 at commit cee1c8c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #4991 has finished for PR 26434 at commit cee1c8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #4992 has finished for PR 26434 at commit cee1c8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor

cloud-fan commented Jan 13, 2020

retest this please

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 13, 2020

Test build #116654 has finished for PR 26434 at commit cee1c8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
cloud-fan and others added 2 commits Jan 14, 2020
create partial shuffle reader
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Jan 14, 2020

Test build #116695 has finished for PR 26434 at commit ac17a7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@cloud-fan

This comment has been minimized.

Copy link
Contributor

cloud-fan commented Jan 14, 2020

thanks, merging to master!

@cloud-fan cloud-fan closed this in a2aa966 Jan 14, 2020
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
val maxSplits = math.min(conf.getConf(
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

Why do we need this config? It seems a bit weird that we try to use the actual size everywhere else, and we are not doing it here. I think that just using the SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD for the target partition size by itself should yield good results.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

According to the feedback in @manuzhang environment. In some use case, the split number may be very large (more than 1000). There will be too many smjs and it may take a long time to launch job after optimizing skewed join. The ui will also be choked by the huge praph. So we add this configuration as a upper limit of the split number.

val partitionStartIndices = ArrayBuffer[Int]()
var postMapPartitionSize = mapPartitionSizes(0)
partitionStartIndices += 0
partitionIndices.drop(1).foreach { nextPartitionIndex =>

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

In some case writing a while loop is actually easier to understand, e.g.:

var postMapPartitionSize = advisoryPartitionSize + 1
var nextPartitionIndex = 0
while (nextPartitionIndex < mapPartitionSizes.length) {
  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) {
    partitionStartIndices += nextPartitionIndex
    postMapPartitionSize = 0
  }
  postMapPartitionSize += nextMapPartitionSize
  nextPartitionIndex += 1
}

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Ok. I will update later.

}
}

if (partitionStartIndices.size > maxSplits) {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

This makes the last partition larger right? Isn't that adding some skew?

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Yes, it may cause the last partition be skewed. This approach can not solve the extreme skewed use case.

* This RDD takes a [[ShuffleDependency]] (`dependency`), a partitionIndex
* and the range of startMapIndex to endMapIndex.
*/
class SkewedShuffledRowRDD(

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

Why do we need a separate implementation of the shuffled row RDD? I am wondering if we can combine them all, and have a couple of partition implementations depending on which (mapper/reducer) coordinate we need to read from.

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

BTW bifurcation of the query plan (part hash join, part smj) is slightly orthogonal to this.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 15, 2020

Contributor

This sounds like a good idea.

private def medianSize(stats: MapOutputStatistics): Long = {
val numPartitions = stats.bytesByPartitionId.length
val bytes = stats.bytesByPartitionId.sorted
if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

math.min(bytes(numPartitions / 2), 1)?

OCD: you could argue that this median calculation is incorrect for an even number of elements. In that case it should be (bytes(numPartitions / 2) + bytes((numPartitions + 1) / 2)) / 2

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Good catch. And I will updated later.


val leftMedSize = medianSize(leftStats)
val rightMedSize = medianSize(rightStats)
val leftSizeInfo = s"median size: $leftMedSize, max size: ${leftStats.bytesByPartitionId.max}"

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

NIT you can avoid materializing the string if debug logging is not enabled by making these defs.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Ok. I will update later.

val shuffleStages = collectShuffleStages(plan)

if (shuffleStages.length == 2) {
// Currently we only support handling skewed join for 2 table join.

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

Is this being planned?

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jan 14, 2020

Contributor

Is it that we want to avoid things like:

SMJ
  SMJ (w/ or w/o Sort)
    Shuffle (w/ or w/o Sort)
    Shuffle (w/ or w/o Sort)
  Shuffle (w/ or w/o Sort)

i.e., A SMJ that contains a child SMJ without a Shuffle on top, so we don't wanna optimize the child SMJ coz it changes the outputPartitioning? If so, can we make that clearer in the comment?

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jan 14, 2020

Contributor

and since it affects the outputPartitioning, we need to check if it breaks the operators above.

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

There are scenario's where we can do this for multiple joins right? INNER for example.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

It may be too complex to optimize the multiple joins. And we optimize 2 table join firstly. We can further optimize the multiple joins later.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

@maryannxue yes, we need to consider the effect of outputPartitioning. And I will updated in the follwing PRs.

stage.shuffle.shuffleDependency.rdd.partitions.length
}

def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

In general it would be good to have some documentation for this method. What you are doing here is not completely trivial :). The same applies for the code inside the method.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Does the design doc in "Idea" section can explain?

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 15, 2020

Contributor

Can we do a summary and put it as code comment?

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 16, 2020

Author Contributor

Yes. I will add later.

}
logDebug(s"number of skewed partitions is ${skewedPartitions.size}")
if (skewedPartitions.nonEmpty) {
val optimizedSmj = smj.transformDown {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

You can just rewrite the SMJ directly right? The alternative is that you use transform down and rewrite the ShuffleQueryStageExec's directly.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Yes. I will update later.

@@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
// Here the 'OptimizeSkewedPartitions' rule should be executed

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jan 14, 2020

Contributor

comment out-of-date

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Ok. I Will update later.

* @param startMapIndex The start map index.
* @param endMapIndex The end map index.
*/
case class SkewedPartitionReaderExec(

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

Like with the RDDs: In general I would be in favor of creating one reader node that can deal with the different kinds of shuffle reads. That avoid a sprawl of readers, and it also allows us to create a much simpler plan if we can just use 1 reader with a join instead of using a union.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Yes. We also plan to further optimize the skew reader to make the plan simpler later.

@@ -579,6 +579,153 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-29544: adaptive skew join with different join types") {

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

Can we do something to increase coverage for different join types?


def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _),

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jan 14, 2020

Contributor

we don't necessarily have a Sort in SMJ.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Yes. We may can also optimize the following use case:

SMJ 
    Shuffle 
    Shuffle
}

def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {
case _: LocalShuffleReaderExec => Nil

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

We could loose an optimization opportunity here. You could have a situation where we converted a SMJ->BHJ and where there is a shuffled join on top of this. Anyway this will require some more changes.

This comment has been minimized.

Copy link
@hvanhovell

hvanhovell Jan 14, 2020

Contributor

This might even be a correctness problem if the LocalShuffleReader produces a partitioning that is actually leveraged later in the stage.

This comment has been minimized.

Copy link
@JkSelf

JkSelf Jan 15, 2020

Author Contributor

Good idea. We can do further optimization later.

This comment has been minimized.

Copy link
@maryannxue

maryannxue Jan 16, 2020

Contributor

We can remove these "reader" cases here:

  1. this rule applies first, so we won't see these readers anyway.
  2. even if this rule is not the first to apply, we should not skip them otherwise we could miscount. we are not matching them in transform anyway, so we are safe to ignore them here.
@hvanhovell

This comment has been minimized.

Copy link
Contributor

hvanhovell commented Jan 14, 2020

@JkSelf I am very excited about this work, this will improve the end-user experience for a lot of users. I have left some additional comments; I hope you don't mind that I am late to the party.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.