Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Classloader conflict when using elasticsink #71

Closed
boeboe opened this issue Oct 18, 2016 · 7 comments
Closed

Classloader conflict when using elasticsink #71

boeboe opened this issue Oct 18, 2016 · 7 comments

Comments

@boeboe
Copy link
Contributor

boeboe commented Oct 18, 2016

Hi,

I am trying out the stream-reactor elastic search sink. Currently I get an error when starting the plugin:

[2016-10-18 15:51:10,490] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769)
[2016-10-18 15:51:10,490] INFO

    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
       ________           __  _      _____ _       __
      / ____/ /___ ______/ /_(_)____/ ___/(_)___  / /__
     / __/ / / __ `/ ___/ __/ / ___/\__ \/ / __ \/ //_/
    / /___/ / /_/ (__  ) /_/ / /__ ___/ / / / / / ,<
   /_____/_/\__,_/____/\__/_/\___//____/_/_/ /_/_/|_|


by Andrew Stevenson
       (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask:36)
[2016-10-18 15:51:10,490] INFO ElasticSinkConfig values:
        connect.elastic.url = ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com:9300
        connect.elastic.url.prefix = elasticsearch
        connect.elastic.cluster.name = elasticsearch-kafka
        connect.elastic.export.route.query = INSERT INTO text SELECT * FROM text
 (com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfig:178)
[2016-10-18 15:51:10,531] INFO [Inertia] modules [], plugins [], sites [] (org.elasticsearch.plugins:180)
[2016-10-18 15:51:10,539] ERROR Task elasticsearch-sink-connector-text-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
        at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
        at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
        at com.sksamuel.elastic4s.ElasticClient$.transport(ElasticClient.scala:111)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticWriter$.apply(ElasticWriter.scala:43)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask.start(ElasticSinkTask.scala:56)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I was wondering if you also have seen this problem or have any clue where the source of the classloader conflict lies...?

Best regards,
Bart

@andrewstevenson
Copy link
Collaborator

Yes. There's a version issue. How are you setting the classpath? Its easier to communicate on our slack channel.

@boeboe
Copy link
Contributor Author

boeboe commented Oct 19, 2016

Hi @andrewstevenson

I have provided a fix in the following pull request:
#73

@gvdm90
Copy link

gvdm90 commented May 17, 2017

Hi,

I'm getting the same error with the latest vesion of stream-reactor (0.2.5-3.2.0). Did you already know about this regression?

@gvdm90
Copy link

gvdm90 commented May 17, 2017

I confirm that this bug is still opened on the master branch. I compiled the elasticsearch jar and when trying to run the connector I get this exception

[2017-05-17 15:22:11,375] INFO Setting task configurations for 1 workers. (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkConnector:45)
[2017-05-17 15:22:11,396] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824)
[2017-05-17 15:22:11,397] INFO

    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
       ________           __  _      _____ _       __
      / ____/ /___ ______/ /_(_)____/ ___/(_)___  / /__
     / __/ / / __ `/ ___/ __/ / ___/\__ \/ / __ \/ //_/
    / /___/ / /_/ (__  ) /_/ / /__ ___/ / / / / / ,<
   /_____/_/\__,_/____/\__/_/\___//____/_/_/ /_/_/|_|


by Andrew Stevenson
       (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask:38)
[2017-05-17 15:22:11,398] INFO ElasticSinkConfig values:
        connect.elastic.cluster.name = elasticsearch
        connect.elastic.error.throw = true
        connect.elastic.sink.kcql = INSERT INTO text SELECT * FROM text
        connect.elastic.url = elastic:9200
        connect.elastic.url.prefix = elasticsearch
        connect.elastic.write.timeout = 300000
 (com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfig:180)
[2017-05-17 15:22:11,483] INFO [Cloak] modules [], plugins [], sites [] (org.elasticsearch.plugins:180)
[2017-05-17 15:22:11,489] ERROR Task elastic-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
        at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
        at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
        at com.sksamuel.elastic4s.ElasticClient$.transport(ElasticClient.scala:112)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticWriter$.apply(ElasticWriter.scala:40)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask.start(ElasticSinkTask.scala:58)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2017-05-17 15:22:11,493] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

I tried changing the kafka-connect-elastic/build.gradle file like in the linked commit but nothing changed:

	compile("com.sksamuel.elastic4s:elastic4s-core_$scalaMajorVersion:$elastic4sVersion") {
		exclude group: "org.scala-lang", module: "scala-library"
		exclude group: "org.scalactic", module: "scalactic_2.11"
-	}
+		exclude group: 'com.google.guava', module: 'guava'
+	}
+	compile group: 'com.google.guava', name: 'guava', version: '19.0'
	compile "com.sun.jna:jna:$jnaVersion"

@stheppi stheppi reopened this May 17, 2017
@stheppi
Copy link
Collaborator

stheppi commented May 17, 2017

When you start your connect distributed to you export the classpath?

Can you please move the folder with elasticsearch provided from confluent in Confluent-HOME/share/java/kafka-connect-elasticsearch if you don't set the classpath. Then try again

Please join our slack channel. Will be easier

@gvdm90
Copy link

gvdm90 commented May 17, 2017

Ok, with the help of the community I fixed this issue. It seems that there is a version mismatch in the com.google.guava.guava dependency between the ElasticSearch connector and the Cassandra connector. In fact if I delete the Cassandra connector and then run start-connect everything works.

@andrewstevenson
Copy link
Collaborator

closing as kafka 0.11 provides classloader isolation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants