Skip to content

Commit

Permalink
[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
Browse files Browse the repository at this point in the history
- Remove Akka dependency from core. Note: the streaming-akka project still uses Akka.
- Remove HttpFileServer
- Remove Akka configs from SparkConf and SSLOptions
- Rename `spark.akka.frameSize` to `spark.rpc.message.maxSize`. I think it's still worth to keep this config because using `DirectTaskResult` or `IndirectTaskResult`  depends on it.
- Update comments and docs

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10854 from zsxwing/remove-akka.
  • Loading branch information
zsxwing authored and rxin committed Jan 23, 2016
1 parent d8fefab commit bc1babd
Show file tree
Hide file tree
Showing 43 changed files with 123 additions and 831 deletions.
17 changes: 4 additions & 13 deletions core/pom.xml
Expand Up @@ -185,19 +185,6 @@
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -224,6 +211,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Expand Up @@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
* workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
* issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
Expand All @@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
* until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
* until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
Expand Down
91 changes: 0 additions & 91 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala

This file was deleted.

7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Expand Up @@ -40,17 +40,18 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
if (serializedSize > maxAkkaFrameSize) {
if (serializedSize > maxRpcMessageSize) {

val msg = s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."

/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
Expand Down
43 changes: 1 addition & 42 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Expand Up @@ -21,18 +21,14 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext

import scala.collection.JavaConverters._

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory

/**
* SSLOptions class is a common container for SSL configuration options. It offers methods to
* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
* authentication with SSL, SSLOptions cannot support it either.
* by the protocol, which it can generate the configuration for.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
Expand Down Expand Up @@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
}
}

/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
if (keyStoreType.isDefined) {
logWarning("Akka configuration does not support key store type.");
}
if (trustStoreType.isDefined) {
logWarning("Akka configuration does not support trust store type.");
}

Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-store-password",
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store",
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store-password",
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-password",
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.random-number-generator",
ConfigValueFactory.fromAnyRef(""))
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Expand Up @@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
*
* Akka also has an option to turn on SSL, this option is currently supported (see
* the details below).
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
Expand Down Expand Up @@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
* Akka based connections or `fs` for broadcast and file server.
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
* broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
* of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
* TypeSafe configuration for Akka or SSLContextFactory for Jetty.
* of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
* Jetty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
Expand Down
32 changes: 11 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Expand Up @@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
.map{case (k, v) => (k.substring(prefix.length), v)}
}

/** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] =
/* This is currently undocumented. If we want to make this public we should consider
* nesting options under the spark namespace to avoid conflicts with user akka options.
* Otherwise users configuring their own akka code via system properties could mess up
* spark's akka options.
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter { case (k, _) => isAkkaConf(k) }

/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
Expand Down Expand Up @@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6"))
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6"))
)

/**
Expand All @@ -615,21 +606,13 @@ private[spark] object SparkConf extends Logging {
}.toMap
}

/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
*/
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")

/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain akka and authentication configs are required from the executor when it connects to
* Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
Expand Down Expand Up @@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
return
}

allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
return
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
logWarning(
s"The configuration key $key is not supported any more " +
s"because Spark doesn't use Akka since 2.0")
}
}

Expand Down

0 comments on commit bc1babd

Please sign in to comment.