From 145ad76e413186c5a2d57c61e2300008e7c847e3 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 26 Sep 2014 10:41:39 +0300 Subject: [PATCH] Improve taskId detection in EsOutputFormat Relates #280 --- .../hadoop/mr/EsOutputFormat.java | 2 +- .../hadoop/mr/HadoopCfgUtils.java | 19 +++++++++++++++++++ .../elasticsearch/hadoop/mr/HeartBeat.java | 19 ++----------------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java index 17ab7b40c..98077792c 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java @@ -261,7 +261,7 @@ private void initMultiIndices(Settings settings, int currentInstance) throws IOE } private int detectCurrentInstance(Configuration conf) { - TaskID taskID = TaskID.forName(HadoopCfgUtils.getTaskId(conf)); + TaskID taskID = HadoopCfgUtils.getTaskID(conf); if (taskID == null) { log.warn(String.format("Cannot determine task id - redirecting writes in a random fashion")); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java index f0ce1d8a8..b0b733f0a 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.elasticsearch.hadoop.util.StringUtils; import org.elasticsearch.hadoop.util.unit.TimeValue; /** @@ -137,4 +140,20 @@ public static JobConf asJobConf(Configuration cfg) { public static String getMapValueClass(Configuration cfg) { return get(cfg, "mapred.mapoutput.value.class", "mapreduce.map.output.value.class"); } + + public static TaskID getTaskID(Configuration cfg) { + // first try with the attempt since some Hadoop versions mix the two + String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg); + if (StringUtils.hasText(taskAttemptId)) { + return TaskAttemptID.forName(taskAttemptId).getTaskID(); + } + else { + String taskIdProp = HadoopCfgUtils.getTaskId(cfg); + // double-check task id bug in Hadoop 2.5.x + if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) { + return TaskID.forName(taskIdProp); + } + } + return null; + } } \ No newline at end of file diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java index 2c7a42b1b..53b2e8361 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java @@ -24,11 +24,9 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.util.Progressable; import org.elasticsearch.hadoop.util.Assert; -import org.elasticsearch.hadoop.util.StringUtils; import org.elasticsearch.hadoop.util.unit.TimeValue; /** @@ -51,22 +49,9 @@ class HeartBeat { this.rate = new TimeValue(tv.getMillis() - delay.getMillis(), TimeUnit.MILLISECONDS); this.log = log; - TaskID taskID = null; - // first try with the attempt since some Hadoop versions mix the two - - String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg); - if (StringUtils.hasText(taskAttemptId)) { - taskID = TaskAttemptID.forName(taskAttemptId).getTaskID(); - } - else { - String taskIdProp = HadoopCfgUtils.getTaskId(cfg); - // double-check task id bug in Hadoop 2.5.x - if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) { - taskID = TaskID.forName(taskIdProp); - } - } - String taskId; + TaskID taskID = HadoopCfgUtils.getTaskID(cfg); + if (taskID == null) { log.warn("Cannot determine task id..."); taskId = "";