Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo

private case class ReviveOffers()

Expand All @@ -50,8 +51,8 @@ private[spark] class LocalEndpoint(

private var freeCores = totalCores

private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
private val localExecutorHostname = "localhost"
val localExecutorId = SparkContext.DRIVER_IDENTIFIER
val localExecutorHostname = "localhost"

private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
Expand Down Expand Up @@ -99,8 +100,9 @@ private[spark] class LocalBackend(
extends SchedulerBackend with ExecutorBackend with Logging {

private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
private var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus

/**
* Returns a list of URLs representing the user classpath.
Expand All @@ -113,9 +115,13 @@ private[spark] class LocalBackend(
}

override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
"LocalBackendEndpoint",
new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
}

override def stop() {
Expand Down