Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,15 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(msg)
try {
decommissioned = true
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
(env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
Comment on lines +299 to +301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the above lines does STORAGE_DECOMMISSION_ENABLED has any role now?
So what about removing this config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That‘s my concern too. But removal involves the API change. I intentionally try to avoid that in this refactor PR. But I still think it's worth a try in a separate PR, especially since the 3.1 hasn't released yet. cc @holdenk what's your opinion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kindly ping @holdenk

if (migrationEnabled) {
env.blockManager.decommissionBlockManager()
} else if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By removing the STORAGE_DECOMMISSION_ENABLED config this part would simply disappear.

logError(s"Storage decommissioning attempted but neither " +
s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key} or " +
s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key} is enabled ")
}
if (executor != null) {
executor.decommission()
Expand All @@ -324,7 +331,7 @@ private[spark] class CoarseGrainedExecutorBackend(
while (true) {
logInfo("Checking to see if we can shutdown.")
if (executor == null || executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
if (migrationEnabled) {
logInfo("No running tasks, checking migrations")
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
Expand All @@ -340,7 +347,7 @@ private[spark] class CoarseGrainedExecutorBackend(
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
}
} else {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ import org.apache.spark.annotation.Experimental
* type safe way.
*/
@Experimental
case class ShuffleBlockInfo(shuffleId: Int, mapId: Long)
case class ShuffleBlockInfo(shuffleId: Int, mapId: Long) {
override def toString: String = s"migrate_shuffle_${shuffleId}_$mapId"
}
Loading