[SPARK-34947][SQL] Streaming write to a V2 table should invalidate its associated cache#32039
[SPARK-34947][SQL] Streaming write to a V2 table should invalidate its associated cache#32039sunchao wants to merge 4 commits intoapache:masterfrom
Conversation
|
Test build #136869 has finished for PR 32039 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #136953 has finished for PR 32039 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136962 has finished for PR 32039 at commit
|
| WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil | ||
| case WriteToDataSourceV2(relationOpt, writer, query) => | ||
| val refreshCacheFunc: () => Unit = relationOpt match { | ||
| case Some(r) => refreshCache(r) |
There was a problem hiding this comment.
I'm not sure if refresh cache is the best choice in the context of streaming - perhaps we should invalidate it?
There was a problem hiding this comment.
If you have a concern, why don't we have a config for that? We can have a controllability to invalidate or to refresh.
There was a problem hiding this comment.
I think refreshing the cache per second (assuming the streaming trigger is one second) doesn't make sense. Invalidating seems more reasonable.
There was a problem hiding this comment.
Hmm, I cannot think about a use-case we want to refresh the cache of a table written by streaming.
There was a problem hiding this comment.
Thanks all! Let me switch to invalidating cache. IMO a config isn't necessary since ppl may never want this behavior (it could get very expensive).
|
I think the test failures are not related. cc @HeartSaVioR @cloud-fan and @aokolnychyi |
| override protected def run(): Seq[InternalRow] = { | ||
| writeWithV2(batchWrite) | ||
| val writtenRows = writeWithV2(batchWrite) | ||
| refreshCache() | ||
| writtenRows |
There was a problem hiding this comment.
Instead of refreshing/invalidating the table per trigger, why we don't just invalidate the cache before we start the streaming query that writes the table?
There was a problem hiding this comment.
Yes that should work too and also will require fewer code changes. I went this way to be consistent with other V2 write commands. Also, in future we may introduce DataStreamWriterV2 which could pass write node with UnresolvedRelation to analyzer and be converted to execution plan, and this approach may fit better in that case.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137147 has finished for PR 32039 at commit
|
|
Test build #137151 has finished for PR 32039 at commit
|
| extraOptions: Map[String, String], | ||
| plan: WriteToStream) | ||
| plan: WriteToStream, | ||
| catalogAndIdent: Option[(CatalogPlugin, Identifier)] = None) |
There was a problem hiding this comment.
shall we put this info in WriteToStream? It's very weird to see catalogAndIdent as a parameter of MicroBatchExecution.
There was a problem hiding this comment.
Good point. WriteToStream is a better place for this information.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137228 has finished for PR 32039 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
Populate table catalog and identifier from
DataStreamWritertoWriteToMicroBatchDataSourceso that we can invalidate cache for tables that are updated by a streaming write.This is somewhat related SPARK-27484 and SPARK-34183 (#31700), as ideally we may want to replace
WriteToMicroBatchDataSourceandWriteToDataSourceV2with logical write nodes and feed them to analyzer. That will potentially change the code path involved in this PR.Why are the changes needed?
Currently
WriteToDataSourceV2doesn't have cache invalidation logic, and therefore, when the target table for a micro batch streaming job is cached, the cache entry won't be removed when the table is updated.Does this PR introduce any user-facing change?
Yes now when a DSv2 table which supports streaming write is updated by a streaming job, its cache will also be invalidated.
How was this patch tested?
Added a new UT.