Skip to content

Commit

Permalink
MAPREDUCE-6892. Issues with the count of failed/killed tasks in the j…
Browse files Browse the repository at this point in the history
…hist file. (Peter Bacsko via Haibo Chen)
  • Loading branch information
haibchen committed Aug 30, 2017
1 parent a20e710 commit d04f85f
Show file tree
Hide file tree
Showing 22 changed files with 573 additions and 136 deletions.
Expand Up @@ -431,10 +431,18 @@ protected void serviceStop() throws Exception {
+ " to have not been closed. Will close"); + " to have not been closed. Will close");
//Create a JobFinishEvent so that it is written to the job history //Create a JobFinishEvent so that it is written to the job history
final Job job = context.getJob(toClose); final Job job = context.getJob(toClose);
int successfulMaps = job.getCompletedMaps() - job.getFailedMaps()
- job.getKilledMaps();
int successfulReduces = job.getCompletedReduces()
- job.getFailedReduces() - job.getKilledReduces();

JobUnsuccessfulCompletionEvent jucEvent = JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), job.getCompletedMaps(), System.currentTimeMillis(),
job.getCompletedReduces(), successfulMaps,
successfulReduces,
job.getFailedMaps(), job.getFailedReduces(),
job.getKilledMaps(), job.getKilledReduces(),
createJobStateForJobUnsuccessfulCompletionEvent( createJobStateForJobUnsuccessfulCompletionEvent(
mi.getForcedJobStateOnShutDown()), mi.getForcedJobStateOnShutDown()),
job.getDiagnostics()); job.getDiagnostics());
Expand Down Expand Up @@ -655,9 +663,9 @@ public void handleEvent(JobHistoryEvent event) {
JobFinishedEvent jFinishedEvent = JobFinishedEvent jFinishedEvent =
(JobFinishedEvent) event.getHistoryEvent(); (JobFinishedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime()); mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps()); mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces( mi.getJobIndexInfo().setNumReduces(
jFinishedEvent.getFinishedReduces()); jFinishedEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString()); mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
closeEventWriter(event.getJobID()); closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID()); processDoneFiles(event.getJobID());
Expand All @@ -672,8 +680,8 @@ public void handleEvent(JobHistoryEvent event) {
JobUnsuccessfulCompletionEvent jucEvent = JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event.getHistoryEvent(); (JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus()); mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID()); closeEventWriter(event.getJobID());
if(context.isLastAMRetry()) if(context.isLastAMRetry())
Expand All @@ -690,8 +698,8 @@ public void handleEvent(JobHistoryEvent event) {
(JobUnsuccessfulCompletionEvent) event (JobUnsuccessfulCompletionEvent) event
.getHistoryEvent(); .getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus()); mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID()); closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID()); processDoneFiles(event.getJobID());
Expand Down Expand Up @@ -739,10 +747,12 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
case JOB_FINISHED: case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event; JobFinishedEvent jfe = (JobFinishedEvent) event;
summary.setJobFinishTime(jfe.getFinishTime()); summary.setJobFinishTime(jfe.getFinishTime());
summary.setNumFinishedMaps(jfe.getFinishedMaps()); summary.setNumSucceededMaps(jfe.getSucceededMaps());
summary.setNumFailedMaps(jfe.getFailedMaps()); summary.setNumFailedMaps(jfe.getFailedMaps());
summary.setNumFinishedReduces(jfe.getFinishedReduces()); summary.setNumSucceededReduces(jfe.getSucceededReduces());
summary.setNumFailedReduces(jfe.getFailedReduces()); summary.setNumFailedReduces(jfe.getFailedReduces());
summary.setNumKilledMaps(jfe.getKilledMaps());
summary.setNumKilledReduces(jfe.getKilledReduces());
if (summary.getJobStatus() == null) if (summary.getJobStatus() == null)
summary summary
.setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED .setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
Expand All @@ -753,11 +763,21 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
break; break;
case JOB_FAILED: case JOB_FAILED:
case JOB_KILLED: case JOB_KILLED:
Job job = context.getJob(jobId);
JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event; JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
int successfulMaps = job.getCompletedMaps() - job.getFailedMaps()
- job.getKilledMaps();
int successfulReduces = job.getCompletedReduces()
- job.getFailedReduces() - job.getKilledReduces();

summary.setJobStatus(juce.getStatus()); summary.setJobStatus(juce.getStatus());
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps()); summary.setNumSucceededMaps(successfulMaps);
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces()); summary.setNumSucceededReduces(successfulReduces);
summary.setNumFailedMaps(job.getFailedMaps());
summary.setNumFailedReduces(job.getFailedReduces());
summary.setJobFinishTime(juce.getFinishTime()); summary.setJobFinishTime(juce.getFinishTime());
summary.setNumKilledMaps(juce.getKilledMaps());
summary.setNumKilledReduces(juce.getKilledReduces());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters()); setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break; break;
default: default:
Expand Down Expand Up @@ -840,25 +860,43 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
JobUnsuccessfulCompletionEvent juce = JobUnsuccessfulCompletionEvent juce =
(JobUnsuccessfulCompletionEvent) event; (JobUnsuccessfulCompletionEvent) event;
tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime()); tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps()); tEvent.addEventInfo("NUM_MAPS",
tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces()); juce.getSucceededMaps() +
juce.getFailedMaps() +
juce.getKilledMaps());
tEvent.addEventInfo("NUM_REDUCES",
juce.getSucceededReduces() +
juce.getFailedReduces() +
juce.getKilledReduces());
tEvent.addEventInfo("JOB_STATUS", juce.getStatus()); tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics()); tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps()); tEvent.addEventInfo("SUCCESSFUL_MAPS", juce.getSucceededMaps());
tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces()); tEvent.addEventInfo("SUCCESSFUL_REDUCES", juce.getSucceededReduces());
tEvent.addEventInfo("FAILED_MAPS", juce.getFailedMaps());
tEvent.addEventInfo("FAILED_REDUCES", juce.getFailedReduces());
tEvent.addEventInfo("KILLED_MAPS", juce.getKilledMaps());
tEvent.addEventInfo("KILLED_REDUCES", juce.getKilledReduces());
tEntity.addEvent(tEvent); tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString()); tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break; break;
case JOB_FINISHED: case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event; JobFinishedEvent jfe = (JobFinishedEvent) event;
tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime()); tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps()); tEvent.addEventInfo("NUM_MAPS",
tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces()); jfe.getSucceededMaps() +
jfe.getFailedMaps() +
jfe.getKilledMaps());
tEvent.addEventInfo("NUM_REDUCES",
jfe.getSucceededReduces() +
jfe.getFailedReduces() +
jfe.getKilledReduces());
tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps()); tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces()); tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps()); tEvent.addEventInfo("SUCCESSFUL_MAPS", jfe.getSucceededMaps());
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces()); tEvent.addEventInfo("SUCCESSFUL_REDUCES", jfe.getSucceededReduces());
tEvent.addEventInfo("KILLED_MAPS", jfe.getKilledMaps());
tEvent.addEventInfo("KILLED_REDUCES", jfe.getKilledReduces());
tEvent.addEventInfo("MAP_COUNTERS_GROUPS", tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(jfe.getMapCounters())); JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
Expand Down
Expand Up @@ -30,10 +30,12 @@ public class JobSummary {
private long firstReduceTaskLaunchTime; // ReduceAttemptStarted | private long firstReduceTaskLaunchTime; // ReduceAttemptStarted |
// TaskAttemptStartEvent // TaskAttemptStartEvent
private long jobFinishTime; private long jobFinishTime;
private int numFinishedMaps; private int numSucceededMaps;
private int numFailedMaps; private int numFailedMaps;
private int numFinishedReduces; private int numSucceededReduces;
private int numFailedReduces; private int numFailedReduces;
private int numKilledMaps;
private int numKilledReduces;
private int resourcesPerMap; // resources used per map/min resource private int resourcesPerMap; // resources used per map/min resource
private int resourcesPerReduce; // resources used per reduce/min resource private int resourcesPerReduce; // resources used per reduce/min resource
// resource models // resource models
Expand Down Expand Up @@ -98,12 +100,12 @@ public void setJobFinishTime(long jobFinishTime) {
this.jobFinishTime = jobFinishTime; this.jobFinishTime = jobFinishTime;
} }


public int getNumFinishedMaps() { public int getNumSucceededMaps() {
return numFinishedMaps; return numSucceededMaps;
} }


public void setNumFinishedMaps(int numFinishedMaps) { public void setNumSucceededMaps(int numSucceededMaps) {
this.numFinishedMaps = numFinishedMaps; this.numSucceededMaps = numSucceededMaps;
} }


public int getNumFailedMaps() { public int getNumFailedMaps() {
Expand All @@ -114,6 +116,22 @@ public void setNumFailedMaps(int numFailedMaps) {
this.numFailedMaps = numFailedMaps; this.numFailedMaps = numFailedMaps;
} }


public int getKilledMaps() {
return numKilledMaps;
}

public void setNumKilledMaps(int numKilledMaps) {
this.numKilledMaps = numKilledMaps;
}

public int getKilledReduces() {
return numKilledReduces;
}

public void setNumKilledReduces(int numKilledReduces) {
this.numKilledReduces = numKilledReduces;
}

public int getResourcesPerMap() { public int getResourcesPerMap() {
return resourcesPerMap; return resourcesPerMap;
} }
Expand All @@ -122,12 +140,12 @@ public void setResourcesPerMap(int resourcesPerMap) {
this.resourcesPerMap = resourcesPerMap; this.resourcesPerMap = resourcesPerMap;
} }


public int getNumFinishedReduces() { public int getNumSucceededReduces() {
return numFinishedReduces; return numSucceededReduces;
} }


public void setNumFinishedReduces(int numFinishedReduces) { public void setNumSucceededReduces(int numSucceededReduces) {
this.numFinishedReduces = numFinishedReduces; this.numSucceededReduces = numSucceededReduces;
} }


public int getNumFailedReduces() { public int getNumFailedReduces() {
Expand Down Expand Up @@ -204,8 +222,15 @@ public String getJobSummaryString() {
.add("finishTime", jobFinishTime) .add("finishTime", jobFinishTime)
.add("resourcesPerMap", resourcesPerMap) .add("resourcesPerMap", resourcesPerMap)
.add("resourcesPerReduce", resourcesPerReduce) .add("resourcesPerReduce", resourcesPerReduce)
.add("numMaps", numFinishedMaps + numFailedMaps) .add("numMaps", numSucceededMaps + numFailedMaps + numKilledMaps)
.add("numReduces", numFinishedReduces + numFailedReduces) .add("numReduces", numSucceededReduces + numFailedReduces
+ numKilledReduces)
.add("succededMaps", numSucceededMaps)
.add("succeededReduces", numSucceededReduces)
.add("failedMaps", numFailedMaps)
.add("failedReduces", numFailedReduces)
.add("killedMaps", numKilledMaps)
.add("killedReduces", numKilledReduces)
.add("user", user) .add("user", user)
.add("queue", queue) .add("queue", queue)
.add("status", jobStatus) .add("status", jobStatus)
Expand Down
Expand Up @@ -65,6 +65,10 @@ public interface Job {
int getTotalReduces(); int getTotalReduces();
int getCompletedMaps(); int getCompletedMaps();
int getCompletedReduces(); int getCompletedReduces();
int getFailedMaps();
int getFailedReduces();
int getKilledMaps();
int getKilledReduces();
float getProgress(); float getProgress();
boolean isUber(); boolean isUber();
String getUserName(); String getUserName();
Expand Down
Expand Up @@ -1684,6 +1684,10 @@ private void unsuccessfulFinish(JobStateInternal finalState) {
finishTime, finishTime,
succeededMapTaskCount, succeededMapTaskCount,
succeededReduceTaskCount, succeededReduceTaskCount,
failedMapTaskCount,
failedReduceTaskCount,
killedMapTaskCount,
killedReduceTaskCount,
finalState.toString(), finalState.toString(),
diagnostics); diagnostics);
eventHandler.handle(new JobHistoryEvent(jobId, eventHandler.handle(new JobHistoryEvent(jobId,
Expand Down Expand Up @@ -1748,6 +1752,7 @@ private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
job.oldJobId, job.finishTime, job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount, job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount, job.failedMapTaskCount, job.failedReduceTaskCount,
job.killedMapTaskCount, job.killedReduceTaskCount,
job.finalMapCounters, job.finalMapCounters,
job.finalReduceCounters, job.finalReduceCounters,
job.fullCounters); job.fullCounters);
Expand Down Expand Up @@ -1797,7 +1802,7 @@ public void transition(JobImpl job, JobEvent event) {
job.setFinishTime(); job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime, 0, 0, 0, 0, 0, 0,
JobStateInternal.KILLED.toString(), job.diagnostics); JobStateInternal.KILLED.toString(), job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobStateInternal.KILLED); job.finished(JobStateInternal.KILLED);
Expand Down Expand Up @@ -1954,8 +1959,8 @@ private static class TaskCompletedTransition implements
@Override @Override
public JobStateInternal transition(JobImpl job, JobEvent event) { public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++; job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event; JobTaskEvent taskEvent = (JobTaskEvent) event;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
Task task = job.tasks.get(taskEvent.getTaskID()); Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) { if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(job, task); taskSucceeded(job, task);
Expand Down Expand Up @@ -1991,11 +1996,15 @@ protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
job.allowedMapFailuresPercent*job.numMapTasks || job.allowedMapFailuresPercent*job.numMapTasks ||
job.failedReduceTaskCount*100 > job.failedReduceTaskCount*100 >
job.allowedReduceFailuresPercent*job.numReduceTasks) { job.allowedReduceFailuresPercent*job.numReduceTasks) {

job.setFinishTime(); job.setFinishTime();


String diagnosticMsg = "Job failed as tasks failed. " + String diagnosticMsg = "Job failed as tasks failed. " +
"failedMaps:" + job.failedMapTaskCount + "failedMaps:" + job.failedMapTaskCount +
" failedReduces:" + job.failedReduceTaskCount; " failedReduces:" + job.failedReduceTaskCount +
" killedMaps:" + job.killedMapTaskCount +
" killedReduces: " + job.killedReduceTaskCount;

LOG.info(diagnosticMsg); LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg); job.addDiagnostic(diagnosticMsg);


Expand Down Expand Up @@ -2226,7 +2235,13 @@ public void transition(JobImpl job, JobEvent event) {
job.setFinishTime(); job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime,
job.succeededMapTaskCount,
job.succeededReduceTaskCount,
job.failedMapTaskCount,
job.failedReduceTaskCount,
job.killedMapTaskCount,
job.killedReduceTaskCount,
jobHistoryString, job.diagnostics); jobHistoryString, job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(terminationState); job.finished(terminationState);
Expand Down Expand Up @@ -2266,4 +2281,24 @@ public int getMaxFetchFailuresNotifications() {
public void setJobPriority(Priority priority) { public void setJobPriority(Priority priority) {
this.jobPriority = priority; this.jobPriority = priority;
} }

@Override
public int getFailedMaps() {
return failedMapTaskCount;
}

@Override
public int getFailedReduces() {
return failedReduceTaskCount;
}

@Override
public int getKilledMaps() {
return killedMapTaskCount;
}

@Override
public int getKilledReduces() {
return killedReduceTaskCount;
}
} }

0 comments on commit d04f85f

Please sign in to comment.