diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java index 8d96ee646..a023b1e85 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java @@ -249,14 +249,9 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) @Override public boolean nextKeyValue() throws IOException { // new API call routed to old API - if (currentKey == null) { - currentKey = createKey(); - } - if (currentValue == null) { - currentValue = createValue(); - } - - // FIXME: does the new API mandate a new instance each time (?) + // under the new API always create new objects since consumers can (and sometimes will) modify them + currentKey = createKey(); + currentValue = createValue(); return next(currentKey, currentValue); } @@ -330,8 +325,11 @@ public boolean next(K key, V value) throws IOException { } Object[] next = scrollQuery.next(); - currentKey = setCurrentKey(currentKey, key, next[0]); - currentValue = setCurrentValue(currentValue, value, next[1]); + + // NB: the left assignment is not needed since method override + // the writable content however for consistency, they are below + currentKey = setCurrentKey(key, next[0]); + currentValue = setCurrentValue(value, next[1]); // keep on counting read++; @@ -344,9 +342,23 @@ public boolean next(K key, V value) throws IOException { @Override public abstract V createValue(); - protected abstract K setCurrentKey(K oldApiKey, K newApiKey, Object object); - - protected abstract V setCurrentValue(V oldApiValue, V newApiKey, Object object); + /** + * Sets the current key. + * + * @param hadoopKey hadoop key + * @param object the actual value to read + * @return returns the key to be used; needed in scenario where the key is immutable (like Pig) + */ + protected abstract K setCurrentKey(K hadoopKey, Object object); + + /** + * Sets the current value. + * + * @param hadoopValue hadoop value + * @param object the actual value to read + * @return returns the value to be used; needed in scenario where the passed value is immutable (like Pig) + */ + protected abstract V setCurrentValue(V hadoopValue, Object object); @Override public long getPos() { @@ -385,29 +397,22 @@ public Map createValue() { } @Override - protected Text setCurrentKey(Text oldApiKey, Text newApiKey, Object object) { - String val = object.toString(); - if (oldApiKey == null) { - oldApiKey = new Text(); - oldApiKey.set(val); - } - - // new API might not be used - if (newApiKey != null) { - newApiKey.set(val); + protected Text setCurrentKey(Text hadoopKey, Object object) { + if (hadoopKey != null) { + hadoopKey.set(object.toString()); } - return oldApiKey; + return hadoopKey; } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { - Map val = (Map) object; - if (newApiKey != null) { - newApiKey.clear(); - newApiKey.putAll(val); + protected Map setCurrentValue(Map hadoopValue, Object object) { + if (hadoopValue != null) { + hadoopValue.clear(); + Map val = (Map) object; + hadoopValue.putAll(val); } - return val; + return hadoopValue; } } diff --git a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java index e929136df..0e0766948 100644 --- a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java +++ b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.elasticsearch.hadoop.util.StringUtils; @SuppressWarnings("rawtypes") public class EsPigInputFormat extends EsInputFormat { @@ -43,7 +44,7 @@ public PigShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configura @Override public String createKey() { - return ""; + return StringUtils.EMPTY; } @Override @@ -52,25 +53,21 @@ public Map createValue() { } @Override - protected String setCurrentKey(String oldApiKey, String newApiKey, Object object) { - //oldApiKey = object.toString(); - //newApiKey = oldApiKey; + protected String setCurrentKey(String hadoopKey, Object object) { + // cannot override a String content (recipe for disaster) + // in case of Pig, it's okay to return a new object as it's using the new API return object.toString(); } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { + protected Map setCurrentValue(Map hadoopValue, Object object) { Map map = (Map) object; - if (oldApiValue != null) { - oldApiValue.clear(); - oldApiValue.putAll(map); + if (hadoopValue != null) { + hadoopValue.clear(); + hadoopValue.putAll(map); } - else { - oldApiValue = map; - } - //newApiKey = map; - return oldApiValue; + return hadoopValue; } }