Permalink
Browse files

Rework from comments

  • Loading branch information...
tgravescs committed Mar 3, 2014
1 parent 50dd9f2 commit 2f7714722854b42f27479955202cde2d2d2fb281
@@ -50,7 +50,7 @@ import scala.collection.mutable.ArrayBuffer
* Spark does not currently support encryption after authentication.
*
* At this point spark has multiple communication protocols that need to be secured and
- * different underlying mechisms are used depending on the protocol:
+ * different underlying mechanisms are used depending on the protocol:
*
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
@@ -108,7 +108,7 @@ import scala.collection.mutable.ArrayBuffer
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
- * could be used for logging, encypryption, or compression.
+ * could be used for logging, encryption, or compression.
*
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
*
@@ -122,15 +122,11 @@ import scala.collection.mutable.ArrayBuffer
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
* and Spark will use that to do authorization against the view acls.
*
- * For other Spark deployments, the shared secret should be specified via the SPARK_SECRET
+ * For other Spark deployments, the shared secret must be specified via the SPARK_SECRET
* environment variable. This isn't ideal but it means only the user who starts the process
- * has access to view that variable. Note that Spark does try to generate a secret for
- * you if the SPARK_SECRET environment variable is not set, but it gets put into the java
- * system property which can be viewed by other users, so setting the SPARK_SECRET environment
- * variable is recommended.
- * All the nodes (Master and Workers) need to have the same shared secret
- * and all the applications running need to have that same shared secret. This again
- * is not ideal as one user could potentially affect another users application.
+ * has access to view that variable.
+ * All the nodes (Master and Workers) and the applications need to have the same shared secret.
+ * This again is not ideal as one user could potentially affect another users application.
* This should be enhanced in the future to provide better protection.
* If the UI needs to be secured the user needs to install a javax servlet filter to do the
* authentication. Spark will then use that user to compare against the view acls to do
@@ -152,7 +148,8 @@ private[spark] class SecurityManager extends Logging {
private val viewAcls = aclUsers.map(_.trim()).filter(!_.isEmpty).toSet
private val secretKey = generateSecretKey()
- logDebug("is auth enabled = " + authOn + " is uiAcls enabled = " + uiAclsOn)
+ logInfo("SecurityManager, is authentication enabled: " + authOn +
+ " are ui acls enabled: " + uiAclsOn)
// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
@@ -170,7 +167,7 @@ private[spark] class SecurityManager extends Logging {
return passAuth
}
}
- );
+ )
}
/**
@@ -179,16 +176,12 @@ private[spark] class SecurityManager extends Logging {
* The way the key is stored depends on the Spark deployment mode. Yarn
* uses the Hadoop UGI.
*
- * For non-Yarn deployments, If the environment variable is not set already
- * we generate a secret and since we can't set an environment variable dynamically
- * we set the java system property SPARK_SECRET. This will allow it to automatically
- * work in certain situations. Others this still will not work and this definitely is
- * not ideal since other users can see it. We should switch to put it in
- * a config once Spark supports configs.
+ * For non-Yarn deployments, If the environment variable is not set
+ * we throw an exception.
*/
private def generateSecretKey(): String = {
if (!isAuthenticationEnabled) return null
- // first check to see if the secret is already set, else generate a new one
+ // first check to see if the secret is already set, else generate a new one if on yarn
if (SparkHadoopUtil.get.isYarnMode) {
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
if (secretKey != null) {
@@ -200,17 +193,17 @@ private[spark] class SecurityManager extends Logging {
}
val secret = System.getProperty("SPARK_SECRET", System.getenv("SPARK_SECRET"))
if (secret != null && !secret.isEmpty()) return secret
- // generate one
- val sCookie = akka.util.Crypt.generateSecureCookie
-
- // if we generated the secret then we must be the first so lets set it so t
- // gets used by everyone else
+ val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
+ // generate one
+ akka.util.Crypt.generateSecureCookie
+ } else {
+ throw new Exception("Error: a secret key must be specified via SPARK_SECRET env variable")
+ }
if (SparkHadoopUtil.get.isYarnMode) {
+ // if we generated the secret then we must be the first so lets set it so t
+ // gets used by everyone else
SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, sCookie)
- logDebug("adding secret to credentials yarn mode")
- } else {
- System.setProperty("SPARK_SECRET", sCookie)
- logDebug("adding secret to java property")
+ logInfo("adding secret to credentials in yarn mode")
}
sCookie
}
@@ -223,7 +216,9 @@ private[spark] class SecurityManager extends Logging {
/**
* Checks the given user against the view acl list to see if they have
- * authorization to view the UI.
+ * authorization to view the UI. If the UI acls must are disabled
+ * via spark.ui.acls.enable, all users have view access.
+ *
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
@@ -48,7 +48,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val metricsHandlers = worker.metricsSystem.getServletHandlers
val handlers = metricsHandlers ++ Seq[ServletContextHandler](
- createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR, "/static/*"),
+ createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
createServletHandler("/log", (request: HttpServletRequest) => log(request)),
createServletHandler("/logPage", (request: HttpServletRequest) => logPage(request)),
createServletHandler("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
@@ -199,6 +199,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
}
private[spark] object WorkerWebUI {
- val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
+ val STATIC_RESOURCE_BASE = "org/apache/spark/ui"
val DEFAULT_PORT="8081"
}
@@ -32,8 +32,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
val socketRemoteConnectionManagerId: ConnectionManagerId, val connectionId: ConnectionId)
extends Logging {
- var sparkSaslServer : SparkSaslServer = null
- var sparkSaslClient : SparkSaslClient = null
+ var sparkSaslServer: SparkSaslServer = null
+ var sparkSaslClient: SparkSaslClient = null
def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = {
this(channel_, selector_,
@@ -23,7 +23,7 @@ private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId,
private[spark] object ConnectionId {
- def createConnectionIdFromString(connectionIdString: String) : ConnectionId = {
+ def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
val res = connectionIdString.split("_").map(_.trim())
if (res.size != 3) {
throw new Exception("Error converting ConnectionId string: " + connectionIdString +
@@ -56,7 +56,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
// default to 30 second timeout waiting for authentication
private val authTimeout = System.getProperty("spark.core.connection.auth.wait.timeout",
- "30000").toInt
+ "30").toInt
private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -79,6 +79,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
+ // used to track the SendingConnections waiting to do SASL negotiation
private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection]
with SynchronizedMap[ConnectionId, SendingConnection]
private val connectionsByKey =
@@ -729,7 +730,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
// We did not find it useful in our test-env ...
// If we do re-add it, we should consistently use it everywhere I guess ?
message.senderAddress = id.toSocketAddress()
- logDebug("Sending Security [" + message + "] to [" + connManagerId + "]")
+ logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
//send security message until going connection has been authenticated
@@ -745,7 +746,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId,
newConnectionId)
- logDebug("creating new sending connection: " + newConnectionId)
+ logTrace("creating new sending connection: " + newConnectionId)
registerRequests.enqueue(newConnection)
newConnection
@@ -772,10 +773,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logDebug("getAuthenticated wait connectionid: " + connection.connectionId)
// have timeout in case remote side never responds
connection.getAuthenticated().wait(500)
- if (((clock.getTime() - startTime) >= authTimeout) && (!connection.isSaslComplete())) {
+ if (((clock.getTime() - startTime) >= (authTimeout * 1000)) && (!connection.isSaslComplete())) {
// took to long to authenticate the connection, something probably went wrong
throw new Exception("Took to long for authentication to " + connectionManagerId +
- ", waited " + authTimeout + "ms, failing.")
+ ", waited " + authTimeout + "seconds, failing.")
}
}
}
@@ -25,6 +25,35 @@ import scala.collection.mutable.StringBuilder
import org.apache.spark._
import org.apache.spark.network._
+/**
+ * SecurityMessage is class that contains the connectionId and sasl token
+ * used in SASL negotiation. SecurityMessage has routines for converting
+ * it to and from a BufferMessage so that it can be sent by the ConnectionManager
+ * and easily consumed by users when received.
+ * The api was modeled after BlockMessage.
+ *
+ * The connectionId is the connectionId of the client side. Since
+ * message passing is asynchronous and its possible for the server side (receiving)
+ * to get multiple different types of messages on the same connection the connectionId
+ * is used to know which connnection the security message is intended for.
+ *
+ * For instance, lets say we are node_0. We need to send data to node_1. The node_0 side
+ * is acting as a client and connecting to node_1. SASL negotiation has to occur
+ * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message.
+ * node_1 receives the message from node_0 but before it can process it and send a response,
+ * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0
+ * and sends a security message of its own to authenticate as a client. Now node_0 gets
+ * the message and it needs to decide if this message is in response to it being a client
+ * (from the first send) or if its just node_1 trying to connect to it to send data. This
+ * is where the connectionId field is used. node_0 can lookup the connectionId to see if
+ * it is in response to it being a client or if its in response to someone sending other data.
+ *
+ * The format of a SecurityMessage as its sent is:
+ * - Length of the ConnectionId
+ * - ConnectionId
+ * - Length of the token
+ * - Token
+ */
private[spark] class SecurityMessage() extends Logging {
private var connectionId: String = null
@@ -39,6 +68,9 @@ private[spark] class SecurityMessage() extends Logging {
connectionId = newconnectionId
}
+ /**
+ * Read the given buffer and set the members of this class.
+ */
def set(buffer: ByteBuffer) {
val idLength = buffer.getInt()
val idBuilder = new StringBuilder(idLength)
@@ -68,10 +100,19 @@ private[spark] class SecurityMessage() extends Logging {
return token
}
+ /**
+ * Create a BufferMessage that can be sent by the ConnectionManager containing
+ * the security information from this class.
+ * @return BufferMessage
+ */
def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()
+ // 4 bytes for the length of the connectionId
+ // connectionId is of type char so multiple the length by 2 to get number of bytes
+ // 4 bytes for the length of token
+ // token is a byte buffer so just take the length
var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length)
buffer.putInt(connectionId.length())
connectionId.foreach((x: Char) => buffer.putChar(x))
@@ -96,15 +137,27 @@ private[spark] class SecurityMessage() extends Logging {
private[spark] object SecurityMessage {
+ /**
+ * Convert the given BufferMessage to a SecurityMessage by parsing the contents
+ * of the BufferMessage and populating the SecurityMessage fields.
+ * @param bufferMessage is a BufferMessage that was received
+ * @return new SecurityMessage
+ */
def fromBufferMessage(bufferMessage: BufferMessage): SecurityMessage = {
val newSecurityMessage = new SecurityMessage()
newSecurityMessage.set(bufferMessage)
newSecurityMessage
}
- def fromResponse(response : Array[Byte], newConnectionId : String) : SecurityMessage = {
+ /**
+ * Create a SecurityMessage to send from a given saslResponse.
+ * @param response is the response to a challenge from the SaslClient or Saslserver
+ * @param connectionId the client connectionId we are negotiation authentication for
+ * @return a new SecurityMessage
+ */
+ def fromResponse(response : Array[Byte], connectionId : String) : SecurityMessage = {
val newSecurityMessage = new SecurityMessage()
- newSecurityMessage.set(response, newConnectionId)
+ newSecurityMessage.set(response, connectionId)
newSecurityMessage
}
}
View
@@ -515,9 +515,9 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.core.connection.auth.wait.timeout</td>
- <td>30000</td>
+ <td>30</td>
<td>
- Number of milliseconds for the connection to wait for authentication to occur before timing
+ Number of seconds for the connection to wait for authentication to occur before timing
out and giving up.
</td>
</tr>
View
@@ -12,3 +12,5 @@ For Spark on Yarn deployments, configuring `spark.authenticate` to true will aut
For other types of Spark deployments, the environment variable `SPARK_SECRET` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
See [Spark Configuration](configuration.html) for more details on the security configs.
+
+See <a href="api/core/index.html#org.apache.spark.SecurityManager"><code>org.apache.spark.SecurityManager</code></a> for implementation details about security.

0 comments on commit 2f77147

Please sign in to comment.