Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6602][Core] Update Master, Worker, Client, AppClient and related classes to use RpcEndpoint #5392

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b776817
Update Master, Worker, Client, AppClient and related classes to use R…
zsxwing Mar 17, 2015
fbf3194
Add Utils.newDaemonSingleThreadExecutor and newDaemonSingleThreadSche…
zsxwing Apr 8, 2015
6b2a104
Log error and use SparkExitCode.UNCAUGHT_EXCEPTION exit code
zsxwing Apr 8, 2015
e8ad0a5
Fix the code style
zsxwing Apr 8, 2015
7fdee0e
Fix the return type to ExecutorService and ScheduledExecutorService
zsxwing Apr 8, 2015
6637e3c
Merge remote-tracking branch 'origin/master' into rpc-rewrite-part3
zsxwing Apr 14, 2015
fadbb9e
Fix the code style
zsxwing Apr 14, 2015
5c27f97
Merge branch 'master' into rpc-rewrite-part3
zsxwing Apr 20, 2015
837927e
Merge branch 'master' into rpc-rewrite-part3
zsxwing Apr 22, 2015
dbfc916
Improve the docs and comments
zsxwing Apr 22, 2015
060ff31
Merge branch 'master' into rpc-rewrite-part3
zsxwing Apr 23, 2015
25a84d8
Use ThreadUtils
zsxwing Apr 23, 2015
bd541e7
Merge branch 'master' into rpc-rewrite-part3
zsxwing Apr 29, 2015
aa34b9b
Fix the code style
zsxwing Apr 29, 2015
a7b86e6
Use JFuture for java.util.concurrent.Future
zsxwing Apr 30, 2015
e56cb16
Always send failure back to the sender
zsxwing Apr 30, 2015
72304f0
Update the error strategy for AkkaRpcEnv
zsxwing Apr 30, 2015
fa47110
Merge branch 'master' into rpc-rewrite-part3
zsxwing May 3, 2015
5a82374
Merge branch 'master' into rpc-rewrite-part3
zsxwing May 5, 2015
2d24fb5
Fix the code style
zsxwing May 5, 2015
e734c71
Merge branch 'master' into rpc-rewrite-part3
zsxwing Jun 8, 2015
9137b82
Fix the code style
zsxwing Jun 8, 2015
f12d943
Address comments
zsxwing Jun 30, 2015
2de7bed
Merge branch 'master' into rpc-rewrite-part3
zsxwing Jun 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 92 additions & 64 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,48 @@
package org.apache.spark.deploy

import scala.collection.mutable.HashSet
import scala.concurrent._
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.util.{Failure, Success}

import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.log4j.{Level, Logger}

import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
import org.apache.spark.util.{ThreadUtils, SparkExitCode, Utils}

/**
* Proxy that relays messages to the driver.
*
* We currently don't support retry if submission fails. In HA mode, client will submit request to
* all masters and see which one could handle it.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

private val masterActors = driverArgs.masters.map { m =>
context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
}
private val lostMasters = new HashSet[Address]
private var activeMasterActor: ActorSelection = null

val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

private class ClientEndpoint(
override val rpcEnv: RpcEnv,
driverArgs: ClientArguments,
masterEndpoints: Seq[RpcEndpointRef],
conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging {

// A scheduled executor used to send messages at the specified time.
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message")
// Used to provide the implicit parameter of `Future` methods.
private val forwardMessageExecutionContext =
ExecutionContext.fromExecutor(forwardMessageThread,
t => t match {
case ie: InterruptedException => // Exit normally
case e: Throwable =>
logError(e.getMessage, e)
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
})

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null

override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
Expand Down Expand Up @@ -82,29 +92,37 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
driverArgs.cores,
driverArgs.supervise,
command)

// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestSubmitDriver(driverDescription)
}
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

case "kill" =>
val driverId = driverArgs.driverId
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestKillDriver(driverId)
}
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}

/**
* Send the message to master and forward the reply to self asynchronously.
*/
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
case Failure(e) =>
logWarning(s"Error sending messages to master $masterEndpoint", e)
}(forwardMessageExecutionContext)
}
}

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
val statusResponse =
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
Expand All @@ -127,50 +145,62 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
}
}

override def receiveWithLogging: PartialFunction[Any, Unit] = {
override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(success, driverId, message) =>
case SubmitDriverResponse(master, success, driverId, message) =>
println(message)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(driverId, success, message) =>
case KillDriverResponse(master, driverId, success, message) =>
println(message)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
}

case DisassociatedEvent(_, remoteAddress, _) =>
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}
}

case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}
}

override def onError(cause: Throwable): Unit = {
println(s"Error processing messages, exiting.")
cause.printStackTrace()
System.exit(-1)
}

override def onStop(): Unit = {
forwardMessageThread.shutdownNow()
}
}

Expand All @@ -194,15 +224,13 @@ object Client {
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
for (m <- driverArgs.masters) {
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
}
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

actorSystem.awaitTermination()
rpcEnv.awaitTermination()
}
}
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler actor nodes. */
/** Contains messages sent between Scheduler endpoint nodes. */
private[deploy] object DeployMessages {

// Worker to Master
Expand All @@ -37,6 +38,7 @@ private[deploy] object DeployMessages {
id: String,
host: String,
port: Int,
worker: RpcEndpointRef,
cores: Int,
memory: Int,
webUiPort: Int,
Expand All @@ -63,11 +65,11 @@ private[deploy] object DeployMessages {
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])

case class Heartbeat(workerId: String) extends DeployMessage
case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage

// Master to Worker

case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage

case class RegisterWorkerFailed(message: String) extends DeployMessage

Expand All @@ -92,13 +94,13 @@ private[deploy] object DeployMessages {

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
case object WorkDirCleanup // Sent to Worker endpoint periodically for cleaning up app folders

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
extends DeployMessage

case class UnregisterApplication(appId: String)
Expand All @@ -107,7 +109,7 @@ private[deploy] object DeployMessages {

// Master to AppClient

case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Expand All @@ -123,12 +125,14 @@ private[deploy] object DeployMessages {

case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage

case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String)
case class SubmitDriverResponse(
master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
extends DeployMessage

case class RequestKillDriver(driverId: String) extends DeployMessage

case class KillDriverResponse(driverId: String, success: Boolean, message: String)
case class KillDriverResponse(
master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
extends DeployMessage

case class RequestDriverStatus(driverId: String) extends DeployMessage
Expand All @@ -142,7 +146,7 @@ private[deploy] object DeployMessages {

// Master to Worker & AppClient

case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

// MasterWebUI To Master

Expand Down
26 changes: 12 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.deploy

import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorSystem

import org.apache.spark.rpc.RpcEnv
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
Expand All @@ -41,8 +40,8 @@ class LocalSparkCluster(
extends Logging {

private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
private val masterRpcEnvs = ArrayBuffer[RpcEnv]()
private val workerRpcEnvs = ArrayBuffer[RpcEnv]()
// exposed for testing
var masterWebUIPort = -1

Expand All @@ -55,18 +54,17 @@ class LocalSparkCluster(
.set("spark.shuffle.service.enabled", "false")

/* Start the Master */
val (masterSystem, masterPort, webUiPort, _) =
Master.startSystemAndActor(localHostname, 0, 0, _conf)
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
masterRpcEnvs += rpcEnv
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + rpcEnv.address.port
val masters = Array(masterUrl)

/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
workerRpcEnvs += workerEnv
}

masters
Expand All @@ -77,11 +75,11 @@ class LocalSparkCluster(
// Stop the workers before the master so they don't get upset that it disconnected
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
workerRpcEnvs.foreach(_.shutdown())
// workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
// masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
}
Loading