Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer fails to re-bind to queue after connection to rabbitmq was closed/re-established. #72

Closed
jschaul opened this issue Mar 14, 2016 · 11 comments

Comments

@jschaul
Copy link

jschaul commented Mar 14, 2016

UPDATE: see #72 (comment) for explanation/workaround of the problem. Original thread left for reference below.


Either op-rabbit or the underlying libraries seem to cause an unrecoverable crash for op-rabbit based consumers after the rabbitmq instance is unavailable (e.g. crashed itself) for a short period of time. Once the actual RabbitMQ is back up, the expected reconnect does not happen for the consumers. Instead the following errors occurred (see stack traces below)

This happened with version 1.2.1 of op-rabbit.

Is there a way to handle this and try to reconnect?

logger name: akka.actor.OneForOneStrategy
full stacktrace:

java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1092) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1052) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.setupSubscription(AsyncAckingRabbitConsumer.scala:97) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
    at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer$$anonfun$connected$1.applyOrElse(AsyncAckingRabbitConsumer.scala:47) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at com.spingo.op_rabbit.impl.AsyncAckingRabbitConsumer.aroundReceive(AsyncAckingRabbitConsumer.scala:11) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.actor.ActorCell.invoke(ActorCell.scala:495) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na]#012Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1090) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:548) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:497) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
    at com.thenewmotion.akka.rabbitmq.RabbitMqActor$class.closeIfOpen(RabbitMqActor.scala:27) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
    at com.thenewmotion.akka.rabbitmq.ChannelActor.closeIfOpen(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
    at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:115) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
    at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:112) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
    at akka.actor.FSM$class.processEvent(FSM.scala:654) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at com.thenewmotion.akka.rabbitmq.ChannelActor.processEvent(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:648) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:642) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
    at com.thenewmotion.akka.rabbitmq.ChannelActor.aroundReceive(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
... 9 common frames omitted

followed by a
logger name: com.spingo.op_rabbit.SubscriptionActor
message: An error while trying to bind a consumer to <queue-name>
full stacktrace:

java.io.IOException: null
  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:921) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:936) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.spingo.op_rabbit.Binding$$anon$2$$anonfun$declare$2.apply(Binding.scala:71) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at com.spingo.op_rabbit.Binding$$anon$2$$anonfun$declare$2.apply(Binding.scala:71) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at scala.collection.immutable.List.foreach(List.scala:381) ~[org.scala-lang.scala-library-2.11.7.jar:na]
  at com.spingo.op_rabbit.Binding$$anon$2.declare(Binding.scala:71) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at com.spingo.op_rabbit.SubscriptionActor.doSubscribe(SubscriptionActor.scala:232) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:159) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:146) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
  at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:597) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:597) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at scala.collection.immutable.List.foreach(List.scala:381) ~[org.scala-lang.scala-library-2.11.7.jar:na]
  at akka.actor.FSM$class.handleTransition(FSM.scala:597) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.FSM$class.makeTransition(FSM.scala:679) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.spingo.op_rabbit.SubscriptionActor.makeTransition(SubscriptionActor.scala:13) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at akka.actor.FSM$class.applyState(FSM.scala:664) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.spingo.op_rabbit.SubscriptionActor.applyState(SubscriptionActor.scala:13) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at akka.actor.FSM$class.processEvent(FSM.scala:659) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.spingo.op_rabbit.SubscriptionActor.akka$actor$LoggingFSM$$super$processEvent(SubscriptionActor.scala:13) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at akka.actor.LoggingFSM$class.processEvent(FSM.scala:790) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.spingo.op_rabbit.SubscriptionActor.processEvent(SubscriptionActor.scala:13) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:648) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:642) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.spingo.op_rabbit.SubscriptionActor.aroundReceive(SubscriptionActor.scala:13) ~[com.spingo.op-rabbit-core_2.11-1.2.1.jar:1.2.1]
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.ActorCell.invoke(ActorCell.scala:495) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.dispatch.Mailbox.run(Mailbox.scala:224) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na]
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na]
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na]
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na]
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
  at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
... 36 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:548) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:497) ~[com.rabbitmq.amqp-client-3.5.2.jar:na]
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
  at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
  at com.thenewmotion.akka.rabbitmq.RabbitMqActor$class.closeIfOpen(RabbitMqActor.scala:27) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
  at com.thenewmotion.akka.rabbitmq.ChannelActor.closeIfOpen(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
  at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:115) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
  at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:112) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
  at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
  at akka.actor.FSM$class.processEvent(FSM.scala:654) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.thenewmotion.akka.rabbitmq.ChannelActor.processEvent(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
  at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:648) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:642) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na]
  at com.thenewmotion.akka.rabbitmq.ChannelActor.aroundReceive(ChannelActor.scala:34) ~[com.thenewmotion.akka.akka-rabbitmq_2.11-2.2.jar:2.2]
... 9 common frames omitted
@timcharper
Copy link
Member

This looks like a bug with the Java driver. Try using amqp-client 3.6.1 with RabbitMQ and see if it still suffers, or report a bug with them?

@jschaul
Copy link
Author

jschaul commented Mar 21, 2016

Thanks, I will try upgrading and see if the issue persists. I can re-open this issue if it's not solved.

@jschaul jschaul closed this as completed Mar 21, 2016
@jschaul jschaul reopened this Jul 20, 2016
@jschaul
Copy link
Author

jschaul commented Sep 8, 2016

TL;DR: If you have this, or similar problem with disconnects, use

consume(Queue.passive(topic(queue("wow-maybe-queue"), List("some-topic.#"))))

Full explanation:

In case anyone faces similar issues (they still occur with op-rabbit 1.3.0), in a race-condition way (sometimes occurs, sometimes not), some explanations:

The exact problem is that after a disconnect to rabbitmq (forced from rabbitmq side by e.g. closing the connection via the rabbitmq web interface), op-rabbit successfully reconnects to rabbitmq, but fails to bind a queue.

Start a consumer (all as supposed to be):

[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO  c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO  c.d.d.c.p.DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(Run,DisconnectedPayload(Paused,10,None)) from Actor[akka://default/user/$a#-246056593]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelCreated(Actor[akka://default/user/$a/connection/$a#-1186346017]),DisconnectedPayload(Binding,10,None)) from Actor[akka://default/user/$a/connection#1872231705]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,3)
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,3),Actor[akka://default/user/$a/connection/$a#-1186346017]),DisconnectedPayload(Binding,10,None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Disconnected -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindSuccess(AMQChannel(amqp://vagrant@<IP>:5672/,3)),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,3),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Running

let's force a connection close via rabbitmq console (this time, the race condition is on our side, reconnect is successful)

[info] WARN  c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection lost connection to amqp://vagrant@{<IP>:5672}:5672//
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a disconnected
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel disconnected
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher disconnected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,1)
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,1),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,3),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] INFO  c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,4)
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Running -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,4),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,1),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindFailure(AMQChannel(amqp://vagrant@<IP>:5672/,1),java.io.IOException),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindSuccess(AMQChannel(amqp://vagrant@<IP>:5672/,4)),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Running

repeat: let's force a connection close via rabbitmq console: this time we see the error. The result is a consumer that is stuck and never recovers.

[info] WARN  c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection lost connection to amqp://vagrant@{<IP>:5672}:5672//
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a disconnected
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher disconnected
[info] WARN  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel disconnected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Channel created; AMQChannel(amqp://vagrant@<IP>:5672/,1)
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/$a connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(ChannelConnected(AMQChannel(amqp://vagrant@<IP>:5672/,1),Actor[akka://default/user/$a/connection/$a#-1186346017]),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,4),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] DEBUG c.s.op_rabbit.SubscriptionActor - Setting up subscription to my-queue-name in akka://default/user/$a/subscription-my-queue-name-1
[info] INFO  c.t.akka.rabbitmq.ConnectionActor - akka://default/user/$a/connection connected to amqp://vagrant@{<IP>:5672}:5672//
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/publisher connected
[info] INFO  c.t.akka.rabbitmq.ChannelActor - akka://default/user/$a/connection/confirmed-publisher-channel connected
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Running -> Binding
[info] DEBUG c.s.op_rabbit.SubscriptionActor - processing Event(BindFailure(AMQChannel(amqp://vagrant@<IP>:5672/,1),java.io.IOException),ConnectedPayload(Actor[akka://default/user/$a/connection/$a#-1186346017],AMQChannel(amqp://vagrant@<IP>:5672/,1),10,Some(Actor[akka://default/user/$a/subscription-my-queue-name-1/consumer#-1397250152]),None)) from Actor[akka://default/user/$a/subscription-my-queue-name-1#1860926862]
[info] ERROR c.s.op_rabbit.SubscriptionActor - An error while trying to bind a consumer to my-queue-name
[info] java.io.IOException: null
[info]  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
[info]  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
[info]  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
[info]  at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
[info]  at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
[info]  at com.spingo.op_rabbit.QueueConcrete.declare(Queue.scala:27)
[info]  at com.spingo.op_rabbit.Binding$$anon$2.declare(Binding.scala:70)
[info]  at com.spingo.op_rabbit.SubscriptionActor.doSubscribe(SubscriptionActor.scala:232)
[info]  at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:159)
[info]  at com.spingo.op_rabbit.SubscriptionActor$$anonfun$8.applyOrElse(SubscriptionActor.scala:146)
[info]  at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
[info]  at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:604)
[info]  at akka.actor.FSM$$anonfun$handleTransition$1.apply(FSM.scala:604)
[info]  at scala.collection.immutable.List.foreach(List.scala:381)
[info]  at akka.actor.FSM$class.handleTransition(FSM.scala:604)
[info]  at akka.actor.FSM$class.makeTransition(FSM.scala:686)
[info]  at com.spingo.op_rabbit.SubscriptionActor.makeTransition(SubscriptionActor.scala:13)
[info]  at akka.actor.FSM$class.applyState(FSM.scala:671)
[info]  at com.spingo.op_rabbit.SubscriptionActor.applyState(SubscriptionActor.scala:13)
[info]  at akka.actor.FSM$class.processEvent(FSM.scala:666)
[info]  at com.spingo.op_rabbit.SubscriptionActor.akka$actor$LoggingFSM$$super$processEvent(SubscriptionActor.scala:13)
[info]  at akka.actor.LoggingFSM$class.processEvent(FSM.scala:797)
[info]  at com.spingo.op_rabbit.SubscriptionActor.processEvent(SubscriptionActor.scala:13)
[info]  at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:655)
[info]  at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:649)
[info]  at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
[info]  at com.spingo.op_rabbit.SubscriptionActor.aroundReceive(SubscriptionActor.scala:13)
[info]  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info]  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info]  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info]  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info]  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
[info]  at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
[info]  at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
[info]  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
[info]  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
[info]  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
[info]  ... 33 common frames omitted
[info] Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
[info]  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:554)
[info]  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:509)
[info]  at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503)
[info]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]  at java.lang.reflect.Method.invoke(Method.java:497)
[info]  at com.thenewmotion.akka.rabbitmq.RabbitMqActor$class.closeIfOpen(RabbitMqActor.scala:27)
[info]  at com.thenewmotion.akka.rabbitmq.ChannelActor.closeIfOpen(ChannelActor.scala:34)
[info]  at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:115)
[info]  at com.thenewmotion.akka.rabbitmq.ChannelActor$$anonfun$2.applyOrElse(ChannelActor.scala:112)
[info]  at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
[info]  at akka.actor.FSM$class.processEvent(FSM.scala:661)
[info]  at com.thenewmotion.akka.rabbitmq.ChannelActor.processEvent(ChannelActor.scala:34)
[info]  at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:655)
[info]  at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:649)
[info]  at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
[info]  at com.thenewmotion.akka.rabbitmq.ChannelActor.aroundReceive(ChannelActor.scala:34)
[info]  ... 9 common frames omitted
[info] DEBUG c.s.op_rabbit.SubscriptionActor - transition Binding -> Stopped

I was able to reproduce this error about 30% of the time. I tried different versions of the java amqp-client and the akka-rabbitmq libraries. This seems to be a bug with op-rabbit only.

If you require it, I'm able to write a failing test (though it takes a bit of work as it involves a running mq instance to contact & force close a connection in the test); let me know if you need this.

The workaround, which does not exhibit that problem, is to use Queue.passive with a queue definition:

consume(Queue.passive(topic(queue("wow-maybe-queue"), List("some-topic.#"))))

Perhaps this could be the default behaviour?

@jschaul jschaul changed the title Consumer does not attempt to reconnect after RabbitMQ was unreachable for short duration. Consumer fails to re-bind to queue after connection to rabbitmq was closed/re-established. Sep 8, 2016
@fedragon
Copy link

fedragon commented Sep 8, 2016

Hello,
I think the issue is actually caused by a combination of less-than-ideal design in akka-rabbitmq ( exposing mutable internal state via the channel) and the fact that op-rabbit holds a reference to the channel itself here: once we are outside of the CreateChannel closure, there is no guarantee about the channel state and this could lead to errors. In this particular case, akka-rabbitmq re-creates the channel twice, so the one referenced in op-rabbit might be already closed: one approach to solve this would be to execute all the channel-related logic inside the above-mentioned closure.

@timcharper
Copy link
Member

Hi @fedragon - Thank you so much for your detailed report. I've read and have considered it. Part of the problem here is that sometimes bindings fail legitimately, and crashing the actor is the appropriate thing to do. However, the case you describe here is clearly an exception.

It might be worthy to note that the problem isn't exactly the shared mutable state, although that can be a problem. Doing all channel operations in the channelActor sub-optimal for performance reasons. I am careful to not concurrently apply operations to the channel, and it is always possible for a channel to close right under your feet. Eventually, I would like all channel operations for a consumer to happen in the consumer actor.

I pushed up a potential fix for this issue. I look at the channel state after a BindFailure is received. If the channel is marked as closed, which it should be after such an exception, then I ignore the exception and wait for a new channel to be created and sent (depending on the behavior of ChannelActor).

Can you please test this and see if it helps your case? If it is possible to write a test for it, it would be really great. However, if the test is going to be brittle due to the complexity of testing for this, it may be okay to forego it.

(PS: careful != correct! On review I noticed and fixed a potential issue with setQos.)

@fedragon
Copy link

Thank you @timcharper we will test it and come back to you as soon as possible!

@timcharper
Copy link
Member

Great, thank you! I just pushed v1.5.0-RC1 which contains the proposed fix.

@jschaul
Copy link
Author

jschaul commented Sep 20, 2016

This looks promising, thank you! I'll test and let you know.

@DStranger
Copy link
Collaborator

Hello @timcharper, I've tested this manually and it works, successfully rebinding to the queue after each disconnect.

@timcharper
Copy link
Member

\o/

@jschaul
Copy link
Author

jschaul commented Sep 21, 2016

My manual tests also do not exhibit the faulty behaviour anymore on 1.5.0-RC1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants