Skip to content

Commit 21033ca

Browse files
committed
Improve taskId detection in EsOutputFormat
Relates #280 (cherry picked from commit 145ad76)
1 parent 11e558b commit 21033ca

File tree

3 files changed

+22
-18
lines changed

3 files changed

+22
-18
lines changed

mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ protected void init() throws IOException {
179179
}
180180

181181
private int detectCurrentInstance(Configuration conf) {
182-
TaskID taskID = TaskID.forName(HadoopCfgUtils.getTaskId(conf));
182+
TaskID taskID = HadoopCfgUtils.getTaskID(conf);
183183

184184
if (taskID == null) {
185185
log.warn(String.format("Cannot determine task id - redirecting writes in a random fashion"));

mr/src/main/java/org/elasticsearch/hadoop/mr/HadoopCfgUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.mapred.JobConf;
26+
import org.apache.hadoop.mapred.TaskAttemptID;
27+
import org.apache.hadoop.mapred.TaskID;
28+
import org.elasticsearch.hadoop.util.StringUtils;
2629
import org.elasticsearch.hadoop.util.unit.TimeValue;
2730

2831
/**
@@ -137,4 +140,20 @@ public static JobConf asJobConf(Configuration cfg) {
137140
public static String getMapValueClass(Configuration cfg) {
138141
return get(cfg, "mapred.mapoutput.value.class", "mapreduce.map.output.value.class");
139142
}
143+
144+
public static TaskID getTaskID(Configuration cfg) {
145+
// first try with the attempt since some Hadoop versions mix the two
146+
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
147+
if (StringUtils.hasText(taskAttemptId)) {
148+
return TaskAttemptID.forName(taskAttemptId).getTaskID();
149+
}
150+
else {
151+
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
152+
// double-check task id bug in Hadoop 2.5.x
153+
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
154+
return TaskID.forName(taskIdProp);
155+
}
156+
}
157+
return null;
158+
}
140159
}

mr/src/main/java/org/elasticsearch/hadoop/mr/HeartBeat.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424

2525
import org.apache.commons.logging.Log;
2626
import org.apache.hadoop.conf.Configuration;
27-
import org.apache.hadoop.mapred.TaskAttemptID;
2827
import org.apache.hadoop.mapred.TaskID;
2928
import org.apache.hadoop.util.Progressable;
3029
import org.elasticsearch.hadoop.util.Assert;
31-
import org.elasticsearch.hadoop.util.StringUtils;
3230
import org.elasticsearch.hadoop.util.unit.TimeValue;
3331

3432
/**
@@ -51,22 +49,9 @@ class HeartBeat {
5149
this.rate = new TimeValue(tv.getMillis() - delay.getMillis(), TimeUnit.MILLISECONDS);
5250
this.log = log;
5351

54-
TaskID taskID = null;
55-
// first try with the attempt since some Hadoop versions mix the two
56-
57-
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
58-
if (StringUtils.hasText(taskAttemptId)) {
59-
taskID = TaskAttemptID.forName(taskAttemptId).getTaskID();
60-
}
61-
else {
62-
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
63-
// double-check task id bug in Hadoop 2.5.x
64-
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
65-
taskID = TaskID.forName(taskIdProp);
66-
}
67-
}
68-
6952
String taskId;
53+
TaskID taskID = HadoopCfgUtils.getTaskID(cfg);
54+
7055
if (taskID == null) {
7156
log.warn("Cannot determine task id...");
7257
taskId = "<unknown>";

0 commit comments

Comments
 (0)