Skip to content

Commit

Permalink
AL-8879 Cheery-pick for release ke-4.6.16.0 GA(apache#676) (apache#681)
Browse files Browse the repository at this point in the history
AL-8879 Fixed bug where the new executor gets expired tokens when the new executor registration time overlaps with the driver delegation token update time
  • Loading branch information
Mrhs121 authored and frearb committed Oct 10, 2023
1 parent ea955d4 commit 8067a1e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
logInfo(s"executor-${arguments.executorId} ask driver for retrieve spark app config")
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId,
arguments.executorId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()

Expand Down Expand Up @@ -477,6 +479,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
)
val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
logInfo(s"Executor-${arguments.executorId} finished construct rpcendpoint backend")
env.rpcEnv.setupEndpoint("Executor", backend)
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case class RetrieveSparkAppConfig(resourceProfileId: Int) extends CoarseGrainedClusterMessage
case class RetrieveSparkAppConfig(resourceProfileId: Int, executorId: String)
extends CoarseGrainedClusterMessage

case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{HashMap, HashSet}
Expand Down Expand Up @@ -110,6 +110,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Current set of delegation tokens to send to executors.
private val delegationTokens = new AtomicReference[Array[Byte]]()

// Last time of send up to date delegation tokens to executors.
private val driverLastUpdateTokensTimestamp = new AtomicLong(0L)
// DriverEndPoint is not thread-safety
private val executorLastGetTokenTimestampMap = new ConcurrentHashMap[String, Long]

// The token manager used to create security tokens.
private var delegationTokenManager: Option[HadoopDelegationTokenManager] = None

Expand Down Expand Up @@ -265,12 +270,39 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
resourcesInfo, resourceProfileId, registrationTs = System.currentTimeMillis())
// This must be synchronized because variables mutated
// in this block are read when requesting executors
var registeredTime = -1L
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
registeredTime = System.currentTimeMillis()
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
}
// Check whether the token held by the newly registered executor has expired
if (UserGroupInformation.isSecurityEnabled) {
logInfo(s"Check whether the token held by the newly registered " +
s"executor:$executorId has expired, " +
s"the timeline is: \n" +
s"Time of executor first get token: ${
executorLastGetTokenTimestampMap
.getOrDefault(executorId, 0)
}\n" +
s"Time of driver last update token: ${driverLastUpdateTokensTimestamp.get()}\n" +
s"Time of new executor registered : $registeredTime\n")
val shouldForceUpdateToken =
executorLastGetTokenTimestampMap.getOrDefault(executorId, 0) <
driverLastUpdateTokensTimestamp.get() &&
driverLastUpdateTokensTimestamp.get() < registeredTime
if (shouldForceUpdateToken) {
logInfo(s"The delegationToken obtained by executor-$executorId " +
s"has expired and needs to be forcibly updated")
data.executorEndpoint.send(UpdateDelegationTokens(delegationTokens.get()))
} else {
logInfo(s"executor-$executorId's token is already up to date, " +
s"doesn't need to update the token")
}
}

listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
// Note: some tests expect the reply to come after we put the executor in the map
Expand Down Expand Up @@ -302,13 +334,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
adjustTargetNumExecutors = false,
triggeredByExecutor = true))

case RetrieveSparkAppConfig(resourceProfileId) =>
case RetrieveSparkAppConfig(resourceProfileId, executorId) =>
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
Option(delegationTokens.get()),
rp)
executorLastGetTokenTimestampMap.put(executorId, System.currentTimeMillis())
context.reply(reply)

case IsExecutorAlive(executorId) => context.reply(isExecutorActive(executorId))
Expand Down Expand Up @@ -429,6 +462,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
reason
}
}
executorLastGetTokenTimestampMap.remove(executorId)
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, lossReason)
Expand Down Expand Up @@ -557,6 +591,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def start(): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
logInfo("Driver Starting create delegationTokenManager")
delegationTokenManager = createTokenManager()
delegationTokenManager.foreach { dtm =>
val ugi = UserGroupInformation.getCurrentUser()
Expand Down Expand Up @@ -945,8 +980,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
SparkHadoopUtil.get.addDelegationTokens(tokens, conf)
delegationTokens.set(tokens)
executorDataMap.values.foreach { ed =>
logInfo(s"Driver starting send updateDelegationTokens msg to ${ed.executorAddress} ")
ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
}
logInfo(s"Driver finished send new token to ${executorDataMap.size} executors " +
s"at ${System.currentTimeMillis()}")
driverLastUpdateTokensTimestamp.set(System.currentTimeMillis())
}

protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
Expand Down

0 comments on commit 8067a1e

Please sign in to comment.