New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression #16493
Conversation
Although the test cases can be improved, the code fix looks good to me. cc @JoshRosen @hvanhovell |
@dilipbiswal Could you post the nested subquery and the plan in the PR description? It can help the other reviewers understand the fix. Thanks! |
spark.catalog.uncacheTable("t1") | ||
spark.catalog.uncacheTable("t2") | ||
spark.catalog.uncacheTable("t3") | ||
spark.catalog.uncacheTable("t4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can call clearCache()
and then no need to uncache each table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override def afterEach(): Unit = {
try {
clearCache()
} finally {
super.afterEach()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this? @dilipbiswal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile sorry.. missed this one .. Will make the change.
Test build #70999 has finished for PR 16493 at commit
|
In the test suite, we can have such a helper function to count private def getNumInMemoryRelations(plan: LogicalPlan): Int = {
var sum = plan.collect { case _: InMemoryRelation => 1 }.sum
plan.transformAllExpressions {
case e: SubqueryExpression =>
sum += getNumInMemoryRelations(e.plan)
e
}
sum
} |
case e: SubqueryExpression => cachedRelations += getCachedPlans(e.plan) | ||
e | ||
} | ||
assert(cachedRelations.flatten.size == 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, this can be simplified to
assert (getNumInMemoryRelations(cachedPlan2) == 4)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thanks... I will make the change
assert( | ||
cachedPlan.collect { | ||
case i: InMemoryRelation => i | ||
}.size == 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, this can be simplified to
assert (getNumInMemoryRelations(cachedPlan) == 3)
} | ||
} | ||
|
||
private def useCachedDataInternal(plan: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After rethinking about it, we do not need to add a new function. We can combine them into a single function, like:
/** Replaces segments of the given logical plan with cached versions where possible. */
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
}
newPlan transformAllExpressions {
case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thank you very much. I have addressed your comments.
assert( | ||
cachedPlan.collect { | ||
case i: InMemoryRelation => i | ||
}.size == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
|
||
test("SPARK-19093 scalar and nested predicate query") { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove these two lines
Test build #71004 has finished for PR 16493 at commit
|
Test build #71005 has finished for PR 16493 at commit
|
retest this please |
Test build #71006 has started for PR 16493 at commit |
test this please |
Test build #71027 has finished for PR 16493 at commit
|
LGTM |
Also cc @rxin @cloud-fan |
LGTM |
Merging to master. Thanks! |
Thank you very much @gatorsmile @hvanhovell |
## What changes were proposed in this pull request? Consider the plans inside subquery expressions while looking up cache manager to make use of cached data. Currently CacheManager.useCachedData does not consider the subquery expressions in the plan. SQL ``` select * from rows where not exists (select * from rows) ``` Before the fix ``` == Optimized Logical Plan == Join LeftAnti :- InMemoryRelation [_1#3775, _2#3776], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> +- Project [_1#3775 AS _1#3775#4001, _2#3776 AS _2#3776#4002] +- Relation[_1#3775,_2#3776] parquet ``` After ``` == Optimized Logical Plan == Join LeftAnti :- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> +- Project [_1#256 AS _1#256#298, _2#257 AS _2#257#299] +- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> ``` Query2 ``` SELECT * FROM t1 WHERE c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1)) ``` Before ``` == Analyzed Logical Plan == c1: int Project [c1#3] +- Filter predicate-subquery#47 [(c1#3 = c1#10)] : +- Project [c1#10] : +- Filter predicate-subquery#46 [(c1#10 = c1#17)] : : +- Project [c1#17] : : +- Filter (c1#17 = 1) : : +- SubqueryAlias t3, `t3` : : +- Project [value#15 AS c1#17] : : +- LocalRelation [value#15] : +- SubqueryAlias t2, `t2` : +- Project [value#8 AS c1#10] : +- LocalRelation [value#8] +- SubqueryAlias t1, `t1` +- Project [value#1 AS c1#3] +- LocalRelation [value#1] == Optimized Logical Plan == Join LeftSemi, (c1#3 = c1#10) :- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 : +- LocalTableScan [c1#3] +- Project [value#8 AS c1#10] +- Join LeftSemi, (value#8 = c1#17) :- LocalRelation [value#8] +- Project [value#15 AS c1#17] +- Filter (value#15 = 1) +- LocalRelation [value#15] ``` After ``` == Analyzed Logical Plan == c1: int Project [c1#3] +- Filter predicate-subquery#47 [(c1#3 = c1#10)] : +- Project [c1#10] : +- Filter predicate-subquery#46 [(c1#10 = c1#17)] : : +- Project [c1#17] : : +- Filter (c1#17 = 1) : : +- SubqueryAlias t3, `t3` : : +- Project [value#15 AS c1#17] : : +- LocalRelation [value#15] : +- SubqueryAlias t2, `t2` : +- Project [value#8 AS c1#10] : +- LocalRelation [value#8] +- SubqueryAlias t1, `t1` +- Project [value#1 AS c1#3] +- LocalRelation [value#1] == Optimized Logical Plan == Join LeftSemi, (c1#3 = c1#10) :- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 : +- LocalTableScan [c1#3] +- Join LeftSemi, (c1#10 = c1#17) :- InMemoryRelation [c1#10], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t2 : +- LocalTableScan [c1#10] +- Filter (c1#17 = 1) +- InMemoryRelation [c1#17], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 +- LocalTableScan [c1#3] ``` ## How was this patch tested? Added new tests in CachedTableSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes apache#16493 from dilipbiswal/SPARK-19093.
## What changes were proposed in this pull request? Consider the plans inside subquery expressions while looking up cache manager to make use of cached data. Currently CacheManager.useCachedData does not consider the subquery expressions in the plan. SQL ``` select * from rows where not exists (select * from rows) ``` Before the fix ``` == Optimized Logical Plan == Join LeftAnti :- InMemoryRelation [_1#3775, _2#3776], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> +- Project [_1#3775 AS _1#3775#4001, _2#3776 AS _2#3776#4002] +- Relation[_1#3775,_2#3776] parquet ``` After ``` == Optimized Logical Plan == Join LeftAnti :- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> +- Project [_1#256 AS _1#256#298, _2#257 AS _2#257#299] +- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string> ``` Query2 ``` SELECT * FROM t1 WHERE c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1)) ``` Before ``` == Analyzed Logical Plan == c1: int Project [c1#3] +- Filter predicate-subquery#47 [(c1#3 = c1#10)] : +- Project [c1#10] : +- Filter predicate-subquery#46 [(c1#10 = c1#17)] : : +- Project [c1#17] : : +- Filter (c1#17 = 1) : : +- SubqueryAlias t3, `t3` : : +- Project [value#15 AS c1#17] : : +- LocalRelation [value#15] : +- SubqueryAlias t2, `t2` : +- Project [value#8 AS c1#10] : +- LocalRelation [value#8] +- SubqueryAlias t1, `t1` +- Project [value#1 AS c1#3] +- LocalRelation [value#1] == Optimized Logical Plan == Join LeftSemi, (c1#3 = c1#10) :- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 : +- LocalTableScan [c1#3] +- Project [value#8 AS c1#10] +- Join LeftSemi, (value#8 = c1#17) :- LocalRelation [value#8] +- Project [value#15 AS c1#17] +- Filter (value#15 = 1) +- LocalRelation [value#15] ``` After ``` == Analyzed Logical Plan == c1: int Project [c1#3] +- Filter predicate-subquery#47 [(c1#3 = c1#10)] : +- Project [c1#10] : +- Filter predicate-subquery#46 [(c1#10 = c1#17)] : : +- Project [c1#17] : : +- Filter (c1#17 = 1) : : +- SubqueryAlias t3, `t3` : : +- Project [value#15 AS c1#17] : : +- LocalRelation [value#15] : +- SubqueryAlias t2, `t2` : +- Project [value#8 AS c1#10] : +- LocalRelation [value#8] +- SubqueryAlias t1, `t1` +- Project [value#1 AS c1#3] +- LocalRelation [value#1] == Optimized Logical Plan == Join LeftSemi, (c1#3 = c1#10) :- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 : +- LocalTableScan [c1#3] +- Join LeftSemi, (c1#10 = c1#17) :- InMemoryRelation [c1#10], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t2 : +- LocalTableScan [c1#10] +- Filter (c1#17 = 1) +- InMemoryRelation [c1#17], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1 +- LocalTableScan [c1#3] ``` ## How was this patch tested? Added new tests in CachedTableSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes apache#16493 from dilipbiswal/SPARK-19093.
…L] Backport Three Cache-related PRs to Spark 2.1 ### What changes were proposed in this pull request? Backport a few cache related PRs: --- [[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression](#16493) Consider the plans inside subquery expressions while looking up cache manager to make use of cached data. Currently CacheManager.useCachedData does not consider the subquery expressions in the plan. --- [[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path](#17064) Catalog.refreshByPath can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, CacheManager.invalidateCachedPath doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. --- [[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table](#17097) When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. The following commands trigger the table uncache: `DropTableCommand`, `TruncateTableCommand`, `AlterTableRenameCommand`, `UncacheTableCommand`, `RefreshTable` and `InsertIntoHiveTable` This PR also includes some refactors: - use java.util.LinkedList to store the cache entries, so that it's safer to remove elements while iterating - rename invalidateCache to recacheByPlan, which is more obvious about what it does. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17319 from gatorsmile/backport-17097.
What changes were proposed in this pull request?
Consider the plans inside subquery expressions while looking up cache manager to make
use of cached data. Currently CacheManager.useCachedData does not consider the
subquery expressions in the plan.
SQL
Before the fix
After
Query2
Before
After
How was this patch tested?
Added new tests in CachedTableSuite.