Skip to content

Commit

Permalink
[SPARK-2157] Enable tight firewall rules for Spark
Browse files Browse the repository at this point in the history
The goal of this PR is to allow users of Spark to write tight firewall rules for their clusters. This is currently not possible because Spark uses random ports in many places, notably the communication between executors and drivers. The changes in this PR are based on top of ash211's changes in #1107.

The list covered here may or may not be the complete set of port needed for Spark to operate perfectly. However, as of the latest commit there are no known sources of random ports (except in tests). I have not documented a few of the more obscure configs.

My spark-env.sh looks like this:
```
export SPARK_MASTER_PORT=6060
export SPARK_WORKER_PORT=7070
export SPARK_MASTER_WEBUI_PORT=9090
export SPARK_WORKER_WEBUI_PORT=9091
```
and my spark-defaults.conf looks like this:
```
spark.master spark://andrews-mbp:6060
spark.driver.port 5001
spark.fileserver.port 5011
spark.broadcast.port 5021
spark.replClassServer.port 5031
spark.blockManager.port 5041
spark.executor.port 5051
```

Author: Andrew Or <andrewor14@gmail.com>
Author: Andrew Ash <andrew@andrewash.com>

Closes #1777 from andrewor14/configure-ports and squashes the following commits:

621267b [Andrew Or] Merge branch 'master' of github.com:apache/spark into configure-ports
8a6b820 [Andrew Or] Use a random UI port during tests
7da0493 [Andrew Or] Fix tests
523c30e [Andrew Or] Add test for isBindCollision
b97b02a [Andrew Or] Minor fixes
c22ad00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into configure-ports
93d359f [Andrew Or] Executors connect to wrong port when collision occurs
d502e5f [Andrew Or] Handle port collisions when creating Akka systems
a2dd05c [Andrew Or] Patrick's comment nit
86461e2 [Andrew Or] Remove spark.executor.env.port and spark.standalone.client.port
1d2d5c6 [Andrew Or] Fix ports for standalone cluster mode
cb3be88 [Andrew Or] Various doc fixes (broken link, format etc.)
e837cde [Andrew Or] Remove outdated TODOs
bfbab28 [Andrew Or] Merge branch 'master' of github.com:apache/spark into configure-ports
de1b207 [Andrew Or] Update docs to reflect new ports
b565079 [Andrew Or] Add spark.ports.maxRetries
2551eb2 [Andrew Or] Remove spark.worker.watcher.port
151327a [Andrew Or] Merge branch 'master' of github.com:apache/spark into configure-ports
9868358 [Andrew Or] Add a few miscellaneous ports
6016e77 [Andrew Or] Add spark.executor.port
8d836e6 [Andrew Or] Also document SPARK_{MASTER/WORKER}_WEBUI_PORT
4d9e6f3 [Andrew Or] Fix super subtle bug
3f8e51b [Andrew Or] Correct erroneous docs...
e111d08 [Andrew Or] Add names for UI services
470f38c [Andrew Or] Special case non-"Address already in use" exceptions
1d7e408 [Andrew Or] Treat 0 ports specially + return correct ConnectionManager port
ba32280 [Andrew Or] Minor fixes
6b550b0 [Andrew Or] Assorted fixes
73fbe89 [Andrew Or] Move start service logic to Utils
ec676f4 [Andrew Or] Merge branch 'SPARK-2157' of github.com:ash211/spark into configure-ports
038a579 [Andrew Ash] Trust the server start function to report the port the service started on
7c5bdc4 [Andrew Ash] Fix style issue
0347aef [Andrew Ash] Unify port fallback logic to a single place
24a4c32 [Andrew Ash] Remove type on val to match surrounding style
9e4ad96 [Andrew Ash] Reformat for style checker
5d84e0e [Andrew Ash] Document new port configuration options
066dc7a [Andrew Ash] Fix up HttpServer port increments
cad16da [Andrew Ash] Add fallover increment logic for HttpServer
c5a0568 [Andrew Ash] Fix ConnectionManager to retry with increment
b80d2fd [Andrew Ash] Make Spark's block manager port configurable
17c79bb [Andrew Ash] Add a configuration option for spark-shell's class server
f34115d [Andrew Ash] SPARK-1176 Add port configuration for HttpBroadcast
49ee29b [Andrew Ash] SPARK-1174 Add port configuration for HttpFileServer
1c0981a [Andrew Ash] Make port in HttpServer configurable
  • Loading branch information
andrewor14 authored and pwendell committed Aug 6, 2014
1 parent ee7f308 commit 09f7e45
Show file tree
Hide file tree
Showing 22 changed files with 416 additions and 172 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.google.common.io.Files

import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
private[spark] class HttpFileServer(
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {

var baseDir : File = null
var fileDir : File = null
Expand All @@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
88 changes: 54 additions & 34 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
Expand All @@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private[spark] class HttpServer(
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
extends Logging {

private var server: Server = null
private var port: Int = -1
private var port: Int = requestedPort

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
server = actualServer
port = actualPort
}
}

/**
* Actually start the HTTP server on the given port.
*
* Note that this is only best effort in the sense that we may end up binding to a nearby port
* in the event of port collision. Return the bound server and the actual port used.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort

(server, actualPort)
}

/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
Expand Down Expand Up @@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
return "http://" + Utils.localIpAddress + ":" + port
"http://" + Utils.localIpAddress + ":" + port
}
}
}
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ private[spark] object SparkConf {
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
isSparkPortConf(name)
}

/**
* Return whether the given config is a Spark port config.
*/
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
}
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.Socket

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
import scala.util.Properties

import akka.actor._
Expand Down Expand Up @@ -151,10 +150,10 @@ object SparkEnv extends Logging {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)

// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
conf.set("spark.driver.port", boundPort.toString)
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
Expand Down Expand Up @@ -222,7 +221,8 @@ object SparkEnv extends Logging {

val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ object Client {
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
Expand All @@ -34,7 +35,7 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
port: Option[Int] = None)
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
with Logging {

val timeout = AkkaUtils.askTimeout(worker.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
"driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
Expand All @@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ import scala.language.postfixOps
import org.apache.spark._
import org.apache.spark.util.{SystemClock, Utils}

private[spark] class ConnectionManager(port: Int, conf: SparkConf,
securityManager: SecurityManager) extends Logging {
private[spark] class ConnectionManager(
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
name: String = "Connection manager")
extends Logging {

class MessageStatus(
val message: Message,
Expand Down Expand Up @@ -105,7 +109,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

serverChannel.socket.bind(new InetSocketAddress(port))
private def startService(port: Int): (ServerSocketChannel, Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker)
extends Logging {

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0, conf, securityManager)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

implicit val futureExecContext = connectionManager.futureExecContext

Expand Down
26 changes: 9 additions & 17 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,32 @@ private[spark] object JettyUtils extends Logging {
hostName: String,
port: Int,
handlers: Seq[ServletContextHandler],
conf: SparkConf): ServerInfo = {
conf: SparkConf,
serverName: String = ""): ServerInfo = {

val collection = new ContextHandlerCollection
collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)

@tailrec
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(collection)

Try {
try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
val nextPort = (currentPort + 1) % 65536
(server, server.getConnectors.head.getLocalPort)
} catch {
case e: Exception =>
server.stop()
pool.stop()
val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
if (f.toString.contains("Address already in use")) {
logWarning(s"$msg - $f")
} else {
logError(msg, f.exception)
}
connect(nextPort)
throw e
}
}

val (server, boundPort) = connect(port)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class SparkUI(
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {

def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ private[spark] abstract class WebUI(
securityManager: SecurityManager,
port: Int,
conf: SparkConf,
basePath: String = "")
basePath: String = "",
name: String = "")
extends Logging {

protected val tabs = ArrayBuffer[WebUITab]()
Expand Down Expand Up @@ -97,7 +98,7 @@ private[spark] abstract class WebUI(
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
Expand Down
Loading

0 comments on commit 09f7e45

Please sign in to comment.