[SPARK-28156][SQL][BACKPORT-2.4] Self-join should not miss cached view#25068
[SPARK-28156][SQL][BACKPORT-2.4] Self-join should not miss cached view#25068bersprockets wants to merge 3 commits intoapache:branch-2.4from
Conversation
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.
The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")
val cachedView = sql("select a, b, c, d from table1_vw")
cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()
val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)
```
Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [a#12660]
+- *(1) Filter isnotnull(a#12660)
+- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```
Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Filter isnotnull(a#12692)
+- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
+- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```
Added test.
Closes apache#24960 from viirya/SPARK-28156.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
cc @cloud-fan and @viirya |
|
Thanks for pinging me. Will take a look tonight.
…On Mon, Jul 8, 2019, 14:37 Dongjoon Hyun ***@***.***> wrote:
cc @cloud-fan <https://github.com/cloud-fan> and @viirya
<https://github.com/viirya>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#25068?email_source=notifications&email_token=AAAQZ57JESRH4ZMFIFA3LITP6LOBBA5CNFSM4H6XNRIKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZMDO3I#issuecomment-509097837>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAAQZ553CXLNQUFNV5U6FCLP6LOBBANCNFSM4H6XNRIA>
.
|
|
Test build #107322 has finished for PR 25068 at commit
|
|
Retest this please. |
| * precision lose or possible runtime failures. For example, long -> int, string -> int are not | ||
| * up-cast. | ||
| */ | ||
| def canUpCast(from: DataType, to: DataType): Boolean = (from, to) match { |
There was a problem hiding this comment.
Where is this from? @bersprockets Can you note the original PR of this in the PR description?
There was a problem hiding this comment.
I think this should have some tests in the original PR. Should we backport them together? @cloud-fan @dongjoon-hyun
There was a problem hiding this comment.
Usually, we don't backport the original PR if this kind of requirement arises. How do you think about this backport, @cloud-fan and @gatorsmile ?
This PR needs canUpCast (here) and sameOutput (https://github.com/apache/spark/pull/25068/files#diff-27c76f96a7b2733ecfd6f46a1716e153R140) additionally.
There was a problem hiding this comment.
The rule AliasViewChild is already there before we have canUpCast and sameOutput. We just need to update the corresponding code to the old version without canUpCast and sameOutput.
| } | ||
| } | ||
|
|
||
| test("SPARK-25691: AliasViewChild with different nullabilities") { |
There was a problem hiding this comment.
hmm, this test actually comes from another PRSPARK-25691. So the original PR doesn't backport too?
There was a problem hiding this comment.
It got pulled in by the cherry pick because your original PR patched it, and I didn't see harm in the added test. However, its title references a patch that was not pulled in, which is weird, so I should probably remove it.
There was a problem hiding this comment.
Yep. Please remove this test("SPARK-25691...") since this is SPARK-28156 backport.
viirya
left a comment
There was a problem hiding this comment.
This backport looks good. It contains more than original PR, I'm not sure if it is ok to include the changes of other PRs in a backport PR.
|
Test build #107332 has finished for PR 25068 at commit
|
|
Thanks for the comments. I was a little perplexed about the case where a back-ported PR needed a few small utility methods from other PRs: should I completely back-port the other PRs, or should I just include the two needed utility methods? I chose to copy in the two utility methods. However, I will follow whatever is the standard. (The two other PRs are not included in the title as they are not actually back-ported). Cast::canUpCast comes from SPARK-24586 ("Upcast should not allow casting from string to other types"). canUpCast is referenced by SPARK-28156 in CheckAnalysis. LogicalPlan::sameOutput comes from SPARK-25691 ("Use semantic equality in AliasViewChild in order to compare attributes"). sameOutput is referenced by SPARK-28156 in CheckAnalysis. |
|
Test build #107365 has finished for PR 25068 at commit
|
|
Retest this please. |
|
The R failure is irrelevant to this PR. |
|
Test build #107377 has finished for PR 25068 at commit
|
|
Hi, @bersprockets .
|
|
@dongjoon-hyun @cloud-fan will do. Thanks. |
|
Gentle ping, @bersprockets . |
|
@dongjoon-hyun Thanks :). I am away but will return to it next week. Edit: Oops, I mean return to it tomorrow. |
|
I did the following:
In the process, I broke a test in SQLViewSuite. I will hunt down the cause tomorrow and hopefully post the changes. |
|
Thank you for informing that, @bersprockets ! |
|
Test build #108138 has finished for PR 25068 at commit
|
|
Could you review this please, @cloud-fan ? |
Back-port of #24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes #25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
thanks, merging to 2.4! |
|
Thank you, @cloud-fan and @bersprockets ! |
|
Thanks for the help everyone. |
Back-port of apache#24960 to branch-2.3. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
@maropu I got pinged on this PR but now can't find the pinging comment. I hope I didn't break anything. |
|
NVM, that's my mistake. |
Back-port of apache#24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of apache#24960 to branch-2.4. The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching. The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan. ```scala val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d") df.write.mode("overwrite").format("orc").saveAsTable("table1") sql("drop view if exists table1_vw") sql("create view table1_vw as select * from table1") val cachedView = sql("select a, b, c, d from table1_vw") cachedView.createOrReplaceTempView("cachedview") cachedView.persist() val queryDf = sql( s"""select leftside.a, leftside.b |from cachedview leftside |join cachedview rightside |on leftside.a = rightside.a """.stripMargin) ``` Query plan before this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc t<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [a#12660] +- *(1) Filter isnotnull(a#12660) +- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc t<a:int> ``` Query plan after this PR: ```scala == Physical Plan == *(2) Project [a#12664, b#12665] +- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight :- *(2) Filter isnotnull(a#12664) : +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)] : +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(a#12692) +- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)] +- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int> ``` Added test. Closes apache#25068 from bersprockets/selfjoin_24. Lead-authored-by: Bruce Robbins <bersprockets@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Back-port of #24960 to branch-2.4.
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in
ResolveReferenceswe do deduplicate for a view to have new output attributes. Then inAliasViewChild, the rule adds extra project under a view. So it breaks cache matching.The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
Query plan before this PR:
Query plan after this PR:
Added test.