From 7bb3a438eff8c8ab9567a50ca59da2e179939e76 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 2 Feb 2017 23:54:08 +0800 Subject: [PATCH] [SPARK-19438] Both reading and updating executorDataMap should be guarded by CoarseGrainedSchedulerBackend.this.synchronized when handle RegisterExecutor. --- .../CoarseGrainedSchedulerBackend.scala | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 31575c0ca0d15..038e8e9572635 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,40 +145,47 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => - if (executorDataMap.contains(executorId)) { - executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) - context.reply(true) - } else { - // If the executor's rpc env is not listening for incoming connections, `hostPort` - // will be null, and the client connection should be used to contact the executor. - val executorAddress = if (executorRef.address != null) { - executorRef.address - } else { - context.senderAddress - } - logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") - addressToExecutorId(executorAddress) = executorId - totalCoreCount.addAndGet(cores) - totalRegisteredExecutors.addAndGet(1) - val data = new ExecutorData(executorRef, executorRef.address, hostname, - cores, cores, logUrls) - // This must be synchronized because variables mutated - // in this block are read when requesting executors - CoarseGrainedSchedulerBackend.this.synchronized { - executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } - if (numPendingExecutors > 0) { - numPendingExecutors -= 1 - logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") + var newRegisteredExecutor = false + var executorData: ExecutorData = null + CoarseGrainedSchedulerBackend.this.synchronized { + if (executorDataMap.contains(executorId)) { + executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) + context.reply(true) + } else { + // If the executor's rpc env is not listening for incoming connections, `hostPort` + // will be null, and the client connection should be used to contact the executor. + val executorAddress = if (executorRef.address != null) { + executorRef.address + } else { + context.senderAddress + } + logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") + addressToExecutorId(executorAddress) = executorId + totalCoreCount.addAndGet(cores) + totalRegisteredExecutors.addAndGet(1) + executorData = new ExecutorData(executorRef, executorRef.address, hostname, + cores, cores, logUrls) + // This must be synchronized because variables mutated + // in this block are read when requesting executors + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.put(executorId, executorData) + if (currentExecutorIdCounter < executorId.toInt) { + currentExecutorIdCounter = executorId.toInt + } + if (numPendingExecutors > 0) { + numPendingExecutors -= 1 + logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") + } } + newRegisteredExecutor = true } + } + if(newRegisteredExecutor) { executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) listenerBus.post( - SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) + SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, executorData)) makeOffers() }