Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into kryoJListNPE
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Sep 11, 2014
2 parents 646976b + 558962a commit 4d4d93c
Show file tree
Hide file tree
Showing 100 changed files with 1,649 additions and 1,543 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
Expand Down Expand Up @@ -59,8 +60,8 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
val securityManager: SecurityManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
Expand Down Expand Up @@ -88,6 +89,8 @@ class SparkEnv (
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.
}

private[spark]
Expand Down Expand Up @@ -223,14 +226,14 @@ object SparkEnv extends Logging {

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockTransferService = new NioBlockTransferService(conf, securityManager)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down Expand Up @@ -278,8 +281,8 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
connectionManager,
securityManager,
httpFileServer,
sparkFilesDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(

def retryCount = _retryCount

def incrementRetryCount = {
def incrementRetryCount() = {
_retryCount += 1
_retryCount
}

def resetRetryCount() = _retryCount = 0

def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
Expand Down
44 changes: 31 additions & 13 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,28 +296,34 @@ private[spark] class Master(
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val appInfo = idToApp(appId)
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state)
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning("Got status update for unknown executor " + appId + "/" + execId)
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

Expand Down Expand Up @@ -481,13 +487,25 @@ private[spark] class Master(
if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
while (curPos != startPos && !launched) {
val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

state = ExecutorState.RUNNING
worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.spark.network

import java.nio.ByteBuffer
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.storage.StorageLevel

private[spark] object ReceiverTest {
def main(args: Array[String]) {
val conf = new SparkConf
val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
Some(Message.createBufferMessage(buffer, msg.id))
})
Thread.currentThread.join()
}
}
trait BlockDataManager {

/**
* Interface to get local block data.
*
* @return Some(buffer) if the block exists locally, and None if it doesn't.
*/
def getBlockData(blockId: String): Option[ManagedBuffer]

/**
* Put the block locally, using the given storage level.
*/
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.util.EventListener


/**
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
*/
trait BlockFetchingListener extends EventListener {

/**
* Called once per successfully fetched block.
*/
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit

/**
* Called upon failures. For each failure, this is called only once (i.e. not once per block).
*/
def onBlockFetchFailure(exception: Throwable): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.storage.StorageLevel


abstract class BlockTransferService {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
def init(blockDataManager: BlockDataManager)

/**
* Tear down the transfer service.
*/
def stop(): Unit

/**
* Port number the service is listening on, available only after [[init]] is invoked.
*/
def port: Int

/**
* Host name the service is listening on, available only after [[init]] is invoked.
*/
def hostName: String

/**
* Fetch a sequence of blocks from a remote node asynchronously,
* available only after [[init]] is invoked.
*
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
*/
def fetchBlocks(
hostName: String,
port: Int,
blockIds: Seq[String],
listener: BlockFetchingListener): Unit

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*/
def uploadBlock(
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit]

/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
*
* It is also only available after [[init]] is invoked.
*/
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
// A monitor for the thread to wait on.
val lock = new Object
@volatile var result: Either[ManagedBuffer, Throwable] = null
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
override def onBlockFetchFailure(exception: Throwable): Unit = {
lock.synchronized {
result = Right(exception)
lock.notify()
}
}
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
lock.synchronized {
result = Left(data)
lock.notify()
}
}
})

// Sleep until result is no longer null
lock.synchronized {
while (result == null) {
try {
lock.wait()
} catch {
case e: InterruptedException =>
}
}
}

result match {
case Left(data) => data
case Right(e) => throw e
}
}

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*
* This method is similar to [[uploadBlock]], except this one blocks the thread
* until the upload finishes.
*/
def uploadBlockSync(
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer,
level: StorageLevel): Unit = {
Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)
}
}
Loading

0 comments on commit 4d4d93c

Please sign in to comment.