Skip to content

Commit

Permalink
Add application ID and attempt ID to the value of the caller context …
Browse files Browse the repository at this point in the history
…when 'Task' invoke Hadoop caller context API
  • Loading branch information
weiqingy committed Aug 22, 2016
1 parent 5ab2a41 commit 1512775
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId))
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
Expand All @@ -1024,7 +1025,8 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId))
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ private[spark] class ResultTask[T, U](
val outputId: Int,
localProperties: Properties,
metrics: TaskMetrics,
jobId: Option[Int] = None)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId)
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Serializable {

@transient private[this] val preferredLocs: Seq[TaskLocation] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ private[spark] class ShuffleMapTask(
@transient private var locs: Seq[TaskLocation],
metrics: TaskMetrics,
localProperties: Properties,
jobId: Option[Int] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId)
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Logging {

/** A constructor used only in test suites. This does not require passing in an RDD. */
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ private[spark] abstract class Task[T](
// The default value is only used in tests.
val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties,
val jobId: Option[Int] = None) extends Serializable {
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {

/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
Expand All @@ -82,7 +84,8 @@ private[spark] abstract class Task[T](
taskThread = Thread.currentThread()

val callerContext =
s"Spark_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" +
s"Spark_AppId_${appId.getOrElse("")}_AppAttemptId_${appAttemptId.getOrElse("None")}" +
s"_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" +
s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber}"
Utils.setCallerContext(callerContext)

Expand Down

0 comments on commit 1512775

Please sign in to comment.