diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java index 6f5a66a4f31d9..a4654ebb12c95 100644 --- a/core/src/main/java/org/apache/spark/TaskContext.java +++ b/core/src/main/java/org/apache/spark/TaskContext.java @@ -18,57 +18,56 @@ package org.apache.spark; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import scala.Function0; import scala.Function1; import scala.Unit; -import scala.collection.JavaConversions; import org.apache.spark.annotation.DeveloperApi; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.util.TaskCompletionListener; -import org.apache.spark.util.TaskCompletionListenerException; /** -* :: DeveloperApi :: -* Contextual information about a task which can be read or mutated during execution. -*/ -@DeveloperApi + * Contextual information about a task which can be read or mutated during + * execution. To access the TaskContext for a running task use + * TaskContext.get(). + */ public abstract class TaskContext implements Serializable { + /** + * Return the currently active TaskContext. This can be called inside of + * user functions to access contextual information about running tasks. + */ + public static TaskContext get() { + return taskContext.get(); + } private static ThreadLocal taskContext = new ThreadLocal(); - /** - * :: Internal API :: - * This is spark internal API, not intended to be called from user programs. - */ static void setTaskContext(TaskContext tc) { taskContext.set(tc); } - public static TaskContext get() { - return taskContext.get(); - } - - /** :: Internal API :: */ static void unset() { taskContext.remove(); } /** - * Checks whether the task has completed. + * Whether the task has completed. */ public abstract boolean isCompleted(); /** - * Checks whether the task has been killed. + * Whether the task has been killed. */ public abstract boolean isInterrupted(); + @Deprecated + /** Deprecated: use isRunningLocally() */ + public abstract boolean runningLocally(); + + public abstract boolean isRunningLocally(); + /** * Add a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. @@ -103,12 +102,7 @@ static void unset() { public abstract long attemptId(); - @Deprecated - /** Deprecated: use isRunningLocally() */ - public abstract boolean runningLocally(); - - public abstract boolean isRunningLocally(); - - /** ::Internal API:: */ + /** ::DeveloperApi:: */ + @DeveloperApi public abstract TaskMetrics taskMetrics(); } diff --git a/core/src/main/scala/org/apache/spark/TaskContextHelper.scala b/core/src/main/scala/org/apache/spark/TaskContextHelper.scala index a18e0586fca12..4636c4600a01a 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextHelper.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextHelper.scala @@ -17,6 +17,9 @@ package org.apache.spark +/** + * This class exists to restrict the visibility of TaskContext setters. + */ private [spark] object TaskContextHelper { def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc) diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 01508f2ed32d2..afd2b85d33a77 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -27,7 +27,7 @@ private[spark] class TaskContextImpl(val stageId: Int, val attemptId: Long, val runningLocally: Boolean = false, val taskMetrics: TaskMetrics = TaskMetrics.empty) - extends TaskContext(stageId, partitionId, attemptId, runningLocally, taskMetrics) + extends TaskContext with Logging { // List of callback functions to execute when the task completes. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 78213686d6813..2552d03d18d06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -70,7 +70,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex var metrics: Option[TaskMetrics] = None // Task context, to be initialized in run(). - @transient protected var context: TaskContext = _ + @transient protected var context: TaskContextImpl = _ // The actual Thread on which the task is running, if any. Initialized in run(). @volatile @transient private var taskThread: Thread = _ diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index 81542a8c11839..ecf34c61eb78e 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -39,8 +39,8 @@ object MimaBuild { ProblemFilters.exclude[MissingFieldProblem](fullName), ProblemFilters.exclude[IncompatibleResultTypeProblem](fullName), ProblemFilters.exclude[IncompatibleMethTypeProblem](fullName), - ProblemFilters.exclude[IncompatibleFieldTypeProblem](fullName), - ProblemFilters.exclude[AbstractClassProblem](fullName) + ProblemFilters.exclude[IncompatibleFieldTypeProblem](fullName) +// ProblemFilters.exclude[AbstractClassProblem](fullName) ) // Exclude a single class and its corresponding object diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d499302124461..350aad47735e4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -50,7 +50,11 @@ object MimaExcludes { "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"), // MapStatus should be private[spark] ProblemFilters.exclude[IncompatibleTemplateDefProblem]( - "org.apache.spark.scheduler.MapStatus") + "org.apache.spark.scheduler.MapStatus"), + // TaskContext was promoted to Abstract class + ProblemFilters.exclude[AbstractClassProblem]( + "org.apache.spark.TaskContext") + ) case v if v.startsWith("1.1") =>