Skip to content

Commit

Permalink
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
Browse files Browse the repository at this point in the history
This patch allows the FaultToleranceTest to work in newer versions of Docker.
See https://spark-project.atlassian.net/browse/SPARK-1136 for more details.

Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs.

Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed.

Author: Aaron Davidson <aaron@databricks.com>

Closes #5 from aarondav/zookeeper and squashes the following commits:

5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
  • Loading branch information
aarondav committed Mar 7, 2014
1 parent 33baf14 commit dabeb6f
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 12 deletions.
Expand Up @@ -30,31 +30,41 @@ import scala.sys.process._
import org.json4s._ import org.json4s._
import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods


import org.apache.spark.{Logging, SparkContext} import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}


/** /**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* In order to mimic a real distributed cluster more closely, Docker is used. * In order to mimic a real distributed cluster more closely, Docker is used.
* Execute using * Execute using
* ./spark-class org.apache.spark.deploy.FaultToleranceTest * ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
* *
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS: * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER * - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181 * - spark.deploy.zookeeper.url=172.17.42.1:2181
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port. * Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
* *
* In case of failure, make sure to kill off prior docker containers before restarting:
* docker kill $(docker ps -q)
*
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a * Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
* working installation of Docker. In addition to having Docker, the following are assumed: * working installation of Docker. In addition to having Docker, the following are assumed:
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/) * - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
* - The docker images tagged spark-test-master and spark-test-worker are built from the * - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these. * docker/ directory. Run 'docker/spark-test/build' to generate these.
*/ */
private[spark] object FaultToleranceTest extends App with Logging { private[spark] object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]() val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]() val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _ var sc: SparkContext = _


val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0 var numPassed = 0
var numFailed = 0 var numFailed = 0


Expand All @@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = null sc = null
} }
terminateCluster() terminateCluster()

// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
} }


test("sanity-basic") { test("sanity-basic") {
Expand Down Expand Up @@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
try { try {
fn fn
numPassed += 1 numPassed += 1
logInfo("==============================================")
logInfo("Passed: " + name) logInfo("Passed: " + name)
logInfo("==============================================")
} catch { } catch {
case e: Exception => case e: Exception =>
numFailed += 1 numFailed += 1
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logError("FAILED: " + name, e) logError("FAILED: " + name, e)
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
sys.exit(1)
} }
afterEach() afterEach()
} }


def addMasters(num: Int) { def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) } (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
} }


def addWorkers(num: Int) { def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters) val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
} }


/** Creates a SparkContext, which constructs a Client to interact with our cluster. */ /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = { def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() } if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this // Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it. // property, we need to reset it.
Expand All @@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
} }


def killLeader(): Unit = { def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState()) masters.foreach(_.readState())
val leader = getLeader val leader = getLeader
masters -= leader masters -= leader
Expand All @@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis) def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)


def terminateCluster() { def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill()) masters.foreach(_.kill())
workers.foreach(_.kill()) workers.foreach(_.kill())
masters.clear() masters.clear()
Expand Down Expand Up @@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* are all alive in a proper configuration (e.g., only one leader). * are all alive in a proper configuration (e.g., only one leader).
*/ */
def assertValidClusterState() = { def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable() assertUsable()
var numAlive = 0 var numAlive = 0
var numStandby = 0 var numStandby = 0
Expand Down Expand Up @@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val


val workers = json \ "workers" val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String]) // Extract the worker IP from "webuiaddress" (rather than "host") because the host name
// on containers is a weird hash instead of the actual IP address.
liveWorkerIPs = liveWorkers.map {
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
}


numLiveApps = (json \ "activeapps").children.size numLiveApps = (json \ "activeapps").children.size


Expand Down Expand Up @@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = { def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else "" val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""


val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args) val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
logDebug("Run command: " + cmd) logDebug("Run command: " + cmd)
cmd cmd
} }
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,


val workerAddress = worker.actor.path.address val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) { if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress) val oldWorker = addressToWorker(workerAddress)
return false if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
} }


workers += worker workers += worker
Expand Down
Expand Up @@ -17,11 +17,13 @@


package org.apache.spark.deploy.master package org.apache.spark.deploy.master


import org.apache.spark.{SparkConf, Logging} import scala.collection.JavaConversions._

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException


import org.apache.spark.{Logging, SparkConf}


object SparkCuratorUtil extends Logging { object SparkCuratorUtil extends Logging {


Expand Down Expand Up @@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
} }
} }
} }

def deleteRecursive(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) != null) {
for (child <- zk.getChildren.forPath(path)) {
zk.delete().forPath(path + "/" + child)
}
zk.delete().forPath(path)
}
}
} }
4 changes: 3 additions & 1 deletion docker/README.md
Expand Up @@ -2,4 +2,6 @@ Spark docker files
=========== ===========


Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles), Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker). as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).

Tested with Docker version 0.8.1.
8 changes: 7 additions & 1 deletion docker/spark-test/master/default_cmd
Expand Up @@ -19,4 +19,10 @@


IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP" echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
8 changes: 7 additions & 1 deletion docker/spark-test/worker/default_cmd
Expand Up @@ -19,4 +19,10 @@


IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP" echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1 export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1

0 comments on commit dabeb6f

Please sign in to comment.