New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.io.NotSerializableException: org.apache.spark.SparkContext #298

Closed
lairdnote opened this Issue Oct 13, 2014 · 3 comments

Comments

Projects
None yet
3 participants
@lairdnote

lairdnote commented Oct 13, 2014

I use spark streaming and ES
my code:

 val sparkConf = new SparkConf(true)
    .setAppName("KafkaNetWork")
    .set("es.index.auto.create", "true")
    .set("es.nodes", "127.0.0.1")
    .set("spark.serializer", classOf[KryoSerializer].getName)

    val sc = new SparkContext(sparkConf)

  val ssc =  new StreamingContext(sc, Seconds(2))

  ssc.checkpoint("checkpoint")

  val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2).cache()
  val esregex = s"""(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) (\\S+) (\\S+) \\[(.*)\\] "(.*?)" (\\d{3}) (\\S+) "(.*?)" "(.*?)" (.*?) (\\d+?)""".r


  val esRdd = lines.flatMap(_.split("\\n"))
              .map{
                case esregex(ip, client, user, dataTime, request, status, bytes, referer, agent, hit , origintime) => Map("ip" -> ip,"client"->client, "user" -> user, "dateTime" -> dataTime, "request" -> request, "status" -> status, "bytes" -> bytes, "referer"->referer, "agent" -> agent, "hit"-> hit, "origintime"-> origintime)
                case line => Map("message"-> line)
              }
              .map{x=> {
              val rdd = sc.makeRDD(Seq(x))
              EsSpark.saveToEs(rdd,"spark/docs")
              }
              }.print()

ssc.start()
ssc.awaitTermination()

ERROR:

14/10/13 19:03:46 ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext
java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
    at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
    at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
    at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

result already store to ES ..

@costin

This comment has been minimized.

Member

costin commented Oct 13, 2014

@lairdnote if you look into the stacktrace, you'll notice there's no es-hadoop in there. The error is caused by the fact that SparkContext is not serializable and somewhere, there's a reference kept to it by an entity that is sent over the wire. It's best to take this further with the spark team through their mailing list.

Speaking of which, this is a bug tracker not a mailing list - when you encounter issues (whether they are configuration issues or the like) please raise them up with the mailing list first and if you are sure or have confirmation is a bug than raise an issue. I'm mentioning this since so far you've raised 3 issues, all invalid.

@costin costin closed this Oct 13, 2014

@lairdnote

This comment has been minimized.

lairdnote commented Oct 14, 2014

I am so sorry .. thanks for you help .

@kohliu

This comment has been minimized.

kohliu commented Jul 3, 2015

@costin - Thanks for your help - this was a weird one to catch !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment