Skip to content

Commit

Permalink
Clean-up the TaskContext API.
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Oct 14, 2014
1 parent ed551ce commit 44089ec
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 32 deletions.
48 changes: 21 additions & 27 deletions core/src/main/java/org/apache/spark/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,56 @@
package org.apache.spark;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import scala.Function0;
import scala.Function1;
import scala.Unit;
import scala.collection.JavaConversions;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.TaskCompletionListenerException;

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*/
@DeveloperApi
* Contextual information about a task which can be read or mutated during
* execution. To access the TaskContext for a running task use
* TaskContext.get().
*/
public abstract class TaskContext implements Serializable {
/**
* Return the currently active TaskContext. This can be called inside of
* user functions to access contextual information about running tasks.
*/
public static TaskContext get() {
return taskContext.get();
}

private static ThreadLocal<TaskContext> taskContext =
new ThreadLocal<TaskContext>();

/**
* :: Internal API ::
* This is spark internal API, not intended to be called from user programs.
*/
static void setTaskContext(TaskContext tc) {
taskContext.set(tc);
}

public static TaskContext get() {
return taskContext.get();
}

/** :: Internal API :: */
static void unset() {
taskContext.remove();
}

/**
* Checks whether the task has completed.
* Whether the task has completed.
*/
public abstract boolean isCompleted();

/**
* Checks whether the task has been killed.
* Whether the task has been killed.
*/
public abstract boolean isInterrupted();

@Deprecated
/** Deprecated: use isRunningLocally() */
public abstract boolean runningLocally();

public abstract boolean isRunningLocally();

/**
* Add a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
Expand Down Expand Up @@ -103,12 +102,7 @@ static void unset() {

public abstract long attemptId();

@Deprecated
/** Deprecated: use isRunningLocally() */
public abstract boolean runningLocally();

public abstract boolean isRunningLocally();

/** ::Internal API:: */
/** ::DeveloperApi:: */
@DeveloperApi
public abstract TaskMetrics taskMetrics();
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark

/**
* This class exists to restrict the visibility of TaskContext setters.
*/
private [spark] object TaskContextHelper {

def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] class TaskContextImpl(val stageId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext(stageId, partitionId, attemptId, runningLocally, taskMetrics)
extends TaskContext
with Logging {

// List of callback functions to execute when the task completes.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
var metrics: Option[TaskMetrics] = None

// Task context, to be initialized in run().
@transient protected var context: TaskContext = _
@transient protected var context: TaskContextImpl = _

// The actual Thread on which the task is running, if any. Initialized in run().
@volatile @transient private var taskThread: Thread = _
Expand Down
4 changes: 2 additions & 2 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ object MimaBuild {
ProblemFilters.exclude[MissingFieldProblem](fullName),
ProblemFilters.exclude[IncompatibleResultTypeProblem](fullName),
ProblemFilters.exclude[IncompatibleMethTypeProblem](fullName),
ProblemFilters.exclude[IncompatibleFieldTypeProblem](fullName),
ProblemFilters.exclude[AbstractClassProblem](fullName)
ProblemFilters.exclude[IncompatibleFieldTypeProblem](fullName)
// ProblemFilters.exclude[AbstractClassProblem](fullName)
)

// Exclude a single class and its corresponding object
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ object MimaExcludes {
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
// MapStatus should be private[spark]
ProblemFilters.exclude[IncompatibleTemplateDefProblem](
"org.apache.spark.scheduler.MapStatus")
"org.apache.spark.scheduler.MapStatus"),
// TaskContext was promoted to Abstract class
ProblemFilters.exclude[AbstractClassProblem](
"org.apache.spark.TaskContext")

)

case v if v.startsWith("1.1") =>
Expand Down

0 comments on commit 44089ec

Please sign in to comment.