Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming job fails due to connection shutdown #116

Open
atfire opened this issue Oct 2, 2017 · 5 comments
Open

Streaming job fails due to connection shutdown #116

atfire opened this issue Oct 2, 2017 · 5 comments

Comments

@atfire
Copy link

atfire commented Oct 2, 2017

We have a spark rabbitmq streamin job which is occasionally failing due to the exception:

java.lang.Exception: An error happen while getting next delivery: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.org$apache$spark$streaming$rabbitmq$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:96) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:73) Caused by: com.rabbitmq.client.ShutdownSignalException: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:201) at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:218) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$4.apply(RabbitMQInputDStream.scala:91) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$4.apply(RabbitMQInputDStream.scala:91) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.org$apache$spark$streaming$rabbitmq$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:91) ... 1 more

I see a similar issue #104 but we've checked and time on our servers is in sync.

Can we enable auto recovery for spark connections or is it already enabled? What else can be done to prevent this?

@nelsou
Copy link
Contributor

nelsou commented Oct 2, 2017

RabbitMQ sucks with Spark Streaming.

  • Solution 1 => Use Kafka
  • Solution 2 => you will have to live with that kind of errors (and many more ...)

@atfire
Copy link
Author

atfire commented Oct 3, 2017

I think connections keep closing due to inactivity. Is there a way to pass heartbeat interval to connection factory?

@atfire
Copy link
Author

atfire commented Oct 4, 2017

@nelsou can you tell which issues did you run into with rabbit + spark streaming?

@nelsou
Copy link
Contributor

nelsou commented Oct 4, 2017

too many ...

  • timeouts
  • connection closed
  • messages lost
  • poor performance (when overload)
  • ....

@haixuan8192
Copy link

org.apache.spark.SparkException: Error creating channel and connection: connection is already closed due to connection error; cause: java.io.EOFException at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:211) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:243) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.(RabbitMQRDD.scala:166) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:143) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

The program runs normally, sometimes with this error, but does not affect the program run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants