From 8067a1ec64858a5baa004c42d57e993073fb52c7 Mon Sep 17 00:00:00 2001 From: huangsheng Date: Tue, 26 Sep 2023 19:05:28 +0800 Subject: [PATCH] AL-8879 Cheery-pick for release ke-4.6.16.0 GA(#676) (#681) AL-8879 Fixed bug where the new executor gets expired tokens when the new executor registration time overlaps with the driver delegation token update time --- .../CoarseGrainedExecutorBackend.scala | 5 ++- .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../CoarseGrainedSchedulerBackend.scala | 45 +++++++++++++++++-- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a94e63656e1a1..b07e0035c6ca4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -449,7 +449,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId)) + logInfo(s"executor-${arguments.executorId} ask driver for retrieve spark app config") + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId, + arguments.executorId)) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId)) fetcher.shutdown() @@ -477,6 +479,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { env.blockManager.blockStoreClient.setAppAttemptId(attemptId) ) val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile) + logInfo(s"Executor-${arguments.executorId} finished construct rpcendpoint backend") env.rpcEnv.setupEndpoint("Executor", backend) arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 61ee865c0fcb4..3950eeafcd99c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -29,7 +29,8 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { - case class RetrieveSparkAppConfig(resourceProfileId: Int) extends CoarseGrainedClusterMessage + case class RetrieveSparkAppConfig(resourceProfileId: Int, executorId: String) + extends CoarseGrainedClusterMessage case class SparkAppConfig( sparkProperties: Seq[(String, String)], diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 13a7183a29dd6..4239b94358fcc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler.cluster -import java.util.concurrent.{ScheduledExecutorService, TimeUnit} -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{HashMap, HashSet} @@ -110,6 +110,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Current set of delegation tokens to send to executors. private val delegationTokens = new AtomicReference[Array[Byte]]() + // Last time of send up to date delegation tokens to executors. + private val driverLastUpdateTokensTimestamp = new AtomicLong(0L) + // DriverEndPoint is not thread-safety + private val executorLastGetTokenTimestampMap = new ConcurrentHashMap[String, Long] + // The token manager used to create security tokens. private var delegationTokenManager: Option[HadoopDelegationTokenManager] = None @@ -265,12 +270,39 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis()) // This must be synchronized because variables mutated // in this block are read when requesting executors + var registeredTime = -1L CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) + registeredTime = System.currentTimeMillis() if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } } + // Check whether the token held by the newly registered executor has expired + if (UserGroupInformation.isSecurityEnabled) { + logInfo(s"Check whether the token held by the newly registered " + + s"executor:$executorId has expired, " + + s"the timeline is: \n" + + s"Time of executor first get token: ${ + executorLastGetTokenTimestampMap + .getOrDefault(executorId, 0) + }\n" + + s"Time of driver last update token: ${driverLastUpdateTokensTimestamp.get()}\n" + + s"Time of new executor registered : $registeredTime\n") + val shouldForceUpdateToken = + executorLastGetTokenTimestampMap.getOrDefault(executorId, 0) < + driverLastUpdateTokensTimestamp.get() && + driverLastUpdateTokensTimestamp.get() < registeredTime + if (shouldForceUpdateToken) { + logInfo(s"The delegationToken obtained by executor-$executorId " + + s"has expired and needs to be forcibly updated") + data.executorEndpoint.send(UpdateDelegationTokens(delegationTokens.get())) + } else { + logInfo(s"executor-$executorId's token is already up to date, " + + s"doesn't need to update the token") + } + } + listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) // Note: some tests expect the reply to come after we put the executor in the map @@ -302,13 +334,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp adjustTargetNumExecutors = false, triggeredByExecutor = true)) - case RetrieveSparkAppConfig(resourceProfileId) => + case RetrieveSparkAppConfig(resourceProfileId, executorId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), Option(delegationTokens.get()), rp) + executorLastGetTokenTimestampMap.put(executorId, System.currentTimeMillis()) context.reply(reply) case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId)) @@ -429,6 +462,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp reason } } + executorLastGetTokenTimestampMap.remove(executorId) totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) scheduler.executorLost(executorId, lossReason) @@ -557,6 +591,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { + logInfo("Driver Starting create delegationTokenManager") delegationTokenManager = createTokenManager() delegationTokenManager.foreach { dtm => val ugi = UserGroupInformation.getCurrentUser() @@ -945,8 +980,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp SparkHadoopUtil.get.addDelegationTokens(tokens, conf) delegationTokens.set(tokens) executorDataMap.values.foreach { ed => + logInfo(s"Driver starting send updateDelegationTokens msg to ${ed.executorAddress} ") ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) } + logInfo(s"Driver finished send new token to ${executorDataMap.size} executors " + + s"at ${System.currentTimeMillis()}") + driverLastUpdateTokensTimestamp.set(System.currentTimeMillis()) } protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()