Skip to content

Commit

Permalink
[MR] Be conservative and don't reuse Hadoop objects
Browse files Browse the repository at this point in the history
In Hadoop like environments (like Spark) the objects passed to consumers might be completely
replaced and lead to strange exceptions. For this reason always create a new object under both MR
APIs (old and new)

Fix #338
  • Loading branch information
costin committed Dec 11, 2014
1 parent 989eb8c commit a8e4813
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 50 deletions.
81 changes: 44 additions & 37 deletions mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -203,12 +203,12 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable)
this.client = partitionReader.client;
this.queryBuilder = partitionReader.queryBuilder;

this.progressable = progressable;
this.progressable = progressable;

// in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed
if (progressable != null) {
beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log);
}
// in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed
if (progressable != null) {
beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log);
}

if (log.isDebugEnabled()) {
log.debug(String.format("Initializing RecordReader for [%s]", esSplit));
Expand All @@ -218,14 +218,11 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable)
@Override
public boolean nextKeyValue() throws IOException {
// new API call routed to old API
if (currentKey == null) {
currentKey = createKey();
}
if (currentValue == null) {
currentValue = createValue();
}
// under the new API always create new objects since consumers can (and sometimes will) modify them

currentKey = createKey();
currentValue = createValue();

// FIXME: does the new API mandate a new instance each time (?)
return next(currentKey, currentValue);
}

Expand Down Expand Up @@ -280,9 +277,9 @@ public void close() throws IOException {
@Override
public boolean next(K key, V value) throws IOException {
if (scrollQuery == null) {
if (beat != null) {
beat.start();
}
if (beat != null) {
beat.start();
}

scrollQuery = queryBuilder.build(client, scrollReader);
size = scrollQuery.getSize();
Expand All @@ -299,8 +296,11 @@ public boolean next(K key, V value) throws IOException {
}

Object[] next = scrollQuery.next();
currentKey = setCurrentKey(currentKey, key, next[0]);
currentValue = setCurrentValue(currentValue, value, next[1]);

// NB: the left assignment is not needed since method override
// the writable content however for consistency, they are below
currentKey = setCurrentKey(key, next[0]);
currentValue = setCurrentValue(value, next[1]);

// keep on counting
read++;
Expand All @@ -313,9 +313,23 @@ public boolean next(K key, V value) throws IOException {
@Override
public abstract V createValue();

protected abstract K setCurrentKey(K oldApiKey, K newApiKey, Object object);

protected abstract V setCurrentValue(V oldApiValue, V newApiKey, Object object);
/**
* Sets the current key.
*
* @param hadoopKey hadoop key
* @param object the actual value to read
* @return returns the key to be used; needed in scenario where the key is immutable (like Pig)
*/
protected abstract K setCurrentKey(K hadoopKey, Object object);

/**
* Sets the current value.
*
* @param hadoopValue hadoop value
* @param object the actual value to read
* @return returns the value to be used; needed in scenario where the passed value is immutable (like Pig)
*/
protected abstract V setCurrentValue(V hadoopValue, Object object);

@Override
public long getPos() {
Expand Down Expand Up @@ -354,29 +368,22 @@ public Map<Writable, Writable> createValue() {
}

@Override
protected Text setCurrentKey(Text oldApiKey, Text newApiKey, Object object) {
String val = object.toString();
if (oldApiKey == null) {
oldApiKey = new Text();
oldApiKey.set(val);
}

// new API might not be used
if (newApiKey != null) {
newApiKey.set(val);
protected Text setCurrentKey(Text hadoopKey, Object object) {
if (hadoopKey != null) {
hadoopKey.set(object.toString());
}
return oldApiKey;
return hadoopKey;
}

@SuppressWarnings("unchecked")
@Override
protected Map<Writable, Writable> setCurrentValue(Map<Writable, Writable> oldApiValue, Map<Writable, Writable> newApiKey, Object object) {
Map<Writable, Writable> val = (Map<Writable, Writable>) object;
if (newApiKey != null) {
newApiKey.clear();
newApiKey.putAll(val);
protected Map<Writable, Writable> setCurrentValue(Map<Writable, Writable> hadoopValue, Object object) {
if (hadoopValue != null) {
hadoopValue.clear();
Map<Writable, Writable> val = (Map<Writable, Writable>) object;
hadoopValue.putAll(val);
}
return val;
return hadoopValue;
}
}

Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.util.StringUtils;


@SuppressWarnings("rawtypes")
public class EsPigInputFormat extends EsInputFormat<String, Map> {
Expand All @@ -43,7 +45,7 @@ public PigShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configura

@Override
public String createKey() {
return "";
return StringUtils.EMPTY;
}

@Override
Expand All @@ -52,25 +54,21 @@ public Map createValue() {
}

@Override
protected String setCurrentKey(String oldApiKey, String newApiKey, Object object) {
//oldApiKey = object.toString();
//newApiKey = oldApiKey;
protected String setCurrentKey(String hadoopKey, Object object) {
// cannot override a String content (recipe for disaster)
// in case of Pig, it's okay to return a new object as it's using the new API
return object.toString();
}

@SuppressWarnings("unchecked")
@Override
protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) {
protected Map setCurrentValue(Map hadoopValue, Object object) {
Map map = (Map) object;
if (oldApiValue != null) {
oldApiValue.clear();
oldApiValue.putAll(map);
}
else {
oldApiValue = map;
if (hadoopValue != null) {
hadoopValue.clear();
hadoopValue.putAll(map);
}
//newApiKey = map;
return oldApiValue;
return hadoopValue;
}
}

Expand Down

0 comments on commit a8e4813

Please sign in to comment.