Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoSuchMethodError when using elasticsearch-spark_2.10 2.2.0 + spark streaming #702

Closed
redlumxn opened this issue Feb 18, 2016 · 1 comment

Comments

@redlumxn
Copy link

Hi,

I have a very simple program using Spark Stream and elasticsearch-spark (2.2.0). While running the program I get the below exception (full output attached). Funny enough when I use elasticsearch-spark (2.1.3) I don't get this error.

java.lang.NoSuchMethodError: org.apache.spark.util.ThreadUtils$.newDaemonCachedThreadPool(Ljava/lang/String;I)Ljava/util/concurrent/ThreadPoolExecutor;

I've also noticed that when I run it locally it spins up Spark 1.6 even though I specify 1.5.1 on my build.sbt file. Again, when I downgrade to elasticsearch-spark (2.1.3) it correctly spins up 1.5.1

16/02/19 01:09:26 INFO SparkContext: Running Spark version 1.6.0

Any idea on what I'm doing wrong? The only reason I want to use elasticsearch-spark (2.2.0) is to be able to use the es.nodes.wan.only as I want to connect to my ElasticSearch instance in AWS and I'm unable to do it with 2.1.3.

build.sbt for reference

name := "ScalaTestProject"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.5.1" % "provided",
  "org.apache.spark" % "spark-streaming_2.10" % "1.5.1" % "provided",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-RC1",
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.2.0"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

Program/code

package au.com.sixtree.scala.spark

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import org.elasticsearch.spark._

object SparkElasticSearchApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SimpleESApp")
      .setMaster("local[2]")
      .set("es.nodes", "xxx.yyy.es.amazonaws.com")
      .set("es.port", "443")
      .set("es.index.auto.create", "true")
      .set("es.nodes.wan.only", "true")
      .set("es.net.ssl", "true")

    val sc = new SparkContext(conf);

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    sc.makeRDD(Seq(numbers, airports)).saveToEs("testindex2/docs");

    val ssc = new StreamingContext(sc, Seconds(5))

    val topicMap = Map("bets" -> 5)
    val kafkaInputDStream = KafkaUtils.createStream(ssc, "192.168.99.100:2181", "spark-scala-cg", topicMap)
    val messagesDStream = kafkaInputDStream.map(tuple => tuple._2)
    messagesDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

Full output:
full_output.txt

@costin
Copy link
Member

costin commented Feb 21, 2016

ES-Spark/Hadoop pulls in Spark 1.6 while you seem to be using Spark 1.5.1. To get around this, exclude the Spark dependency pulled in by ES-Spark or force it to be the same as the rest of your app.
In SBT this is documented here as an example.

P.S. You seem to have some duplicated jars in your path, such as Guava 0.14 and 0.16. Cleaning them up will go a long way going forward.

@costin costin closed this as completed Feb 21, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants