[SPARK-28156][SQL][BACKPORT-2.3] Self-join should not miss cached view#25066
[SPARK-28156][SQL][BACKPORT-2.3] Self-join should not miss cached view#25066bersprockets wants to merge 1 commit intoapache:branch-2.3from
Conversation
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in `ResolveReferences` we do deduplicate for a view to have new output attributes. Then in `AliasViewChild`, the rule adds extra project under a view. So it breaks cache matching.
The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
```scala
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")
val cachedView = sql("select a, b, c, d from table1_vw")
cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()
val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)
```
Query plan before this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12660], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryF
ileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struc
t<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [a#12660]
+- *(1) Filter isnotnull(a#12660)
+- *(1) FileScan orc default.table1[a#12660] Batched: true, DataFilters: [isnotnull(a#12660)], Format: ORC, Location: InMemoryFileIndex[fil
e:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struc
t<a:int>
```
Query plan after this PR:
```scala
== Physical Plan ==
*(2) Project [a#12664, b#12665]
+- *(2) BroadcastHashJoin [a#12664], [a#12692], Inner, BuildRight
:- *(2) Filter isnotnull(a#12664)
: +- *(2) InMemoryTableScan [a#12664, b#12665], [isnotnull(a#12664)]
: +- InMemoryRelation [a#12664, b#12665, c#12666, d#12667], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Filter isnotnull(a#12692)
+- *(1) InMemoryTableScan [a#12692], [isnotnull(a#12692)]
+- InMemoryRelation [a#12692, b#12693, c#12694, d#12695], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) FileScan orc default.table1[a#12660,b#12661,c#12662,d#12663] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/Users/viirya/repos/spark-1/sql/core/spark-warehouse/org.apache.spark.sql...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int,c:int,d:int>
```
Added test.
Closes apache#24960 from viirya/SPARK-28156.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
There was a problem hiding this comment.
Hi, @bersprockets .
- Do you really want to bypass
branch-2.4and merge this tobranch-2.3directly? - According to SPARK-28156, the issue are reported to
2.3.3and2.4.3. In this case, the backporting should not bypassbranch-2.4.
Did I miss some background? (cc @viirya and @cloud-fan )
In addition, I removed the following from the PR description. You had better clean up those information when you make a PR next time.
Closes #24960 from viirya/SPARK-28156.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
@dongjoon-hyun I left the "closes #24960" so it could be tracked back to the original pull request. I had seen that in other back-ports so I thought it was standard (I have the track back already, but this also gives attribution). I don't plan on bypassing 2.4, but the user that suffered from the issue is a 2.3 user, so I started there first. |
|
@dongjoon-hyun Hi Dongjoon. Sorry, I am not sure what change are you requesting. If you are requesting a back-port to 2.4, that will be after this one (as I mentioned above, I did this one first because the affected user was using 2.3). |
|
Let me be clear.
|
|
Test build #107312 has finished for PR 25066 at commit
|
|
Got it. I will close this one now and re-open after the 2.4 back-port is done. |
|
Thank you~ Yes. It was just a procedure issue. |
Back-port of #24960 to branch-2.3.
The issue is when self-join a cached view, only one side of join uses cached relation. The cause is in
ResolveReferenceswe do deduplicate for a view to have new output attributes. Then inAliasViewChild, the rule adds extra project under a view. So it breaks cache matching.The fix is when dedup, we only dedup a view which has output different to its child plan. Otherwise, we dedup on the view's child plan.
Query plan before this PR:
Query plan after this PR:
Added test.