Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow message ack downstream #483

Merged
merged 22 commits into from
Sep 27, 2017

Conversation

juanjoDiaz
Copy link
Contributor

Fix #97 by adding the possibility of deactivate autoAck and allowing to ack or nack messages downstream.

Nack allow to optionally requeue the message or not (by default it does).

I've noticed that #292 was doing the same but it seems stale and the work seems bloated to me (it introduce a lot of duplicities and unrelated changes).

This PR introduce a very minimal change.
No breaking changes at API level.
Fully tested.

Only "breaking" change is that IncomingMessage becomes a trait and AckedIncomingMessage and UnackedIncomingMessage are case classes extending it. (I'm any case I don't think that anyone should be using IncomingMessage so this shouldn't even be a real breaking change)
The advantage of having AckedIncomingMessage and UnackedIncomingMessage as case classes inheriting from IncomingMessage is that is minimize changes, maximize backwards compatibility and allow pattern matching to act differently depending on the type of message.

@juanjoDiaz
Copy link
Contributor Author

Does anyone has any clue of why is this failing the build?

JavaConverters.seqAsJavaListConverter is certainly a method. http://www.scala-lang.org/api/current/scala/collection/JavaConverters$.html#seqAsJavaListConverter[A](b:Seq[A]):scala.collection.convert.Decorators.AsJava[java.util.List[A]]

@johanandren
Copy link
Member

I think the build failed because it's been checked in with incorrect formatting. Run a compile of it before checking in and things should be alright.

@juanjoDiaz
Copy link
Contributor Author

Hi @johanandren ,

Thanks for the response. I fixed the styles but the issue persist.

[error] /home/travis/build/akka/alpakka/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java:279: cannot find symbol
[error]   symbol:   method seqAsJavaList(scala.collection.immutable.Seq<akka.stream.alpakka.amqp.IncomingMessage>)
[error]   location: class scala.collection.JavaConverters
[error]     List<IncomingMessage> probeResult = JavaConverters.seqAsJavaList(probe.toStrict(Duration.create(5, TimeUnit.SECONDS)));

As mentioned, that's odd since the method definitely exist.

@juanjoDiaz
Copy link
Contributor Author

Hurray! It turns out I was living too much on the bleeding edge and using the latest Scala.
Now everything looks good.

Is there any chance to get and ETA of when this could be merged and released?

@johanandren
Copy link
Member

I'm not so sure this specific API is a good idea, would be better if #104 got anywhere and we could use the same committable interface across connectors than if each stage creates its own way to doing things. I also find it a bit weird that you can mix unacked and acked messages with the same materialised stage, deciding that upon creation would make more sense to me.

@juanjoDiaz
Copy link
Contributor Author

Hi Johan,

I agree that fixing #104 would be optimal. However, it is clearly stated that "Akka team is not working on this and we don't have it in plans for the next month."
Also, some connectors require acknowledging and some probably don't. Some take options and some don't. So I don't think that really possible to a have a general committable message that works for everything. But I'm open to suggestions.

Could you elaborate a bit on the issues you can see with the current approach?

You actually will always get only Acked or Unacked messages depending on whether autoAck is set to true or false.
They'll never be mixed. The reason to use a generic IncomingMessage stage is to avoid having to create 2 stages with 99% the same code. But I'm happy to do that if that will help to get this merged.

I'm happy to do any changes that are required to get this functionality in place. This issue is sort of jeopardizing the integrity of some of my system and I really wouldn't want to create my own fork for this.

@johanandren
Copy link
Member

Ok, fair point about #104 and nobody stepping up to do that.

Taking a step back I still think a better design is not to define committable per message but to look at reactive-kafka connector which is the most battle tested and worked through API, something along the lines of:

trait Committable {
  def message: IncomingMessage
  // I think it has to be future because commit leads to a blocking network call that needs to be managed
  def commit(): Future[Unit] 
}


AmqpSource.createCommittable(...): Source[Committable, ...] = ???
AmqpSource.create(...): Source[IncomingMessage, ...] = createComittable(...).mapAsync(1)(_.commit)

This also avoids having to implement two stages, while giving the same control to the user to do the batched commit instead of one message per time to get better throughput etc.

Wdyt?

@juanjoDiaz
Copy link
Contributor Author

Sounds good.

I took a look at akka-streams-kafka and took the same approach. Does it look better now?

*/
def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] =
committableSource(settings, bufferSize)
.map(cm => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried:

      .mapAsync(1)(cm => {
        Future {
          println(s"Committing ${cm.message.bytes.utf8String}")
          cm.ack()
          cm.message
        } (ExecutionContexts.sameThreadExecutionContext)
      })

But that makes the not ack messages unless they get consumed test fail. For some reason it ack the second element in subscriber even though it doesn't fetch it, and then the element is missing on subscriber2.
Maybe you have some idea of why is this happening?

In any case, the ack method of RabbitMQ library is blocking so it should be pretty much the same either way.

*/
def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] =
committableSource(settings, bufferSize)
.map((cm: CommittableIncomingMessage) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried:

      .mapAsync(1)(cm => {
        Future {
          println(s"Committing ${cm.message.bytes.utf8String}")
          cm.ack()
          cm.message
        } (ExecutionContexts.sameThreadExecutionContext)
      })

But that makes the not ack messages unless they get consumed test fail. For some reason it ack the second element in subscriber even though it doesn't fetch it, and then the element is missing on subscriber2.
Maybe you have some idea of why is this happening?

In any case, the ack method of RabbitMQ library is blocking so it should be pretty much the same either way.

@juanjoDiaz
Copy link
Contributor Author

It seems that the build worked for 3 our of the 4 cases, but the other one got stuck before it even run.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more feedback

/**
* Java API:
* Convenience for "at-most once delivery" semantics. Each message is acked to Kafka
* before it is emitted downstream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, not to asked to Kafka though ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! I went too much into kafka here :)

message: IncomingMessage,
callback: AsyncCallback[Unit] = new AsyncCallback[Unit] {
override def invoke(t: Unit): Unit = Unit
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message shouldn't need to know about AsyncCallback just a regular function should do. After thinking about the things noted below, I think it should be callback: (ackOrNack+data) => Future[Unit] with the whole implementation of that function in the stage using AsyncCallback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So you are suggesting that I wrapped the Async callback in a plain function and pass that to the Committable message, right?

There is some logic in the Stage so it won't be completed until all the messages have been acked. If something really goes wrong I guess the messages won't be acked. I think it's the same in the akka-streams-kafka, we offer out of the box at-least-once delivery but not exactly-once delivery.

The ack method from rabbitMQ is blocking AFAIK. I tried wrapping it in a Future and using mapAsync but got the issue that I mentioned in a comment above. Maybe you have any suggestion?

}
)(implicit channel: Channel) {
def ack(multiple: Boolean = false): Unit = {
channel.basicAck(message.envelope.getDeliveryTag, multiple)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky:

Channel is an internal resource of the stage, so it shouldn't be leaked outside, what if the stage was stopped or failed? Also, I think basicAck leads to a network call, is it blocking, if so what thread is this executed on?

On the other hand, if the logic is moved into the stage, then this ack needs to return a Future[...] to signal to the stream that the ack was completed or failed before emitting the message downstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So you are suggesting that I wrap the Async callback in a plain function and pass that to the Committable message, right?

There is some logic in the Stage so it won't be completed until all the messages have been acked. If something really goes wrong I guess the messages won't be acked. I think it's the same in the akka-streams-kafka, we offer out of the box at-least-once delivery but not exactly-once delivery.

The ack method from rabbitMQ is blocking AFAIK. I tried wrapping it in a Future and using mapAsync but got the issue that I mentioned in a comment above. Maybe you have any suggestion?

@@ -13,6 +13,31 @@ object AmqpSource {
* Java API: Creates an [[AmqpSource]] with given settings and buffer size.
*/
def create(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are just calling the new one, then maybe @deprecate it and tell people to use atMostOnceSource instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree on this one. I was going to ask about that but didn't want to modify much the API without taking first with core maintainers. I'll @deprecate the old constructor in favour of atMostOnceSource

@juanjoDiaz
Copy link
Contributor Author

Alright, hope we are getting there.

I got a bunch of inspiration from Kafka and alpakka.ironMQ and I think I got all your concerns addressed. Let me know if there is anything I've missed.


override def ack(multiple: Boolean) = {
Future {
channel.basicAck(message.envelope.getDeliveryTag, multiple)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I was a bit unclear, I think this should go into the commitCallback logic, so that it is executed on the same thread as the stage itself and the future is not completed until both ack/nack and changing the counter is done, that means we need to pass a promise from here to the async callback, call the channel method, update the counter and then (if successful) complete the promise.

There will still be some unknowns with that though, what if the stage already failed/stopped when a downstream stage calls ack/nack? What if it was alive when ack/nack was called but fails before the async callback has been executed. There is a solution for that coming up with akka/akka#23185 though, so maybe complete this and then update to use that later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This future uses the materializer.executionContext from the stage. Which as far as I understand is exactly how Kafka and IronMQ connectors do it.

The call is blocking so the future won't complete until the call is done.

Adding the call to channel.ack or channel.nack to the commitCallback has the issue that we won't be able to use parameters (or we need a separate callback for each combination of parameters).

Can you elaborate a bit in what exactly you would like me to do here? I'm a bit lost :)

Regarding the fail/stop uncertainties. I think that we should complete this work and then update later as you suggest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counting down inflight messages and the nack is now done in separate places/threads even though they both belong to the same "logical" operation. The future will can complete even though the asyncCallback has not executed yet

Handling the two different calls with a single callback can be done by just encoding the needed data plus the promise in a single or two case classes with a common supertype whose sole purpose is to be passed across the async boundary for the stage to do things with and then complete the promise. (something like how we do it in the InputStreamStage for example: https://github.com/akka/akka/blob/3ba093d27e830bea675e45d0cec56860de3d133e/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala#L56 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To say it in another way, rather than scheduling two blocks of logic to execute, the Future and the asyncCallback we can eliminate one, and execute everything in the asyncCallback.

@juanjoDiaz
Copy link
Contributor Author

juanjoDiaz commented Sep 25, 2017

Alright. Let's see if I'm getting any closer with my latest commit :)

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks about right to me except for the failing test.

@@ -66,7 +66,7 @@ private[amqp] trait AmqpConnector {
private[amqp] trait AmqpConnectorLogic { this: GraphStageLogic =>

private var connection: Connection = _
protected var channel: Channel = _
protected implicit var channel: Channel = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't need to be implicit anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'll revert this.

subscriber2.expectNext().bytes.utf8String shouldEqual "two"
subscriber2.request(3)
// MapAsync execute the provided function as it pull the elements regardless of whether they are passed
// downstream or not. Thus the second element is acked even though it didn't get pushed down by `subscriber`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was true (and racy) before as well (although maybe it did not manifest itself in this test), an element could be fetched from the MQ and ack:ed but have downstream cancel/fail before it was emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented that line to fix the test in akka 2.4 but broke on akka 2.5.

I think that the issue is that mapAsync use to run the function as soon as it pulls it regardless of whether it was emitted or not. So that test could never work with mapAsync.

However, it seems to work as expected in 2.5. The only change I can see in the history is this: akka/akka@a40826e. Any idea??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, we just renamed this source as atMostOnceSource which means precisely this. IF things go south in unexpected ways, you might miss the element.

If you want to be 100% sure that you don't miss any element you should use committableSource which could also be atLeastOnceSource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take back what I just said 2 comments above. 2.5 Didn't fix mapAsync 🙂

I guess the test it's just 100% indeterministic since whether the element is acked or not depends completely o how fast the probe close the connection... Still open to suggestions :)

@juanjoDiaz
Copy link
Contributor Author

Hi @johanandren

Just checking up so this doesn't get stale.

Is there anything left apart from the racy test?
What should we do about it?
My opinion is that the test is not predictable enough and should be remove (it's inadequate).

@johanandren
Copy link
Member

I think it looks good apart from the failing test. I think you are right, the name of the test case already hints about it being not quite true. Lets remove the test.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great work @juanjoDiaz !

@Falmarri
Copy link
Contributor

@juanjoDiaz thanks for cleaning up my version. I didn't like the duplication that i introduced, but didn't have the time to really make it good. so thanks =D

@juanjoDiaz
Copy link
Contributor Author

No problem.
It's been a good learning experience! :)

@dwickern
Copy link
Contributor

dwickern commented Oct 3, 2017

Thanks for implementing this! The code snippets don't seem to work in the published docs: https://developer.lightbend.com/docs/alpakka/current/amqp.html#acknowledging-messages-downstream

@juanjoDiaz
Copy link
Contributor Author

Hi @dwickern ,

The fix is already committed (see #507) but I guess it hasn't been published yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants