-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23614][SQL] Fix incorrect reuse exchange when caching is used #20831
Conversation
Test build #88256 has finished for PR 20831 at commit
|
retest this please. |
@@ -68,6 +69,15 @@ case class InMemoryRelation( | |||
|
|||
override protected def innerChildren: Seq[SparkPlan] = Seq(child) | |||
|
|||
override def doCanonicalize(): logical.LogicalPlan = | |||
copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), | |||
storageLevel = new StorageLevel(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageLevel.NONE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Test build #88258 has finished for PR 20831 at commit
|
Test build #88262 has finished for PR 20831 at commit
|
tableName = None)( | ||
_cachedColumnBuffers, | ||
sizeInBytesStats, | ||
statsOfPlanToCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to copy these cached data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_cachedColumnBuffers
can't be null. If it is null, copy
will trigger buildBuffers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statsOfPlanToCache
and sizeInBytesStats
, too? For instance, ResolveHint
drops hints in canonicalization:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
Line 44 in 3675af7
override def doCanonicalize(): LogicalPlan = child.canonicalized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cachedColumnBuffers
, sizeInBytesStats
, statsOfPlanToCache
won't be considered when comparing two InMemoryRelation
. So instead of create empty instances of statistics, I just use the original values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha, ok. Thanks!
Test build #88266 has finished for PR 20831 at commit
|
cc @cloud-fan |
@@ -68,6 +69,15 @@ case class InMemoryRelation( | |||
|
|||
override protected def innerChildren: Seq[SparkPlan] = Seq(child) | |||
|
|||
override def doCanonicalize(): logical.LogicalPlan = | |||
copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), | |||
storageLevel = StorageLevel.NONE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we follow the parameter order in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is followed. I just ignored useCompression
, batchSize
as they are just primitives and don't need to be canonicalized here.
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec( | |||
override def outputOrdering: Seq[SortOrder] = | |||
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) | |||
|
|||
private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) | |||
// When we make canonicalized plan, we can't find a normalized attribute in this map. | |||
// We return a `ColumnStatisticsSchema` for normalized attribute in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks weird to call statsFor
on a canonicalized InMemoryTableScanExec
, can we just make buildFilter
a lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried that at beginning. However, partitionFilters
uses buildFilter
. Making partitionFilters
a lazy doesn't work because when do copy
, the initialization of InMemoryTableScanExec
will try to materialize partitionFilters
for coping it value.
Making partitionFilters
, buildFilter
as methods is not enough too, we also need to remove @transient
from relation
and InMemoryRelation.partitionStatistics
. So I think it isn't worth and leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it. Regardless how copy is implemented in scala, ideally we can just mark buildFilter
and partitionFilters
as lazy, and in doCanonicalize
, create a new InMemoryTableScanExec
, which won't materialize partitionFilters
in either the current InMemoryTableScanExec
or the new InMemoryTableScanExec
.
One problem I can think of is to serialize a canonicalized InMemoryTableScanExec
, but it should never happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I get it wrongly. The reason why it doesn't work is because relation
is @transient
. partitionFilters
needs to be non-lazy, otherwise when we need to access relation
in executor, we will get a NullPointerException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think it isn't worth removing @transient
from relation
and InMemoryRelation.partitionStatistics
just for this. So I leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be solved if we add a val stats = relation.partitionStatistics
, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think so. Updated.
good catch! LGTM except a few comments |
LGTM, pending jenkins |
Thanks! |
Test build #88493 has finished for PR 20831 at commit
|
retest this please. |
Test build #88497 has finished for PR 20831 at commit
|
## What changes were proposed in this pull request? We should provide customized canonicalize plan for `InMemoryRelation` and `InMemoryTableScanExec`. Otherwise, we can wrongly treat two different cached plans as same result. It causes wrongly reused exchange then. For a test query like this: ```scala val cached = spark.createDataset(Seq(TestDataUnion(1, 2, 3), TestDataUnion(4, 5, 6))).cache() val group1 = cached.groupBy("x").agg(min(col("y")) as "value") val group2 = cached.groupBy("x").agg(min(col("z")) as "value") group1.union(group2) ``` Canonicalized plans before: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` You can find that they have the canonicalized plans are the same, although we use different columns in two `InMemoryTableScan`s. Canonicalized plan after: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#2] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #20831 from viirya/SPARK-23614. (cherry picked from commit b2edc30) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.3! |
Thanks! @cloud-fan |
## What changes were proposed in this pull request? We should provide customized canonicalize plan for `InMemoryRelation` and `InMemoryTableScanExec`. Otherwise, we can wrongly treat two different cached plans as same result. It causes wrongly reused exchange then. For a test query like this: ```scala val cached = spark.createDataset(Seq(TestDataUnion(1, 2, 3), TestDataUnion(4, 5, 6))).cache() val group1 = cached.groupBy("x").agg(min(col("y")) as "value") val group2 = cached.groupBy("x").agg(min(col("z")) as "value") group1.union(group2) ``` Canonicalized plans before: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` You can find that they have the canonicalized plans are the same, although we use different columns in two `InMemoryTableScan`s. Canonicalized plan after: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#2] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#20831 from viirya/SPARK-23614. (cherry picked from commit b2edc30) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
We should provide customized canonicalize plan for `InMemoryRelation` and `InMemoryTableScanExec`. Otherwise, we can wrongly treat two different cached plans as same result. It causes wrongly reused exchange then. For a test query like this: ```scala val cached = spark.createDataset(Seq(TestDataUnion(1, 2, 3), TestDataUnion(4, 5, 6))).cache() val group1 = cached.groupBy("x").agg(min(col("y")) as "value") val group2 = cached.groupBy("x").agg(min(col("z")) as "value") group1.union(group2) ``` Canonicalized plans before: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [x#4253, y#4254, z#4255], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [x#4253, y#4254, z#4255] ``` You can find that they have the canonicalized plans are the same, although we use different columns in two `InMemoryTableScan`s. Canonicalized plan after: First exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(1) InMemoryTableScan [none#0, none#1] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` Second exchange: ``` Exchange hashpartitioning(none#0, 5) +- *(3) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4]) +- *(3) InMemoryTableScan [none#0, none#2] +- InMemoryRelation [none#0, none#1, none#2], true, 10000, StorageLevel(memory, 1 replicas) +- LocalTableScan [none#0, none#1, none#2] ``` Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#20831 from viirya/SPARK-23614. (cherry picked from commit b2edc30) Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1d0d0a5) RB=3620847 BUG=APA-69078 G=spark-reviewers R=mmuralid,ekrogen A=ekrogen
What changes were proposed in this pull request?
We should provide customized canonicalize plan for
InMemoryRelation
andInMemoryTableScanExec
. Otherwise, we can wrongly treat two different cached plans as same result. It causes wrongly reused exchange then.For a test query like this:
Canonicalized plans before:
First exchange:
Second exchange:
You can find that they have the canonicalized plans are the same, although we use different columns in two
InMemoryTableScan
s.Canonicalized plan after:
First exchange:
Second exchange:
How was this patch tested?
Added unit test.