Skip to content

Commit

Permalink
[SPARK-9202] capping maximum number of executor&driver information ke…
Browse files Browse the repository at this point in the history
…pt in Worker

https://issues.apache.org/jira/browse/SPARK-9202

Author: CodingCat <zhunansjtu@gmail.com>

Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits:

23977fb [CodingCat] add comments about why we don't synchronize finishedExecutors & finishedDrivers
dc9772d [CodingCat] addressing the comments
e125241 [CodingCat] stylistic fix
80bfe52 [CodingCat] fix JsonProtocolSuite
d7d9485 [CodingCat] styistic fix and respect insert ordering
031755f [CodingCat] add license info & stylistic fix
c3b5361 [CodingCat] test cases and docs
c557b3a [CodingCat] applications are fine
9cac751 [CodingCat] application is fine...
ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers
  • Loading branch information
CodingCat authored and srowen committed Jul 31, 2015
1 parent a8340fa commit c068666
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 94 deletions.
124 changes: 83 additions & 41 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.control.NonFatal
Expand Down Expand Up @@ -115,13 +115,18 @@ private[worker] class Worker(
}

var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]

val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)

// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)

Expand Down Expand Up @@ -461,25 +466,7 @@ private[worker] class Worker(
}

case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
sendToMaster(executorStateChanged)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
executors -= fullId
finishedExecutors(fullId) = executor
coresUsed -= executor.cores
memoryUsed -= executor.memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
maybeCleanupApplication(appId)
}
handleExecutorStateChanged(executorStateChanged)

case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
Expand Down Expand Up @@ -523,24 +510,8 @@ private[worker] class Worker(
}
}

case driverStageChanged @ DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
sendToMaster(driverStageChanged)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
handleDriverStateChanged(driverStateChanged)
}

case ReregisterWithMaster =>
Expand Down Expand Up @@ -614,6 +585,78 @@ private[worker] class Worker(
webUi.stop()
metricsSystem.stop()
}

private def trimFinishedExecutorsIfNecessary(): Unit = {
// do not need to protect with locks since both WorkerPage and Restful server get data through
// thread-safe RpcEndPoint
if (finishedExecutors.size > retainedExecutors) {
finishedExecutors.take(math.max(finishedExecutors.size / 10, 1)).foreach {
case (executorId, _) => finishedExecutors.remove(executorId)
}
}
}

private def trimFinishedDriversIfNecessary(): Unit = {
// do not need to protect with locks since both WorkerPage and Restful server get data through
// thread-safe RpcEndPoint
if (finishedDrivers.size > retainedDrivers) {
finishedDrivers.take(math.max(finishedDrivers.size / 10, 1)).foreach {
case (driverId, _) => finishedDrivers.remove(driverId)
}
}
}

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
sendToMaster(driverStateChanged)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}

private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
Unit = {
sendToMaster(executorStateChanged)
val state = executorStateChanged.state
if (ExecutorState.isFinished(state)) {
val appId = executorStateChanged.appId
val fullId = appId + "/" + executorStateChanged.execId
val message = executorStateChanged.message
val exitStatus = executorStateChanged.exitStatus
executors.get(fullId) match {
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
executors -= fullId
finishedExecutors(fullId) = executor
trimFinishedExecutorsIfNecessary()
coresUsed -= executor.cores
memoryUsed -= executor.memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
maybeCleanupApplication(appId)
}
}
}

private[deploy] object Worker extends Logging {
Expand Down Expand Up @@ -669,5 +712,4 @@ private[deploy] object Worker extends Logging {
cmd
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class WorkerWebUI(
}
}

private[ui] object WorkerWebUI {
private[worker] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
val DEFAULT_RETAINED_DRIVERS = 1000
val DEFAULT_RETAINED_EXECUTORS = 1000
}
89 changes: 89 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io.File
import java.util.Date

import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.{SecurityManager, SparkConf}

private[deploy] object DeployTestUtils {
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}

def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}

def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)

def createDriverDesc(): DriverDescription =
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())

def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(execId: Int): ExecutorRunner = {
new ExecutorRunner(
"appId",
execId,
createAppDesc(),
4,
1234,
null,
"workerId",
"host",
123,
"publicAddress",
new File("sparkHome"),
new File("workDir"),
"akka://worker",
new SparkConf,
Seq("localDir"),
ExecutorState.RUNNING)
}

def createDriverRunner(driverId: String): DriverRunner = {
val conf = new SparkConf()
new DriverRunner(
conf,
driverId,
new File("workDir"),
new File("sparkHome"),
createDriverDesc(),
null,
"akka://worker",
new SecurityManager(conf))
}
}
59 changes: 10 additions & 49 deletions core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark.deploy

import java.io.File
import java.util.Date

import com.fasterxml.jackson.core.JsonParseException
import org.json4s._
import org.json4s.jackson.JsonMethods

import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState}
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.{JsonTestUtils, SparkFunSuite}

class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {

import org.apache.spark.deploy.DeployTestUtils._

test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
Expand All @@ -50,7 +51,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
}

test("writeExecutorRunner") {
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123))
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
}
Expand All @@ -77,57 +78,17 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {

test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
val drivers = List(createDriverRunner())
val finishedDrivers = List(createDriverRunner(), createDriverRunner())
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123),
createExecutorRunner(123))
val drivers = List(createDriverRunner("driverId"))
val finishedDrivers = List(createDriverRunner("driverId"), createDriverRunner("driverId"))
val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr))
}

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}

def createAppInfo() : ApplicationInfo = {
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}

def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)

def createDriverDesc(): DriverDescription =
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())

def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

def createDriverRunner(): DriverRunner = {
val conf = new SparkConf()
new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
}

def assertValidJson(json: JValue) {
try {
JsonMethods.parse(JsonMethods.compact(json))
Expand Down
Loading

0 comments on commit c068666

Please sign in to comment.