Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3883: SSL support for HttpServer and Akka #3571

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
Expand Down Expand Up @@ -72,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector

val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)

connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
Expand Down Expand Up @@ -149,13 +153,14 @@ private[spark] class HttpServer(
}

/**
* Get the URI of this HTTP server (http://host:port)
* Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
"http://" + Utils.localIpAddress + ":" + port
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
175 changes: 175 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.io.File

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory

/** SSLOptions class is a common container for SSL configuration options. It offers methods to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor style nit, but do you mind placing the /** on its own line? All of the Java/Scaladoc comments should generally look like

/**
 * comments comments comments [...]
 */

* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
* authentication with SSL, SSLOptions cannot support it either.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms to use
*/
private[spark] case class SSLOptions(
enabled: Boolean = false,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty) {

/** Creates a Jetty SSL context factory according to the SSL settings represented by this object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same minor style nit here; put this comment on its own line with a single *.

*/
def createJettySslContextFactory(): Option[SslContextFactory] = {
if (enabled) {
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)

Some(sslContextFactory)
} else {
None
}
}

/** Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
import scala.collection.JavaConversions._
if (enabled) {
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-store-password",
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store",
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store-password",
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-password",
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.random-number-generator",
ConfigValueFactory.fromAnyRef(""))
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Masking the passwords here was a nice touch!

override def toString: String = s"SSLOptions{enabled=$enabled, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

}

private[spark] object SSLOptions extends Logging {

/** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
*
* The following settings are allowed:
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
* For a list of protocols and ciphers supported by particular Java versions, you may go to
* [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
* blog page]].
*
* You can optionally specify the default configuration. If you do, for each setting which is
* missing in SparkConf, the corresponding setting is used from the default configuration.
*
* @param conf Spark configuration object where the settings are collected from
* @param ns the namespace name
* @param defaults the default configuration
* @return [[org.apache.spark.SSLOptions]] object
*/
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
.orElse(defaults.flatMap(_.keyStore))

val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
.orElse(defaults.flatMap(_.keyStorePassword))

val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))

val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))

val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
.map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
.orElse(defaults.map(_.enabledAlgorithms))
.getOrElse(Set.empty)

new SSLOptions(
enabled,
keyStore,
keyStorePassword,
keyPassword,
trustStore,
trustStorePassword,
protocol,
enabledAlgorithms)
}

}

99 changes: 93 additions & 6 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Expand Up @@ -18,7 +18,11 @@
package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import java.security.KeyStore
import java.security.cert.X509Certificate
import javax.net.ssl._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: java.net before java.security

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry - i don't get what you mean here? am i to move javax.net ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nevermind. didn't notice it was javax.


import com.google.common.io.Files
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
* who always have permission to view or modify the Spark application.
*
* Spark does not currently support encryption after authentication.
* Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
*
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
Expand All @@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
* Akka also has an option to turn on SSL, this option is not currently supported
* but we could add a configuration option in the future.
*
* Akka also has an option to turn on SSL, this option is currently supported (see
* the details below).
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
Expand All @@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
* We currently do not support SSL (https), but Jetty can be configured to use it
* so we could add a configuration option for this in the future.
*
* We currently support SSL (https) for this communication protocol (see the details
* below).
*
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
* Any clients must specify the user and password. There is a default
Expand Down Expand Up @@ -142,9 +148,39 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
*
* Connection encryption (SSL) configuration is organized hierarchically. The user can configure
* the default SSL settings which will be used for all the supported communication protocols unless
* they are overwritten by protocol specific settings. This way the user can easily provide the
* common settings for all the protocols without disabling the ability to configure each one
* individually.
*
* All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
* Akka based connections or `fs` for broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
* of SSL settings. SSLOptions is the used to provide protocol-specific configuration like TypeSafe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the used" -> "then used"?

* configuration for Akka or SSLContextFactory for Jetty.
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
* the client side then distributed and used by the executors as the part of the application
* (YARN allows the user to deploy files before the application is started).
* In standalone deployment, the user needs to provide key-stores and configuration
* options for master and workers. In this mode, the user may allow the executors to use the SSL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is kind of pedantic, but is the difference between YARN and Standalone that standalone users have to handle key distribution themselves whereas YARN handles it automatically? If I understand correctly, the user is still responsible for generating the key-stores in both modes and the only difference is whether there's a secure distribution mechanism for those stores. If this is the case, maybe we can state this a bit more explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is already covered pretty well in configuration.md, so we might be able to disregard that here.

* settings inherited from the worker which spawned that executor. It can be accomplished by
* setting `spark.ssl.useNodeLocalConf` to `true`.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
private[spark] class SecurityManager(sparkConf: SparkConf)
extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down Expand Up @@ -196,6 +232,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
)
}

// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)

// SSL configuration for different communication layers - they can override the default
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")

val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()

try {
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ks)
tmf.getTrustManagers
} finally {
input.close()
}
}

lazy val credulousTrustStoreManagers = Array({
logWarning("Using 'accept-all' trust manager for SSL connections.")
new X509TrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = null

override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}

override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
}: TrustManager
})

val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)

val hostVerifier = new HostnameVerifier {
override def verify(s: String, sslSession: SSLSession): Boolean = true
}

(Some(sslContext.getSocketFactory), Some(hostVerifier))
} else {
(None, None)
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Expand Up @@ -369,6 +369,7 @@ private[spark] object SparkConf {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}

Expand Down
Expand Up @@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
uc = new URL(url).openConnection()
uc.setConnectTimeout(httpReadTimeout)
}
Utils.setupSecureURLConnection(uc, securityManager)

val in = {
uc.setReadTimeout(httpReadTimeout)
Expand Down
Expand Up @@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(

val user = System.getProperty("user.name", "<unknown>")

def copy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we shouldn't just make ApplicationDescription into a case class, since that will implicitly give us a copy method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... I was wondering why it isn't a case class, but it has a mutable field. I hope it will become a case class at some point, and then the copy method will be just removed, but I would not mix here such refactoring.

name: String = name,
maxCores: Option[Int] = maxCores,
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)

override def toString: String = "ApplicationDescription(" + name + ")"
}