Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fields in tuples are generated out of order in pig interface #119

Closed
bwmeier opened this issue Dec 18, 2013 · 4 comments
Closed

Fields in tuples are generated out of order in pig interface #119

bwmeier opened this issue Dec 18, 2013 · 4 comments

Comments

@bwmeier
Copy link

bwmeier commented Dec 18, 2013

The fields in the tuples generated by the Elasticsearch query are ordered according to the JSON field order returned by Elasticsearch, not the order specified in the pig script. This causes errors because of mapping the wrong columns to the MR jobs in the compiled script.

Also, the JSON order is apparently non-deterministic, at least in 0.90.3. This results in tuples that are not in a consistent order during processing, and not in a consistent order from execution to execution.

The following document:

{
  "A": "value A",
  "B": "value B",
  "C": "value C"
}

when queried with the mapping

LOAD 'index/type' USING org.elasticsearch.hadoop.pig.ESStorage('es.query:?q=A:*') AS (A:chararray, B:chararray, C:chararray);

could return any of the following tuples depending on the ordering of the JSON:

- (value A,value B,value C)
- (value B,value A,value C)
- (value C,value A,value B)
- (value A,value C,value B)
- (value B,value C,value A)
- (value C,value B,value A)
@bwmeier
Copy link
Author

bwmeier commented Dec 18, 2013

The following diff is a pig specific solution, and probably not the best way of fixing the issue, but it does the job for now.

diff --git a/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java b/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
index acdfb6c..13f2da8 100644
--- a/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
+++ b/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
@@ -88,6 +88,7 @@ public class ESStorage extends LoadFunc implements LoadPushDown, StoreFuncInterf
     private RecordReader<String, Map<?, ?>> reader;
     private RecordWriter<Object, Object> writer;
     private PigTuple pigTuple;
+    private String[] projection = null;

     public ESStorage() {
         this(new String[0]);
@@ -218,6 +219,8 @@ public class ESStorage extends LoadFunc implements LoadPushDown, StoreFuncInterf
         Settings settings = SettingsManager.loadFrom(cfg);

         if (settings.getScrollFields() != null) {
+            projection = settings.getScrollFields().split(",");
+            log.info("Settings setLocation - scroll fields preset: " + Arrays.toString(projection));
             return;
         }

@@ -270,6 +273,10 @@ public class ESStorage extends LoadFunc implements LoadPushDown, StoreFuncInterf
             }
             cfg.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, fields);
         }
+        if (fields!=null) {
+            projection = fields.split(",");
+            log.info("Settings setLocation - scroll fields found: " + Arrays.toString(projection));
+        }
     }


@@ -303,10 +310,17 @@ public class ESStorage extends LoadFunc implements LoadPushDown, StoreFuncInterf
             Map dataMap = reader.getCurrentValue();
             Tuple tuple = TupleFactory.getInstance().newTuple(dataMap.size());

-            int i = 0;
-            Set<Entry<?,?>> entrySet = dataMap.entrySet();
-            for (Map.Entry entry : entrySet) {
-                tuple.set(i++, entry.getValue());
+            if (projection != null && projection.length > 0) {
+                for (int i = 0; i < projection.length; i++) {
+                    tuple.set(i, dataMap.get(projection[i]));
+                }
+            }
+            else {
+                int i = 0;
+                Set<Entry<?,?>> entrySet = dataMap.entrySet();
+                for (Map.Entry entry : entrySet) {
+                    tuple.set(i++, entry.getValue());
+                }
             }

             if (trace) {

@costin
Copy link
Member

costin commented Dec 18, 2013

@bwmeier Thanks for the report, I'm looking into it. From the ES side, the fields are returned in the order requested or, if all fields are requested, in alphabetical order no matter what the original document or mapping specifies. Not sure whether 0.90.3 had a different behaviour in this regards, but 0.90.7 shouldn't.

Back to the topic at hand, the mapping order should be used, no matter the wire order - currently the code expect the same order (thus the use of iteration instead of lookup) which I'm going to correct.

@costin costin closed this as completed in 917c432 Dec 19, 2013
@costin
Copy link
Member

costin commented Dec 19, 2013

Hi,

I've pushed a fix for the ordering. A nightly build is pending and it will be available in Maven in about 15-20 minutes (until all the tests pass). I'll let you know once it's there. As an alternative you can build master.

@costin
Copy link
Member

costin commented Dec 19, 2013

@bwmeier nightly build finally pushed to Maven.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants