You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
private[spark] classHeartbeatReceiver(sc: SparkContext, clock: Clock)
extendsSparkListenerwithThreadSafeRpcEndpointwithLogging {
defthis(sc: SparkContext) {
this(sc, newSystemClock)
}
sc.listenerBus.addToManagementQueue(this)
overridevalrpcEnv:RpcEnv= sc.env.rpcEnv
private[spark] varscheduler:TaskScheduler=null// executor ID -> timestamp of when the last heartbeat from this executor was received// 维护executorId和这个executor最后接收hearbeat的时间privatevalexecutorLastSeen=new mutable.HashMap[String, Long]
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses// "milliseconds"// executor 超时时间,根据 spark.network.timeout 和 spark.storage.blockManagerSlaveTimeoutMs决定privatevalexecutorTimeoutMs=
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s")
// "spark.network.timeoutInterval" uses "seconds", while// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"// 超时间隔privatevaltimeoutIntervalMs=
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
// 检查超时的间隔(单位为ms)。可通过spark.network.time-outInterval属性配置,默认采用timeoutIntervalMs的值。privatevalcheckTimeoutIntervalMs=
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") *1000// 超时校验线程privatevartimeoutCheckingTask:ScheduledFuture[_] =null// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not// block the thread for a long time.// 循环处理事件线程privatevaleventLoopThread=ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
// kill executor 线程privatevalkillExecutorThread=ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
private[spark] classExecutor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] =Nil, // 用户指定的类路径。可通过spark.executor.extraClassPath属性进行配置。如果有多个类路径,可以在配置时用英文逗号分隔。isLocal: Boolean=false,
uncaughtExceptionHandler: UncaughtExceptionHandler=newSparkUncaughtExceptionHandler//spark捕获异常处理器
)
extendsLogging {
logInfo(s"Starting executor ID $executorId on host $executorHostname")
// Application dependencies (added through SparkContext) that we've fetched so far on this node.// Each map holds the master's timestamp for the version of that file or JAR we got.// 当前执行的Task所需要的文件。privatevalcurrentFiles:HashMap[String, Long] =newHashMap[String, Long]()
// 当前执行的Task所需要的Jar包。privatevalcurrentJars:HashMap[String, Long] =newHashMap[String, Long]()
privatevalEMPTY_BYTE_BUFFER=ByteBuffer.wrap(newArray[Byte](0))
privatevalconf= env.conf
// No ip or host:port - just hostnameUtils.checkHost(executorHostname)
// must not have port specified.
assert (0==Utils.parseHostPort(executorHostname)._2)
// Make sure the local hostname we report matches the cluster scheduler's name for this hostUtils.setCustomHostname(executorHostname)
if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.// Make any thread terminations due to uncaught exceptions kill the entire// executor process to avoid surprising stalls.Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler)
}
// Start worker thread pool// 启动worker线程池privatevalthreadPool= {
valthreadFactory=newThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(newThreadFactory {
overridedefnewThread(r: Runnable):Thread=// Use UninterruptibleThread to run tasks so that we can allow running codes without being// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,// will hang forever if some methods are interrupted.newUninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
// 采集executor worker线程池运行相关状态信息privatevalexecutorSource=newExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation// 用于监督任务的kill和取消privatevaltaskReaperPool=ThreadUtils.newDaemonCachedThreadPool("Task reaper")
// For tasks which are in the process of being killed, this map holds the most recently created// TaskReaper. All accesses to this map should be synchronized on the map itself (this isn't// a ConcurrentHashMap because we use the synchronization for purposes other than simply guarding// the integrity of the map's internal state). The purpose of this map is to prevent the creation// of a separate TaskReaper for every killTask() of a given task. Instead, this map allows us to// track whether an existing TaskReaper fulfills the role of a TaskReaper that we would otherwise// create. The map key is a task id.// 用户缓存正在被kill的Task的身份标识与执行kill工作的任务收割者(TaskReaper)之间的映射关系。privatevaltaskReaperForTask:HashMap[Long, TaskReaper] =HashMap[Long, TaskReaper]()
if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}
// Whether to load classes in user jars before those in Spark jars// 是否先加载用户jar中的类,然后再加载Spark jar中的类privatevaluserClassPathFirst= conf.getBoolean("spark.executor.userClassPathFirst", false)
// Whether to monitor killed / interrupted tasks// 是否监控kill和中断的任务privatevaltaskReaperEnabled= conf.getBoolean("spark.task.reaper.enabled", false)
// Create our ClassLoader// do this after SparkEnv creation so can access the SecurityManager// Task需要的类加载器。privatevalurlClassLoader= createClassLoader()
// spark-shell/spark-sql使用的类加载器privatevalreplClassLoader= addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)
/** * executor插件*/privatevalexecutorPlugins:Seq[ExecutorPlugin] = {
valpluginNames= conf.get(EXECUTOR_PLUGINS)
if (pluginNames.nonEmpty) {
logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}")
// Plugins need to load using a class loader that includes the executor's user classpathvalpluginList:Seq[ExecutorPlugin] =Utils.withContextClassLoader(replClassLoader) {
valplugins=Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf)
plugins.foreach { plugin =>
plugin.init()
logDebug(s"Successfully loaded plugin "+ plugin.getClass().getCanonicalName())
}
plugins
}
logDebug("Finished initializing plugins")
pluginList
} else {
Nil
}
}
// Max size of direct result. If task result is bigger than this, we use the block manager// to send the result back.//直接结果的最大大小。取spark.task.maxDirectResultSize属性(默认为1L << 20,即1048 576)// 与spark.rpc.message.maxSize属性(默认为128MB)之间的最小值。privatevalmaxDirectResultSize=Math.min(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L<<20),
RpcUtils.maxMessageSizeBytes(conf))
// 结果的最大限制。此属性通过调用Utils工具类的getMaxResultSize方法获得,默认为1GB。Task运行的结果如果超过maxResultSize,则会被删除。Task运行的结果如果小于等于maxResultSize且大于maxDirectResultSize,则会写入本地存储体系。Task运行的结果如果小于等于maxDirectResultSize,则会直接返回给Driver。privatevalmaxResultSize= conf.get(MAX_RESULT_SIZE)
// Maintains the list of running tasks.// 用于维护正在运行的Task的身份标识与TaskRunner之间的映射关系。privatevalrunningTasks=newConcurrentHashMap[Long, TaskRunner]
/** * Interval to send heartbeats, in milliseconds*/privatevalHEARTBEAT_INTERVAL_MS= conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
// Executor for the heartbeat task.// 心跳线程池privatevalheartbeater:ScheduledExecutorService=ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
// must be initialized before running startDriverHeartbeat()// executor心跳接收器 rpc应用privatevalheartbeatReceiverRef=RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
/** * 心跳最大失败次数 * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` * times, it should kill itself. The default value is 60. It means we will retry to send * heartbeats about 10 minutes because the heartbeat interval is 10s.*/privatevalHEARTBEAT_MAX_FAILURES= conf.getInt("spark.executor.heartbeat.maxFailures", 60)
/** * Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each * successful heartbeat will reset it to 0.*/privatevarheartbeatFailures=0