Skip to content

Commit

Permalink
Merge branch 'master' into spark-sink-test
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Aug 15, 2014
2 parents f2c56c9 + fd9fcd2 commit 7b9b649
Show file tree
Hide file tree
Showing 34 changed files with 702 additions and 117 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
.*avsc
.*txt
.*json
.*data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
// introduces an expensive read fence.
if (context.interrupted) {
if (context.isInterrupted) {
throw new TaskKilledException
} else {
delegate.hasNext
Expand Down
63 changes: 56 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.TaskCompletionListener


/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* @param runningLocally whether the task is running locally in the driver JVM
* @param taskMetrics performance metrics of the task
*/
@DeveloperApi
class TaskContext(
Expand All @@ -39,27 +47,68 @@ class TaskContext(
def splitId = partitionId

// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

// Whether the corresponding task has been killed.
@volatile var interrupted: Boolean = false
@volatile private var interrupted: Boolean = false

// Whether the task has completed.
@volatile private var completed: Boolean = false

/** Checks whether the task has completed. */
def isCompleted: Boolean = completed

// Whether the task has completed, before the onCompleteCallbacks are executed.
@volatile var completed: Boolean = false
/** Checks whether the task has been killed. */
def isInterrupted: Boolean = interrupted

// TODO: Also track whether the task has completed successfully or with exception.

/**
* Add a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
}

/**
* Add a listener in the form of a Scala closure to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(f: TaskContext => Unit): this.type = {
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
}
this
}

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
* @param f Callback function.
*/
@deprecated("use addTaskCompletionListener", "1.1.0")
def addOnCompleteCallback(f: () => Unit) {
onCompleteCallbacks += f
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f()
}
}

def executeOnCompleteCallbacks() {
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach { _() }
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
}

/** Marks the task for interruption, i.e. cancellation. */
private[spark] def markInterrupted(): Unit = {
interrupted = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
Expand All @@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] class PythonRDD(
// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

context.addOnCompleteCallback { () =>
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()

// Cleanup the worker socket. This will also cause the Python worker to exit.
Expand Down Expand Up @@ -137,7 +137,7 @@ private[spark] class PythonRDD(
}
} catch {

case e: Exception if context.interrupted =>
case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException

Expand Down Expand Up @@ -176,7 +176,7 @@ private[spark] class PythonRDD(

/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
def shutdownOnTaskCompletion() {
assert(context.completed)
assert(context.isCompleted)
this.interrupt()
}

Expand Down Expand Up @@ -209,7 +209,7 @@ private[spark] class PythonRDD(
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
dataOut.flush()
} catch {
case e: Exception if context.completed || context.interrupted =>
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)

case e: Exception =>
Expand All @@ -235,10 +235,10 @@ private[spark] class PythonRDD(
override def run() {
// Kill the worker if it is interrupted, checking until task completion.
// TODO: This has a race condition if interruption occurs, as completed may still become true.
while (!context.interrupted && !context.completed) {
while (!context.isInterrupted && !context.isCompleted) {
Thread.sleep(2000)
}
if (!context.completed) {
if (!context.isCompleted) {
try {
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
Expand Down Expand Up @@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
Expand Down Expand Up @@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
Expand Down Expand Up @@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
Expand Down Expand Up @@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
} yield (Class.forName(k), Class.forName(v))
} yield (Utils.classForName(k), Utils.classForName(v))
}

private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
Expand Down Expand Up @@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}

Expand Down Expand Up @@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ private[spark] class ApplicationInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
in.defaultReadObject()
init()
}

private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private[spark] object CheckpointRDD extends Logging {
val deserializeStream = serializer.deserializeStream(fileInputStream)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => deserializeStream.close())
context.addTaskCompletionListener(context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class HadoopRDD[K, V](
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class JdbcRDD[T: ClassTag](
}

override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addOnCompleteCallback{ () => closeIfNeeded() }
context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class NewHadoopRDD[K, V](
context.taskMetrics.inputMetrics = Some(inputMetrics)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
context.addTaskCompletionListener(context => close())
var havePair = false
var finished = false

Expand Down
30 changes: 25 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,19 @@ abstract class RDD[T: ClassTag](

/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString: String = {
// Get a debug description of an rdd without its children
def debugSelf (rdd: RDD[_]): Seq[String] = {
import Utils.bytesToString

val persistence = storageLevel.description
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize),
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))

s"$rdd [$persistence]" +: storageInfo
}

// Apply a different rule to the last child
def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
val len = rdd.dependencies.length
Expand All @@ -1324,7 +1337,11 @@ abstract class RDD[T: ClassTag](
val partitionStr = "(" + rdd.partitions.size + ")"
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)

debugSelf(rdd).zipWithIndex.map{
case (desc: String, 0) => s"$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix $desc"
} ++ debugChildren(rdd, nextPrefix)
}
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
Expand All @@ -1334,17 +1351,20 @@ abstract class RDD[T: ClassTag](
thisPrefix
+ (if (isLastChild) " " else "| ")
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)

debugSelf(rdd).zipWithIndex.map{
case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix$desc"
} ++ debugChildren(rdd, nextPrefix)
}
def debugString(rdd: RDD[_],
prefix: String = "",
isShuffle: Boolean = true,
isLastChild: Boolean = false): Seq[String] = {
if (isShuffle) {
shuffleDebugString(rdd, prefix, isLastChild)
}
else {
Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
} else {
debugSelf(rdd).map(prefix + _) ++ debugChildren(rdd, prefix)
}
}
firstDebugString(this).mkString("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ class DAGScheduler(
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.executeOnCompleteCallbacks()
taskContext.markTaskCompleted()
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class ResultTask[T, U](
try {
func(context, rdd.iterator(partition, context))
} finally {
context.executeOnCompleteCallbacks()
context.markTaskCompleted()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class ShuffleMapTask(
}
throw e
} finally {
context.executeOnCompleteCallbacks()
context.markTaskCompleted()
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
def kill(interruptThread: Boolean) {
_killed = true
if (context != null) {
context.interrupted = true
context.markInterrupted()
}
if (interruptThread && taskThread != null) {
taskThread.interrupt()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[spark] object JettyUtils extends Logging {
def createServletHandler(
path: String,
servlet: HttpServlet,
basePath: String = ""): ServletContextHandler = {
basePath: String): ServletContextHandler = {
val prefixedPath = attachPrefix(basePath, path)
val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
Expand Down
Loading

0 comments on commit 7b9b649

Please sign in to comment.