From 1d7e40813e6ae98ee5cffb3e9e61807f3a01e941 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 17:40:27 -0700 Subject: [PATCH] Treat 0 ports specially + return correct ConnectionManager port --- .../spark/network/ConnectionManager.scala | 2 +- .../org/apache/spark/ui/JettyUtils.scala | 27 +++++++------------ .../scala/org/apache/spark/util/Utils.scala | 14 ++++++---- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index a913580d8f6c6..f837474b0c6ef 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -110,7 +110,7 @@ private[spark] class ConnectionManager( private def startService(port: Int): (ServerSocketChannel, Int) = { serverChannel.socket.bind(new InetSocketAddress(port)) - (serverChannel, port) + (serverChannel, serverChannel.socket.getLocalPort) } Utils.startServiceOnPort[ServerSocketChannel](port, startService, name) serverChannel.register(selector, SelectionKey.OP_ACCEPT) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2535e3c1c41f..220b1bf234aaf 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -174,40 +174,33 @@ private[spark] object JettyUtils extends Logging { hostName: String, port: Int, handlers: Seq[ServletContextHandler], - conf: SparkConf): ServerInfo = { + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection collection.setHandlers(handlers.toArray) addFilters(handlers, conf) - @tailrec + // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) server.setHandler(collection) - - Try { + try { server.start() - } match { - case s: Success[_] => - (server, server.getConnectors.head.getLocalPort) - case f: Failure[_] => - val nextPort = (currentPort + 1) % 65536 + (server, server.getConnectors.head.getLocalPort) + } catch { + case e: Exception => server.stop() pool.stop() - val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort." - if (f.toString.contains("Address already in use")) { - logWarning(s"$msg - $f") - } else { - logError(msg, f.exception) - } - connect(nextPort) + throw e } } - val (server, boundPort) = connect(port) + val (server, boundPort) = + Utils.startServiceOnPort[Server](port, connect, serverName, maxRetries = 10) ServerInfo(server, boundPort, collection) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f955f5bc59422..f091e5ace1022 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1347,22 +1347,26 @@ private[spark] object Utils extends Logging { startService: Int => (T, Int), serviceName: String = "", maxRetries: Int = 3): (T, Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" for (offset <- 0 to maxRetries) { - val tryPort = (startPort + offset) % 65536 + // Do not increment port if startPort is 0, which is treated as a special port + val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 try { - return startService(tryPort) + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) } catch { case e: BindException => - val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'" if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { val exceptionMessage = - s"${e.getMessage}: $service failed after $maxRetries retries!" + s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" val exception = new BindException(exceptionMessage) // restore original stack trace exception.setStackTrace(e.getStackTrace) throw exception } - logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.") + logWarning(s"Service$serviceString could not bind on port $tryPort. " + + s"Attempting port ${tryPort + 1}.") } } // Should never happen