diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index f575a0d65e80d..edc3889c9ae51 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -23,7 +23,10 @@ import com.google.common.io.Files import org.apache.spark.util.Utils -private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging { +private[spark] class HttpFileServer( + securityManager: SecurityManager, + requestedPort: Int = 0) + extends Logging { var baseDir : File = null var fileDir : File = null @@ -31,18 +34,14 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo var httpServer : HttpServer = null var serverUri : String = null - def initialize(port: Option[Int]) { + def initialize() { baseDir = Utils.createTempDir() fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = if (port.isEmpty) { - new HttpServer(baseDir, securityManager) - } else { - new HttpServer(baseDir, securityManager, port.get) - } + httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server") httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 3883a9cb71f40..9ced0b87a5187 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -41,13 +41,33 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -private[spark] class HttpServer(resourceBase: File, - securityManager: SecurityManager, - localPort: Int = 0) extends Logging { +private[spark] class HttpServer( + resourceBase: File, + securityManager: SecurityManager, + requestedPort: Int = 0, + serverName: String = "HTTP server") extends Logging { private var server: Server = null - private var port: Int = localPort + private var port: Int = requestedPort - private def startOnPort(startPort: Int): (Server, Int) = { + def start() { + if (server != null) { + throw new ServerStateException("Server is already started") + } else { + logInfo("Starting HTTP Server") + val (actualServer, actualPort) = + Utils.startServiceOnPort[Server](requestedPort, doStart, serverName) + server = actualServer + port = actualPort + } + } + + /** + * Actually start the HTTP server on the given port. + * + * Note that this is only best effort in the sense that we may end up binding to a nearby port + * in the event of port collision. Return the bound server and the actual port used. + */ + private def doStart(startPort: Int): (Server, Int) = { val server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) @@ -76,22 +96,11 @@ private[spark] class HttpServer(resourceBase: File, } server.start() - val actualPort = server.getConnectors()(0).getLocalPort() + val actualPort = server.getConnectors()(0).getLocalPort (server, actualPort) } - def start() { - if (server != null) { - throw new ServerStateException("Server is already started") - } else { - logInfo("Starting HTTP Server") - val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort) - server = actualServer - port = actualPort - } - } - /** * Setup Jetty to the HashLoginService using a single user with our * shared secret. Configure it to use DIGEST-MD5 authentication so that the password @@ -143,7 +152,7 @@ private[spark] class HttpServer(resourceBase: File, if (server == null) { throw new ServerStateException("Server is not started") } else { - return "http://" + Utils.localIpAddress + ":" + port + "http://" + Utils.localIpAddress + ":" + port } } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ab0e1d6c3ba93..051814f074c80 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -225,8 +225,9 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { - val server = new HttpFileServer(securityManager) - server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt)) + val fileServerPort = conf.getInt("spark.fileserver.port", 0) + val server = new HttpFileServer(securityManager, fileServerPort) + server.initialize() conf.set("spark.fileserver.uri", server.serverUri) server } else { diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index eaf3b69a7c5a6..942dc7d7eac87 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -152,8 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) - val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(broadcastDir, securityManager, broadcastListenPort) + val broadcastPort = conf.getInt("spark.broadcast.port", 0) + server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 382e6362cf953..a913580d8f6c6 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -38,8 +38,11 @@ import scala.language.postfixOps import org.apache.spark._ import org.apache.spark.util.{SystemClock, Utils} -private[spark] class ConnectionManager(port: Int, conf: SparkConf, - securityManager: SecurityManager) extends Logging { +private[spark] class ConnectionManager( + port: Int, + conf: SparkConf, + securityManager: SecurityManager, + name: String = "Connection manager") extends Logging { class MessageStatus( val message: Message, @@ -105,11 +108,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.setReuseAddress(true) serverChannel.socket.setReceiveBufferSize(256 * 1024) - private def startService(port: Int) = { + private def startService(port: Int): (ServerSocketChannel, Int) = { serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, port) } - Utils.startServiceOnPort(port, 3, startService) + Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f8330eb49c0c1..3876cf43e2a7d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -60,11 +60,12 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker) extends Logging { + private val port = conf.getInt("spark.blockManager.port", 0) val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) - val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf, - securityManager) + val connectionManager = + new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") implicit val futureExecContext = connectionManager.futureExecContext diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 844675eeddf09..042bc5b98fa0d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1345,22 +1345,29 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - maxRetries: Int, - startService: Int => (T, Int)): (T, Int) = { + startService: Int => (T, Int), + serviceName: String = "", + maxRetries: Int = 3): (T, Int) = { for (offset <- 0 to maxRetries) { val tryPort = (startPort + offset) % 65536 try { return startService(tryPort) } catch { case e: BindException => + val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'" if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { - throw e + val exceptionMessage = + s"${e.getMessage}: $service failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception } - logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) + logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.") } } // Should never happen - throw new SparkException(s"Couldn't start service on port $startPort") + throw new SparkException(s"Failed to start service on port $startPort") } } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 89b64df320498..84b57cd2dc1af 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -102,8 +102,8 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ - val classServerListenPort = conf.getInt("spark.replClassServer.port", 0) - val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort) + val classServerPort = conf.getInt("spark.replClassServer.port", 0) + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything