Skip to content

Commit

Permalink
Merge remote-tracking branch 'asf/master' into spark-sink-test
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Aug 18, 2014
2 parents abc20cb + 6a13dca commit 7fedc5a
Show file tree
Hide file tree
Showing 78 changed files with 1,977 additions and 1,262 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/**
* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests.
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)
"spark.cleaner.referenceTracking.blocking", true)

@volatile private var stopped = false

Expand Down Expand Up @@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.
}

private object ContextCleaner {
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,22 @@ object SparkEnv extends Logging {
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker)
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -250,16 +260,6 @@ object SparkEnv extends Logging {
"."
}

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}

def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
sc.broadcast(obj)
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
// The right way to implement this would be to use TypeTags to get the full
// type of T. Since I don't want to introduce breaking changes throughout the
Expand Down
41 changes: 14 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private[spark] class Worker(
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
var activeMasterUrl: String = ""
Expand Down Expand Up @@ -145,18 +144,16 @@ private[spark] class Worker(
}

def changeMaster(url: String, uiUrl: String) {
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
}

def tryRegisterAllMasters() {
Expand Down Expand Up @@ -199,9 +196,7 @@ private[spark] class Worker(
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
if (connected) { master ! Heartbeat(workerId) }

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
Expand Down Expand Up @@ -244,27 +239,21 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
}
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
Expand Down Expand Up @@ -330,9 +319,7 @@ private[spark] class Worker(
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// Set the classloader for serializer
env.serializer.setDefaultClassLoader(urlClassLoader)

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicInteger

import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
Expand Down Expand Up @@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
var ackMessage: Option[Message] = None

def markDone(ackMessage: Option[Message]) {
this.synchronized {
this.ackMessage = ackMessage
completionHandler(this)
}
this.ackMessage = ackMessage
completionHandler(this)
}
}

private val selector = SelectorProvider.provider.openSelector()
private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)

// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)

private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
Expand Down Expand Up @@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
}
}
if (bufferMessage.hasAckId()) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
status
status.markDone(Some(message))
}
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
/**
* We can fall down on this code because of following 2 cases
*
* (1) Invalid ack sent due to buggy code.
*
* (2) Late-arriving ack for a SendMessageStatus
* To avoid unwilling late-arriving ack
* caused by long pause like GC, you can set
* larger value than default to spark.core.connection.ack.wait.timeout
*/
logWarning(s"Could not find reference for received ack Message ${message.id}")
}
}
}
sentMessageStatus.markDone(Some(message))
} else {
var ackMessage : Option[Message] = None
try {
Expand Down Expand Up @@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Message] = {
val promise = Promise[Message]()

val timeoutTask = new TimerTask {
override def run(): Unit = {
messageStatuses.synchronized {
messageStatuses.remove(message.id).foreach ( s => {
promise.failure(
new IOException(s"sendMessageReliably failed because ack " +
"was not received within ${ackTimeout} sec"))
})
}
}
}

val status = new MessageStatus(message, connectionManagerId, s => {
timeoutTask.cancel()
s.ackMessage match {
case None => // Indicates a failure where we either never sent or never got ACK'd
case None => // Indicates a failure where we either never sent or never got ACK'd
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
case Some(ackMessage) =>
if (ackMessage.hasError) {
Expand All @@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}

ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
sendMessage(connectionManagerId, message)
promise.future
}
Expand All @@ -861,6 +886,7 @@ private[spark] class ConnectionManager(
}

def stop() {
ackTimeoutMonitor.cancel()
selectorThread.interrupt()
selectorThread.join()
selector.close()
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
// Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
Expand All @@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
val increment = (max-min)/bucketCount.toDouble
val range = if (increment != 0) {
Range.Double.inclusive(min, max, increment)
val range = if (min != max) {
// Range.Double.inclusive(min, max, increment)
// The above code doesn't always work. See Scala bug #SI-8782.
// https://issues.scala-lang.org/browse/SI-8782
customRange(min, max, bucketCount)
} else {
List(min, min)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
logEvent(event, flushLogger = true)
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }

/**
* Stop logging events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,35 @@ extends DeserializationStream {
def close() { objIn.close() }
}

private[spark] class JavaSerializerInstance(counterReset: Int) extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer = {

private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader)
extends SerializerInstance {

override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
ByteBuffer.wrap(bos.toByteArray)
}

def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject().asInstanceOf[T]
in.readObject()
}

def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject().asInstanceOf[T]
in.readObject()
}

def serializeStream(s: OutputStream): SerializationStream = {
override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset)
}

def deserializeStream(s: InputStream): DeserializationStream = {
override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
}

Expand All @@ -109,7 +112,10 @@ private[spark] class JavaSerializerInstance(counterReset: Int) extends Serialize
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)

def newInstance(): SerializerInstance = new JavaSerializerInstance(counterReset)
override def newInstance(): SerializerInstance = {
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
new JavaSerializerInstance(counterReset, classLoader)
}

override def writeExternal(out: ObjectOutput) {
out.writeInt(counterReset)
Expand Down
Loading

0 comments on commit 7fedc5a

Please sign in to comment.