Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IP is not a supported type #144

Closed
fedesilva opened this Issue Feb 20, 2014 · 16 comments

Comments

Projects
None yet
3 participants
@fedesilva
Copy link

fedesilva commented Feb 20, 2014

Using an EsInputFormat with a query to an index with a field with type IP is not supported. Additionally it throws an exception even if the field will not be "selected".

 UnsupportedOperationException: field IP not supported yet

In my particular use case I do not even need that field and I am not fetching it in the query. ( see below ) so I would expect the field with unsupported type to be ignored.

For IP types if the field was needed it could be useful to get a string back or even a 4-Tuple (though I do not know if hadoop supports that). For other unsupported types (if any other) a string representation could be provided or at least - as stated above - ignored if the field is not retrieved.

Versions

es-hadoop: 1.3.0.M2
spark: the binary release spark-0.9.0-incubating-bin-cdh4

Mapping for the index

{
  "logs": {
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "dateOptionalTime"
      },
      "@version": {
        "type": "string"
      },
      "body_bytes_sent": {
        "type": "long"
      },
      "upstream": {
        "properties": {
          "addr": {
            "type": "string"
          },
          "cache_status": {
            "type": "string"
          },
          "response_time": {
            "type": "double"
          },
          "status": {
            "type": "long"
          }
        }
      },
      "cookie": {
        "properties": {
          "listenerdjid": {
            "type": "string"
          },
          "userid": {
            "type": "string"
          }
        }
      },
      "file": {
        "type": "string"
      },
      "from": {
        "type": "string"
      },
      "geoip": {
        "properties": {
          "area_code": {
            "type": "long"
          },
          "city_name": {
            "type": "string"
          },
          "continent_code": {
            "type": "string"
          },
          "country_code2": {
            "type": "string"
          },
          "country_code3": {
            "type": "string"
          },
          "country_name": {
            "type": "string"
          },
          "dma_code": {
            "type": "long"
          },
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "double"
          },
          "longitude": {
            "type": "double"
          },
          "postal_code": {
            "type": "string"
          },
          "real_region_name": {
            "type": "string"
          },
          "region_name": {
            "type": "string"
          },
          "timezone": {
            "type": "string"
          }
        }
      },
      "host": {
        "type": "string"
      },
      "http_referer": {
        "type": "string"
      },
      "http_user_agent": {
        "type":   "multi_field",
        "fields": {
          "http_user_agent": { 
            "type": "string",
            "index": "not_analyzed"
          },
          "tokenized": { "type": "string" }
        }
      },
      "message": {
        "type": "string"
      },
      "offset": {
        "type": "string"
      },
      "postdata": {
        "properties": {
          "request_body": {
            "type": "string"
          },
          "request_completion": {
            "type": "string"
          },
          "request_length": {
            "type": "long"
          }
        }
      },
      "remote_addr": {
        "type": "ip"
      },
      "remote_user": {
        "type": "string"
      },
      "request": {
        "type": "string"
      },
      "request_method": {
        "type": "string"
      },
      "request_querystring": {
        "type": "string"
      },
      "request_uri": {
        "type": "string"
      },
      "status": {
        "type": "long"
      },
      "tags": {
        "type": "string"
      },
      "ua": {
        "properties": {
          "build": {
            "type": "string",
            "index": "not_analyzed"
          },
          "device": {
            "type": "string",
            "index": "not_analyzed"
          },
          "major": {
            "type": "string",
            "index": "not_analyzed"
          },
          "minor": {
            "type": "string",
            "index": "not_analyzed"
          },
          "name": {
            "type": "string",
            "index": "not_analyzed"
          },
          "os": {
            "type": "string",
            "index": "not_analyzed"
          },
          "os_major": {
            "type": "string",
            "index": "not_analyzed"
          },
          "os_minor": {
            "type": "string",
            "index": "not_analyzed"
          },
          "os_name": {
            "type": "string",
            "index": "not_analyzed"
          },
          "patch": {
            "type": "string",
            "index": "not_analyzed"

          }
        }
      }
    }
  }
}

Query

 {      
       "query": {
         "match_all": {}
       },
       "fields": [
          "btng.listenerdjid"
       ]
   }

Sample result

{
   "took": 109,
   "timed_out": false,
   "_shards": {
      "total": 6,
      "successful": 6,
      "failed": 0
   },
   "hits": {
      "total": 23218094,
      "max_score": 1,
      "hits": [
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "qjvOxnZeSF2ZQkhgEpjbcQ",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12414527"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "Nh8ODbVsRY2Pcpf3vivqRw",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "11734641"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "qFErT3SjQzSn5rRiPDNnaQ",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "-45997920"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "jp2-k75JQceMl1rFh8gdSQ",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12133256"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "wZzN8QPOQNmrHNTQdKSyLQ",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "11718434"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "qZ2zj6-hRNmBhEkWDiv8lQ",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12085263"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "yM_3TnpATbCa7Lz6b35WjA",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "11405650"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "Dl226BDxQmWpxDg1fSj_TA",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12374676"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "f9CSi2Y_TGmpfQnLHUdLgA",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12352681"
            }
         },
         {
            "_index": "logstash-batanga-radio-2014.02.18",
            "_type": "logs",
            "_id": "fjcnG2RgTvmtkWdZ0Oa_Xg",
            "_score": 1,
            "fields": {
               "btng.listenerdjid": "12503624"
            }
         }
      ]
   }
}

How to reproduce

I downloaded the es-hadoop jar and the spark distro and started spark-shell with this command:

ADD_JARS=../elasticsearch-hadoop-1.3.0.M2.jar SPARK_CLASSPATH=../elasticsearch-hadoop-1.3.0.M2.jar ./bin/spark-shell

In the shell I introduced this code:

import org.elasticsearch.hadoop.mr._
import org.apache.hadoop.mapred._

val q = """

{      
  "query": {
     "match_all": {}
  },
  "fields": [
    "btng.listenerdjid"
  ]
}
"""


val conf = new JobConf()
conf.set("es.resource", "logstash-batanga-radio-2014.02.18/logs")
conf.set("es.query", q)

// Creates a Spark RDD from the ES index/query.
// ( I just started playing with spark so this may not be the right way to get the data in )
val data = sc.hadoopRDD(conf, classOf[EsInputFormat[String,String]], classOf[String], classOf[String], 6)

data foreach println
/**
java.lang.UnsupportedOperationException: field IP not supported yet
        at org.elasticsearch.hadoop.serialization.FieldType.isRelevant(FieldType.java:84)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:148)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:160)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:160)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:160)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:160)
        at org.elasticsearch.hadoop.rest.dto.mapping.Field.parseField(Field.java:65)
        at org.elasticsearch.hadoop.rest.RestRepository.getMapping(RestRepository.java:224)
        at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:391)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:898)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:588)
        ... [main and repl part of the trace omited] ...
*/
@rbraley

This comment has been minimized.

Copy link

rbraley commented Feb 21, 2014

+1

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 21, 2014

Hi @fedesilva

First of all, thanks for the detailed report - if only all issues would be like this one :)

Regarding the mapping, we currently read the whole one for several reasons - the main one being that we are driven by Hadoop not Elasticsearch. The former might request fields that do not exist in ES hence asking just for particular mappings might not work out.
In case of Cascading, Hive, Pig there's always a mapping associated (unsure about Spark) so we use that to do 'projection' - however, we'll try to optimize this and avoid some of the problems that might occur with a complicated mapping.

I'm starting to look into the issue and add support for IP types.

Cheers,

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 21, 2014

Hi, @costin , thanks for looking into it so fast.

Regarding the mapping, I understand the need to read the whole mapping. Could you ignore those fields and blow up late and only if the field is requested? That would allow one to proceed if the field is not requested while still doing the right thing: signaling that this field is not usable. Anyway, you are the one who knows best.

If you make a branch and want me to package and test something just ping me. If "we" can make this work there is a lot of code I will not need to write :)

Thanks!

costin added a commit that referenced this issue Feb 21, 2014

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 21, 2014

Pushed a fix into master - can you check it out and see whether it fixes things for you?
Thanks!

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 21, 2014

Building in my laptop; will take a bit. Will report ASAP.

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 21, 2014

Ok, the fix appears to work. I am now getting another exception that seems to be unrelated.
I think now I need to go and read more on spark or verify the settings, classpath, etc.

I'm pasting the exception so you can confirm it's not related and close the issue.

data foreach println
/**
java.lang.NoSuchMethodError: org.apache.commons.codec.binary.Base64.encodeBase64([BZZ)[B
    at org.elasticsearch.hadoop.util.IOUtils.serializeToBase64(IOUtils.java:45)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:420)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:898)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:588)
*/
@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 21, 2014

Interesting - can you check your classpath. It looks like you have an older version of apache commons than expected.

On 21/02/2014 6:35 PM, federico silva wrote:

Ok, the fix appears to work. I am now getting another exception that seems to be unrelated.
I think now I need to go and read more on spark or verify the settings, classpath, etc.

I'm pasting the exception so you can confirm it's not related and close the issue.

data foreach println
/**
java.lang.NoSuchMethodError: org.apache.commons.codec.binary.Base64.encodeBase64([BZZ)[B
at org.elasticsearch.hadoop.util.IOUtils.serializeToBase64(IOUtils.java:45)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:420)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:898)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:588)
*/


Reply to this email directly or view it on GitHub
#144 (comment).

Costin

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 21, 2014

Yes, I guessed so. Have not had time yet today to look into it. Later or tomorrow.
Will report back.

Thanks!

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 21, 2014

By the way, I've pushed an improvement to master which removes the apache commons dependency - the current master should work w/o hiccups in your previous environment.

Cheers,

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 22, 2014

Great! Spark's build produces a fat jar with all the dependencies copied into it. That is not a problem!
I'm still getting exceptions but I would close this issue as it seems to be solved regarding the IP type.

For reference here is a gist with the exception; code is the same in this ticket.
https://gist.github.com/fedesilva/9155335

If you are interested maybe I can open a more general Spark related ticket and I will report there.
This whole interaction has been a reinforcement to my pleasure working with Elasticsearch!

Thanks for your help.

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 22, 2014

Please do open an issue - maybe one to track overall Spark integration and several minor ones for the actual issues encountered.
It looks like the issue is related to UTF8 encoding from the mapping (maybe some of the fields contain special characters)? I'm not sure why it occurs since it's a response from Elasticsearch which means it should be UTF8.
Anyway, looks like something that can and should be fixed.
Can you please enable TRACE logging (through log4j.properties) for org.elasticsearch.hadoop.mr and org.elasticsearch.hadoop.rest and org.elasticsearch.hadoop.serialization package? - you could just do it for the whole org.elasticserach.hadoop package.

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 22, 2014

By the way, I've fixed the issue in master (see #147). Can you please try it out and let me know how it goes?

Thanks!

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 24, 2014

Hi, @costin, the fix works. Now I am stuck with the problem @rbraley describes in #148.

Later today I will open a umbrella ticket to track Spark integration.

Thanks you!

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

have you tried the latest master? Try the fix it applies and if that doesn't work, please post the properties logged (see the comments in #148).

@fedesilva

This comment has been minimized.

Copy link
Author

fedesilva commented Feb 24, 2014

Yes, just built it and re-run the code here. Same setup.
Pasting properties there.

@costin costin added v1.3.0.M3 and removed bug labels Feb 24, 2014

@costin

This comment has been minimized.

Copy link
Member

costin commented Feb 24, 2014

Closing current issue.

@costin costin closed this Feb 24, 2014

costin added a commit that referenced this issue Apr 8, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.