Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to ACK messages downstream from Source #13

Open
zackangelo opened this issue May 22, 2015 · 25 comments
Open

Ability to ACK messages downstream from Source #13

zackangelo opened this issue May 22, 2015 · 25 comments

Comments

@zackangelo
Copy link
Contributor

Currently, messages seem to be auto-ACKed after they are pulled off the queue and submitted downstream (https://github.com/ScalaConsultants/reactive-rabbit/blob/master/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala#L39).

Have you given any thought to incorporating a way to ACK messages after they've been fully processed by the stream? If it's something you think would be worthwhile and you have some ideas on how to implement, I can take a crack at a PR.

@vrabeux
Copy link

vrabeux commented May 27, 2015

Hi ! I would also require this feature. However I have no idea how to implement it as the acknowledgment is also link to the back pressure of the stream.

@vrabeux
Copy link

vrabeux commented May 27, 2015

Or maybe I am not understanding something.
For example, on my current sample code :

  • if the publisher send say 10 messages in less than 1 sec.
  • the consumer takes care of 1 message in 10 seconds.
  • The consumer dies on the second message.
    => If I look at my queue. All message are consumed which is not really the case.

Shouldn't the message stay in the queue due to back pressure of the flow ?
Shouldn't messages be just prefetch and put back to the queue if not consumed when consumer dies ?

Thanks in advance for your help.

@mkiedys
Copy link
Contributor

mkiedys commented May 27, 2015

ACK before delivery was introduced in this commit: fdba020. Looks like this can be easily changed.

Facts:

  • Current implementation of QueueSubscription.cancel() is implemented using Channel.close().
  • Channel.close() cancels the subscription and prevents broker from sending new messages. There are still messages in buffer.
  • The message can be acknowledged only in Channel in which it was received.
  • Specification allows delivery after cancel in rule 2.8.

Issue:

The way cancel() is handled in QueuePublisher.scala causes Channel to be closed. In consequence all in-flight messages can't be acknowledged anymore. Just chaning the order in which delivery and ACK is perfomed will lead to situation where message is successfuly delivered to the subscriber but never acknowledged to the broker. In current implementation approximately 1 message may sneak in between.

Possible solutions:

  1. We can catch and ignore AlreadyClosedException from channel.basicAck. That way subscriber may receive approximately 1 message that will be never acknowledged to the broker.
  2. QueueSubscription.cancel() could be implemented by Channel.basicCancel(String). In-flight messages could be still acknowledged to the broker and Channel would be closed after QueueSubscription.buffer dries out.
  3. Like above but only for 1 message that is in-flight. Messages sitting in buffer are discarded and Channel is closed. This is somehow complicated to implement because we would need to know whether there is a message in delivery.

I'm open to discussion and other solutions.

@mkiedys
Copy link
Contributor

mkiedys commented May 27, 2015

@vrabeux wrote:

Or maybe I am not understanding something.
For example, on my current sample code :

  • if the publisher send say 10 messages in less than 1 sec.
  • the consumer takes care of 1 message in 10 seconds.
  • The consumer dies on the second message.

Under current implementation:
Message 1 and 2 are acknowledged to the broker because they were delivered to the subscriber.
The thing under discussion is whether the message should be acknowledged only after subscriber returned without exception.

=> If I look at my queue. All message are consumed which is not really the case.

Only 2 messages were acknowledged to the broker. Consumer is allowed to prefetch (without acknowledging) messages to improve throughput. Current implementation sets prefetch to 20.
Doing 1 by 1 fetch-delivery-ack would cause system to behave in synchronous manner and dramatically lower perfomance. Even Akka Streams does similar thing - they use 16 as their magic number.

Shouldn't the message stay in the queue due to back pressure of the flow ?

Once the subscriber dies, the subscription is considered canceled, the Channel is closed and messages return to the queue.

Shouldn't messages be just prefetch and put back to the queue if not consumed when consumer dies ?

That is the current behaviour.

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

@vrabeux Indeed there was a bug in code that handles exceptions from onNext: messages could stay in buffer as long as program is running even after onError was signalled to the Subscriber. I fixed that here: 8d014d9

Generally speaking, the onNext should never throw:

Rule 2.13:

Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

@vrabeux
Copy link

vrabeux commented May 28, 2015

@mkiedys Thanks for the reply.

I was more talking about an app crash or instance crash. All prefetch messages should be re-queued. However, since they are acked before the work is completed, they are not re-queued. Anyway, that may be a particular use case.

I agree that having a prefetch that is around 20 works in most cases. However, if we have a very slow service (lets say several seconds or worse minutes) a too big prefetch number can cause problems : a new worker would not be able to get some work if most works are prefetched by an other instance. On the contrary if the work is very fast, then 20 may even be a bit small.

All of these prefetch number, distributed work loads, and streams are really connected together and it really seam that akka-streams can be a powerfull tool to implement it. For now, I implemented my own "Source" that correspond to my use case and I use reactive-rabbit later in the flow. It works so far but it is way too "naive" to really use it.

@ktoso
Copy link

ktoso commented May 28, 2015

The action of ACKing must be exposed to end-users of the library, since only the user of the stream knows exactly when an element should be considered as handled.
Imagine Source().map().map(NOW_HANDLED).map(), as library author you won't know that after the second map the user considers the element as "handled", and the rest is maybe logging things etc.

Perhaps it would be possible to offer .mapAck() however I'm not completely familiar rabbit APIs, and if that would be doable. Still, I think it should be exposed to users when to ack.

@vrabeux
Copy link

vrabeux commented May 28, 2015

@ktoso Yes, that it why I decided to implement my own source : to be able to ack the message down the stream when I want it. The way I do it is however very "ugly" and "naive" since I need to wrap the rabbit Channel and delivery tag within the object I pass down the stream ... I don't even handle if the connection is closed, channel still alive or not etc ...

The .mapAck() could be a way of ACKing the message but I am too much of a noob with akka-streams so I cant really tell. If we consider actors we could definitively chain (as in responsibility chain) a Ack(tag) message to the original actor or one able to ack it. But for streams I dont think that would work : a stream seams to be one way.

After all I am wondering if akka-stream is even the correct way to go for my need.

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

@vrabeux wrote:

I was more talking about an app crash or instance crash. All prefetch messages should be re-queued. However, since they are acked before the work is completed, they are not re-queued. Anyway, that may be a particular use case.

All messages are returned to the broker once the subscription is canceled. Messages are always acknowledged one-by-one after delivery to the stream. Message that sits in the buffer is not acknowledged.

I agree that having a prefetch that is around 20 works in most cases. However, if we have a very slow service (lets say several seconds or worse minutes) a too big prefetch number can cause problems : a new worker would not be able to get some work if most works are prefetched by an other instance. On the contrary if the work is very fast, then 20 may even be a bit small.

If there are multiple consumers attatched to the queue than broker tries to do it best to share messages between them also taking into consideration if particular consumer is slow. So that should be not an issue. Have you tried coding example like this for your case and checking wherer one or all consumers get messages evenly?

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

@ktoso wrote:

Perhaps it would be possible to offer .mapAck() however I'm not completely familiar rabbit APIs, and if that would be doable. Still, I think it should be exposed to users when to ack.

Usual workflow is:

  1. Broker delivers a bunch messages to the consumer, up to prefetch.
  2. Consumer does something with the message and sends ACK/reject/requeue
  • ACK - message is acknowledged and removed from the queue
  • REJECT - message is acknowledged and removed from the queue but a copy of it can be retrieved in exchange connected to the queue that sinks rejected messages
  • REQUEUE - message is send back to the end of the queue

Prefetch stands for how many unacknowledged messages can be delivered to the consumer before broker stops delivering. It's essentially a base on which the reactive component is built.

I agree that it would be nice to have an option to ACK message deeper in the processing pipeline.
Risk is that this may cause stream to stall if user decides not to ACK a number of messages equal to prefetch. In this case subscriber would have to always ACK a particural message or reject it. The mapAck would need a companions like mapReject and mapRequeue.

My original plan for this scenario was to provide a pair of streams:

  1. Source[Message]
  2. Sink[Confirmation]

It's essentially a Processor under current design of Reactive Streams API. I have plan for this only if the userbase of the library grows to a point where it is reasonable to do additional work.

@vrabeux
Copy link

vrabeux commented May 28, 2015

@mkiedys Sorry, maybe I am doing something wrong then or I may just dont understand how it works.

If we look at QueueSubscription.scala#L40

if(channel.isOpen()) {
      channel.basicAck(delivery.deliveryTag.underlying, false)
      subscriber.onNext(delivery)
 }

This gave me the impression that the delivery is sent after being acked. My yesterday tests were confirming this, but maybe it was due to a bug on my side.

I'll try to get a small sample working this weekend.

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

This gave me the impression that the delivery is sent after being acked. My yesterday tests were confirming this, but maybe it was due to a bug on my side.

Message is delivered to the consumer after it was acknowledged to the broker. One by one. Improvement can be made to acknowledge after delivery - not before.

@vrabeux
Copy link

vrabeux commented May 28, 2015

Ok I am confused here. Didn't you say that :

Messages are always acknowledged one-by-one after delivery to the stream.

Is it :
1 ) --

  • ACK
  • Send message down the stream
    2 ) --
  • Send message down the stream
  • ACK

If it is 2, then I definitively have an issue on my side. If it is 1, then the flowing flow :

  • ACK
  • Send message down the stream
  • => [CRASH]

Does not work in my use case as messages are not re-queued.

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

Usage example for a Processor:

val connection = Connection()
val processor: Processor[Confirm, Delivery] = connection.processor(queue = "invoices")

Source(processor)
  .map(delivery                  // decide whether to acknowledge or requeue
    if(Random.nextBoolean())
      Ack(delivery.deliveryTag)
    else
      Requeue(delivery.deliveryTag))
  .to(Sink(processor))            // tell to the broker

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

@vrabeux wrote:

Does not work in my use case as messages are not re-queued.

It might be that Akka Streams does additional buffering. In this case more than one message might get lost.
The message is acknowledged before onNext. If stream decides to take more than one message before even delivering something to your code than all of them might get lost.

@vrabeux
Copy link

vrabeux commented May 28, 2015

@mkiedys Ok. I'll really need to experiment a bit more and try to understand more deeply all of this.

The Processor seams nice. But from what I understood, a runnable flow is 1 Source to 1 Sink. How would this handle cases like :

Source(inputQueue).map().map(ACK).map(...).to(aCompletelyDifferentOtherQueue)

Anyway thank you so much for your help and the talk.

@mkiedys
Copy link
Contributor

mkiedys commented May 28, 2015

You would have to split your stream into two: one going to the Processor sink and another to your exchange*).

@ktoso Is there a better solution?

*) In AMQP, messages are never published to the queues directly. You always publish them via the exchange.

@vrabeux
Copy link

vrabeux commented May 28, 2015

Yes makes sens. Thanks.

@sr78ger
Copy link

sr78ger commented May 29, 2015

+1 I would really like to see this feature

@mkiedys
Copy link
Contributor

mkiedys commented Jul 9, 2015

I recommend reading about Akka Streams buffering: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-rate.html

I you depend on driver to send acknowledgments one-by-one you may want to reduce buffer size from default 16 to 1:

.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))

Sadly the smallest buffer is 1 message.

@agentgt
Copy link

agentgt commented Dec 16, 2015

For what its worth I ended up creating my own Reactive Streams implementation that is in pure Java and does not Auto Ack (also because its in Java is consequently far less code than this project as I just used the RabbitMQ driver directly).

public interface DeliveryAckSubscriber extends Subscriber<DeliveryResult> { 
    Delivery getDelivery();
}

public interface ReactiveAMQPOperations {
        //rpc is for direct-reply   
    Publisher<Delivery> rpc(final Message m);
    Publisher<DeliveryAckSubscriber> toPublisher(String queue, int prefetchCount);
}

public class DeliveryResult {
    private final DeliveryResponse response; // for rpc
    private final Throwable error; // if null no error
    private final boolean requeue;
        //constructor and getters snipped for brevity.
}

Basically the idea is you get a Publisher that is essentially a tuple of a Subscriber and Delivery object (a delivery object being the body and envelope etc). That is every message that comes from the queue gets a special subscriber that is awaiting for a single onNext of DeliveryResult that will signify whether the message is to be acknowledged or not.

As nifty and cute as it would be to have request(int n) changing basicQos prefetch we do not do this but instead internally buffer. That is the prefetch is set only once. The Publisher also only allows one subscriber. If another one subscribes we drop the other subscriber (honestly this hasn't been tested enough so its probably a bad idea). There has been some thought though on allowing request(int n) to signify ack of previous messages but this would not allow any way of nack ing messages.

The rpc method allows direct-reply rpc (see rabbitmq doc) that provides a Publisher that is Promise. A Publisher that emits a single object that is cold is essentially a promise. Both above Publishers are cold with the exception of replacing the subscriber.

If folks are interested I was going to open source it. I also have an pure RxJava implementation that is easier to work with as well.

@mkiedys
Copy link
Contributor

mkiedys commented Jan 4, 2016

Thanks for inspiration Adam. I recommend running tests from reactive-streams-tck against your implementation to check whether specification isn't violated somewhere.
Feel free send pull request here if you see ways of simplifying things.

@agentgt
Copy link

agentgt commented Jan 4, 2016

It will take me a little time to get it opensource ready as its in our proprietary repository. That being said it is unfortunately Java so I'm not sure if it will help but it might.

As for the TCK we do pass it (well the required tests) and my secret was to shamefully steal parts of the RxJava SerializedObserver. This is because the RabbitMQ Java Consumer handleDelivery has overlapping calls. I believe reactive-rabbit uses Scala's STM to do this. I'm not sure which is faster but I would not be surprised if Scala STM beat the RxJava synchronized version under very heavy loads. (I did have my own implementation using ConcurrentLinkedQueue and some nasty r/w locks but the RxJava version was much faster).

Other implementations will often use a queue (blocking or nonblocking) to keep the RabbitMQ threads from overlapping and/or blocking up at the cost of a context switch. Thus for both of our implementations you really have to not block or dispatch downstream (in rxjava this would be observeOn) on a separate thread or else you might block up too many RabbitMQ threads (the ExecutorService that is passed into the connection).

@mancvso
Copy link

mancvso commented Jan 25, 2016

@agentgt Please let us know when time comes; A Stream Java version would be nice.

@niharrathod
Copy link

niharrathod commented Jun 2, 2017

Is this enhancement available?
In my case, rabbitmq ack should be only sent after sink(DB insert) is completed successfully.
A sample code would be really help full. (In Java)
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants