From ad93956969ec66c9322e31e0735680421955c3bb Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 2 Dec 2014 15:07:48 +0800 Subject: [PATCH 1/4] Allow users to set arbitrary configurations via property file --- .../org/apache/spark/MapOutputTracker.scala | 2 +- .../scala/org/apache/spark/SparkConf.scala | 4 +- .../org/apache/spark/deploy/Client.scala | 6 +-- .../CoarseGrainedSchedulerBackend.scala | 5 +- .../org/apache/spark/util/AkkaUtils.scala | 53 ++++++++----------- .../apache/spark/MapOutputTrackerSuite.scala | 8 +-- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala | 4 +- .../scheduler/TaskResultGetterSuite.scala | 4 +- .../mllib/util/LocalClusterSparkContext.scala | 3 +- 10 files changed, 41 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 7d96962c4acd7..443084f2604e4 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -51,7 +51,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster val serializedSize = mapOutputStatuses.size if (serializedSize > maxAkkaFrameSize) { val msg = s"Map output statuses were $serializedSize bytes which " + - s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)." + s"exceeds spark.akka.remote.netty.tcp.maximum-frame-size ($maxAkkaFrameSize bytes)." /* For SPARK-1244 we'll opt for just logging an error and then throwing an exception. * Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c14764f773982..28d48725e5470 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -216,6 +216,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * E.g. spark.akka.option.x.y.x = "value" */ getAll.filter { case (k, _) => isAkkaConf(k) } + .map { case (k, v) => (k.substring(k.indexOf("akka")), v) } /** * Returns the Spark application id, valid in the Driver after TaskScheduler registration and @@ -354,7 +355,7 @@ private[spark] object SparkConf { * Return whether the given config is an akka config (e.g. akka.actor.provider). * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout). */ - def isAkkaConf(name: String): Boolean = name.startsWith("akka.") + def isAkkaConf(name: String): Boolean = name.startsWith("spark.akka.") /** * Return whether the given config should be passed to an executor on start-up. @@ -364,7 +365,6 @@ private[spark] object SparkConf { */ def isExecutorStartupConf(name: String): Boolean = { isAkkaConf(name) || - name.startsWith("spark.akka") || name.startsWith("spark.auth") || isSparkPortConf(name) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f2687ce6b42b4..24f4314a7ef70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -151,10 +151,10 @@ object Client { val driverArgs = new ClientArguments(args) if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { - conf.set("spark.akka.logLifecycleEvents", "true") + conf.set("spark.akka.remote.log-remote-lifecycle-events", "true") } - conf.set("spark.akka.askTimeout", "10") - conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) + conf.set("spark.internal.akka.askTimeout", "10") + conf.set("spark.akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) Logger.getRootLogger.setLevel(driverArgs.logLevel) val (actorSystem, _) = AkkaUtils.createActorSystem( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 88b196ac64368..ebeaee308fd89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -182,8 +182,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + - "spark.akka.frameSize or using broadcast variables for large values." + "spark.akka.remote.netty.tcp.maximum-frame-size (%d bytes) - reserved (%d bytes). " + + "Consider increasing spark.akka.remote.netty.tcp.maximum-frame-size " + + "or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSet.abort(msg) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 10010bdfa1a51..714736e5a5b0e 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -63,25 +63,13 @@ private[spark] object AkkaUtils extends Logging { conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { - val akkaThreads = conf.getInt("spark.akka.threads", 4) - val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSizeBytes(conf) - val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) - val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" + val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.remote.log-remote-lifecycle-events", false) if (!akkaLogLifecycleEvents) { // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. // See: https://www.assembla.com/spaces/akka/tickets/3787#/ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } - val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - - val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000) - val akkaFailureDetector = - conf.getDouble("spark.akka.failure-detector.threshold", 300.0) - val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) - val secretKey = securityManager.getSecretKey() val isAuthOn = securityManager.isAuthenticationEnabled() if (isAuthOn && secretKey == null) { @@ -91,8 +79,7 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug("In createActorSystem, requireCookie is: " + requireCookie) - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( - ConfigFactory.parseString( + val akkaConf = ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -100,23 +87,23 @@ private[spark] object AkkaUtils extends Logging { |akka.jvm-exit-on-fatal-error = off |akka.remote.require-cookie = "$requireCookie" |akka.remote.secure-cookie = "$secureCookie" - |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s - |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector + |akka.remote.transport-failure-detector.heartbeat-interval = 1000 s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 6000 s + |akka.remote.transport-failure-detector.threshold = 300.0 |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on - |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B - |akka.remote.netty.tcp.execution-pool-size = $akkaThreads - |akka.actor.default-dispatcher.throughput = $akkaBatchSize - |akka.log-config-on-start = $logAkkaConfig - |akka.remote.log-remote-lifecycle-events = $lifecycleEvents - |akka.log-dead-letters = $lifecycleEvents - |akka.log-dead-letters-during-shutdown = $lifecycleEvents - """.stripMargin)) + |akka.remote.netty.tcp.connection-timeout = 100 s + |akka.remote.netty.tcp.maximum-frame-size = 10485760B + |akka.remote.netty.tcp.execution-pool-size = 4 + |akka.actor.default-dispatcher.throughput = 15 + |akka.log-config-on-start = off + |akka.remote.log-remote-lifecycle-events = off + |akka.log-dead-letters = off + |akka.log-dead-letters-during-shutdown = off + """.stripMargin).withFallback(conf.getAkkaConf.toMap[String, String]) val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider @@ -126,17 +113,19 @@ private[spark] object AkkaUtils extends Logging { /** Returns the default Spark timeout to use for Akka ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds") + Duration.create(conf.getLong("spark.internal.akka.askTimeout", 30), "seconds") } /** Returns the default Spark timeout to use for Akka remote actor lookup. */ def lookupTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds") + Duration.create(conf.getLong("spark.internal.akka.lookupTimeout", 30), "seconds") } /** Returns the configured max frame size for Akka messages in bytes. */ def maxFrameSizeBytes(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 + val frameSizeStr = conf.get("spark.akka.remote.netty.tcp.maximum-frame-size", "10485760B") + .replace(" ", "").toLowerCase + frameSizeStr.substring(0, frameSizeStr.indexOf("b")).toInt } /** Space reserved for extra data in an Akka message besides serialized task or task result. */ @@ -144,12 +133,12 @@ private[spark] object AkkaUtils extends Logging { /** Returns the configured number of times to retry connecting */ def numRetries(conf: SparkConf): Int = { - conf.getInt("spark.akka.num.retries", 3) + conf.getInt("spark.internal.akka.num.retries", 3) } /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Int = { - conf.getInt("spark.akka.retry.wait", 3000) + conf.getInt("spark.internal.akka.retry.wait", 3000) } /** diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d27880f4bc32f..acef9db469465 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -153,8 +153,8 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch below akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") - newConf.set("spark.akka.askTimeout", "1") // Fail fast + newConf.set("spark.akka.remote.netty.tcp.maximum-frame-size", "1048576b") + newConf.set("spark.internal.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) val actorSystem = ActorSystem("test") @@ -174,8 +174,8 @@ class MapOutputTrackerSuite extends FunSuite { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") - newConf.set("spark.akka.askTimeout", "1") // Fail fast + newConf.set("spark.akka.remote.netty.tcp.maximum-frame-size", "1048576b") + newConf.set("spark.internal.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) val actorSystem = ActorSystem("test") diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f77661ccbd1c5..5b7be1be0e180 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -26,7 +26,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext test("serialized task larger than akka frame size") { val conf = new SparkConf - conf.set("spark.akka.frameSize","1") + conf.set("spark.akka.remote.netty.tcp.maximum-frame-size","1048576b") conf.set("spark.default.parallelism","1") sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index abe0dc35b07e2..54b18e935ffe7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -39,7 +39,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } override def afterAll() { - System.clearProperty("spark.akka.frameSize") + System.clearProperty("spark.akka.remote.netty.tcp.maximum-frame-size") } test("basic creation and shutdown of LiveListenerBus") { @@ -273,7 +273,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers sc.addSparkListener(listener) // Make a task whose result is larger than the akka frame size - System.setProperty("spark.akka.frameSize", "1") + System.setProperty("spark.akka.remote.netty.tcp.maximum-frame-size", "1048576b") val akkaFrameSize = sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 5768a3a733f00..c01a184f75bc0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -61,11 +61,11 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA override def beforeAll { // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small // as we can make it) so the tests don't take too long. - System.setProperty("spark.akka.frameSize", "1") + System.setProperty("spark.akka.remote.netty.tcp.maximum-frame-size", "1048576b") } override def afterAll { - System.clearProperty("spark.akka.frameSize") + System.clearProperty("spark.akka.remote.netty.tcp.maximum-frame-size") } test("handling results smaller than Akka frame size") { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala index 5e9101cdd3804..813aab660edcd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala @@ -28,7 +28,8 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite => val conf = new SparkConf() .setMaster("local-cluster[2, 1, 512]") .setAppName("test-cluster") - .set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data + // set to 1MB to detect direct serialization of data + .set("spark.akka.remote.netty.tcp.maximum-frame-size", "1048576b") sc = new SparkContext(conf) super.beforeAll() } From 775e7b02ffd9d0eb2c9be74b73baddbc104c8d6a Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 2 Dec 2014 15:14:54 +0800 Subject: [PATCH 2/4] fix type mismatch --- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 714736e5a5b0e..336a14fa57d13 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -103,7 +103,7 @@ private[spark] object AkkaUtils extends Logging { |akka.remote.log-remote-lifecycle-events = off |akka.log-dead-letters = off |akka.log-dead-letters-during-shutdown = off - """.stripMargin).withFallback(conf.getAkkaConf.toMap[String, String]) + """.stripMargin).withFallback(ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])) val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider From b5324b7f10a6c2157d6f54c52ab88990b0465220 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 2 Dec 2014 16:57:04 +0800 Subject: [PATCH 3/4] return the value --- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 065cca51020ca..53b3f69c66e21 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -130,6 +130,7 @@ private[spark] object AkkaUtils extends Logging { throw new IllegalArgumentException("spark.akka.remote.netty.tcp.maximum-frame-size " + "should not be greater than " + Int.MaxValue + "B") } + ret } /** Space reserved for extra data in an Akka message besides serialized task or task result. */ From 13f295780d5217464c8589963cb2b2cc02b9dcc5 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 2 Dec 2014 18:50:00 +0800 Subject: [PATCH 4/4] style up --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ebeaee308fd89..5b136f2589dba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -182,8 +182,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.akka.remote.netty.tcp.maximum-frame-size (%d bytes) - reserved (%d bytes). " + - "Consider increasing spark.akka.remote.netty.tcp.maximum-frame-size " + + "spark.akka.remote.netty.tcp.maximum-frame-size (%d bytes) - reserved (%d bytes)." + + " Consider increasing spark.akka.remote.netty.tcp.maximum-frame-size " + "or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 53b3f69c66e21..e5c90b68dcf1b 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -63,8 +63,8 @@ private[spark] object AkkaUtils extends Logging { conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { - val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.remote.log-remote-lifecycle-events", false) - if (!akkaLogLifecycleEvents) { + val akkaLogLifecycleEvents = conf.get("spark.akka.remote.log-remote-lifecycle-events", "off") + if (akkaLogLifecycleEvents == "on") { // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. // See: https://www.assembla.com/spaces/akka/tickets/3787#/ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))