-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24556][SQL] Always rewrite output partitioning in ReusedExchangeExec and InMemoryTableScanExec #21564
Conversation
…also when child's partitioning is RangePartitioning
@cloud-fan @viirya @gatorsmile , could you help review this? |
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec( | |||
override def outputPartitioning: Partitioning = { | |||
relation.cachedPlan.outputPartitioning match { | |||
case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] | |||
case r: RangePartitioning => | |||
r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why RangePartitioning
isn't included at first.
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec( | |||
override def outputPartitioning: Partitioning = { | |||
relation.cachedPlan.outputPartitioning match { | |||
case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] | |||
case r: RangePartitioning => | |||
r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just updateAttribute(r)
?
Moreover, in order to avoid the same issue in the future with other cases, have you considered doing something like:
updateAttribute(relation.cachedPlan.outputPartitioning)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all Partitioning
are Expression
. Only HashPartitioning
and RangePartitioning
are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you're right @viirya , thanks. Then, I'd propose something like:
relation.cachedPlan.outputPartitioning match {
case e: Expression => updateAttribute(e)
case other => other
}
what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think PartitioningCollection
is for an operator that has multiple children. BroadcastPartitioning
is not Expression
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, HashPartitioning
and RangePartitioning
can affect later sorting and shuffle. But for BroadcastPartitioning
, seems to me no too much benefit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartitioningCollection
should be considered. Like below case:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.codegen.wholeStage", false)
val df1 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j").as("t1")
val df2 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("m", "n").as("t2")
val d = df1.join(df2, $"t1.i" === $"t2.m")
d.cache
val d1 = d.as("t3")
val d2 = d.as("t4")
d1.join(d2, $"t3.i" === $"t4.i").explain
SortMergeJoin [i#5], [i#54], Inner
:- InMemoryTableScan [i#5, j#6, m#15, n#16]
: +- InMemoryRelation [i#5, j#6, m#15, n#16], CachedRDDBuilder
: +- SortMergeJoin [i#5], [m#15], Inner
: :- Sort [i#5 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i#5, 10)
: : +- LocalTableScan [i#5, j#6]
: +- Sort [m#15 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(m#15, 10)
: +- LocalTableScan [m#15, n#16]
+- Sort [i#54 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i#54, 10)
+- InMemoryTableScan [i#54, j#55, m#58, n#59]
+- InMemoryRelation [i#54, j#55, m#58, n#59], CachedRDDBuilder
+- SortMergeJoin [i#5], [m#15], Inner
:- Sort [i#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i#5, 10)
: +- LocalTableScan [i#5, j#6]
+- Sort [m#15 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(m#15, 10)
+- LocalTableScan [m#15, n#16]
Exchange hashpartitioning(i#54, 10)
is extra shuffle.
How do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For PartitioningCollection
, I think it is harder to treat it like HashPartitioning
and RangePartitioning
when replacing attributes.
In above example, PartitioningCollection
contains HashPartitioning(i#5)
and HashPartitioning(m#15)
, the output of InMemoryRelation
is [i#54, j#55, m#58, n#59]
. Can we still replace attributes based on the location of attribute in output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya From updateAttribute
, relation.cachedPlan.output
and relation.output
one to one.
private def updateAttribute(expr: Expression): Expression = {
....
val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
....
}
It means "[i#54, j#55, m#58, n#59]" corresponds to "[i#5, j#6, m#15, n#16]", so we can always replace HashPartitioning(i#5)
to HashPartitioning(i#54)
.
Any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks correct.
Test build #91829 has finished for PR 21564 at commit
|
@viirya I think
|
@mgaido91 I update the codes as per your suggestion, thanks! |
@yucai thanks, can you please also add more UTs in order to cover all the possible cases? Thanks. |
Test build #91855 has finished for PR 21564 at commit
|
Test build #91856 has finished for PR 21564 at commit
|
@@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |||
val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) | |||
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) | |||
} | |||
|
|||
test("SPARK-24556: ReusedExchange should rewrite output partitioning for RangePartitioning") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not an end-to-end test, let's put it in PlannerSuite
and also test cached table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also mention cached table in PR title
LGTM except the test |
@@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan | |||
} | |||
|
|||
override def outputPartitioning: Partitioning = child.outputPartitioning match { | |||
case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) | |||
case e: Expression => updateAttr(e).asInstanceOf[Partitioning] | |||
case other => other |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
// ReusedExchange is RangePartitioning | ||
val df9 = Seq(1 -> "a").toDF("i", "j").orderBy($"i") | ||
val df10 = Seq(1 -> "a").toDF("i", "j").orderBy($"i") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this test can be simplified. For example the difference between df3, df4 and df9, df10 is only persist
. You can just define the dataframes and reuse them.
checkInMemoryTableScanOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning]) | ||
|
||
// InMemoryTableScan is PartitioningCollection | ||
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use SQLConf
instead of the plain string (and the value here I think should be -1
)
checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning]) | ||
|
||
// InMemoryTableScan is RangePartitioning | ||
val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a spark.range
is enough instead of creating a df and ordering it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want RangePartitioning
here, so using orderBy
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but if you use spark.range
you have RangePartitioning
as well without the need of a sort operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just have an update of tests, feel free to let me know if you are OK with the new version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am OK apart from this comment which is still unresolved in the new version. Instead of doing an unneeded sort, we can just simply have a Range
operation which has RangePartitioning
as output partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why didn't you just set:
val df3 = spark.range ...
val df4 = spark.range ...
but you let them as before and than you changed the other place where they were used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are different, in ReusedExchange
we need Shuffle, so we need orderBy, while in InMemoryTableScan
, we can use spark.range directly, right?
+ // ReusedExchange is RangePartitioning
+ val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
+ val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
+ checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
+
+ // InMemoryTableScan is RangePartitioning
+ val df7 = spark.range(1, 100, 1, 10).toDF().persist()
+ val df8 = spark.range(1, 100, 1, 10).toDF().persist()
+ checkInMemoryTableScanOutputPartitioningRewrite(df7.union(df8), classOf[RangePartitioning])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sure, sorry, thanks.
Test build #91968 has finished for PR 21564 at commit
|
LGTM |
Thanks for fixing this! LGTM |
Test build #91970 has finished for PR 21564 at commit
|
Test build #91972 has finished for PR 21564 at commit
|
// InMemoryTableScan is HashPartitioning | ||
val df5 = df1.persist() | ||
val df6 = df2.persist() | ||
checkInMemoryTableScanOutputPartitioningRewrite(df5.union(df6), classOf[HashPartitioning]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to test table cache with union?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to make sure both cache have the right output partitioning, so test the second cache table only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
union is used to trigger exchange reuse, but it's unnecessary to test cache.
@cloud-fan thanks for reviewing, tests have been updated. |
test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " + | ||
"and InMemoryTableScanExec") { | ||
def checkOutputPartitioningRewrite( | ||
plans: Seq[SparkPlan], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we can take a single spark plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you think if we merge check*OutputPartitioningRewrite
together?
def checkPlanAndOutputPartitioningRewrite(
df: DataFrame,
expectedPlanClass: Class[_],
expectedPartitioningClass: Class[_]): Unit = {
val plans = df.queryExecution.executedPlan.collect {
case r: ReusedExchangeExec => r
case m: InMemoryTableScanExec => m
}
assert(plans.size == 1)
val plan = plans.head
assert(plan.getClass == expectedPlanClass)
val partitioning = plan.outputPartitioning
assert(partitioning.getClass == expectedPartitioningClass)
val partitionedAttrs = partitioning.asInstanceOf[Expression].references
assert(partitionedAttrs.subsetOf(plan.outputSet))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I still use Seq
, so I can make checkReusedExchangeOutputPartitioningRewrite
and checkInMemoryTableScanOutputPartitioningRewrite
simpler. Kindly let me know if you have better idea.
checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning]) | ||
|
||
// InMemoryTableScan is HashPartitioning | ||
df1.persist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's better to not reuse the dataframe that were used to test ReuseExchange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I also like a new one :).
Test build #92068 has finished for PR 21564 at commit
|
retest this please. |
Test build #92074 has finished for PR 21564 at commit
|
retest this please. |
Test build #92080 has finished for PR 21564 at commit
|
Test build #92082 has finished for PR 21564 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
Currently, ReusedExchange and InMemoryTableScanExec only rewrite output partitioning if child's partitioning is HashPartitioning and do nothing for other partitioning, e.g., RangePartitioning. We should always rewrite it, otherwise, unnecessary shuffle could be introduced like https://issues.apache.org/jira/browse/SPARK-24556.
How was this patch tested?
Add new tests.