-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning #19467
Conversation
Test build #82607 has finished for PR 19467 at commit
|
Test build #82609 has finished for PR 19467 at commit
|
cc @tdas |
} | ||
} | ||
|
||
case class SimplePartition(index: Int) extends Partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: EmptyPartition? isnt that more descriptive than "simple"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few minor points.
Major point (offline discussion) that the right way to do this is the codify the requirement for fixed number of partitions as a require child distribution, and let EnsureRequirement take care of ti.
// Needs to be transformUp to avoid extra shuffles | ||
override def apply(plan: SparkPlan): SparkPlan = plan transformUp { | ||
case so: StatefulOperator => | ||
val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions | ||
val numPartitions = conf.numShufflePartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? Doesnt the plan have the same context and conf?
@@ -590,10 +590,33 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN | |||
} | |||
|
|||
protected override def doExecute(): RDD[InternalRow] = { | |||
child.execute().coalesce(numPartitions, shuffle = false) | |||
if (numPartitions == 1 && child.execute().getNumPartitions < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test in DatasetSuite that tests this empty rdd case. maybe in the same test as the existing coalesce test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing tests for the original problem should catch it
Test build #82689 has finished for PR 19467 at commit
|
Test build #82690 has finished for PR 19467 at commit
|
Test build #82699 has finished for PR 19467 at commit
|
Test build #82704 has finished for PR 19467 at commit
|
Test build #82705 has finished for PR 19467 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost LGTM assuming tests pass. Just a few nits.
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
case ClusteredDistribution(requiredClustering, desiredPartitions) => | ||
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) && | ||
desiredPartitions.forall(_ == numPartitions) // if desiredPartition = true, returns true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// if desiredPartitions is None, return true
@@ -50,7 +50,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |||
numPartitions: Int): Partitioning = { | |||
requiredDistribution match { | |||
case AllTuples => SinglePartition | |||
case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions) | |||
case ClusteredDistribution(clustering, desiredPartitions) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update scala docs saying that numPartitions param is only if the distribution does not specify it.
@@ -43,10 +43,11 @@ case class StatefulOperatorStateInfo( | |||
checkpointLocation: String, | |||
queryRunId: UUID, | |||
operatorId: Long, | |||
storeVersion: Long) { | |||
storeVersion: Long, | |||
numPartitions: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
@@ -53,7 +53,7 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont | |||
test("ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning") { | |||
testEnsureStatefulOpPartitioning( | |||
baseDf.coalesce(1).queryExecution.sparkPlan, | |||
requiredDistribution = keys => ClusteredDistribution(keys), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test suite does not make sense as this rule does not exist anymore. So if we add tests in the related PlannerSuite to test the new addition in EnsureRequirements and Partitioning, then we will only need to test whether each stateful operator specifies the numPartitions in its required distribution.
@@ -214,7 +214,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn | |||
path: String, | |||
queryRunId: UUID = UUID.randomUUID, | |||
version: Int = 0): StatefulOperatorStateInfo = { | |||
StatefulOperatorStateInfo(path, queryRunId, operatorId = 0, version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: numPartitions = 5
protected def checkChildOutputPartitioning[T <: StatefulOperator]( | ||
sq: StreamingQuery, | ||
colNames: Seq[String], | ||
numPartitions: Option[Int] = None): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numPartitions is never used.
Whoops, i missed one comment. Not LGTM. Need tests in PlannerSuite that tests whether EnsureRequirements respects numPartitions in ClusteredDistribution. |
LGTM, assuming tests pass. |
Test build #82747 has finished for PR 19467 at commit
|
Test build #82749 has finished for PR 19467 at commit
|
Merging to master, thanks for fixing this. |
What changes were proposed in this pull request?
In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions.
How was this patch tested?
Regression test in StreamingQuerySuite