Skip to content

Commit

Permalink
[SPARK-34764][CORE][K8S][UI] Propagate reason for exec loss to Web UI
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Adds the exec loss reason to the Spark web UI & in doing so also fix the Kube integration to pass exec loss reason into core.

UI change:

![image](https://user-images.githubusercontent.com/59893/117045762-b975ba80-acc4-11eb-9679-8edab3cfadc2.png)

### Why are the changes needed?

Debugging Spark jobs is *hard*, making it clearer why executors have exited could help.

### Does this PR introduce _any_ user-facing change?

Yes a new column on the executor page.

### How was this patch tested?

K8s unit test updated to validate exec loss reasons are passed through regardless of exec alive state, manual testing to validate the UI.

Closes #32436 from holdenk/SPARK-34764-propegate-reason-for-exec-loss.

Lead-authored-by: Holden Karau <hkarau@apple.com>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
holdenk and holdenk committed May 13, 2021
1 parent 6a949d1 commit 160b3be
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ <h4 class="title-table">Executors</h4>
Shuffle Write</span></th>
<th>Logs</th>
<th>Thread Dump</th>
<th>Exec Loss Reason</th>
</tr>
</thead>
<tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ function getThreadDumpEnabled() {
return threadDumpEnabled;
}

function formatLossReason(removeReason, type, row) {
if (removeReason) {
return removeReason
} else {
return ""
}
}

function formatStatus(status, type, row) {
if (row.isExcluded) {
return "Excluded";
Expand Down Expand Up @@ -132,7 +140,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}

var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14];
var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14, 15];
var execDataTable;
var sumDataTable;

Expand Down Expand Up @@ -543,6 +551,10 @@ $(document).ready(function () {
data: 'id', render: function (data, type) {
return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
}
},
{
data: 'removeReason',
render: formatLossReason
}
],
"order": [[0, "asc"]],
Expand Down Expand Up @@ -709,6 +721,7 @@ $(document).ready(function () {
"<div id='direct_mapped_pool_memory' class='direct_mapped_pool_memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'> Peak Pool Memory Direct / Mapped</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='13'> Resources</div>" +
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='14'> Resource Profile Id</div>" +
"<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='15'> Exec Loss Reason</div>" +
"</div>");

reselectCheckboxesBasedOnTaskTableState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,17 @@ private[spark] class ExecutorPodsLifecycleManager(
val pod = podState.pod
val reason = Option(pod.getStatus.getReason)
val message = Option(pod.getStatus.getMessage)
val explained = describeExitCode(exitCode)
val exitMsg = s"The executor with id $execId exited with exit code $explained."
val reasonStr = reason.map(r => s"The API gave the following brief reason: ${r}")
val msgStr = message.map(m => s"The API gave the following message: ${m}")


s"""
|The executor with id $execId exited with exit code $exitCode.
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|${exitMsg}
|${reasonStr.getOrElse("")}
|${msgStr.getOrElse("")}
|
|The API gave the following container statuses:
|
|${containersDescription(pod)}
Expand All @@ -246,4 +253,25 @@ private[spark] class ExecutorPodsLifecycleManager(

private object ExecutorPodsLifecycleManager {
val UNKNOWN_EXIT_CODE = -1

// A utility function to try and help people figure out whats gone wrong faster.
def describeExitCode(code: Int): String = {
val humanStr = code match {
case 0 => "(success)"
case 1 => "(generic, look at logs to clarify)"
case 42 => "(Douglas Adams fan)"
// Spark specific
case 10 | 50 => "(Uncaught exception)"
case 52 => "(JVM OOM)"
case 53 => "(DiskStore failed to create temp dir)"
// K8s & JVM specific exit codes
case 126 => "(not executable - possibly perm or arch)"
case 137 => "(SIGKILL, possible container OOM)"
case 139 => "(SIGSEGV: that's unexpected)"
case 255 => "(exit-1, your guess is as good as mine)"
case _ => "(unexpected)"
}
s"${code}${humanStr}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ private[spark] class KubernetesClusterSchedulerBackend(

// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
if (isExecutorActive(executorId)) {
removeExecutor(executorId, reason)
}
removeExecutor(executorId, reason)
}

private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
val failedPod = failedExecutorWithoutDeletion(1)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val msg = exitReasonMessage(1, failedPod, 1)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName)).delete()
Expand All @@ -81,7 +81,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
snapshotsStore.notifySubscribers()
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val msg = exitReasonMessage(1, failedPod, 1)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend, times(1)).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
Expand Down Expand Up @@ -114,7 +114,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod)
val msg = exitReasonMessage(1, failedPod, 1)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
Expand All @@ -126,13 +126,20 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
}

private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
val explained = ExecutorPodsLifecycleManager.describeExitCode(exitCode)
val exitMsg = s"The executor with id $execId exited with exit code $explained."
val reasonStr = reason.map(r => s"The API gave the following brief reason: ${r}")
val msgStr = message.map(m => s"The API gave the following message: ${m}")


s"""
|The executor with id $failedExecutorId exited with exit code 1.
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|${exitMsg}
|${reasonStr.getOrElse("")}
|${msgStr.getOrElse("")}
|
|The API gave the following container statuses:
|
|${containersDescription(failedPod)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn

backend.start()
backend.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef, never()).send(RemoveExecutor("1", ExecutorKilled))
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))

backend.doRemoveExecutor("2", ExecutorKilled)
verify(driverEndpointRef, never()).send(RemoveExecutor("1", ExecutorKilled))
verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
}

test("Kill executors") {
Expand Down

0 comments on commit 160b3be

Please sign in to comment.