Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command #29797

Closed
wants to merge 7 commits into from

Conversation

manuzhang
Copy link
Contributor

@manuzhang manuzhang commented Sep 18, 2020

What changes were proposed in this pull request?

Do not use local shuffle reader at final stage if the root node is write command.

Why are the changes needed?

Users usually repartition with partition column on dynamic partition overwrite. AQE could break it by removing physical shuffle with local shuffle reader. That could lead to a large number of output files, even exceeding the file system limit.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Add test.

@manuzhang manuzhang changed the title [SPARK-32932][SQL] Do not change number of partitions on RepartitionByExpression when coalescing disabled [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled Sep 18, 2020
@manuzhang
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Sep 18, 2020

Test build #128851 has finished for PR 29797 at commit fc831f6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2020

Test build #128858 has finished for PR 29797 at commit 93775e8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

AQE could break it by removing shuffle with local shuffle reader.

Yea this is an issue, thanks for reporting! BTW why is it OK when coalesce partition is enabled?

@manuzhang
Copy link
Contributor Author

manuzhang commented Sep 18, 2020

@cloud-fan
It's not ok but I haven't thought of a (simple) way to disable local shuffle reader on dynamic partition overwrite (bucket table overwrite as well). Any ideas ?

@cloud-fan
Copy link
Contributor

seems like we should be stricter to apply local shuffle reader, to make sure we can satisfy the expected parallelism.

Comment on lines 708 to 709
noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
r.optNumPartitions.isEmpty) :: Nil
Copy link
Member

@viirya viirya Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions so far.

  1. Seems to me RepartitionByExpression here is not always for dynamic partition overwrite case. By this change, all RepartitionByExpression are affected by coalesceShufflePartitionsEnabled config.

  2. So the users of dynamic partition overwrite need to set conf.coalesceShufflePartitionsEnabled false to avoid that? If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?

Copy link
Contributor Author

@manuzhang manuzhang Sep 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Thanks for the good questions.

This solution is not ideal till we find a way not to apply local shuffle reader if the partitioning doesn't match that of dynamic partition. It's not ideal either that any AQE config is global.

With coalesceShufflePartitionsEnabled and localShuffleReaderEnabled, the output partitioning of shuffle is uncertain which arguably contradicts the purpose of RepartitionByExpression.

If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?

Another option is to introduce a new config specific for repartition, e.g. spark.sql.adaptive.repartition.canChangeNumPartitions

@cloud-fan
Copy link
Contributor

I think the root cause is, we give local shuffle reader expected parallelism, but it's not always satisfied. For example, too many mappers can lead to over-parallelism and many small files.

We should add a check and only apply local shuffle reader if expected parallelism can be satisfied.

@manuzhang
Copy link
Contributor Author

@cloud-fan

we give local shuffle reader expected parallelism

Could you please elaborate on this ? How will it solve the small files issue due to removed repartition when writing to dynamic partition or bucket table ?

@cloud-fan
Copy link
Contributor

See OptimizeLocalShuffleReader.getPartitionSpecs:

val expectedParallelism = advisoryParallelism.getOrElse(numReducers)

If we skip local shuffle reader, we fix the small files issue.

@cloud-fan
Copy link
Contributor

the small files issue due to removed repartition

Are you talking about a different issue? IIUC the repartition (shuffle) is still there and the only problem is local shuffle reader.

@manuzhang
Copy link
Contributor Author

I mean the physical shuffle doesn't happen so that each shuffle task will generate at most numReducers files. The overall number will be numMappers * numReducers.

If we add a check, I'm not sure whether the local shuffle reader will ever be applied in practice. In our use cases, the target bucket tables usually have more than 1000 buckets.

@cloud-fan
Copy link
Contributor

OK I get your point. The file write node works better with certain partitioning and the local shuffle reader breaks it.

Then how about we update InsertAdaptiveSparkPlan, and have a way to disable local shuffle reader if the root node is data writing node?

@manuzhang
Copy link
Contributor Author

Then how about we update InsertAdaptiveSparkPlan, and have a way to disable local shuffle reader if the root node is data writing node?

That will also disable local shuffle reader for broadcast join. How about adding a variable isDataWritingCommand in the context and bypassing local shuffle reader for shuffle exchange if that's true ?

@cloud-fan
Copy link
Contributor

That will also disable local shuffle reader for broadcast join

We can just disable local shuffle reader in the final stage, if the root node is data writing node. What do you think?

@manuzhang
Copy link
Contributor Author

@cloud-fan Redo the PR as you suggested. Please help review.

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Test build #129110 has finished for PR 29797 at commit 7e0d766.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@manuzhang manuzhang changed the title [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand Sep 25, 2020
@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33731/

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33732/

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33731/

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33732/

@SparkQA
Copy link

SparkQA commented Sep 25, 2020

Test build #129111 has finished for PR 29797 at commit 84134b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
OptimizeLocalShuffleReader(conf)
)

@transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
context.qe.sparkPlan match {
case _: DataWritingCommandExec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to match all writing commands, including DS v1, v2 and file source. Maybe we can create a tagging trait like UserDefinedExpression, to tag these writing commands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems DSv2 is not ready for write as per https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L988-L994.

Meanwhile, will it too big a change for those interfaces to extend the tagging trait ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File source v2 is not ready yet, but it doesn't mean DS v2 is not ready for writing. Please follow InsertAdaptiveSparkPlan.applyInternal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, is there a UT for DS v2 write ? I find only V1 is used for write no matter format I specify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See DataSourceV2Suite, we have testing v2 source that support writing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added match for V2TableWriteExec and UT. Not sure I've got it right as v2 API is quite new to me. Please help review. Thanks.

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34297/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34297/

@@ -102,6 +104,16 @@ case class AdaptiveSparkPlanExec(
OptimizeLocalShuffleReader(conf)
)

@transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only called once, can be a def

// Test DataSource v2
withTempPath { f =>
val path = f.getCanonicalPath
val format = classOf[V2Source].getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use NoopDataSource?

Copy link
Contributor Author

@manuzhang manuzhang Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using NoopDataSource will fail

java.lang.IllegalArgumentException: requirement failed: The provided partitioning does not match of the table.
 - provided: j
 - table: 
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.sql.DataFrameWriter.checkPartitioningMatchesV2Table(DataFrameWriter.scala:772)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioned or not doesn't matter, we can test df.write.format(...).save()

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Test build #129691 has finished for PR 29797 at commit 4fdee62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _: DataWritingCommandExec | _: V2TableWriteExec =>
// SPARK-32932: Local shuffle reader could break partitioning that works best
// for the following writing command
queryStageOptimizerRules.filterNot(_.isInstanceOf[OptimizeLocalShuffleReader])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is one space off.


// Test DataSource v2
withTempPath { f =>
val path = f.getCanonicalPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path is not needed for noop source. We can just call save()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we can remove withTempPath

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34373/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34373/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129767 has finished for PR 29797 at commit d391269.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 77a8efb Oct 15, 2020
@maryannxue
Copy link
Contributor

maryannxue commented Nov 12, 2020

@manuzhang This change seems incomplete. The problem here is not about the writer, but rather about the repartition Exchange. We should avoid letting LSR rule work on such shuffles across the board, not just for the writer case.
All you need to do is: 1) move LSR rule into postStageCreationRules; and 2) make the LSR rule match an Exchange first:

    plan match {
      case e: Exchange if canUseLocalShuffleReader(e.child) =>
        e.withNewChildren(Seq(createLocalReader(e.child)))
      case s: SparkPlan =>
        createProbeSideLocalReader(s)
    }

So that it'll only modify the child node of an Exchange, which is the purpose of this rule in the first place.

@manuzhang
Copy link
Contributor Author

manuzhang commented Nov 13, 2020

@maryannxue This could miss a LSR optimization on build probe side of leaf BHJ in multiple joins as follows.

*(6) BroadcastHashJoin [b#24], [a#33], Inner, BuildLeft, false
:- BroadcastQueryStage 7
:  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, false] as bigint)),false), [id=#390]
:     +- CustomShuffleReader local
:        +- ShuffleQueryStage 6
:           +- Exchange hashpartitioning(b#24, 5), true, [id=#364]
:              +- *(5) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
:                 :- BroadcastQueryStage 4
:                 :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#264]
:                 :     +- CustomShuffleReader local
:                 :        +- ShuffleQueryStage 0
:                 :           +- Exchange hashpartitioning(key#13, 5), true, [id=#172]
:                 :              +- *(1) Filter (isnotnull(value#14) AND (cast(value#14 as int) = 1))
:                 :                 +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
:                 :                    +- Scan[obj#12]
:                 +- ShuffleQueryStage 1
:                    +- Exchange hashpartitioning(a#23, 5), true, [id=#179]
:                       +- *(2) Filter (b#24 = 1)
:                          +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
:                             +- Scan[obj#22]
+- *(6) BroadcastHashJoin [n#93], [a#33], Inner, BuildRight, false
   :- CustomShuffleReader local
   :  +- ShuffleQueryStage 2
   :     +- Exchange hashpartitioning(n#93, 5), true, [id=#196]
   :        +- *(3) Filter (n#93 = 1)
   :           +- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).l, true, false) AS l#94]
   :              +- Scan[obj#92]
   +- BroadcastQueryStage 5
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#310]
         +- CustomShuffleReader local
            +- ShuffleQueryStage 3
               +- Exchange hashpartitioning(a#33, 5), true, [id=#210]
                  +- *(4) Filter (a#33 = 1)
                     +- *(4) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33, unwrapoption(IntegerType, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).b) AS b#34]
                        +- Scan[obj#32]

@cloud-fan
Copy link
Contributor

Can you highlight the problem? I do see CustomShuffleReader local in all the build sides.

@manuzhang
Copy link
Contributor Author

manuzhang commented Nov 13, 2020

@cloud-fan Sorry, it's probe side, ShuffleQueryStage 1 which currently has a CustomShuffleReader local parent

@maryannxue
Copy link
Contributor

I assume in this case, the repartition node has been optimized out. Then still this fix won't cover just the repartition case without a writer, right?

@maryannxue
Copy link
Contributor

We have some fundamental problems hanging around in our optimizations. In general, when we optimize a Shuffle over another Shuffle that is semantically equivalent (almost equivalent in this case), we should get rid of the child shuffle instead of the parent one, yet it's a little hard to implement right now.
But again, this is a "repartition" issue rather than the write command issue. Introducing this kind of hack in AQE plan does not make sense.

@manuzhang
Copy link
Contributor Author

@maryannxue thanks for pointing out the fundamental problems while I'm not familiar with the design behind. When starting out, I was trying to solve the most urgent issue in our use cases, too many output files caused by LSR, which broke the downstream jobs on the pipeline. Meanwhile, disabling LSR entirely downgrades the performance of BHJ switched from SMJ. Hence, this PR with help of @cloud-fan.

But again, this is a "repartition" issue rather than the write command issue. Introducing this kind of hack in AQE plan does not make sense.

  1. Is that ok or even good if repartition is optimized away in cases other than on write command ?
  2. Moving forward, what's your suggestion ? Do you want me revert this PR ?

@cloud-fan
Copy link
Contributor

I'm reading the classdoc of OptimizeLocalShuffleReader, and I do feel the design is a bit hacky. We add local shuffle reader if

  1. the shuffle is the root node of a query stage.
  2. the shuffle is BHJ build side.

The reason for condition 1 is it will never introduce shuffle. This is true, but this may change the final output partitioning which may be bad for cases like write command.

I like the idea from @maryannxue which is more general: 1) move LSR rule into postStageCreationRules; and 2) make the LSR rule match an Exchange first (so condition 1 becomes: the shuffle is a direct child of an exchange). By doing this we can skip LSR rule in the last stage, as the last stage's root node is not exchange.

I'm not very sure why the current approach can add LSR to BHJ probe side. This seems like an accident to me as it's not mentioned in the classdoc.

@maryannxue
Copy link
Contributor

In LSR, we have a sanity check to make sure that the output partitioning requirement is not broken after this rule. For example, if there's a parent join sitting above the current BHJ for which we are trying to optimize the probe side to LSR, and that parent join has to take advantage of the output partitioning of the current BHJ. When applying the LSR rule, the check would fail, and we would back out of it.
It's a similar situation here with repartition, only that repartition has been optimized away. So if we can introduce an idea like "required partitioning" (and in the future maybe even required sort order) of the query, then when applying the LSR rule we would know it could break the required partitioning. Hope it makes sense, @manuzhang

@manuzhang
Copy link
Contributor Author

@maryannxue thanks for the thorough explanation. Let me rework this.

@maryannxue
Copy link
Contributor

Thank you, @manuzhang! I have a local fix already, and I'll submit a PR shortly. I assume that the coalescing rule has a similar issue where a repartition shuffle with a specific number is optimized out because of another shuffle introduced by join or agg, and later that shuffle gets coalesced, which means the specified repartition number is ignored.

@manuzhang
Copy link
Contributor Author

@maryannxue Great to know. I've been testing a draft fix which doesn't look good without optimizing the probe side of BHJ. I can help test your PR. Coalescing won't be applied to repartition if user specifies a repartition number.

@maryannxue
Copy link
Contributor

maryannxue commented Nov 19, 2020

@manuzhang

Coalescing won't be applied to repartition if user specifies a repartition number.

It does apply when the specified number happen to be the default partition number, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants