Skip to content

Commit

Permalink
[MR] Be conservative and don't reuse Hadoop objects
Browse files Browse the repository at this point in the history
In Hadoop like environments (like Spark) the objects passed to consumers might be completely
replaced and lead to strange exceptions. For this reason always create a new object under both MR
APIs (old and new)

Fix #338

(cherry picked from commit a8e4813)
  • Loading branch information
costin committed Dec 11, 2014
1 parent 0f46be8 commit ca8efbb
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 43 deletions.
65 changes: 35 additions & 30 deletions mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -249,14 +249,9 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable)
@Override
public boolean nextKeyValue() throws IOException {
// new API call routed to old API
if (currentKey == null) {
currentKey = createKey();
}
if (currentValue == null) {
currentValue = createValue();
}

// FIXME: does the new API mandate a new instance each time (?)
// under the new API always create new objects since consumers can (and sometimes will) modify them
currentKey = createKey();
currentValue = createValue();
return next(currentKey, currentValue);
}

Expand Down Expand Up @@ -330,8 +325,11 @@ public boolean next(K key, V value) throws IOException {
}

Object[] next = scrollQuery.next();
currentKey = setCurrentKey(currentKey, key, next[0]);
currentValue = setCurrentValue(currentValue, value, next[1]);

// NB: the left assignment is not needed since method override
// the writable content however for consistency, they are below
currentKey = setCurrentKey(key, next[0]);
currentValue = setCurrentValue(value, next[1]);

// keep on counting
read++;
Expand All @@ -344,9 +342,23 @@ public boolean next(K key, V value) throws IOException {
@Override
public abstract V createValue();

protected abstract K setCurrentKey(K oldApiKey, K newApiKey, Object object);

protected abstract V setCurrentValue(V oldApiValue, V newApiKey, Object object);
/**
* Sets the current key.
*
* @param hadoopKey hadoop key
* @param object the actual value to read
* @return returns the key to be used; needed in scenario where the key is immutable (like Pig)
*/
protected abstract K setCurrentKey(K hadoopKey, Object object);

/**
* Sets the current value.
*
* @param hadoopValue hadoop value
* @param object the actual value to read
* @return returns the value to be used; needed in scenario where the passed value is immutable (like Pig)
*/
protected abstract V setCurrentValue(V hadoopValue, Object object);

@Override
public long getPos() {
Expand Down Expand Up @@ -385,29 +397,22 @@ public Map<Writable, Writable> createValue() {
}

@Override
protected Text setCurrentKey(Text oldApiKey, Text newApiKey, Object object) {
String val = object.toString();
if (oldApiKey == null) {
oldApiKey = new Text();
oldApiKey.set(val);
}

// new API might not be used
if (newApiKey != null) {
newApiKey.set(val);
protected Text setCurrentKey(Text hadoopKey, Object object) {
if (hadoopKey != null) {
hadoopKey.set(object.toString());
}
return oldApiKey;
return hadoopKey;
}

@SuppressWarnings("unchecked")
@Override
protected Map<Writable, Writable> setCurrentValue(Map<Writable, Writable> oldApiValue, Map<Writable, Writable> newApiKey, Object object) {
Map<Writable, Writable> val = (Map<Writable, Writable>) object;
if (newApiKey != null) {
newApiKey.clear();
newApiKey.putAll(val);
protected Map<Writable, Writable> setCurrentValue(Map<Writable, Writable> hadoopValue, Object object) {
if (hadoopValue != null) {
hadoopValue.clear();
Map<Writable, Writable> val = (Map<Writable, Writable>) object;
hadoopValue.putAll(val);
}
return val;
return hadoopValue;
}
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.util.StringUtils;

@SuppressWarnings("rawtypes")
public class EsPigInputFormat extends EsInputFormat<String, Map> {
Expand All @@ -43,7 +44,7 @@ public PigShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configura

@Override
public String createKey() {
return "";
return StringUtils.EMPTY;
}

@Override
Expand All @@ -52,25 +53,21 @@ public Map createValue() {
}

@Override
protected String setCurrentKey(String oldApiKey, String newApiKey, Object object) {
//oldApiKey = object.toString();
//newApiKey = oldApiKey;
protected String setCurrentKey(String hadoopKey, Object object) {
// cannot override a String content (recipe for disaster)
// in case of Pig, it's okay to return a new object as it's using the new API
return object.toString();
}

@SuppressWarnings("unchecked")
@Override
protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) {
protected Map setCurrentValue(Map hadoopValue, Object object) {
Map map = (Map) object;
if (oldApiValue != null) {
oldApiValue.clear();
oldApiValue.putAll(map);
if (hadoopValue != null) {
hadoopValue.clear();
hadoopValue.putAll(map);
}
else {
oldApiValue = map;
}
//newApiKey = map;
return oldApiValue;
return hadoopValue;
}
}

Expand Down

0 comments on commit ca8efbb

Please sign in to comment.