-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54022][SQL] Make DSv2 table resolution aware of cached tables #52764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| newRelation.copyTagsFrom(multi) | ||
| newRelation | ||
| } | ||
| val r = u.getTagValue(LogicalPlan.PLAN_ID_TAG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a separate PR that refactors this rule a bit, I'll rebase before merging.
There is a bit of duplicated logic here now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the PR for refactoring: #52781
| } | ||
| CacheManager.logCacheOperation( | ||
| log"Relation cache hit for table ${MDC(TABLE_NAME, nameWithTimeTravel)}") | ||
| Some(cachedRelation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm we just return the first match? Shall we use the scan with the latest table version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we shouldn't have multiple matching relations after this change, but cachedData is IndexedSeq to which we always prepend entries (so newer entries are at the beginning of the sequence). We don't know which version of the table is newer cause they are strings. In Iceberg, for instance, they are random UUIDs. That said, this piece should always take the newest matching entry but we expect to only have one.
58fd24d to
17c2c55
Compare
| } | ||
| } | ||
|
|
||
| private[sql] def lookupCachedTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make it more straightforward: lookupCachedTableByName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi This is my last comment for this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang, I tested the rename locally, but not sure it would make the code any clearer. In fact, it only creates inconsistencies with other methods that accept name in args but don't have xxxByName suffix.
I feel lookupCachedTable(name, resolver) is already pretty clear. What do you think?
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
17c2c55 to
4e9df65
Compare
|
@gengliangwang @cloud-fan, I made some updates to this PR. Please, take another look. |
| } | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
|
|
||
| private[sql] trait RelationCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be any relation cache in the future.
| val tableName = V2TableUtil.toQualifiedName(catalog, ident) | ||
| logDebug(s"Refreshing table metadata for $tableName") | ||
| catalog.loadTable(ident) | ||
| val cachedTable = sharedRelationCache.lookup(nameParts, conf.resolver).collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
| logDebug(s"Refreshing table metadata for $tableName") | ||
| catalog.loadTable(ident) | ||
| val cachedTable = sharedRelationCache.lookup(nameParts, conf.resolver).collect { | ||
| case cached: DataSourceV2Relation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| } | ||
|
|
||
| test("SPARK-54022: caching table via Dataset API should pin table state") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mention the behavior change before / after this PR in the PR description:
- caching table via Dataset API
- caching a query via Dataset API
- caching table via CACHE TABLE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = { | ||
| // RTAS may drop and recreate table before query execution, breaking self-references | ||
| // refresh and pin versions here to read from original table versions instead of | ||
| // newly created empty table that is meant to serve as target for append/overwrite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we can remove this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We simply moved this to exec node as I no longer have access to SparkSession in catalyst. We can't do this refresh without checking cached relations as it may potentially hit the metastore and move the version inconsistently.
4e9df65 to
939f877
Compare
| // 2. Writing to the new table fails, | ||
| // 3. The table returned by catalog.createTable doesn't support writing. | ||
| // | ||
| // RTAS must refresh and pin versions in query to read from original table versions instead of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, this is the new place for this RTAS refresh.
gengliangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the proposal.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you resolve the conflict, @aokolnychyi ?
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been monitoring this PR for last month as a kind of bug fix as described in the PR description (and JIRA). Thank you, @aokolnychyi , @cloud-fan , @gengliangwang .
Merged to master/4.1 for Apache Spark 4.1.0.
### What changes were proposed in this pull request?
This PR makes DSv2 table resolution aware of cached tables via CACHE TABLE t or `spark.table("t").cache()` commands.
### Why are the changes needed?
These changes are needed to avoid silent cache misses for DSv2 tables. Cache lookups depend on DSv2 Table instance equality. If each query is allowed to load a new Table instance from the metastore, this would mean connectors can pick up external changes, leading to unexpected cache misses. This contradicts the behavior we had for built-in Tables and some DSv1 connectors such as Delta. Historically, the expected behavior of CACHE TABLE t and `spark.table("t").cache()` is to cache the table state.
### Does this PR introduce _any_ user-facing change?
Yes. The PR fixes the the resolution for DSv2 so that CACHE TABLE t behaves correctly and reliably.
- caching table via Dataset API will now pin table state
- caching table via CACHE TABLE will now pin table state
- caching a query via Dataset API will continue to simply cache the query plan as before
### How was this patch tested?
This PR comes with tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52764 from aokolnychyi/spark-54022.
Lead-authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 92c948f)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR makes DSv2 table resolution aware of cached tables via CACHE TABLE t or
spark.table("t").cache()commands.Why are the changes needed?
These changes are needed to avoid silent cache misses for DSv2 tables. Cache lookups depend on DSv2 Table instance equality. If each query is allowed to load a new Table instance from the metastore, this would mean connectors can pick up external changes, leading to unexpected cache misses. This contradicts the behavior we had for built-in Tables and some DSv1 connectors such as Delta. Historically, the expected behavior of CACHE TABLE t and
spark.table("t").cache()is to cache the table state.Does this PR introduce any user-facing change?
Yes. The PR fixes the the resolution for DSv2 so that CACHE TABLE t behaves correctly and reliably.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.