Skip to content

Commit

Permalink
[SPARK-23943] Improve observability of MesosRestServer/MesosClusterDi…
Browse files Browse the repository at this point in the history
…spatcher
  • Loading branch information
Paul Mackles committed Apr 10, 2018
1 parent d3bd043 commit dc06283
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 2 deletions.
Expand Up @@ -63,6 +63,8 @@ private[spark] abstract class RestSubmissionServer(
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
"/health" -> new ServerStatusServlet(this),
"/status" -> new ServerStatusServlet(this),
"/*" -> new ErrorServlet // default handler
)

Expand Down Expand Up @@ -111,6 +113,18 @@ private[spark] abstract class RestSubmissionServer(
def stop(): Unit = {
_server.foreach(_.stop())
}

/** Default implementation, subclasses should override */
def isServerHealthy(): Boolean = true

/** Default implementation, subclasses should override */
def serverStatus(): ServerStatusResponse = {
val s = new ServerStatusResponse
s.success = true
s.serverSparkVersion = sparkVersion
s.message = "iamok"
s
}
}

private[rest] object RestSubmissionServer {
Expand Down Expand Up @@ -331,3 +345,15 @@ private class ErrorServlet extends RestServlet {
sendResponse(error, response)
}
}

private class ServerStatusServlet(server: RestSubmissionServer) extends RestServlet {
override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val path = req.getRequestURI
if (!server.isServerHealthy() && path == "/health") {
// for monitors that only look at status codes
resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE)
}

sendResponse(server.serverStatus(), resp)
}
}
Expand Up @@ -83,3 +83,15 @@ private[rest] class ErrorResponse extends SubmitRestProtocolResponse {
assertFieldIsSet(message, "message")
}
}

private[rest] class ServerStatusResponse extends SubmitRestProtocolResponse {
var queuedDrivers: java.lang.Integer = null
var launchedDrivers: java.lang.Integer = null
var pendingRetryDrivers: java.lang.Integer = null
var schedulerDriverStopped: Boolean = null

protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(message, "message")
}
}
Expand Up @@ -208,6 +208,26 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(statusResponse.submissionId === doesNotExist)
}

test("server health ok") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val (response, code) = sendHttpRequestWithResponse(s"${httpUrl}/health", "GET")
val serverStatusResponse = getServerStatusResponse(response)
assert(code === HttpServletResponse.SC_OK)
assert(serverStatusResponse.message === "iamok")
assert(serverStatusResponse.success === true)
}

test("server health not ok") {
val masterUrl = startFaultyServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val (response, code) = sendHttpRequestWithResponse(s"${httpUrl}/health", "GET")
val serverStatusResponse = getServerStatusResponse(response)
assert(code === HttpServletResponse.SC_SERVICE_UNAVAILABLE)
assert(serverStatusResponse.message === "notok")
assert(serverStatusResponse.success === false)
}

/* ---------------------------------------- *
| Aberrant client / server behavior |
* ---------------------------------------- */
Expand Down Expand Up @@ -468,6 +488,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
}
}

private def getServerStatusResponse(response: SubmitRestProtocolResponse):
ServerStatusResponse = {
response match {
case s: ServerStatusResponse => s
case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
case r => fail(s"Expected status response. Actual: ${r.toJson}")
}
}

/** Return the response as a status response, or fail with error otherwise. */
private def getStatusResponse(response: SubmitRestProtocolResponse): SubmissionStatusResponse = {
response match {
Expand Down Expand Up @@ -594,6 +623,15 @@ private class FaultyStandaloneRestServer(
protected override val killRequestServlet = new InvalidKillServlet
protected override val statusRequestServlet = new ExplodingStatusServlet

override def isServerHealthy(): Boolean = false
override def serverStatus(): ServerStatusResponse = {
val s = new ServerStatusResponse
s.success = false
s.serverSparkVersion = SPARK_VERSION
s.message = "notok"
s
}

/** A faulty servlet that produces malformed responses. */
class MalformedSubmitServlet
extends StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf) {
Expand Down
Expand Up @@ -50,6 +50,24 @@ private[spark] class MesosRestServer(
new MesosKillRequestServlet(scheduler, masterConf)
protected override val statusRequestServlet =
new MesosStatusRequestServlet(scheduler, masterConf)

override def isServerHealthy(): Boolean = !scheduler.isSchedulerDriverStopped()

override def serverStatus(): ServerStatusResponse = {
val s = new ServerStatusResponse
s.schedulerDriverStopped = scheduler.isSchedulerDriverStopped()
s.queuedDrivers = scheduler.getQueuedDriversSize
s.launchedDrivers = scheduler.getLaunchedDriversSize
s.pendingRetryDrivers = scheduler.getPendingRetryDriversSize
s.success = true
s.message = "iamok"
s.serverSparkVersion = sparkVersion
if (scheduler.isSchedulerDriverStopped()) {
s.success = false
s.message = "notok: scheduler driver stopped"
}
s
}
}

private[mesos] class MesosSubmitRequestServlet(
Expand Down
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand All @@ -47,7 +48,7 @@ import org.apache.spark.util.Utils
trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)

private final val schedulerDriverStopped = new AtomicBoolean(false)
private final val ANY_ROLE = "*"

/**
Expand Down Expand Up @@ -160,7 +161,10 @@ trait MesosSchedulerUtils extends Logging {
logError("driver.run() failed", e)
error = Some(e)
markErr()
}
} finally {
logWarning("schedulerDriver stopped")
schedulerDriverStopped.set(true)
}
}
}.start()

Expand All @@ -172,6 +176,8 @@ trait MesosSchedulerUtils extends Logging {
}
}

def isSchedulerDriverStopped(): Boolean = schedulerDriverStopped.get()

def getResource(res: JList[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
Expand Down

0 comments on commit dc06283

Please sign in to comment.