Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.UnsupportedOperationException caused by org.elasticsearch.hadoop.mr.EsInputFormat ? #338

Closed
lrhazi opened this issue Dec 10, 2014 · 6 comments

Comments

@lrhazi
Copy link

lrhazi commented Dec 10, 2014

Trying to figure out why this is always crashing.. someone pointed out might be a bug in
EsInputFormat ?

14/12/10 15:46:49 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/pyspark/worker.py", line 75, in main
    command = pickleSer._read_with_length(infile)
  File "/spark/python/pyspark/serializers.py", line 146, in _read_with_length
    length = read_int(stream)
  File "/spark/python/pyspark/serializers.py", line 464, in read_int
    raise EOFError
EOFError

        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
        at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
        at java.util.AbstractMap.put(AbstractMap.java:203)
        at java.util.AbstractMap.putAll(AbstractMap.java:273)
        at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:373)
        at org.elasticsearch.hadoop.mr.EsInputFormat$WritableShardRecordReader.setCurrentValue(EsInputFormat.java:322)
        at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:299)
        at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:227)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
        at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
@lrhazi
Copy link
Author

lrhazi commented Dec 10, 2014

I tried elasticsearch-hadoop-2.1.0.Beta3.jar and with the latest snapshot.

@costin
Copy link
Member

costin commented Dec 10, 2014

Can you provide some background on what you are trying to do? What version of Spark and Python are you using? I assume you are trying to interact with ES through the Map/Reduce layer instead of the Java/Scala API correct?
Did you report the bug somewhere else (you mentioned someone pointed you here - she was right) is that discussion somewhere only - wondering whether there's some extra information in there that might prove useful.

Thanks!

@lrhazi
Copy link
Author

lrhazi commented Dec 10, 2014

OK, am newbie, not quite sure what I am doing, so please bear with me. Someone on spark mailing list suggested it could be a bug in EsInputFormat

Versions:
spark-1.1.1-bin-hadoop2.4.tgz
Python 2.7.6

am trying to learn how to use PySpark to process data stored in Elasticseach. This is my code based on some examples I could find around:

I start iPython as follows:

[ root@rap-es:/spark ]$ IPYTHON=1 /spark/bin/pyspark --jars     /spark/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT/dist/elasticsearch-hadoop-  2.1.0.BUILD-SNAPSHOT.jar                           

I run this with no errors:

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource" : "ar_2005/doc",
        "es.nodes":"rap-es2.uis.georgetown.edu",
        "es.query" :  """{"query":{"match_all":{}},"fields":["title"] }"""
        }
    )

I run this with no errors also:

es_rdd.take(1)[0]

which prints my document title.

Then I try this which gives the exceptions I had pasted above:

from operator import add

titles=es_rdd.map(lambda d: d[1]['title'][0])
counts = titles.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)                                                                                                                                      
output = counts.collect()    

@costin
Copy link
Member

costin commented Dec 10, 2014

I think I've found the problem and pushed a nightly build with it. Can you please download it and try it out ? it's on the 2.1.0.BUILD-SNAPSHOT which you can read about in the docs, here

Thanks!

@lrhazi
Copy link
Author

lrhazi commented Dec 11, 2014

Great! Now I get a much more helpful exception.. thanks a lot.

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/spark/python/pyspark/rdd.py", line 1989, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/spark/python/pyspark/rdd.py", line 1989, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/spark/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/spark/python/pyspark/rdd.py", line 1575, in combineLocally
    merger.mergeValues(iterator)
  File "/spark/python/pyspark/shuffle.py", line 245, in mergeValues
    for k, v in iterator:
  File "<ipython-input-3-82d9d23333f8>", line 1, in <lambda>
KeyError: 'title'

@costin
Copy link
Member

costin commented Dec 11, 2014

Thanks for the feedback. I assume the issue is fixed on the es-hadoop side. If that's not the case, please reopen the issue. By the way, I recommend using the native Scala/Java integration instead of the M/R layer as the former is much more performant.

@costin costin closed this as completed Dec 11, 2014
@costin costin added the :MR label Dec 11, 2014
costin added a commit that referenced this issue Dec 11, 2014
In Hadoop like environments (like Spark) the objects passed to consumers might be completely
replaced and lead to strange exceptions. For this reason always create a new object under both MR
APIs (old and new)

Fix #338

(cherry picked from commit a8e4813)
costin added a commit that referenced this issue Dec 11, 2014
In Hadoop like environments (like Spark) the objects passed to consumers might be completely
replaced and lead to strange exceptions. For this reason always create a new object under both MR
APIs (old and new)

Fix #338
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants