From 85311f86d4d929732289181a515e9d1a571019a2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 6 May 2014 20:36:16 -0700 Subject: [PATCH] Move interrupted flag from TaskContext constructor --- .../scala/org/apache/spark/TaskContext.scala | 16 +++++++++------- .../test/java/org/apache/spark/JavaAPISuite.java | 2 +- .../org/apache/spark/CacheManagerSuite.scala | 9 +++------ .../scala/org/apache/spark/PipedRDDSuite.scala | 4 +--- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index dc012cc381346..1d8ba2ad36969 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics */ @DeveloperApi class TaskContext( - val stageId: Int, - val partitionId: Int, - val attemptId: Long, - val runningLocally: Boolean = false, - @volatile var interrupted: Boolean = false, - private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty -) extends Serializable { + val stageId: Int, + val partitionId: Int, + val attemptId: Long, + val runningLocally: Boolean = false, + private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends Serializable { @deprecated("use partitionId", "0.8.1") def splitId = partitionId @@ -42,6 +41,9 @@ class TaskContext( // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit] + // Whether the corresponding task has been killed + @volatile var interrupted: Boolean = false + /** * Add a callback function to be executed on task completion. An example use * is for HadoopRDD to register a callback to close the input stream. diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c3e03cea917b3..1912015827927 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -597,7 +597,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, false, false, new TaskMetrics()); + TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics()); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index fd5b0906e6765..cf995b0894003 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -59,8 +59,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) } @@ -72,8 +71,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(5, 6, 7)) } @@ -86,8 +84,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false, - taskMetrics = TaskMetrics.empty) + val context = new TaskContext(0, 0, 0) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) } diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 0bb6a6b09c5b5..db56a4acdd6f5 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -178,14 +178,12 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { } val hadoopPart1 = generateFakeHadoopPartition() val pipedRdd = new PipedRDD(nums, "printenv " + varName) - val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, - taskMetrics = TaskMetrics.empty) + val tContext = new TaskContext(0, 0, 0) val rddIter = pipedRdd.compute(hadoopPart1, tContext) val arr = rddIter.toArray assert(arr(0) == "/some/path") } else { // printenv isn't available so just pass the test - assert(true) } }