Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark SQL 1.3 - Exception in Elasticsearch when executing JOIN with DataFrame created from Oracle's table #449

Closed
difin opened this issue May 12, 2015 · 6 comments

Comments

@difin
Copy link

difin commented May 12, 2015

Hi,

Environment: Spark 1.3.0
Elasticsearch: 1.4.4
Elasticsearch-Hadoop: 2.1.0.Beta4
Oracle Express 11g XE

I am trying to run a join between 2 DataFrames, one from Elasticsearch and another one from Oracle and getting exception as below.
This issue only happens when loading Elasticsearch data using Spark SQL "load" function. It doesn't happen when loading using esDF function. However, when using esDF function, mappings of columns to column names is broken (issue: #451)

com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.elasticsearch.spark.sql.ScalaEsRow
    at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1050)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1062)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:66)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Code:

    //=======================================================================
    // Preparing Elasticsearch DataFrame
    //=======================================================================

    val sqlContext = new SQLContext(sc)
    val hours = sqlContext.load("summary/hours", "org.elasticsearch.spark.sql")
    hours.foreach (x => println(x))

    //=======================================================================
    // Preparing Oracle DataFrame
    //=======================================================================

    val users = sqlContext.load("jdbc", Map("url" -> dbConnectionString,
                                        "dbtable" -> "sats.users",
                                        "driver"  -> "oracle.jdbc.driver.OracleDriver"))
    users.foreach (x => println(x))

    //=======================================================================
    // Joining ES and Oracle DataFrames
    //=======================================================================

    val hoursAug = hours.join(users, hours("User") === users("USERNAME"))
    hoursAug.foreach (x => println(x))

The elements of Elasticsearch and Oracle DataFrames are printed successfully on the screen (first two foreach{...}) and the exceptions happens when join is executed.

Can you please advise if this is a bug and if yes, is there any workaround?

Thanks,
Dmitriy Fingerman

@difin
Copy link
Author

difin commented May 14, 2015

Update: the same Exception with JOIN happens when joining 2 DataFrames loaded from Elasticsearch as well, so it is not related to Oracle:

val hours = sqlContext.load("summary/hours", "org.elasticsearch.spark.sql")
hours.registerTempTable("HOURS_DF")

val days = sqlContext.load("summary/days", "org.elasticsearch.spark.sql")
days.registerTempTable("DAYS_DF")

val hoursAug = sqlContext.sql("SELECT H.Hour, D.Day " +
                                  "FROM HOURS_DF H, DAYS_DF D " +
                                  "WHERE H.User = D.User")
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.elasticsearch.spark.sql.ScalaEsRow
    at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1050)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1062)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
    at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
    at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@costin
Copy link
Member

costin commented May 17, 2015

Hi @FDmitriy ,

This looks indeed like a bug - I've pushed a fix (and the new snapshot builds) to address it - can you please give a try and report back? it might address #451 or not - haven't had time to look into it yet.

@difin
Copy link
Author

difin commented May 19, 2015

Hi Costin,

Thank's a lot for the fix.
I tested it with 2.1.0.Beta4 and the error message was the same as before, then I tested with 2.1.0.BUILD-SNAPSHOT and got the following Exception:

com.esotericsoftware.kryo.KryoException: Class cannot be created (non-static member class): scala.collection.convert.Wrappers$JListWrapper
Serialization trace:
rowOrder (org.elasticsearch.spark.sql.ScalaEsRow)
    at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1048)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1062)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:66)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:121)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:121)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

costin added a commit that referenced this issue Jun 8, 2015
Better control over the internal data structure in EsScalaRow to help
Kryo serialize/deserialize the data
relates #449
@costin
Copy link
Member

costin commented Jun 8, 2015

@FDmitriy I have tried to reproduce the error to no avail (with a local, locally-remote and fully remote Spark cluster). I even called the Kryo serialization directly with a ScalaEsRow instance and it still worked.
Either way, I've changed the internal class used with one that should be friendlier serialization-wise. I have also pushed the latest dev build into maven; can you please try the latest snapshot and report back whether the problem persists or not?

Thanks,

@difin
Copy link
Author

difin commented Jun 22, 2015

Hi Costin,

I tried it again with 2.1.0.rc1 and development snapshot versions and the issue didn't occur.
Thanks for fixing it!

@costin
Copy link
Member

costin commented Jun 22, 2015

Thanks for confirming. Cheers!

@costin costin closed this as completed Jun 22, 2015
@costin costin added v2.1.0 and removed v2.2.0-m1 labels Jun 22, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants