Skip to content

Commit a1a32d2

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-32600][CORE] Unify task name in some logs between driver and executor
### What changes were proposed in this pull request? This PR replaces some arbitrary task names in logs with the widely used task name (e.g. "task 0.0 in stage 1.0 (TID 1)") among driver and executor. This will change the task name in `TaskDescription` by appending TID. ### Why are the changes needed? Some logs are still using TID(a.k.a `taskId`) only as the task name, e.g., https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/executor/Executor.scala#L786 https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/executor/Executor.scala#L632-L635 And the task thread name also only has the `taskId`: https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/executor/Executor.scala#L325 As mentioned in #1259, TID itself does not capture stage or retries, making it harder to correlate with the application. It's inconvenient when debugging applications. Actually, task name like "task name (e.g. "task 0.0 in stage 1.0 (TID 1)")" has already been used widely after #1259. We'd better follow the naming convention. ### Does this PR introduce _any_ user-facing change? Yes. Users will see the more consistent task names in the log. ### How was this patch tested? Manually checked. Closes #29418 from Ngone51/unify-task-name. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 03e2de9 commit a1a32d2

File tree

4 files changed

+44
-42
lines changed

4 files changed

+44
-42
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ private[spark] class Executor(
336336
extends Runnable {
337337

338338
val taskId = taskDescription.taskId
339-
val threadName = s"Executor task launch worker for task $taskId"
340339
val taskName = taskDescription.name
340+
val threadName = s"Executor task launch worker for $taskName"
341341
val mdcProperties = taskDescription.properties.asScala
342342
.filter(_._1.startsWith("mdc.")).toSeq
343343

@@ -364,7 +364,7 @@ private[spark] class Executor(
364364
@volatile var task: Task[Any] = _
365365

366366
def kill(interruptThread: Boolean, reason: String): Unit = {
367-
logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason")
367+
logInfo(s"Executor is trying to kill $taskName, reason: $reason")
368368
reasonIfKilled = Some(reason)
369369
if (task != null) {
370370
synchronized {
@@ -425,7 +425,7 @@ private[spark] class Executor(
425425
} else 0L
426426
Thread.currentThread.setContextClassLoader(replClassLoader)
427427
val ser = env.closureSerializer.newInstance()
428-
logInfo(s"Running $taskName (TID $taskId)")
428+
logInfo(s"Running $taskName")
429429
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
430430
var taskStartTimeNs: Long = 0
431431
var taskStartCpu: Long = 0
@@ -459,7 +459,7 @@ private[spark] class Executor(
459459
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
460460
// we don't need to make any special calls here.
461461
if (!isLocal) {
462-
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
462+
logDebug(s"$taskName's epoch is ${task.epoch}")
463463
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
464464
}
465465

@@ -485,7 +485,7 @@ private[spark] class Executor(
485485
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
486486

487487
if (freedMemory > 0 && !threwException) {
488-
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
488+
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, $taskName"
489489
if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
490490
throw new SparkException(errMsg)
491491
} else {
@@ -495,7 +495,7 @@ private[spark] class Executor(
495495

496496
if (releasedLocks.nonEmpty && !threwException) {
497497
val errMsg =
498-
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
498+
s"${releasedLocks.size} block locks were not released by $taskName\n" +
499499
releasedLocks.mkString("[", ", ", "]")
500500
if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
501501
throw new SparkException(errMsg)
@@ -508,7 +508,7 @@ private[spark] class Executor(
508508
// uh-oh. it appears the user code has caught the fetch-failure without throwing any
509509
// other exceptions. Its *possible* this is what the user meant to do (though highly
510510
// unlikely). So we will log an error and keep going.
511-
logError(s"TID ${taskId} completed successfully though internally it encountered " +
511+
logError(s"$taskName completed successfully though internally it encountered " +
512512
s"unrecoverable fetch failures! Most likely this means user code is incorrectly " +
513513
s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
514514
}
@@ -592,7 +592,7 @@ private[spark] class Executor(
592592
// directSend = sending directly back to the driver
593593
val serializedResult: ByteBuffer = {
594594
if (maxResultSize > 0 && resultSize > maxResultSize) {
595-
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
595+
logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
596596
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
597597
s"dropping it.")
598598
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
@@ -602,11 +602,10 @@ private[spark] class Executor(
602602
blockId,
603603
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
604604
StorageLevel.MEMORY_AND_DISK_SER)
605-
logInfo(
606-
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
605+
logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)")
607606
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
608607
} else {
609-
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
608+
logInfo(s"Finished $taskName. $resultSize bytes result sent to driver")
610609
serializedDirectResult
611610
}
612611
}
@@ -616,7 +615,7 @@ private[spark] class Executor(
616615
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
617616
} catch {
618617
case t: TaskKilledException =>
619-
logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")
618+
logInfo(s"Executor killed $taskName, reason: ${t.reason}")
620619

621620
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
622621
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
@@ -629,7 +628,7 @@ private[spark] class Executor(
629628
case _: InterruptedException | NonFatal(_) if
630629
task != null && task.reasonIfKilled.isDefined =>
631630
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
632-
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
631+
logInfo(s"Executor interrupted and killed $taskName, reason: $killReason")
633632

634633
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
635634
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
@@ -643,7 +642,7 @@ private[spark] class Executor(
643642
// there was a fetch failure in the task, but some user code wrapped that exception
644643
// and threw something else. Regardless, we treat it as a fetch failure.
645644
val fetchFailedCls = classOf[FetchFailedException].getName
646-
logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " +
645+
logWarning(s"$taskName encountered a ${fetchFailedCls} and " +
647646
s"failed, but the ${fetchFailedCls} was hidden by another " +
648647
s"exception. Spark is handling this like a fetch failure and ignoring the " +
649648
s"other exception: $t")
@@ -659,13 +658,13 @@ private[spark] class Executor(
659658
case t: Throwable if env.isStopped =>
660659
// Log the expected exception after executor.stop without stack traces
661660
// see: SPARK-19147
662-
logError(s"Exception in $taskName (TID $taskId): ${t.getMessage}")
661+
logError(s"Exception in $taskName: ${t.getMessage}")
663662

664663
case t: Throwable =>
665664
// Attempt to exit cleanly by informing the driver of our failure.
666665
// If anything goes wrong (or this was a fatal exception), we will delegate to
667666
// the default uncaught exception handler, which will terminate the Executor.
668-
logError(s"Exception in $taskName (TID $taskId)", t)
667+
logError(s"Exception in $taskName", t)
669668

670669
// SPARK-20904: Do not report failure to driver if if happened during shut down. Because
671670
// libraries may set up shutdown hooks that race with running tasks during shutdown,

core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private[spark] class TaskDescription(
5959
val resources: immutable.Map[String, ResourceInformation],
6060
val serializedTask: ByteBuffer) {
6161

62-
override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
62+
override def toString: String = s"TaskDescription($name)"
6363
}
6464

6565
private[spark] object TaskDescription {

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
8383
"Tasks result size has exceeded maxResultSize"))
8484
return
8585
}
86-
logDebug("Fetching indirect task result for TID %s".format(tid))
86+
logDebug(s"Fetching indirect task result for ${taskSetManager.taskName(tid)}")
8787
scheduler.handleTaskGettingResult(taskSetManager, tid)
8888
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
8989
if (serializedTaskResult.isEmpty) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,8 @@ private[spark] class TaskSetManager(
479479
// We used to log the time it takes to serialize the task, but task size is already
480480
// a good proxy to task serialization time.
481481
// val timeTaken = clock.getTime() - startTime
482-
val taskName = s"task ${info.id} in stage ${taskSet.id}"
483-
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
482+
val tName = taskName(taskId)
483+
logInfo(s"Starting $tName ($host, executor ${info.executorId}, " +
484484
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes) " +
485485
s"taskResourceAssignments ${taskResourceAssignments}")
486486

@@ -489,7 +489,7 @@ private[spark] class TaskSetManager(
489489
taskId,
490490
attemptNum,
491491
execId,
492-
taskName,
492+
tName,
493493
index,
494494
task.partitionId,
495495
addedFiles,
@@ -509,6 +509,12 @@ private[spark] class TaskSetManager(
509509
}
510510
}
511511

512+
def taskName(tid: Long): String = {
513+
val info = taskInfos.get(tid)
514+
assert(info.isDefined, s"Can not find TaskInfo for task (TID $tid)")
515+
s"task ${info.get.id} in stage ${taskSet.id} (TID $tid)"
516+
}
517+
512518
private def maybeFinishTaskSet(): Unit = {
513519
if (isZombie && runningTasks == 0) {
514520
sched.taskSetFinished(this)
@@ -743,9 +749,8 @@ private[spark] class TaskSetManager(
743749
// Kill any other attempts for the same task (since those are unnecessary now that one
744750
// attempt completed successfully).
745751
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
746-
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
747-
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
748-
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
752+
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for ${taskName(attemptInfo.taskId)}" +
753+
s" on ${attemptInfo.host} as the attempt ${info.attemptNumber} succeeded on ${info.host}")
749754
killedByOtherAttempt += attemptInfo.taskId
750755
sched.backend.killTask(
751756
attemptInfo.taskId,
@@ -755,17 +760,16 @@ private[spark] class TaskSetManager(
755760
}
756761
if (!successful(index)) {
757762
tasksSuccessful += 1
758-
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
759-
s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
760-
s" ($tasksSuccessful/$numTasks)")
763+
logInfo(s"Finished ${taskName(info.taskId)} in ${info.duration} ms " +
764+
s"on ${info.host} (executor ${info.executorId}) ($tasksSuccessful/$numTasks)")
761765
// Mark successful and stop if all the tasks have succeeded.
762766
successful(index) = true
763767
if (tasksSuccessful == numTasks) {
764768
isZombie = true
765769
}
766770
} else {
767-
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
768-
" because task " + index + " has already completed successfully")
771+
logInfo(s"Ignoring task-finished event for ${taskName(info.taskId)} " +
772+
s"because it has already completed successfully")
769773
}
770774
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
771775
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@@ -806,8 +810,8 @@ private[spark] class TaskSetManager(
806810
copiesRunning(index) -= 1
807811
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
808812
var metricPeaks: Array[Long] = Array.empty
809-
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," +
810-
s" executor ${info.executorId}): ${reason.toErrorString}"
813+
val failureReason = s"Lost ${taskName(tid)} (${info.host} " +
814+
s"executor ${info.executorId}): ${reason.toErrorString}"
811815
val failureException: Option[Throwable] = reason match {
812816
case fetchFailed: FetchFailed =>
813817
logWarning(failureReason)
@@ -828,12 +832,11 @@ private[spark] class TaskSetManager(
828832
// ExceptionFailure's might have accumulator updates
829833
accumUpdates = ef.accums
830834
metricPeaks = ef.metricPeaks.toArray
835+
val task = taskName(tid)
831836
if (ef.className == classOf[NotSerializableException].getName) {
832837
// If the task result wasn't serializable, there's no point in trying to re-execute it.
833-
logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
834-
.format(info.id, taskSet.id, tid, ef.description))
835-
abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format(
836-
info.id, taskSet.id, tid, ef.description))
838+
logError(s"$task had a not serializable result: ${ef.description}; not retrying")
839+
abort(s"$task had a not serializable result: ${ef.description}")
837840
return
838841
}
839842
if (ef.className == classOf[TaskOutputFileAlreadyExistException].getName) {
@@ -866,8 +869,8 @@ private[spark] class TaskSetManager(
866869
logWarning(failureReason)
867870
} else {
868871
logInfo(
869-
s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on ${info.host}, executor" +
870-
s" ${info.executorId}: ${ef.className} (${ef.description}) [duplicate $dupCount]")
872+
s"Lost $task on ${info.host}, executor ${info.executorId}: " +
873+
s"${ef.className} (${ef.description}) [duplicate $dupCount]")
871874
}
872875
ef.exception
873876

@@ -879,7 +882,7 @@ private[spark] class TaskSetManager(
879882
None
880883

881884
case e: ExecutorLostFailure if !e.exitCausedByApp =>
882-
logInfo(s"Task $tid failed because while it was being computed, its executor " +
885+
logInfo(s"${taskName(tid)} failed because while it was being computed, its executor " +
883886
"exited for a reason unrelated to the task. Not counting this failure towards the " +
884887
"maximum number of failures for the task.")
885888
None
@@ -910,10 +913,10 @@ private[spark] class TaskSetManager(
910913
}
911914

912915
if (successful(index)) {
913-
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
914-
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
915-
s" so the previous stage needs to be re-run, or because a different copy of the task" +
916-
s" has already succeeded).")
916+
logInfo(s"${taskName(info.taskId)} failed, but the task will not" +
917+
" be re-executed (either because the task failed with a shuffle data fetch failure," +
918+
" so the previous stage needs to be re-run, or because a different copy of the task" +
919+
" has already succeeded).")
917920
} else {
918921
addPendingTask(index)
919922
}

0 commit comments

Comments
 (0)