diff --git a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java index ced2b042e..2d2a01deb 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java @@ -203,12 +203,12 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) this.client = partitionReader.client; this.queryBuilder = partitionReader.queryBuilder; - this.progressable = progressable; + this.progressable = progressable; - // in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed - if (progressable != null) { - beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log); - } + // in Hadoop-like envs (Spark) the progressable might be null and thus the heart-beat is not needed + if (progressable != null) { + beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log); + } if (log.isDebugEnabled()) { log.debug(String.format("Initializing RecordReader for [%s]", esSplit)); @@ -218,14 +218,11 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) @Override public boolean nextKeyValue() throws IOException { // new API call routed to old API - if (currentKey == null) { - currentKey = createKey(); - } - if (currentValue == null) { - currentValue = createValue(); - } + // under the new API always create new objects since consumers can (and sometimes will) modify them + + currentKey = createKey(); + currentValue = createValue(); - // FIXME: does the new API mandate a new instance each time (?) return next(currentKey, currentValue); } @@ -280,9 +277,9 @@ public void close() throws IOException { @Override public boolean next(K key, V value) throws IOException { if (scrollQuery == null) { - if (beat != null) { - beat.start(); - } + if (beat != null) { + beat.start(); + } scrollQuery = queryBuilder.build(client, scrollReader); size = scrollQuery.getSize(); @@ -299,8 +296,11 @@ public boolean next(K key, V value) throws IOException { } Object[] next = scrollQuery.next(); - currentKey = setCurrentKey(currentKey, key, next[0]); - currentValue = setCurrentValue(currentValue, value, next[1]); + + // NB: the left assignment is not needed since method override + // the writable content however for consistency, they are below + currentKey = setCurrentKey(key, next[0]); + currentValue = setCurrentValue(value, next[1]); // keep on counting read++; @@ -313,9 +313,23 @@ public boolean next(K key, V value) throws IOException { @Override public abstract V createValue(); - protected abstract K setCurrentKey(K oldApiKey, K newApiKey, Object object); - - protected abstract V setCurrentValue(V oldApiValue, V newApiKey, Object object); + /** + * Sets the current key. + * + * @param hadoopKey hadoop key + * @param object the actual value to read + * @return returns the key to be used; needed in scenario where the key is immutable (like Pig) + */ + protected abstract K setCurrentKey(K hadoopKey, Object object); + + /** + * Sets the current value. + * + * @param hadoopValue hadoop value + * @param object the actual value to read + * @return returns the value to be used; needed in scenario where the passed value is immutable (like Pig) + */ + protected abstract V setCurrentValue(V hadoopValue, Object object); @Override public long getPos() { @@ -354,29 +368,22 @@ public Map createValue() { } @Override - protected Text setCurrentKey(Text oldApiKey, Text newApiKey, Object object) { - String val = object.toString(); - if (oldApiKey == null) { - oldApiKey = new Text(); - oldApiKey.set(val); - } - - // new API might not be used - if (newApiKey != null) { - newApiKey.set(val); + protected Text setCurrentKey(Text hadoopKey, Object object) { + if (hadoopKey != null) { + hadoopKey.set(object.toString()); } - return oldApiKey; + return hadoopKey; } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { - Map val = (Map) object; - if (newApiKey != null) { - newApiKey.clear(); - newApiKey.putAll(val); + protected Map setCurrentValue(Map hadoopValue, Object object) { + if (hadoopValue != null) { + hadoopValue.clear(); + Map val = (Map) object; + hadoopValue.putAll(val); } - return val; + return hadoopValue; } } diff --git a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java index e929136df..8bb3fc33d 100644 --- a/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java +++ b/pig/src/main/java/org/elasticsearch/hadoop/pig/EsPigInputFormat.java @@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.elasticsearch.hadoop.util.StringUtils; + @SuppressWarnings("rawtypes") public class EsPigInputFormat extends EsInputFormat { @@ -43,7 +45,7 @@ public PigShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configura @Override public String createKey() { - return ""; + return StringUtils.EMPTY; } @Override @@ -52,25 +54,21 @@ public Map createValue() { } @Override - protected String setCurrentKey(String oldApiKey, String newApiKey, Object object) { - //oldApiKey = object.toString(); - //newApiKey = oldApiKey; + protected String setCurrentKey(String hadoopKey, Object object) { + // cannot override a String content (recipe for disaster) + // in case of Pig, it's okay to return a new object as it's using the new API return object.toString(); } @SuppressWarnings("unchecked") @Override - protected Map setCurrentValue(Map oldApiValue, Map newApiKey, Object object) { + protected Map setCurrentValue(Map hadoopValue, Object object) { Map map = (Map) object; - if (oldApiValue != null) { - oldApiValue.clear(); - oldApiValue.putAll(map); - } - else { - oldApiValue = map; + if (hadoopValue != null) { + hadoopValue.clear(); + hadoopValue.putAll(map); } - //newApiKey = map; - return oldApiValue; + return hadoopValue; } }