From 7b04d8f1527868770f6bcab96537320596190933 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Mon, 13 Jul 2015 12:50:42 +0530 Subject: [PATCH] Added synchronization in removeApplication --- .../apache/spark/deploy/master/Master.scala | 66 ++++++++++--------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 48070768f6edb..f6909be93d67d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -727,40 +727,44 @@ private[master] class Master( def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) { if (apps.contains(app)) { - logInfo("Removing app " + app.id) - apps -= app - idToApp -= app.id - endpointToApp -= app.driver - addressToApp -= app.driver.address - if (completedApps.size >= RETAINED_APPLICATIONS) { - val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) - completedApps.take(toRemove).foreach( a => { - appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) } - applicationMetricsSystem.removeSource(a.appSource) - }) - completedApps.trimStart(toRemove) - } - completedApps += app // Remember it in our history - waitingApps -= app + synchronized{ + if (apps.contains(app)) { + logInfo("Removing app " + app.id) + apps -= app + idToApp -= app.id + endpointToApp -= app.driver + addressToApp -= app.driver.address + if (completedApps.size >= RETAINED_APPLICATIONS) { + val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) + completedApps.take(toRemove).foreach(a => { + appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) } + applicationMetricsSystem.removeSource(a.appSource) + }) + completedApps.trimStart(toRemove) + } + completedApps += app // Remember it in our history + waitingApps -= app - // If application events are logged, use them to rebuild the UI - rebuildSparkUI(app) + // If application events are logged, use them to rebuild the UI + rebuildSparkUI(app) - for (exec <- app.executors.values) { - exec.worker.removeExecutor(exec) - exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id)) - exec.state = ExecutorState.KILLED - } - app.markFinished(state) - if (state != ApplicationState.FINISHED) { - app.driver.send(ApplicationRemoved(state.toString)) - } - persistenceEngine.removeApplication(app) - schedule() + for (exec <- app.executors.values) { + exec.worker.removeExecutor(exec) + exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id)) + exec.state = ExecutorState.KILLED + } + app.markFinished(state) + if (state != ApplicationState.FINISHED) { + app.driver.send(ApplicationRemoved(state.toString)) + } + persistenceEngine.removeApplication(app) + schedule() - // Tell all workers that the application has finished, so they can clean up any app state. - workers.foreach { w => - w.endpoint.send(ApplicationFinished(app.id)) + // Tell all workers that the application has finished, so they can clean up any app state. + workers.foreach { w => + w.endpoint.send(ApplicationFinished(app.id)) + } + } } } }