-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-55097][SQL] Fix re-adding cached artifacts drops blocks silently issue #53852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-55097][SQL] Fix re-adding cached artifacts drops blocks silently issue #53852
Conversation
JIRA Issue Information=== Bug SPARK-55097 === This comment was automatically generated by GitHub Actions |
279991e to
27e1369
Compare
27e1369 to
ee93b21
Compare
| tellMaster = false) | ||
| updater.save() | ||
| val oldBlock = hashToCachedIdMap.put(blockId.hash, new RefCountedCacheId(blockId)) | ||
| if (oldBlock != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldBlock is the same as existingBlock right? It might be better to use existingBlock...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, fixed.
| oldBlock.release(blockManager) | ||
| } | ||
| } else { | ||
| logDebug(s"Cache artifact with hash $hash already exists in this session, skipping.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elevate this to Warning. It means that the client is sort of misbehaving...
|
|
||
| def isBlockRegistered(id: CacheId): Boolean = { | ||
| sparkContext.env.blockManager.getStatus(id).isDefined | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT messed up spacing.
| super.afterEach() | ||
| } | ||
|
|
||
| def isBlockRegistered(id: CacheId): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private?
| sparkContext.env.blockManager.getStatus(id).isDefined | ||
| } | ||
|
|
||
| def addCachedArtifact(session: SparkSession, name: String, data: String): CacheId = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private.
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ly issue ### What changes were proposed in this pull request? After the introduction of the ref-counting logic for cloning sessions #52651, whenever an identical cached artifact (same session, same hash) is re-added, it incorrectly leds to deletion of the existing block. Verified this bug locally using: ``` test("re-adding the same cache artifact should not remove the block") { val blockManager = spark.sparkContext.env.blockManager val remotePath = Paths.get("cache/duplicate_hash") val blockId = CacheId(spark.sessionUUID, "duplicate_hash") try { // First addition withTempPath { path => Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8)) artifactManager.addArtifact(remotePath, path.toPath, None) } assert(blockManager.getLocalBytes(blockId).isDefined) blockManager.releaseLock(blockId) // Second addition with same hash - block should still exist withTempPath { path => Files.write(path.toPath, "test".getBytes(StandardCharsets.UTF_8)) artifactManager.addArtifact(remotePath, path.toPath, None) } assert(blockManager.getLocalBytes(blockId).isDefined, "Block should still exist after re-adding the same cache artifact") } finally { blockManager.releaseLock(blockId) blockManager.removeCache(spark.sessionUUID) } } ``` which fails `assert(blockManager.getLocalBytes(blockId).isDefined` check after the second addition with the same hash. Proposed solution: 1. Add early exit check: if `existingBlock.id == blockId`, skip the addition since the block already exists. 2. The fix preserves the intended replacement behavior for cloned sessions (different session UUID, same hash) ### Why are the changes needed? When the same cached artifact is added twice to the same session: - BlockManager.save() detects the block exists and returns without re-adding - hashToCachedIdMap.put() returns the existing RefCountedCacheId - release() decrements ref count to 0 and deletes the block ### Does this PR introduce _any_ user-facing change? No. This is an internal bug fix. Previously, duplicate artifact additions could silently delete cached data causing runtime errors. ### How was this patch tested? Added unit test `cache artifact deduplication and replacement across sessions` in ArtifactManagerSuite that validates: - Duplicate addition in same session is skipped (block survives) - Cloned session can replace inherited artifacts (different CacheId) - Reference counting works correctly across session cleanup ### Was this patch authored or co-authored using generative AI tooling? Co-authored using cursor. Closes #53852 from pranavdev022/fix-artifacts-duplicate-add-issue. Authored-by: pranavdev022 <pranavdev022@gmail.com> Signed-off-by: Herman van Hövell <herman@databricks.com> (cherry picked from commit dfea036) Signed-off-by: Herman van Hövell <herman@databricks.com>
|
Merged to master/4.1 |
What changes were proposed in this pull request?
After the introduction of the ref-counting logic for cloning sessions #52651, whenever an identical cached artifact (same session, same hash) is re-added, it incorrectly leds to deletion of the existing block.
Verified this bug locally using:
which fails
assert(blockManager.getLocalBytes(blockId).isDefinedcheck after the second addition with the same hash.Proposed solution:
existingBlock.id == blockId, skip the addition since the block already exists.Why are the changes needed?
When the same cached artifact is added twice to the same session:
Does this PR introduce any user-facing change?
No. This is an internal bug fix. Previously, duplicate artifact additions could silently delete cached data causing runtime errors.
How was this patch tested?
Added unit test
cache artifact deduplication and replacement across sessionsin ArtifactManagerSuite that validates:Was this patch authored or co-authored using generative AI tooling?
Co-authored using cursor.