New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33202][CORE] Fix BlockManagerDecommissioner to return the correct migration status #30116
Conversation
…ect migration status
cc @holdenk |
@@ -268,7 +268,7 @@ private[storage] class BlockManagerDecommissioner( | |||
stoppedShuffle = true | |||
} | |||
// If we found any new shuffles to migrate or otherwise have not migrated everything. | |||
newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() | |||
newShufflesToMigrate.nonEmpty || migratingShuffles.size > numMigratedShuffles.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix.
cc @HyukjinKwon since you are interested in this area, too. |
Kubernetes integration test starting |
Thanks for cc'ing me. |
Kubernetes integration test status success |
Test build #130071 has finished for PR 30116 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130077 has finished for PR 30116 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
cc: @Ngone51 |
LGTM pending integration testing. |
@@ -183,7 +183,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { | |||
val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm) | |||
bmDecomManager.migratingShuffles += ShuffleBlockInfo(10, 10) | |||
|
|||
validateDecommissionTimestampsOnManager(bmDecomManager) | |||
validateDecommissionTimestampsOnManager(bmDecomManager, fail = false, assertDone = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also increment migratedShuffles and then we would expect it to finish.
Test build #130098 has finished for PR 30116 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130102 has finished for PR 30116 at commit
|
Since this is a bug fix, if no one has any concerns I'll merge this tonight. |
Thank you for review and approval, @holdenk . I'll merge this with your |
Nice, +1. Seems like properly merged to master. |
Good catch! late LGTM. |
…ect migration status This PR changes `<` into `>` in the following to fix data loss during storage migrations. ```scala // If we found any new shuffles to migrate or otherwise have not migrated everything. - newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() + newShufflesToMigrate.nonEmpty || migratingShuffles.size > numMigratedShuffles.get() ``` `refreshOffloadingShuffleBlocks` should return `true` when the migration is still on-going. Since `migratingShuffles` is defined like the following, `migratingShuffles.size > numMigratedShuffles.get()` means the migration is not finished. ```scala // Shuffles which are either in queue for migrations or migrated protected[storage] val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() ``` No. Pass the CI with the updated test cases. Closes apache#30116 from dongjoon-hyun/SPARK-33202. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This PR changes
<
into>
in the following to fix data loss during storage migrations.Why are the changes needed?
refreshOffloadingShuffleBlocks
should returntrue
when the migration is still on-going.Since
migratingShuffles
is defined like the following,migratingShuffles.size > numMigratedShuffles.get()
means the migration is not finished.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pass the CI with the updated test cases.