Skip to content
This repository has been archived by the owner on Jan 13, 2022. It is now read-only.

Commit

Permalink
Fix bug in detecting dead job tracker.
Browse files Browse the repository at this point in the history
Summary:
The job tracker reporter thread in the corona task tracker should
periodically attempt a heartbeat to the corona job tracker to detect
a dead job tracker. The job tracker reporter has logic to heartbeat
every 3 minutes even if there are no tasks running for that job, but
there is a bug in that logic which is fixed by this diff.

Also:
1. Set a name for the job tracker reporter thread. This will help in
   debugging.
1. prefix log messages from job tracker reporter with the reporter name.

Test Plan:
unit tests

Revert Plan:

Reviewers: dms, aching, pyang

Reviewed By: dms
  • Loading branch information
rvadali authored and Alex Feinberg committed Jul 7, 2012
1 parent cc6a14d commit f01e2af
Showing 1 changed file with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,13 @@ class JobTrackerReporter extends Thread {
long heartbeatInterval = 3000L;
short heartbeatResponseId = -1;
TaskTrackerStatus status = null;
final String name;
JobTrackerReporter(RunningJob rJob, InetSocketAddress jobTrackerAddr,
String sessionHandle) {
this.rJob = rJob;
this.jobTrackerAddr = jobTrackerAddr;
this.sessionHandle = sessionHandle;
this.name = "JobTrackerReporter(" + rJob.getJobID() + ")";
}
volatile boolean shuttingDown = false;
@Override
Expand Down Expand Up @@ -362,14 +364,14 @@ public void run() {
String jobTrackerBV = jobClient.getBuildVersion();
if(doCheckBuildVersion() &&
!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
String msg = name + " shutting down. Incompatible buildVersion." +
"\nJobTracker's: " + jobTrackerBV +
"\nTaskTracker's: "+ VersionInfo.getBuildVersion();
LOG.error(msg);
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
LOG.info("Problem reporting to jobtracker: " + e);
LOG.info(name + " problem reporting to jobtracker: " + e);
}
shuttingDown = true;
return;
Expand All @@ -396,12 +398,13 @@ public void run() {
sendCounters, status, tipsInSession, jobTrackerAddr);
}
}
if (!tipsInSession.isEmpty()) {
if (!tipsInSession.isEmpty() ||
now - lastHeartbeat > SLOW_HEARTBEAT_INTERVAL) {
// Send heartbeat only when there is at least one running tip in
// this session
// this session, or we have reached the slow heartbeat interval.

LOG.info("JobTracker heartbeat:" + jobTrackerAddr.toString() +
" hearbeatId:" + heartbeatResponseId + " " + status.toString());
LOG.info(name + " heartbeat:" + jobTrackerAddr.toString() +
" hearbeatId:" + heartbeatResponseId + " " + status.toString());

HeartbeatResponse heartbeatResponse = transmitHeartBeat(
jobClient, heartbeatResponseId, status);
Expand All @@ -423,26 +426,26 @@ public void run() {

}
} catch (DiskErrorException de) {
String msg = "Exiting task tracker for disk error:\n" +
String msg = name + " exiting for disk error:\n" +
StringUtils.stringifyException(de);
LOG.error(msg);
try {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
} catch (IOException exp) {
LOG.error("Cannot report TaskTracker failure");
LOG.error(name + " cannot report TaskTracker failure");
}
} catch (IOException e) {
LOG.error("Error report to JobTracker:" + jobTrackerAddr +
" sessionHandle:" + sessionHandle, e);
LOG.error(name + " error in reporting to " + jobTrackerAddr, e);
// JobTracker is dead. Purge the job.
// Or it will timeout this task.
// Treat the task as killed
purgeSession(this.sessionHandle);
} catch (InterruptedException e) {
LOG.info("JobTrackerReporter interrupted");
LOG.info(name + " interrupted");
}
}

private void connect() throws IOException {
try {
jobClient = RPC.waitForProtocolProxy(
Expand All @@ -453,13 +456,12 @@ private void connect() throws IOException {
jtConnectTimeoutMsec).getProxy();
rJob.setJobClient(jobClient);
} catch (IOException e) {
LOG.error("Failed to connect to JobTracker:" +
jobTrackerAddr + " sessionHandle:" + sessionHandle, e);
LOG.error(name + " failed to connect to " + jobTrackerAddr, e);
throw e;
}
}
public void shutdown() {
LOG.info("Shutting down reporter to JobTracker " + this.jobTrackerAddr);
LOG.info(name + " shutting down");
// shutdown RPC connections
RPC.stopProxy(jobClient);
shuttingDown = true;
Expand Down Expand Up @@ -613,6 +615,7 @@ protected RunningJob createRunningJob(JobID jobId, TaskInProgress tip)
RunningJob rJob = new RunningJob(jobId, null, info);
JobTrackerReporter reporter = new JobTrackerReporter(
rJob, info.getJobTrackerAddr(), info.getSessionHandle());
reporter.setName("JobTrackerReporter for " + jobId);
// Start the heartbeat to the jobtracker
reporter.start();
jobTrackerReporters.put(jobId, reporter);
Expand Down

0 comments on commit f01e2af

Please sign in to comment.