-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively #19611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few initial style comments
| var lastAvailableVersion = version | ||
| var lastAvailableMap: Option[MapType] = None | ||
| while (lastAvailableMap.isEmpty) { | ||
| lastAvailableVersion = lastAvailableVersion - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -= 1?
| // Shortcut if the map for this version is already there to avoid a redundant put. | ||
| val currentVersionMap = | ||
| synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) | ||
| if (currentVersionMap.isDefined) return currentVersionMap.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd unroll this onto two lines with braces for consistency
|
|
||
| // Load all the deltas from the version after the last available one up to the target version. | ||
| // The last available version is the one with a full snapshot, so it doesn't need deltas. | ||
| var resultMap = lastAvailableMap.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val?
| } | ||
|
|
||
| loadedMaps.put(version, resultMap) | ||
| return resultMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need return
|
One issue I want to explicitly bring up: this new unit test takes very long, almost 2 minutes on my computer. Creating 10k files isn't going to be super fast no matter what we do, but is there something that can mitigate the problem? Maybe it'd be better in a different suite or something; I don't know what the typical practice is. |
|
I think we can remove the unit test. It's obviously that |
|
Test build #83225 has started for PR 19611 at commit |
|
Test build #83217 has finished for PR 19611 at commit
|
|
Test build #83220 has finished for PR 19611 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Left some comments
| if (currentVersionMap.isDefined) { | ||
| return currentVersionMap.get | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra empty line
|
|
||
| // Load all the deltas from the version after the last available one up to the target version. | ||
| // The last available version is the one with a full snapshot, so it doesn't need deltas. | ||
| val resultMap = lastAvailableMap.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create a new map here to not change the previous map in case the maintenance task is using it.
|
|
||
| // Shortcut if the map for this version is already there to avoid a redundant put. | ||
| val currentVersionMap = | ||
| synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should put the map read from snapshot file into loadedMaps.
| updateFromDeltaFile(deltaVersion, resultMap) | ||
| } | ||
|
|
||
| loadedMaps.put(version, resultMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loadedMaps.put(version, resultMap) -> synchronized { loadedMaps.put(version, resultMap) }
This is a different issue but since you are touching this, it's better to fix it as well.
|
LGTM pending tests |
|
Test build #83229 has finished for PR 19611 at commit
|
|
Thanks! Merging to master. |
|
Is there any chance of this change getting merged into 2.2.1? |
|
@BriggsW This is pretty hard to hit. I could not make it happen in a real query. So I think it's not worth to backport it. |
What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment.
How was this patch tested?
existing unit tests for functional equivalence, new unit test to check for stack overflow