Skip to content

Commit

Permalink
Unify port fallback logic to a single place
Browse files Browse the repository at this point in the history
  • Loading branch information
ash211 committed Jul 25, 2014
1 parent 24a4c32 commit 0347aef
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 39 deletions.
26 changes: 4 additions & 22 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.apache.spark.network.PortManager
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
Expand Down Expand Up @@ -47,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File,
private var server: Server = null
private var port: Int = localPort

private def startOnPort(startPort: Int): Tuple2[Server,Int] = {
private def startOnPort(startPort: Int): Server = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
Expand Down Expand Up @@ -78,34 +79,15 @@ private[spark] class HttpServer(resourceBase: File,
server.start()
val actualPort = server.getConnectors()(0).getLocalPort()

return (server, actualPort)
}

private def startWithIncrements(startPort: Int, maxRetries: Int): Tuple2[Server,Int] = {
for( offset <- 0 to maxRetries) {
try {
val (server, actualPort) = startOnPort(startPort + offset)
return (server, actualPort)
} catch {
case e: java.net.BindException => {
if (!e.getMessage.contains("Address already in use") ||
offset == (maxRetries-1)) {
throw e
}
logInfo("Could not bind on port: " + (startPort + offset))
}
case e: Exception => throw e
}
}
return (null, -1)
server
}

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) = startWithIncrements(localPort, 3)
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
server = actualServer
port = actualPort
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

def bindWithIncrement(port: Int, maxTries: Int = 3) {
for( offset <- 0 until maxTries ) {
try {
serverChannel.socket.bind(new InetSocketAddress(port + offset))
return
} catch {
case e: java.net.BindException => {
if(!e.getMessage.contains("Address already in use") ||
offset == maxTries) {
throw e
}
logInfo("Could not bind on port: " + (port + offset))
}
case e: Exception => throw e
}
}
private def startService(port: Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
serverChannel
}
bindWithIncrement(port, 3)
PortManager.startWithIncrements(port, 3, startService)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
61 changes: 61 additions & 0 deletions core/src/main/scala/org/apache/spark/network/PortManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.net.InetSocketAddress

import org.apache.spark.{Logging, SparkException}
import org.eclipse.jetty.server.Server

private[spark] object PortManager extends Logging
{

/**
* Start service on given port, or attempt to fall back to the n+1 port for a certain number of
* retries
*
* @param startPort
* @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4
* total attempts, on ports n, n+1, n+2, and n+3
* @param startService Function to start service on a given port. Expected to throw a java.net
* .BindException if the port is already in use
* @tparam T
* @throws SparkException When unable to start service in the given number of attempts
* @return
*/
def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => T):
(T, Int) = {
for( offset <- 0 to maxRetries) {
val tryPort = startPort + offset
try {
val service: T = startService(tryPort)
return (service, tryPort)
} catch {
case e: java.net.BindException => {
if (!e.getMessage.contains("Address already in use") ||
offset == (maxRetries-1)) {
throw e
}
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort+1))
}
case e: Exception => throw e
}
}
throw new SparkException(s"Couldn't start service on port $startPort")
}
}

0 comments on commit 0347aef

Please sign in to comment.