Skip to content

Commit

Permalink
Assorted fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 4, 2014
1 parent 73fbe89 commit 6b550b0
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 42 deletions.
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,25 @@ import com.google.common.io.Files

import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
private[spark] class HttpFileServer(
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {

var baseDir : File = null
var fileDir : File = null
var jarDir : File = null
var httpServer : HttpServer = null
var serverUri : String = null

def initialize(port: Option[Int]) {
def initialize() {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = if (port.isEmpty) {
new HttpServer(baseDir, securityManager)
} else {
new HttpServer(baseDir, securityManager, port.get)
}
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
45 changes: 27 additions & 18 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,33 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File,
securityManager: SecurityManager,
localPort: Int = 0) extends Logging {
private[spark] class HttpServer(
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server") extends Logging {
private var server: Server = null
private var port: Int = localPort
private var port: Int = requestedPort

private def startOnPort(startPort: Int): (Server, Int) = {
def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
server = actualServer
port = actualPort
}
}

/**
* Actually start the HTTP server on the given port.
*
* Note that this is only best effort in the sense that we may end up binding to a nearby port
* in the event of port collision. Return the bound server and the actual port used.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
Expand Down Expand Up @@ -76,22 +96,11 @@ private[spark] class HttpServer(resourceBase: File,
}

server.start()
val actualPort = server.getConnectors()(0).getLocalPort()
val actualPort = server.getConnectors()(0).getLocalPort

(server, actualPort)
}

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) = Utils.startServiceOnPort(localPort, 3, startOnPort)
server = actualServer
port = actualPort
}
}

/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
Expand Down Expand Up @@ -143,7 +152,7 @@ private[spark] class HttpServer(resourceBase: File,
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
return "http://" + Utils.localIpAddress + ":" + port
"http://" + Utils.localIpAddress + ":" + port
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ object SparkEnv extends Logging {

val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
server.initialize(conf.getOption("spark.fileserver.port").map(_.toInt))
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
val broadcastListenPort: Int = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastListenPort)
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import scala.language.postfixOps
import org.apache.spark._
import org.apache.spark.util.{SystemClock, Utils}

private[spark] class ConnectionManager(port: Int, conf: SparkConf,
securityManager: SecurityManager) extends Logging {
private[spark] class ConnectionManager(
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
name: String = "Connection manager") extends Logging {

class MessageStatus(
val message: Message,
Expand Down Expand Up @@ -105,11 +108,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
serverChannel.socket.setReuseAddress(true)
serverChannel.socket.setReceiveBufferSize(256 * 1024)

private def startService(port: Int) = {
private def startService(port: Int): (ServerSocketChannel, Int) = {
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, port)
}
Utils.startServiceOnPort(port, 3, startService)
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker)
extends Logging {

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(conf.getInt("spark.blockManager.port", 0), conf,
securityManager)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

implicit val futureExecContext = connectionManager.futureExecContext

Expand Down
17 changes: 12 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1345,22 +1345,29 @@ private[spark] object Utils extends Logging {
*/
def startServiceOnPort[T](
startPort: Int,
maxRetries: Int,
startService: Int => (T, Int)): (T, Int) = {
startService: Int => (T, Int),
serviceName: String = "",
maxRetries: Int = 3): (T, Int) = {
for (offset <- 0 to maxRetries) {
val tryPort = (startPort + offset) % 65536
try {
return startService(tryPort)
} catch {
case e: BindException =>
val service = if (serviceName.isEmpty) "Service" else s"Service '$serviceName'"
if (!e.getMessage.contains("Address already in use") || offset >= maxRetries) {
throw e
val exceptionMessage =
s"${e.getMessage}: $service failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logInfo("Could not bind on port: " + tryPort + ". Attempting port " + (tryPort + 1))
logInfo(s"$service could not bind on port $tryPort. Attempting port ${tryPort + 1}.")
}
}
// Should never happen
throw new SparkException(s"Couldn't start service on port $startPort")
throw new SparkException(s"Failed to start service on port $startPort")
}

}
4 changes: 2 additions & 2 deletions repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ import org.apache.spark.util.Utils

val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
val classServerListenPort = conf.getInt("spark.replClassServer.port", 0)
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerListenPort)
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
Expand Down

0 comments on commit 6b550b0

Please sign in to comment.