Skip to content

Commit

Permalink
Move interrupted flag from TaskContext constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed May 7, 2014
1 parent 39b8b14 commit 85311f8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 17 deletions.
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@ import org.apache.spark.executor.TaskMetrics
*/
@DeveloperApi
class TaskContext(
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends Serializable {

@deprecated("use partitionId", "0.8.1")
def splitId = partitionId

// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]

// Whether the corresponding task has been killed
@volatile var interrupted: Boolean = false

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public void persist() {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
TaskContext context = new TaskContext(0, 0, 0, false, false, new TaskMetrics());
TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}

Expand Down
9 changes: 3 additions & 6 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty)
val context = new TaskContext(0, 0, 0)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand All @@ -72,8 +71,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty)
val context = new TaskContext(0, 0, 0)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
Expand All @@ -86,8 +84,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}

whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false,
taskMetrics = TaskMetrics.empty)
val context = new TaskContext(0, 0, 0)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,12 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = TaskMetrics.empty)
val tContext = new TaskContext(0, 0, 0)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
} else {
// printenv isn't available so just pass the test
assert(true)
}
}

Expand Down

0 comments on commit 85311f8

Please sign in to comment.