Skip to content

Commit

Permalink
[MR] HeartBeat should handle MR with infinite timeout
Browse files Browse the repository at this point in the history
relates #426
  • Loading branch information
costin committed Apr 19, 2015
1 parent 1e209e0 commit e0aabd9
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java
Expand Up @@ -36,17 +36,20 @@ class HeartBeat {

private ScheduledExecutorService scheduler;
private final Progressable progressable;
private final TimeValue rate;
private final TimeValue delay;
private final Log log;
private final String id;

HeartBeat(final Progressable progressable, Configuration cfg, TimeValue delay, final Log log) {
HeartBeat(final Progressable progressable, Configuration cfg, TimeValue lead, final Log log) {
Assert.notNull(progressable, "a valid progressable is required to report status to Hadoop");
TimeValue tv = HadoopCfgUtils.getTaskTimeout(cfg);
Assert.isTrue(tv.getSeconds() > delay.getSeconds(), "Hadoop timeout is shorter than the heartbeat");

Assert.isTrue(tv.getSeconds() <= 0 || tv.getSeconds() > lead.getSeconds(), "Hadoop timeout is shorter than the heartbeat");

this.progressable = progressable;
this.rate = new TimeValue(tv.getMillis() - delay.getMillis(), TimeUnit.MILLISECONDS);
long cfgMillis = (tv.getMillis() > 0 ? tv.getMillis() : 0);
// the task is simple hence the delay = timeout - lead, that is when to start the notification right before the timeout
this.delay = new TimeValue(Math.abs(cfgMillis - lead.getMillis()), TimeUnit.MILLISECONDS);
this.log = log;

String taskId;
Expand All @@ -73,7 +76,7 @@ void start() {
log.trace(String.format("Starting heartbeat for %s", id));
}

scheduler.schedule(new Runnable() {
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (log != null && log.isTraceEnabled()) {
Expand All @@ -82,7 +85,7 @@ public void run() {
progressable.progress();
}
// start the reporter before timing out
}, rate.getMillis(), TimeUnit.MILLISECONDS);
}, delay.getMillis(), delay.getMillis(), TimeUnit.MILLISECONDS);
}

void stop() {
Expand Down

0 comments on commit e0aabd9

Please sign in to comment.