[SPARK-28156][SQL] Self-join should not miss cached view#24960
[SPARK-28156][SQL] Self-join should not miss cached view#24960viirya wants to merge 5 commits intoapache:masterfrom
Conversation
|
Test build #106868 has finished for PR 24960 at commit
|
|
retest this please. |
|
Test build #106883 has finished for PR 24960 at commit
|
|
How about we add a |
|
In |
|
Then we should revisit #24236 which adds Project to do dedup for Union. I don't know if there are more places like this. The cache manager uses |
|
A project node just aliasing child's output in same output order, looks like can be a candidate to be removed in canonicalized query plan. But we should be careful about If removing a project node like that is safe when canonicalization, #24236 and this issue should be fine. |
|
|
|
I read |
|
Another idea: shall we apply |
|
Does it mean moving |
|
I think we can even merge |
|
Deferring Shall we make |
|
Can we make |
|
It sounds not safe but I did try it locally. A rule, e.g., |
This is easy to fix, just overwrite |
|
Ok. Let's try this approach and see Jenkins results. Thanks. |
|
(This might be off-topic and this is just a question though...) btw, any reason not to use optimized plans instead of analyzed plans for plan caches? Just to avoid overheads? In case that we do so, it seems self-joins in the description uses the cached view; |
|
Test build #107097 has finished for PR 24960 at commit
|
|
retest this please |
|
Test build #107100 has finished for PR 24960 at commit
|
|
retest this please |
|
Test build #107108 has finished for PR 24960 at commit
|
| */ | ||
| case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { | ||
| object EliminateView extends Rule[LogicalPlan] with CastSupport { |
There was a problem hiding this comment.
We still need a rule in analyzer to make sure the view is valid... We can put it in CheckAnalysis
There was a problem hiding this comment.
Yeah, I noticed that. Will add it later.
| failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") | ||
| } | ||
|
|
||
| case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => |
There was a problem hiding this comment.
Basically, I moved analysis exception check from AliasViewChild. In EliminateView rule, it doesn't need to check this.
|
Test build #107122 has finished for PR 24960 at commit
|
|
Test build #107123 has finished for PR 24960 at commit
|
|
oh, the CRAN problem happens again... I will contact CRAN sysadmin to solve it. |
|
retest this please. |
|
Test build #107155 has finished for PR 24960 at commit
|
|
retest this please |
|
@maropu from my understanding, optimization can change query plan structure largely. A cached query fragment can be transformed to very different form and miss the cache. |
|
Test build #107167 has finished for PR 24960 at commit
|
|
thanks, merging to master! |
|
Thanks @cloud-fan I also visited #24236 as you mentioned. I think it is fine from missing caches. Because when you cache an union with duplicate children, you actually cache the deduplicated plan processed by #24236. In later you have this query fragment in other queries, the deduplicated union will match cached version. |
|
@viirya ur, I see. thanks for your kind explanation. |
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.
The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")
val cachedView = sql("select a, b, c, d from table1_vw")
cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()
val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)
```
Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [a#12660]
+- *(1) Filter isnotnull(a#12660)
+- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```
Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Filter isnotnull(a#12692)
+- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
+- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```
Added test.
Closes apache#24960 from viirya/SPARK-28156.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.
The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")
val cachedView = sql("select a, b, c, d from table1_vw")
cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()
val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)
```
Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [a#12660]
+- *(1) Filter isnotnull(a#12660)
+- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```
Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Filter isnotnull(a#12692)
+- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
+- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```
Added test.
Closes apache#24960 from viirya/SPARK-28156.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of #24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes #25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of apache#24960 to branch-2.3. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of #24960 to branch-2.3. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes #25293 from bersprockets/selfjoin_23_noextra. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of apache#24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of apache#24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of apache#24960 to branch-2.3. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25293 from bersprockets/selfjoin_23_noextra. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 998ac04) RB=2946325 BUG=LIHADOOP-62459 G=spark-reviewers R=apatnam,ekrogen,mmuralid A=ekrogen
What changes were proposed in this pull request?
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in
ResolveReferenceswe do deduplicate for a view to have new output attributes. Then inAliasViewChild, the rule adds extra project under a view. So it breaks cache matching.The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
Query plan before this PR:
Query plan after this PR:
How was this patch tested?
Added test.