From a8e4813001d37b1613e3411547c0970512753006 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 11 Dec 2014 17:45:49 +0200 Subject: [PATCH] [MR] Be conservative and don't reuse Hadoop objects In Hadoop like environments (like Spark) the objects passed to consumers might be completely replaced and lead to strange exceptions. For this reason always create a new object under both MR APIs (old and new) Fix #338 --- .../hadoop/mr/EsInputFormat.java | 81 ++++++++++--------- .../hadoop/pig/EsPigInputFormat.java | 24 +++--- 2 files changed, 55 insertions(+), 50 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java index ced2b042e..2d2a01deb 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java @@ -203,12 +203,12 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) this.client = partitionReader.client; this.queryBuilder = partitionReader.queryBuilder; - this.progressable = progressable; + this.progressable = progressable; - // in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed - if (progressable != null) { - beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log); - } + // in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed + if (progressable != null) { + beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log); + } if (log.isDebugEnabled()) { log.debug(String.format("Initializing RecordReader for [%s]", esSplit)); @@ -218,14 +218,11 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) @Override public boolean nextKeyValue() throws IOException { // new API call routed to old API - if (currentKey == null) { - currentKey = createKey(); - } - if (currentValue == null) { - currentValue = createValue(); - } + // under the new API always create new objects since consumers can (and sometimes will) modify them + + currentKey = createKey(); + currentValue = createValue(); - // FIXME: does the new API mandate a new instance each time (?) return next(currentKey, currentValue); } @@ -280,9 +277,9 @@ public void close() throws IOException { @Override public boolean next(K key, V value) throws IOException { if (scrollQuery == null) { - if (beat != null) { - beat.start(); - } + if (beat != null) { + beat.start(); + } scrollQuery = queryBuilder.build(client, scrollReader); size = scrollQuery.getSize(); @@ -299,8 +296,11 @@ public boolean next(K key, V value) throws IOException { } Object[] next = scrollQuery.next(); - currentKey = setCurrentKey(currentKey, key, next[0]); - currentValue = setCurrentValue(currentValue, value, next[1]); + + // NB: the left assignment is not needed since method override + // the writable content however for consistency, they are below + currentKey = setCurrentKey(key, next[0]); + currentValue = setCurrentValue(value, next[1]); // keep on counting read++; @@ -313,9 +313,23 @@ public boolean next(K key, V value) throws IOException { @Override public abstract V createValue(); - protected abstract K setCurrentKey(K oldApiKey, K newApiKey, Object object); - - protected abstract V setCurrentValue(V oldApiValue, V newApiKey, Object object); + /** + * Sets the current key. + * + * @param hadoopKey hadoop key + * @param object the actual value to read + * @return returns the key to be used; needed in scenario where the key is immutable (like Pig) + */ + protected abstract K setCurrentKey(K hadoopKey, Object object); + + /** + * Sets the current value. + * + * @param hadoopValue hadoop value + * @param object the actual value to read + * @return returns the value to be used; needed in scenario where the passed value is immutable (like Pig) + */ + protected abstract V setCurrentValue(V hadoopValue, Object object); @Override public long getPos() { @@ -354,29 +368,22 @@ public Map createValue() { } @Override - protected Text setCurrentKey(Text oldApiKey, Text newApiKey, Object object) { - String val = object.toString(); - if (oldApiKey == null) { - oldApiKey = new Text(); - oldApiKey.set(val); - } - - // new API might not be used - if (newApiKey != null) { - newApiKey.set(val); + protected Text setCurrentKey(Text hadoopKey, Object object) { + if (hadoopKey != null) { + hadoopKey.set(object.toString()); } - return oldApiKey; + return hadoopKey; } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { - Map val = (Map) object; - if (newApiKey != null) { - newApiKey.clear(); - newApiKey.putAll(val); + protected Map setCurrentValue(Map hadoopValue, Object object) { + if (hadoopValue != null) { + hadoopValue.clear(); + Map val = (Map) object; + hadoopValue.putAll(val); } - return val; + return hadoopValue; } } diff --git a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java index e929136df..8bb3fc33d 100644 --- a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java +++ b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java @@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.elasticsearch.hadoop.util.StringUtils; + @SuppressWarnings("rawtypes") public class EsPigInputFormat extends EsInputFormat { @@ -43,7 +45,7 @@ public PigShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configura @Override public String createKey() { - return ""; + return StringUtils.EMPTY; } @Override @@ -52,25 +54,21 @@ public Map createValue() { } @Override - protected String setCurrentKey(String oldApiKey, String newApiKey, Object object) { - //oldApiKey = object.toString(); - //newApiKey = oldApiKey; + protected String setCurrentKey(String hadoopKey, Object object) { + // cannot override a String content (recipe for disaster) + // in case of Pig, it's okay to return a new object as it's using the new API return object.toString(); } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { + protected Map setCurrentValue(Map hadoopValue, Object object) { Map map = (Map) object; - if (oldApiValue != null) { - oldApiValue.clear(); - oldApiValue.putAll(map); - } - else { - oldApiValue = map; + if (hadoopValue != null) { + hadoopValue.clear(); + hadoopValue.putAll(map); } - //newApiKey = map; - return oldApiValue; + return hadoopValue; } }