Skip to content

Commit

Permalink
Merged branch mesos/master to branch dev.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 26, 2012
2 parents fd11d23 + cd16eab commit 0fe2fc4
Show file tree
Hide file tree
Showing 34 changed files with 467 additions and 169 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ object SparkEnv extends Logging {
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)

// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,13 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress
def localIpAddress(): String = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
if (defaultIpOverride != null)
defaultIpOverride
else
InetAddress.getLocalHost.getHostAddress
}

private var customHostname: Option[String] = None

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI

private[spark]
case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
completedJobs: List[JobInfo])
case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
completedJobs: Array[JobInfo])

// WorkerWebUI to Worker
private[spark] case object RequestWorkerState
Expand All @@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
30 changes: 30 additions & 0 deletions core/src/main/scala/spark/deploy/WebUI.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package spark.deploy

import java.text.SimpleDateFormat
import java.util.Date

/**
* Utilities used throughout the web UI.
*/
private[spark] object WebUI {
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

def formatDate(date: Date): String = DATE_FORMAT.format(date)

def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))

def formatDuration(milliseconds: Long): String = {
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
return "%.0f s".format(seconds)
}
val minutes = seconds / 60
if (minutes < 10) {
return "%.1f min".format(minutes)
} else if (minutes < 60) {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
return "%.1f h".format(hours)
}
}
14 changes: 13 additions & 1 deletion core/src/main/scala/spark/deploy/client/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[spark] class Client(

class ClientActor extends Actor with Logging {
var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times

override def preStart() {
Expand All @@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
Expand Down Expand Up @@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""))
}

case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
case Terminated(actor_) if actor_ == master =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)

case RemoteClientDisconnected(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)

case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
Expand Down
23 changes: 21 additions & 2 deletions core/src/main/scala/spark/deploy/master/JobInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable

private[spark]
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
private[spark] class JobInfo(
val startTime: Long,
val id: String,
val desc: JobDescription,
val submitDate: Date,
val actor: ActorRef)
{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L

private var nextExecutorId = 0

Expand Down Expand Up @@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
_retryCount += 1
_retryCount
}

def markFinished(endState: JobState.Value) {
state = endState
endTime = System.currentTimeMillis()
}

def duration: Long = {
if (endTime != -1) {
endTime - startTime
} else {
System.currentTimeMillis() - startTime
}
}
}
96 changes: 69 additions & 27 deletions core/src/main/scala/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]

// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean

override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
Expand Down Expand Up @@ -123,28 +128,62 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}

case RequestMasterState => {
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
}
}

/**
* Can a job use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the job on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/
def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
}

/**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
*/
def schedule() {
// Right now this is a very simple FIFO scheduler. We keep looking through the jobs
// in order of submission time and launching the first one that fits on each node.
for (worker <- workers if worker.coresFree > 0) {
for (job <- waitingJobs.clone()) {
val jobMemory = job.desc.memoryPerSlave
if (worker.memoryFree >= jobMemory) {
val coresToUse = math.min(worker.coresFree, job.coresLeft)
val exec = job.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
// in the queue, then the second job, etc.
if (spreadOutJobs) {
// Try to spread out each job among all the nodes, until it has all its cores
for (job <- waitingJobs if job.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
if (job.coresLeft == 0) {
waitingJobs -= job
job.state = JobState.RUNNING
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
job.state = JobState.RUNNING
}
}
}
} else {
// Pack each job into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0) {
for (job <- waitingJobs if job.coresLeft > 0) {
if (canUse(job, worker)) {
val coresToUse = math.min(worker.coresFree, job.coresLeft)
if (coresToUse > 0) {
val exec = job.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
job.state = JobState.RUNNING
}
}
}
}
}
Expand Down Expand Up @@ -179,8 +218,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}

def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
val date = new Date
val job = new JobInfo(newJobId(date), desc, date, actor)
val now = System.currentTimeMillis()
val date = new Date(now)
val job = new JobInfo(now, newJobId(date), desc, date, actor)
jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
Expand All @@ -189,19 +229,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}

def removeJob(job: JobInfo) {
logInfo("Removing job " + job.id)
jobs -= job
idToJob -= job.id
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
waitingJobs -= job
for (exec <- job.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
if (jobs.contains(job)) {
logInfo("Removing job " + job.id)
jobs -= job
idToJob -= job.id
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
waitingJobs -= job
for (exec <- job.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
}
job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
schedule()
}
job.state = JobState.FINISHED
schedule()
}

/** Generate a new job ID given a job's submission date */
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/deploy/master/MasterArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import spark.Utils
* Command-line parser for the master.
*/
private[spark] class MasterArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var ip = Utils.localHostName()
var port = 7077
var webUiPort = 8080

Expand Down Expand Up @@ -59,4 +59,4 @@ private[spark] class MasterArguments(args: Array[String]) {
" --webui-port PORT Port for web UI (default: 8080)")
System.exit(exitCode)
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/spark/deploy/master/MasterWebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct

// A bit ugly an inefficient, but we won't have a number of jobs
// so large that it will make a significant difference.
(masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
(masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => null
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/spark/deploy/master/WorkerInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
memoryUsed -= exec.memory
}
}

def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}

def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)

case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.lang.management.ManagementFactory
* Command-line parser for the master.
*/
private[spark] class WorkerArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var ip = Utils.localHostName()
var port = 0
var webUiPort = 8081
var cores = inferDefaultCores()
Expand Down Expand Up @@ -110,4 +110,4 @@ private[spark] class WorkerArguments(args: Array[String]) {
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
}
}
15 changes: 15 additions & 0 deletions core/src/main/scala/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ private[spark] class Executor extends Logging {
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)

// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
System.exit(1)
} catch {
case t: Throwable => System.exit(2)
}
}
}
)

// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/spark/network/ConnectionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connectionRequests += newConnection
newConnection
}
val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection())
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
val connection = connectionsById.getOrElse(lookupKey, startNewConnection())
message.senderAddress = id.toSocketAddress()
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
/*connection.send(message)*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
scheduler.slaveLost(slaveId.toString)
scheduler.slaveLost(slaveId.getValue)
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/spark/storage/BlockManagerMaster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "

if (!blockManagerInfo.contains(blockManagerId)) {
// Can happen if this is from a locally cached partition on the master
sender ! true
return
}

if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
Expand Down

0 comments on commit 0fe2fc4

Please sign in to comment.