Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.* #5595

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s"))
translation = s => s"${s.toLong * 10}s")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
"spark.rpc.askTimeout" -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4"))
)

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}

/**
* Proxy that relays messages to the driver.
Expand All @@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
masterActor = context.actorSelection(
Expand Down Expand Up @@ -155,7 +155,7 @@ object Client {
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -193,7 +193,7 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}

private[master] class Master(
host: String,
Expand Down Expand Up @@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone master.
Expand All @@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
val timeout = RpcUtils.askTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._

Expand Down Expand Up @@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
Expand Down Expand Up @@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
Expand Down Expand Up @@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone worker.
Expand All @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)

initialize()

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.reflect.ClassTag

import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{RpcUtils, Utils}

/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down Expand Up @@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{RpcUtils, Utils}

import scala.util.control.NonFatal

Expand All @@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))

private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)

/**
* Request executors from the ApplicationMaster by specifying the total number desired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

private[spark]
class BlockManagerMaster(
Expand All @@ -32,7 +32,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {

val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
Expand Down
26 changes: 3 additions & 23 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.FiniteDuration

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand Down Expand Up @@ -125,16 +125,6 @@ private[spark] object AkkaUtils extends Logging {
(actorSystem, boundPort)
}

/** Returns the default Spark timeout to use for Akka ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
}

/** Returns the default Spark timeout to use for Akka remote actor lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
}

private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024

/** Returns the configured max frame size for Akka messages in bytes. */
Expand All @@ -150,16 +140,6 @@ private[spark] object AkkaUtils extends Logging {
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.akka.num.retries", 3)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Int = {
conf.getInt("spark.akka.retry.wait", 3000)
}

/**
* Send a message to the given actor and get its result within a default timeout, or
* throw a SparkException if this fails.
Expand Down Expand Up @@ -216,7 +196,7 @@ private[spark] object AkkaUtils extends Logging {
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = AkkaUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
Expand All @@ -230,7 +210,7 @@ private[spark] object AkkaUtils extends Logging {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = AkkaUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.util

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}

Expand All @@ -32,4 +35,24 @@ object RpcUtils {
Utils.checkHost(driverHost, "Expected hostname")
rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
}

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.rpc.numRetries", 3)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
}

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
newConf.set("spark.rpc.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("spark")
Expand All @@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast
newConf.set("spark.rpc.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("test")
Expand Down
24 changes: 23 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark

import java.util.concurrent.{TimeUnit, Executors}

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Try, Random}

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
Expand Down Expand Up @@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}

test("akka deprecated configs") {
val conf = new SparkConf()

assert(!conf.contains("spark.rpc.num.retries"))
assert(!conf.contains("spark.rpc.retry.wait"))
assert(!conf.contains("spark.rpc.askTimeout"))
assert(!conf.contains("spark.rpc.lookupTimeout"))

conf.set("spark.akka.num.retries", "1")
assert(RpcUtils.numRetries(conf) === 1)

conf.set("spark.akka.retry.wait", "2")
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askTimeout(conf) === (3 seconds))

conf.set("spark.akka.lookupTimeout", "4")
assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
}
}

class Class1 {}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})

val conf = new SparkConf()
conf.set("spark.akka.retry.wait", "0")
conf.set("spark.akka.num.retries", "1")
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.num.retries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
Expand Down