Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Nov 18, 2025

What changes were proposed in this pull request?

This PR fixes recaching of DSv2 tables.

Why are the changes needed?

These changes are needed to restore correct caching behavior for DSv2 tables if a connector doesn't reuse table instances. Currently, the following use case is broken:

// create and populate table
sql("CREATE TABLE testcat.ns.tbl (id bigint, data string) USING foo")
Seq((1L, "a"), (2L, "b")).toDF("id", "data").write.insertInto("testcat.ns.tbl")

// cache table
val df1 = spark.table("testcat.ns.tbl")
df1.cache()
df1.show() // 1 -> a, 2 -> b

// insert more data, refreshing cache entry
Seq((3L, "c"), (4L, "d")).toDF("id", "data").write.insertInto("testcat.ns.tbl")

// query
val df2 = spark.table("testcat.ns.tbl")
df2.show() // CACHE MISS BEFORE CHANGE!

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing + new tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 18, 2025

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is important. A bunch of tests in this suite worked only because we reused the same table instance everywhere. With this config, there were test failures before this PR.

@aokolnychyi
Copy link
Contributor Author

@cloud-fan @viirya @gengliangwang @dongjoon-hyun @szehon-ho @huaxingao, could you, please, check? Found during testing of the 4.1 release candidate.

@manuzhang
Copy link
Member

@aokolnychyi there is a typo reaching in the commit title.

@aokolnychyi aokolnychyi changed the title [SPARK-54387][SQL] Fix reaching of DSv2 tables [SPARK-54387][SQL] Fix recaching of DSv2 tables Nov 18, 2025
@aokolnychyi
Copy link
Contributor Author

@manuzhang, oops, fixed, thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @aokolnychyi .

Merged to master/4.1 for Apache Spark 4.1.0.

dongjoon-hyun pushed a commit that referenced this pull request Nov 18, 2025
### What changes were proposed in this pull request?

This PR fixes recaching of DSv2 tables.

### Why are the changes needed?

These changes are needed to restore correct caching behavior for DSv2 tables if a connector doesn't reuse table instances. Currently, the following use case is broken:

```
// create and populate table
sql("CREATE TABLE testcat.ns.tbl (id bigint, data string) USING foo")
Seq((1L, "a"), (2L, "b")).toDF("id", "data").write.insertInto("testcat.ns.tbl")

// cache table
val df1 = spark.table("testcat.ns.tbl")
df1.cache()
df1.show() // 1 -> a, 2 -> b

// insert more data, refreshing cache entry
Seq((3L, "c"), (4L, "d")).toDF("id", "data").write.insertInto("testcat.ns.tbl")

// query
val df2 = spark.table("testcat.ns.tbl")
df2.show() // CACHE MISS BEFORE CHANGE!
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing + new tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53109 from aokolnychyi/spark-54387.

Authored-by: Anton Okolnychyi <aokolnychyi@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit dce992b)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@aokolnychyi
Copy link
Contributor Author

Thank you, @dongjoon-hyun!

Comment on lines +409 to +414
val newCachedPlan = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
serializer.convertToColumnarPlanIfPossible(qe.executedPlan)
} else {
cacheBuilder.copy(cachedPlan = qe.executedPlan)
qe.executedPlan
}
val newBuilder = cacheBuilder.copy(cachedPlan = newCachedPlan, logicalPlan = qe.logical)
Copy link
Member

@viirya viirya Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to refreshing cache of DSv2 tables?

This looks no difference to previous newCachedPlan but only logicalPlan = qe.logical? Is it related to the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is formatting. The real change is logicalPlan = qe.logical.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good with one question only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants