Skip to content

Commit

Permalink
Move start service logic to Utils
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 4, 2014
1 parent ec676f4 commit 73fbe89
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 67 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package org.apache.spark

import java.io.File

import org.apache.spark.network.PortManager
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
Expand Down Expand Up @@ -87,7 +86,7 @@ private[spark] class HttpServer(resourceBase: File,
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort)
val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort)
server = actualServer
port = actualPort
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, port)
}
PortManager.startWithIncrements(port, 3, startService)
Utils.startServiceOnPort(port, 3, startService)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
60 changes: 0 additions & 60 deletions core/src/main/scala/org/apache/spark/network/PortManager.scala

This file was deleted.

4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import java.net.{BindException, InetSocketAddress, URL}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

Expand All @@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.util.Utils

/**
Expand Down
34 changes: 33 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.util

import java.io._
import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
import java.net._
import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
Expand Down Expand Up @@ -1331,4 +1331,36 @@ private[spark] object Utils extends Logging {
.map { case (k, v) => s"-D$k=$v" }
}

/**
* Attempt to start a service on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt.
*
* @param startPort The initial port to start the service on.
* @param maxRetries Maximum number of retries to attempt.
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @throws SparkException When unable to start service in the given number of attempts
* @return
*/
def startServiceOnPort[T](
startPort: Int,
maxRetries: Int,
startService: Int => (T, Int)): (T, Int) = {
for (offset <- 0 to maxRetries) {
val tryPort = (startPort + offset) % 65536
try {
return startService(tryPort)
} catch {
case e: BindException =>
if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) {
throw e
}
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1))
}
}
// Should never happen
throw new SparkException(s"Couldn't start service on port $startPort")
}

}

0 comments on commit 73fbe89

Please sign in to comment.