Skip to content

Commit

Permalink
fallback from task attempt to task id
Browse files Browse the repository at this point in the history
in case the task id information is not present, log existing props for better debugging...
relates #148
  • Loading branch information
costin committed Feb 24, 2014
1 parent f01d387 commit cf49ea2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
13 changes: 9 additions & 4 deletions src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand Down Expand Up @@ -212,11 +212,16 @@ protected void init() throws IOException {
}

private int detectCurrentInstance(Configuration conf) {
TaskAttemptID attempt = TaskAttemptID.forName(HadoopCfgUtils.getTaskAttemptId(conf));
Assert.notNull(attempt,
TaskID taskID = TaskID.forName(HadoopCfgUtils.getTaskId(conf));

if (taskID == null) {
log.error(String.format("Cannot determine task id - current properties are %s", HadoopCfgUtils.asProperties(conf)));
}

Assert.notNull(taskID,
"Unable to determine task id - please report your distro/setting through the issue tracker");

return attempt.getTaskID().getId();
return taskID.getId();
}

@Override
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java
Expand Up @@ -18,6 +18,9 @@
*/
package org.elasticsearch.hadoop.mr;

import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.elasticsearch.hadoop.util.unit.TimeValue;

Expand Down Expand Up @@ -67,6 +70,10 @@ public static String getTaskAttemptId(Configuration cfg) {
return get(cfg, "mapreduce.task.attempt.id", "mapred.task.id");
}

public static String getTaskId(Configuration cfg) {
return get(cfg, "mapreduce.task.id", "mapred.tip.id");
}

public static String getReduceTasks(Configuration cfg) {
return get(cfg, "mapreduce.job.reduces", "mapred.reduce.tasks");
}
Expand All @@ -87,6 +94,18 @@ public static TimeValue getTaskTimeout(Configuration cfg) {
return TimeValue.parseTimeValue(get(cfg, "mapreduce.task.timeout", "mapred.task.timeout"));
}

public static Properties asProperties(Configuration cfg) {
Properties props = new Properties();

if (cfg != null) {
for (Map.Entry<String, String> entry : cfg) {
props.setProperty(entry.getKey(), entry.getValue());
}
}

return props;
}

private static String get(Configuration cfg, String hadoop2, String hadoop1) {
String prop = cfg.get(hadoop2);
return (prop != null ? prop : (hadoop1 != null ? cfg.get(hadoop1) : null));
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java
Expand Up @@ -24,7 +24,7 @@

import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.unit.TimeValue;
Expand All @@ -48,7 +48,16 @@ class HeartBeat {
this.progressable = progressable;
this.rate = new TimeValue(tv.getMillis() - delay.getMillis(), TimeUnit.MILLISECONDS);
this.log = log;
this.id = TaskAttemptID.forName(HadoopCfgUtils.getTaskAttemptId(cfg)).getTaskID().toString();
TaskID taskID = TaskID.forName(HadoopCfgUtils.getTaskId(cfg));

if (taskID == null) {
log.error(String.format("Cannot determine task id - current properties are %s", HadoopCfgUtils.asProperties(cfg)));
}

Assert.notNull(taskID,
"Unable to determine task id - please report your distro/setting through the issue tracker");

this.id = taskID.toString();
}


Expand Down

0 comments on commit cf49ea2

Please sign in to comment.