Skip to content

Commit

Permalink
Treat 0 ports specially + return correct ConnectionManager port
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 5, 2014
1 parent ba32280 commit 1d7e408
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] class ConnectionManager(

private def startService(port: Int): (ServerSocketChannel, Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, port)
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
Expand Down
27 changes: 10 additions & 17 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,33 @@ private[spark] object JettyUtils extends Logging {
hostName: String,
port: Int,
handlers: Seq[ServletContextHandler],
conf: SparkConf): ServerInfo = {
conf: SparkConf,
serverName: String = ""): ServerInfo = {

val collection = new ContextHandlerCollection
collection.setHandlers(handlers.toArray)
addFilters(handlers, conf)

@tailrec
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(collection)

Try {
try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
val nextPort = (currentPort + 1) % 65536
(server, server.getConnectors.head.getLocalPort)
} catch {
case e: Exception =>
server.stop()
pool.stop()
val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
if (f.toString.contains("Address already in use")) {
logWarning(s"$msg - $f")
} else {
logError(msg, f.exception)
}
connect(nextPort)
throw e
}
}

val (server, boundPort) = connect(port)
val (server, boundPort) =
Utils.startServiceOnPort[Server](port, connect, serverName, maxRetries = 10)
ServerInfo(server, boundPort, collection)
}

Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1347,22 +1347,26 @@ private[spark] object Utils extends Logging {
startService: Int => (T, Int),
serviceName: String = "",
maxRetries: Int = 3): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
for (offset <- 0 to maxRetries) {
val tryPort = (startPort + offset) % 65536
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536
try {
return startService(tryPort)
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: BindException =>
val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'"
if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: $service failed after $maxRetries retries!"
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.")
logWarning(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
}
}
// Should never happen
Expand Down

0 comments on commit 1d7e408

Please sign in to comment.