Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.cache() on an RDD causes RDDBlockId class not found #344

Closed
jeffsteinmetz opened this issue Dec 23, 2014 · 10 comments

Comments

Projects
None yet
8 participants
@jeffsteinmetz
Copy link

commented Dec 23, 2014

[I did not run into this issue with Beta3, only SNAPSHOT]

It appears to happen only when I call .cache() or persist() on an esRDD and then save back to Elasticsearch.

Calling a spark action such as collect, first or take does not cause the issue, only saveToEs

_Odd thing_ is if I add a Spark Action such as first or take, ie. println(saveToESRDD.first()) right before saving back to ES, the issue does not arise. If I remove .cache(), issue does not arise.

Here is a simplified version of what I can use to reproduce.

val someRDD = sc.esRDD(someIndex,someQuery) 
val anotherRDD = sc.esRDD(someOtherIndex,someOtherQuery)
val joinedRDD = someRDD.leftOuterJoin(anotherRDD) 
joinedRDD.cache()

// save back to elastic search    
val saveToESRDD = valueRDD.map { j: (collection.Map[String, Any], Option[collection.Map[String, Any]]) => makeMap(j) }
//println(saveToESRDD.first())
saveToESRDD.saveToEs("sometestindex/somedocument")

Trace is:

14/12/22 21:47:51 INFO Version: Elasticsearch Hadoop v2.1.0.BUILD-SNAPSHOT [8d9ba42a1c]
[error] (Driver Heartbeater) java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId
java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.util.Utils$.deserialize(Utils.scala:81)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$9$$anonfun$apply$7.apply(Executor.scala:361)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$9$$anonfun$apply$7.apply(Executor.scala:355)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$9.apply(Executor.scala:355)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$9.apply(Executor.scala:353)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:353)
@jeffsteinmetz

This comment has been minimized.

Copy link
Author

commented Jan 27, 2015

was anybody ever able to reproduce this?

@jkleckner

This comment has been minimized.

Copy link

commented Feb 3, 2015

This might be off-base since it is for a different project (I landed here via a search), but I found a similar problem that was solved by bumping the Spark version to 1.2.0 AND the akka version to 2.3.9.

@elmalto

This comment has been minimized.

Copy link

commented Jul 7, 2015

I am still running into this on 1.2.1 and 1.3.1. Any fixes?

@ssimeonov

This comment has been minimized.

Copy link

commented Sep 21, 2015

I am seeing this with Spark jobs that have nothing to do with ES. It is very likely a Spark issue.

@michaelmalak

This comment has been minimized.

Copy link

commented Sep 27, 2015

It appears to be a long-standing Spark bug that just got a Jira ticket last week. https://issues.apache.org/jira/browse/SPARK-10722

@costin

This comment has been minimized.

Copy link
Member

commented Oct 28, 2015

@michaelmalak thanks for confirming. Closing the issue.

@costin costin closed this Oct 28, 2015

@sebastiennoir

This comment has been minimized.

Copy link

commented Dec 13, 2015

Same issue for me, everytime with current basic code :

  object Word2VecBasic {

    def main(args: Array[String]):Unit ={
      import org.apache.spark._
      import org.apache.spark.rdd._
      import org.apache.spark.SparkContext._
      import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

      val sc = new SparkContext(new SparkConf().setAppName("basicTest")
        .setMaster("local[8]"))

      val input = sc.textFile("text8").map(line => line.split(" ").toSeq)

      val word2vec = new Word2Vec()

      val model = word2vec.fit(input)
    }
  }

Extract of stack trace :

15/12/13 13:53:57 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
  at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
  at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:497)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
  at org.apache.spark.util.Utils$.deserialize(Utils.scala:103)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:432)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:423)
  at scala.Option.foreach(Option.scala:236)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:423)
  at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:421)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:421)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:464)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:464)
  at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:464)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
  at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:464)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
@sebastiennoir

This comment has been minimized.

Copy link

commented Dec 13, 2015

Forgot to mention dependencies :

name := "Sparkling"

version := "0.1"

scalaVersion := "2.10.6"

libraryDependencies ++= Seq(
  // same problem with this...
  // "org.apache.spark"  % "spark-core_2.10"              % "1.5.2" % "provided",
  // "org.apache.spark"  % "spark-mllib_2.10"             % "1.5.2",

  //... or this
  "org.apache.spark"  % "spark-core_2.10"              % "1.4.1" % "provided",
  "org.apache.spark"  % "spark-mllib_2.10"             % "1.4.1",

  "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly()
  )

resolvers += "Scala-Tools Maven2 Snapshots Repository" at "http://scala-tools.org/repo-snapshots"
resolvers += "repo-for-dsiutils" at "http://ir.dcs.gla.ac.uk/~bpiwowar/maven/"
resolvers += "oracle" at "http://download.oracle.com/maven"
@costin

This comment has been minimized.

Copy link
Member

commented Dec 14, 2015

@sebastiennoir it looks like a Spark problem (both your snippet and dependencies and in particular the stacktrace do not contain any ES-Spark/Hadoop code).
From what I can tell, in case of an error, Spark closes its classloaders and thus prevents any more class loading which ends up obfuscating the initial error with ClassNotFound (as part of the error reporting/logging). Maybe you're running into this... maybe not.

@rvpradeep

This comment has been minimized.

Copy link

commented Feb 23, 2016

I had the same issue but assembling the project made it go away.
sbt assembly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.