Skip to content

Commit

Permalink
fix according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Sep 21, 2014
1 parent 8f7cc96 commit c90d84e
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ private[deploy] object DeployMessages {
port: Int,
cores: Int,
memory: Int,
workerWebUiUrl: String,
publicAddress: String)
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ private[spark] class Master(
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.bind()
val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://"
val masterWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) {
"https://"
} else{
"http://"
}
masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

Expand Down Expand Up @@ -191,7 +195,7 @@ private[spark] class Master(
System.exit(0)
}

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) =>
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
Expand All @@ -201,7 +205,7 @@ private[spark] class Master(
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerWebUiUrl, publicAddress)
sender, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiAddress: String,
val publicAddress: String)
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] class Worker(
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
var workerWebUiUrl: String = _
var workerWebUiUrl: String = ""
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
Expand Down Expand Up @@ -139,7 +139,12 @@ private[spark] class Worker(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort
val workerWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) {
"https://"
} else{
"http://"
}
workerWebUiUrl = workerWebUiUrlPrefix + publicAddress + ":" + webUi.boundPort
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand All @@ -165,7 +170,7 @@ private[spark] class Worker(
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress)
actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
if (conf.contains("worker.ui.port")) {
webUiPort = conf.get("worker.ui.port").toInt
}

parse(args.toList)

Expand Down
14 changes: 3 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import org.eclipse.jetty.server.{Connector, Server}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.eclipse.jetty.server.nio.SelectChannelConnector
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.Utils
import org.eclipse.jetty.server.nio.SelectChannelConnector
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector

/**
* Utilities for launching a web server using Jetty's HTTP Server class
Expand Down Expand Up @@ -212,11 +212,10 @@ private[spark] object JettyUtils extends Logging {
}

private def getConnector(port: Int, conf: SparkConf): Connector = {
val https = getHttpPolicy(conf)
val https = conf.get("spark.ui.https.enabled", "false").toBoolean
if (https) {
buildSslSelectChannelConnector(port, conf)
} else {
conf.set("spark.http.policy", "http")
val connector = new SelectChannelConnector
connector.setPort(port)
connector
Expand Down Expand Up @@ -245,13 +244,6 @@ private[spark] object JettyUtils extends Logging {
connector
}

def getHttpPolicy(conf: SparkConf): Boolean = {
if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) {
true
} else {
false
}
}
}

private[spark] case class ServerInfo(
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ private[spark] class SparkUI(
*/
private[spark] def appUIHostPort = publicHostName + ":" + boundPort

private def appUiAddressPrefix = conf.get("spark.http.policy")

private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort"
}

Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ private[spark] abstract class WebUI(
}
}

def appUiAddressPrefix = if(conf.get("spark.ui.https.enabled", "false").toBoolean) {
"https"
} else {
"http"
}

/** Initialize all components of the server. */
def initialize()

Expand All @@ -99,11 +105,7 @@ private[spark] abstract class WebUI(
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
if (conf.get("spark.http.policy").equals("https")) {
logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort))
} else {
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
}
logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind %s".format(className), e)
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ class UISuite extends FunSuite {
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
//keytool -export -keystore core/src/test/resources/spark.keystore -alias spark -file /home/wf/code/spark2/core/src/test/resources/spark.cer -storepass 123456

val sparkConf = new SparkConf()
.set("spark.http.policy", "https")
.set("spark.ui.https.enabled", "true")
.set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf)
Expand Down Expand Up @@ -154,7 +156,7 @@ class UISuite extends FunSuite {

test("jetty with https binds to port 0 correctly") {
val sparkConf = new SparkConf()
.set("spark.http.policy", "https")
.set("spark.ui.https.enabled", "true")
.set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore")
val serverInfo = JettyUtils.startJettyServer(
"0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf)
Expand Down

0 comments on commit c90d84e

Please sign in to comment.