From 73fbe892794a6f7e4a051401f356c89f4aa7f81f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 4 Aug 2014 15:39:01 -0700 Subject: [PATCH] Move start service logic to Utils --- .../scala/org/apache/spark/HttpServer.scala | 5 +- .../spark/network/ConnectionManager.scala | 2 +- .../apache/spark/network/PortManager.scala | 60 ------------------- .../org/apache/spark/ui/JettyUtils.scala | 4 +- .../scala/org/apache/spark/util/Utils.scala | 34 ++++++++++- 5 files changed, 38 insertions(+), 67 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/network/PortManager.scala diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index e0c76feb124a2..3883a9cb71f40 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,10 +19,9 @@ package org.apache.spark import java.io.File -import org.apache.spark.network.PortManager import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator -import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler} +import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.bio.SocketConnector @@ -87,7 +86,7 @@ private[spark] class HttpServer(resourceBase: File, throw new ServerStateException("Server is already started") } else { logInfo("Starting HTTP Server") - val (actualServer, actualPort) = PortManager.startWithIncrements(localPort, 3, startOnPort) + val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort) server = actualServer port = actualPort } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e624414861b1d..382e6362cf953 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -109,7 +109,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, serverChannel.socket.bind(new InetSocketAddress(port)) (serverChannel, port) } - PortManager.startWithIncrements(port, 3, startService) + Utils.startServiceOnPort(port, 3, startService) serverChannel.register(selector, SelectionKey.OP_ACCEPT) val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) diff --git a/core/src/main/scala/org/apache/spark/network/PortManager.scala b/core/src/main/scala/org/apache/spark/network/PortManager.scala deleted file mode 100644 index f9ad9629fe80a..0000000000000 --- a/core/src/main/scala/org/apache/spark/network/PortManager.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network - -import java.net.InetSocketAddress - -import org.apache.spark.{Logging, SparkException} -import org.eclipse.jetty.server.Server - -private[spark] object PortManager extends Logging -{ - - /** - * Start service on given port, or attempt to fall back to the n+1 port for a certain number of - * retries - * - * @param startPort - * @param maxRetries Maximum number of retries to attempt. A value of e.g. 3 will cause 4 - * total attempts, on ports n, n+1, n+2, and n+3 - * @param startService Function to start service on a given port. Expected to throw a java.net - * .BindException if the port is already in use - * @tparam T - * @throws SparkException When unable to start service in the given number of attempts - * @return - */ - def startWithIncrements[T](startPort: Int, maxRetries: Int, startService: Int => (T, Int)): - (T, Int) = { - for( offset <- 0 to maxRetries) { - val tryPort = startPort + offset - try { - return startService(tryPort) - } catch { - case e: java.net.BindException => { - if (!e.getMessage.contains("Address already in use") || - offset == (maxRetries-1)) { - throw e - } - logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) - } - case e: Exception => throw e - } - } - throw new SparkException(s"Couldn't start service on port $startPort") - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2535e3c1c41f..a4be870757250 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.{BindException, InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 30073a82857d2..844675eeddf09 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection} +import java.net._ import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -1331,4 +1331,36 @@ private[spark] object Utils extends Logging { .map { case (k, v) => s"-D$k=$v" } } + /** + * Attempt to start a service on the given port, or fail after a number of attempts. + * Each subsequent attempt uses 1 + the port used in the previous attempt. + * + * @param startPort The initial port to start the service on. + * @param maxRetries Maximum number of retries to attempt. + * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example. + * @param startService Function to start service on a given port. + * This is expected to throw java.net.BindException on port collision. + * @throws SparkException When unable to start service in the given number of attempts + * @return + */ + def startServiceOnPort[T]( + startPort: Int, + maxRetries: Int, + startService: Int => (T, Int)): (T, Int) = { + for (offset <- 0 to maxRetries) { + val tryPort = (startPort + offset) % 65536 + try { + return startService(tryPort) + } catch { + case e: BindException => + if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) { + throw e + } + logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1)) + } + } + // Should never happen + throw new SparkException(s"Couldn't start service on port $startPort") + } + }