Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12583][Mesos] Mesos shuffle service: Don't delete shuffle files before application has stopped #11207

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.spark.deploy.mesos

import java.net.SocketAddress
import java.net.{HttpURLConnection, SocketTimeoutException, URL}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.language.postfixOps

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import sys.process._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.ExternalShuffleService
Expand All @@ -29,16 +34,25 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.ThreadUtils


/**
* An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
private[mesos] class MesosExternalShuffleBlockHandler(
transportConf: TransportConf, sparkMaster: String, frameworkTimeoutMs: Long)
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {

// Stores a map of driver socket addresses to app ids
private val connectedApps = new mutable.HashMap[SocketAddress, String]
private val cleanerThreadExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("mesos-shuffle-cleaner")

// Stores the active frameworks and when they were last seen active
private val connectedApps = new ConcurrentHashMap[String, Long]()

cleanerThreadExecutor.scheduleAtFixedRate(
new MesosFrameworkCleaner(), 0, frameworkTimeoutMs / 4, TimeUnit.MILLISECONDS)

protected override def handleMessage(
message: BlockTransferMessage,
Expand All @@ -48,15 +62,10 @@ private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportCo
case RegisterDriverParam(appId) =>
val address = client.getSocketAddress
logDebug(s"Received registration request from app $appId (remote address $address).")
if (connectedApps.contains(address)) {
val existingAppId = connectedApps(address)
if (!existingAppId.equals(appId)) {
logError(s"A new app '$appId' has connected to existing address $address, " +
s"removing previously registered app '$existingAppId'.")
applicationRemoved(existingAppId, true)
}
if (connectedApps.contains(appId)) {
logError(s"App '$appId' has re-registered.")
}
connectedApps(address) = appId
connectedApps.put(appId, System.nanoTime())
callback.onSuccess(ByteBuffer.allocate(0))
case _ => super.handleMessage(message, client, callback)
}
Expand All @@ -67,33 +76,162 @@ private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportCo
*/
override def channelInactive(client: TransportClient): Unit = {
val address = client.getSocketAddress
if (connectedApps.contains(address)) {
val appId = connectedApps(address)
logInfo(s"Application $appId disconnected (address was $address).")
applicationRemoved(appId, true /* cleanupLocalDirs */)
connectedApps.remove(address)
} else {
logWarning(s"Unknown $address disconnected.")
}
logInfo(s"Socket disconnected (address was $address).")
}

/** An extractor object for matching [[RegisterDriver]] message. */
private object RegisterDriverParam {
def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
}

private class MesosFrameworkCleaner extends Runnable {

// relevant if Mesos is running in HA mode with zookeeper
private var mesosHaMode = sparkMaster.toLowerCase().startsWith("mesos://zk://")

// The Zookeeper URI if mesos is running in HA mode
// (e.g. zk://zk1:port1,zk2:port2,zk3:port3/mesos)
private var zkUri = if (!mesosHaMode) {
None
} else {
Some(sparkMaster.toLowerCase().stripPrefix("mesos://"))
}

// The currently known mesos leader.
private var mesosLeader: String = if (!mesosHaMode) {
// configured as non-HA. Verify:
val sparkMasterUri = sparkMaster.stripPrefix("mesos://")
getMasterStateObj(sparkMasterUri) match {
case None =>
logError(s"Unable to retrieve mesos state on start-up from $sparkMaster (non-HA " +
s"configuration). Verify that spark.master points to a running mesos master and " +
s"restart the shuffle service.")
System.exit(-1)
sparkMasterUri
case Some(stateObj) =>
getZkFromStateObj(stateObj) match {
case Some(zk) =>
logWarning(s"Shuffle service was started with a non-HA master ($sparkMaster) but a " +
s"HA configuration was detected. Reconfiguring shuffle service to use " +
s"'mesos://$zk' as 'spark.master'. You might want to fix your configuration.")
mesosHaMode = true
zkUri = Some(zk)
getLeaderFromZk(zkUri.get)
case None =>
// Started as non-HA. Detected non-HA.
sparkMasterUri
}
}
} else {
getLeaderFromZk(zkUri.get)
}

lazy val objectMapper = new ObjectMapper()


private def getLeaderFromZk(zkUri: String): String = {
// this throws "java.lang.RuntimeException: Nonzero exit value: 255"
// if the leader can't be determined within a timeout (5 seconds)
val leaderFromZk = (s"mesos-resolve ${zkUri}" !!).stripLineEnd
logTrace(s"Retrieved mesos leader $leaderFromZk from Zookeeper.")
leaderFromZk
}

private def getMasterStateObj(master: String): Option[JsonNode] = {
val stateUrl = new URL(s"http://${master}/master/state.json")
try {
val conn = stateUrl.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestMethod("GET")
conn.setConnectTimeout(5000) // 5 secs
if (200 == conn.getResponseCode) {
Some(objectMapper.readTree(conn.getInputStream))
} else {
None
}
} catch {
case _: SocketTimeoutException =>
logError(s"Connection to mesos leader at $stateUrl timed out.")
None
}
}

private def getLeaderFromStateObj(stateObj: JsonNode): Option[String] = {
if (stateObj.has("leader")) {
Some(stateObj.get("leader").asText().stripPrefix("master@"))
} else {
None
}
}

private def getRunningFrameworks(stateObj: JsonNode): Set[String] = {
stateObj.get("frameworks").elements().asScala
.map(_.get("id").asText()).toSet
}

private def getZkFromStateObj(stateObj: JsonNode): Option[String] = {
val flags = stateObj.get("flags")
if (flags.has("zk")) {
Some(flags.get("zk").asText())
} else {
None
}
}

override def run(): Unit = {
getMasterStateObj(mesosLeader) match {
case None =>
if (mesosHaMode) {
mesosLeader = getLeaderFromZk(zkUri.get)
logInfo(s"Failed to retrieve mesos state, but found a new leader: $mesosLeader. " +
s"Will retry.")
} else {
logError("Failed to retrieve mesos (non-HA) state.")
}
case Some(state) =>
getLeaderFromStateObj(state) match {
case None => logError("Failed to determine mesos leader from state.json")
case Some(leader) =>
if (leader != mesosLeader) {
logInfo(s"Got a new leader ($leader) from state.json. Will retry with the new " +
s"leader.")
mesosLeader = leader
} else {
// definitely got the state from the leader
val runningFrameworks = getRunningFrameworks(state)
val now = System.nanoTime()
runningFrameworks.foreach { id =>
if (connectedApps.containsKey(id)) {
connectedApps.replace(id, now)
}
}
connectedApps.asScala.foreach { case (appId, lastSeen) =>
if (now - lastSeen > frameworkTimeoutMs * 1000 * 1000) {
logInfo(s"Application $appId has timed out. Removing shuffle files.")
applicationRemoved(appId, true)
connectedApps.remove(appId)
}
}
}
}
}
}
}
}

/**
* A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
* to associate with. This allows the shuffle service to detect when a driver is terminated
* and can clean up the associated shuffle files.
* to register with. This allows the shuffle service to detect when a mesos framework is no longer
* running and can clean up the associated shuffle files after a timeout.
*/
private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
extends ExternalShuffleService(conf, securityManager) {

protected override def newShuffleBlockHandler(
conf: TransportConf): ExternalShuffleBlockHandler = {
new MesosExternalShuffleBlockHandler(conf)
new MesosExternalShuffleBlockHandler(
conf,
this.conf.get("spark.master"),
this.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I use a different config key here?

}
}

Expand Down