Skip to content

Commit

Permalink
[SPARK-6602][Core] Update Master, Worker, Client, AppClient and relat…
Browse files Browse the repository at this point in the history
…ed classes to use RpcEndpoint

This PR updates the rest Actors in core to RpcEndpoint.

Because there is no `ActorSelection` in RpcEnv, I changes the logic of `registerWithMaster` in Worker and AppClient to avoid blocking the message loop. These changes need to be reviewed carefully.

Author: zsxwing <zsxwing@gmail.com>

Closes #5392 from zsxwing/rpc-rewrite-part3 and squashes the following commits:

2de7bed [zsxwing] Merge branch 'master' into rpc-rewrite-part3
f12d943 [zsxwing] Address comments
9137b82 [zsxwing] Fix the code style
e734c71 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
2d24fb5 [zsxwing] Fix the code style
5a82374 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
fa47110 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
72304f0 [zsxwing] Update the error strategy for AkkaRpcEnv
e56cb16 [zsxwing] Always send failure back to the sender
a7b86e6 [zsxwing] Use JFuture for java.util.concurrent.Future
aa34b9b [zsxwing] Fix the code style
bd541e7 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
25a84d8 [zsxwing] Use ThreadUtils
060ff31 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
dbfc916 [zsxwing] Improve the docs and comments
837927e [zsxwing] Merge branch 'master' into rpc-rewrite-part3
5c27f97 [zsxwing] Merge branch 'master' into rpc-rewrite-part3
fadbb9e [zsxwing] Fix the code style
6637e3c [zsxwing] Merge remote-tracking branch 'origin/master' into rpc-rewrite-part3
7fdee0e [zsxwing] Fix the return type to ExecutorService and ScheduledExecutorService
e8ad0a5 [zsxwing] Fix the code style
6b2a104 [zsxwing] Log error and use SparkExitCode.UNCAUGHT_EXCEPTION exit code
fbf3194 [zsxwing] Add Utils.newDaemonSingleThreadExecutor and newDaemonSingleThreadScheduledExecutor
b776817 [zsxwing] Update Master, Worker, Client, AppClient and related classes to use RpcEndpoint
  • Loading branch information
zsxwing authored and rxin committed Jul 1, 2015
1 parent ccdb052 commit 3bee0f1
Show file tree
Hide file tree
Showing 27 changed files with 806 additions and 633 deletions.
156 changes: 92 additions & 64 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,48 @@
package org.apache.spark.deploy

import scala.collection.mutable.HashSet
import scala.concurrent._
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.util.{Failure, Success}

import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.log4j.{Level, Logger}

import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
import org.apache.spark.util.{ThreadUtils, SparkExitCode, Utils}

/**
* Proxy that relays messages to the driver.
*
* We currently don't support retry if submission fails. In HA mode, client will submit request to
* all masters and see which one could handle it.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

private val masterActors = driverArgs.masters.map { m =>
context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
}
private val lostMasters = new HashSet[Address]
private var activeMasterActor: ActorSelection = null

val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

private class ClientEndpoint(
override val rpcEnv: RpcEnv,
driverArgs: ClientArguments,
masterEndpoints: Seq[RpcEndpointRef],
conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging {

// A scheduled executor used to send messages at the specified time.
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message")
// Used to provide the implicit parameter of `Future` methods.
private val forwardMessageExecutionContext =
ExecutionContext.fromExecutor(forwardMessageThread,
t => t match {
case ie: InterruptedException => // Exit normally
case e: Throwable =>
logError(e.getMessage, e)
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
})

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null

override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
Expand Down Expand Up @@ -82,29 +92,37 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
driverArgs.cores,
driverArgs.supervise,
command)

// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestSubmitDriver(driverDescription)
}
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

case "kill" =>
val driverId = driverArgs.driverId
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
masterActor ! RequestKillDriver(driverId)
}
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}

/**
* Send the message to master and forward the reply to self asynchronously.
*/
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
case Failure(e) =>
logWarning(s"Error sending messages to master $masterEndpoint", e)
}(forwardMessageExecutionContext)
}
}

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
val statusResponse =
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
Expand All @@ -127,50 +145,62 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
}
}

override def receiveWithLogging: PartialFunction[Any, Unit] = {
override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(success, driverId, message) =>
case SubmitDriverResponse(master, success, driverId, message) =>
println(message)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(driverId, success, message) =>
case KillDriverResponse(master, driverId, success, message) =>
println(message)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}
}

case DisassociatedEvent(_, remoteAddress, _) =>
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}
}

case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterActors.size) {
println("No master is available, exiting.")
System.exit(-1)
}
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
System.exit(-1)
}
}
}

override def onError(cause: Throwable): Unit = {
println(s"Error processing messages, exiting.")
cause.printStackTrace()
System.exit(-1)
}

override def onStop(): Unit = {
forwardMessageThread.shutdownNow()
}
}

Expand All @@ -194,15 +224,13 @@ object Client {
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
for (m <- driverArgs.masters) {
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
}
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

actorSystem.awaitTermination()
rpcEnv.awaitTermination()
}
}
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler actor nodes. */
/** Contains messages sent between Scheduler endpoint nodes. */
private[deploy] object DeployMessages {

// Worker to Master
Expand All @@ -37,6 +38,7 @@ private[deploy] object DeployMessages {
id: String,
host: String,
port: Int,
worker: RpcEndpointRef,
cores: Int,
memory: Int,
webUiPort: Int,
Expand All @@ -63,11 +65,11 @@ private[deploy] object DeployMessages {
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])

case class Heartbeat(workerId: String) extends DeployMessage
case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage

// Master to Worker

case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage

case class RegisterWorkerFailed(message: String) extends DeployMessage

Expand All @@ -92,13 +94,13 @@ private[deploy] object DeployMessages {

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
case object WorkDirCleanup // Sent to Worker endpoint periodically for cleaning up app folders

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
extends DeployMessage

case class UnregisterApplication(appId: String)
Expand All @@ -107,7 +109,7 @@ private[deploy] object DeployMessages {

// Master to AppClient

case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Expand All @@ -123,12 +125,14 @@ private[deploy] object DeployMessages {

case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage

case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String)
case class SubmitDriverResponse(
master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
extends DeployMessage

case class RequestKillDriver(driverId: String) extends DeployMessage

case class KillDriverResponse(driverId: String, success: Boolean, message: String)
case class KillDriverResponse(
master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
extends DeployMessage

case class RequestDriverStatus(driverId: String) extends DeployMessage
Expand All @@ -142,7 +146,7 @@ private[deploy] object DeployMessages {

// Master to Worker & AppClient

case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

// MasterWebUI To Master

Expand Down
26 changes: 12 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.deploy

import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorSystem

import org.apache.spark.rpc.RpcEnv
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
Expand All @@ -41,8 +40,8 @@ class LocalSparkCluster(
extends Logging {

private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
private val masterRpcEnvs = ArrayBuffer[RpcEnv]()
private val workerRpcEnvs = ArrayBuffer[RpcEnv]()
// exposed for testing
var masterWebUIPort = -1

Expand All @@ -55,18 +54,17 @@ class LocalSparkCluster(
.set("spark.shuffle.service.enabled", "false")

/* Start the Master */
val (masterSystem, masterPort, webUiPort, _) =
Master.startSystemAndActor(localHostname, 0, 0, _conf)
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
masterRpcEnvs += rpcEnv
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + rpcEnv.address.port
val masters = Array(masterUrl)

/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
workerRpcEnvs += workerEnv
}

masters
Expand All @@ -77,11 +75,11 @@ class LocalSparkCluster(
// Stop the workers before the master so they don't get upset that it disconnected
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
workerRpcEnvs.foreach(_.shutdown())
// workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
// masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
}
Loading

0 comments on commit 3bee0f1

Please sign in to comment.