-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53924] Reload DSv2 tables in views created using plans on each view access #52876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f002dd1 to
ccd330c
Compare
| extends LeafNode with MultiInstanceRelation with NamedRelation { | ||
|
|
||
| override def name: String = { | ||
| s"${catalog.name()}.${identifier.quoted}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's follow DataSourceV2RelationBase or create a base trait/util function
override def name: String = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
(catalog, identifier) match {
case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
case _ => table.name()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
|
||
| object TableReference { | ||
|
|
||
| case class TableInfo(columns: Seq[Column]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just use StructType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually need Columns here to compare original metadata like types (e.g. char/varchar).
ccd330c to
ac9e49e
Compare
706ade0 to
a9251b6
Compare
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
|
|
||
| case class TableReference private ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add code comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since it is DSV2 only, shall we rename as DSV2TableReference?
| test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp views with plans") { | ||
| val t = "testcat.tbl" | ||
| withTable(t, "v") { | ||
| withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also test when the config is false
| test("SPARK-53924: uncache DSv2 table using SQL uncaches SQL temp views with plans") { | ||
| val t = "testcat.tbl" | ||
| withTable(t, "v") { | ||
| withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also test when the config is false
| private def adaptCachedRelation(cached: LogicalPlan, ref: TableReference): LogicalPlan = { | ||
| cached transform { | ||
| case r: DataSourceV2Relation if matchesReference(r, ref) => | ||
| r.copy(output = ref.output, options = ref.options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why this is needed? When resolveReference is call, all the cached relation will be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I got the question. This code exists as the cached output attributes may be different from the reference. This method replaces TableReference (which is resolved and has its output) with a relation from the cache but makes it use output and options from TableReference.
Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: do we need to call TableReferenceUtils.validateLoadedTable if it is cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that's a good call. Let me add it.
| * For instance, temporary views with fully resolved logical plans don't allow schema changes | ||
| * in underlying tables. | ||
| */ | ||
| case class V2TableReference private( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add private[sql] to make it internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought everything in catalyst is considered internal, but let me update.
| } | ||
| } | ||
|
|
||
| object V2TableReference { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add private[sql] to make it internal
| } | ||
| } | ||
|
|
||
| object V2TableReferenceUtils extends SQLConfHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add private[sql] to make it internal
| }, | ||
| "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" : { | ||
| "message" : [ | ||
| "View <viewName> plan references table <tableName> whose <colType> columns changed since the view plan was initially captured.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Maybe, <colType> columns -> <colType> column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest of the error message says "column changes" in plural, so I think using columns here should be OK.
f5c8a68 to
e8a9d1a
Compare
|
@dongjoon-hyun @gengliangwang, could you folks help me merge this into master/4.1? All tests are passing. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM again. Thank you, @aokolnychyi .
… view access ### What changes were proposed in this pull request? This PR makes Spark reload DSv2 tables in views created using plans on each view access. ### Why are the changes needed? The current problem is that the view definition in the session catalog captures the analyzed plan that references `Table` (that is supposed to pin the version). If a connector doesn’t have an internal cache and produces a new `Table` object on each load, the table referenced in the view will become orphan and there will be no way to refresh it unless that `Table` instance auto refreshes on each scan (super dangerous). ### Does this PR introduce _any_ user-facing change? Yes, but it restores the correct behavior without requiring hacks in connectors. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52876 from aokolnychyi/spark-53924. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 407e79c) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Merged to master/4.1 for Apache Spark 4.1.0. |
|
Thanks, @dongjoon-hyun @gengliangwang! |
…ng schema changes ### What changes were proposed in this pull request? Follow-up of #52876, add tests for cached temp view detecting schema changes ### Why are the changes needed? There is no test coverage after comment #52876 (comment) is addressed. This PR is to add a test case for it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test case ### Was this patch authored or co-authored using generative AI tooling? No Closes #53103 from gengliangwang/SPARK-53924-test. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
…ng schema changes ### What changes were proposed in this pull request? Follow-up of #52876, add tests for cached temp view detecting schema changes ### Why are the changes needed? There is no test coverage after comment #52876 (comment) is addressed. This PR is to add a test case for it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test case ### Was this patch authored or co-authored using generative AI tooling? No Closes #53103 from gengliangwang/SPARK-53924-test. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit fd683ce) Signed-off-by: Gengliang Wang <gengliang@apache.org>
|
@aokolnychyi I'm testing Spark 4.1 support for Iceberg in apache/iceberg#14155. It looks this PR has broken this Iceberg test with following error. Can you help check? |
|
@manuzhang, ack, let me check. |
### What changes were proposed in this pull request? Resolve `V2TableReference` for table in `InsertIntoStatement`. ### Why are the changes needed? #52876 brought in `V2TableReference` which broke relation resolution for insert into temp view on DSv2 table. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53196 from manuzhang/FIX-SPARK-54491. Authored-by: manuzhang <owenzhang1990@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Resolve `V2TableReference` for table in `InsertIntoStatement`. ### Why are the changes needed? #52876 brought in `V2TableReference` which broke relation resolution for insert into temp view on DSv2 table. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53196 from manuzhang/FIX-SPARK-54491. Authored-by: manuzhang <owenzhang1990@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 3f5a2b9) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… view access ### What changes were proposed in this pull request? This PR makes Spark reload DSv2 tables in views created using plans on each view access. ### Why are the changes needed? The current problem is that the view definition in the session catalog captures the analyzed plan that references `Table` (that is supposed to pin the version). If a connector doesn’t have an internal cache and produces a new `Table` object on each load, the table referenced in the view will become orphan and there will be no way to refresh it unless that `Table` instance auto refreshes on each scan (super dangerous). ### Does this PR introduce _any_ user-facing change? Yes, but it restores the correct behavior without requiring hacks in connectors. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52876 from aokolnychyi/spark-53924. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ng schema changes ### What changes were proposed in this pull request? Follow-up of apache#52876, add tests for cached temp view detecting schema changes ### Why are the changes needed? There is no test coverage after comment apache#52876 (comment) is addressed. This PR is to add a test case for it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test case ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53103 from gengliangwang/SPARK-53924-test. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? Resolve `V2TableReference` for table in `InsertIntoStatement`. ### Why are the changes needed? apache#52876 brought in `V2TableReference` which broke relation resolution for insert into temp view on DSv2 table. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53196 from manuzhang/FIX-SPARK-54491. Authored-by: manuzhang <owenzhang1990@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR makes Spark reload DSv2 tables in views created using plans on each view access.
Why are the changes needed?
The current problem is that the view definition in the session catalog captures the analyzed plan that references
Table(that is supposed to pin the version). If a connector doesn’t have an internal cache and produces a newTableobject on each load, the table referenced in the view will become orphan and there will be no way to refresh it unless thatTableinstance auto refreshes on each scan (super dangerous).Does this PR introduce any user-facing change?
Yes, but it restores the correct behavior without requiring hacks in connectors.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.