amqp-consume: support consuming N messages at a time #209

Closed
wants to merge 1 commit into
from

Conversation

Projects
None yet
3 participants
@terceiro

If you have a single consumer C₁ and 10 messages are published, all 10
will be streamed to that one customer. Assume each message takes a few
minutes to be handled.

If a second consumer C₂ comes up before C₁ is able to process its first
message, it will stay idle until new messages are published, while C₁
will still have to process the other 9 messages after finishing with the
first one.

If both consumers were started with --messages 1, C₁ would only fetch
a single message, and start handling it; C₂ would start and already
receive the second message .

@lrascao

This comment has been minimized.

Show comment
Hide comment
@lrascao

lrascao Aug 31, 2014

isn't this consumer prefetch (http://www.rabbitmq.com/consumer-prefetch.html
)?

On Sunday, August 31, 2014, Antonio Terceiro notifications@github.com
wrote:

If you have a single consumer C₁ and 10 messages are published, all 10
will be streamed to that one customer. Assume each message takes a few
minutes to be handled.

If a second consumer C₂ comes up before C₁ is able to process its first
message, it will stay idle until new messages are published, while C₁
will still have to process the other 9 messages after finishing with the
first one.

If both consumers were started with --messages 1, C₁ would only fetch
a single message, and start handling it; C₂ would start and already

receive the second message .

You can merge this Pull Request by running

git pull https://github.com/terceiro/rabbitmq-c consume-messages

Or view, comment on, or merge it at:

#209
Commit Summary

  • amqp-consume: support consuming N messages at a time

File Changes

Patch Links:


Reply to this email directly or view it on GitHub
#209.

lrascao commented Aug 31, 2014

isn't this consumer prefetch (http://www.rabbitmq.com/consumer-prefetch.html
)?

On Sunday, August 31, 2014, Antonio Terceiro notifications@github.com
wrote:

If you have a single consumer C₁ and 10 messages are published, all 10
will be streamed to that one customer. Assume each message takes a few
minutes to be handled.

If a second consumer C₂ comes up before C₁ is able to process its first
message, it will stay idle until new messages are published, while C₁
will still have to process the other 9 messages after finishing with the
first one.

If both consumers were started with --messages 1, C₁ would only fetch
a single message, and start handling it; C₂ would start and already

receive the second message .

You can merge this Pull Request by running

git pull https://github.com/terceiro/rabbitmq-c consume-messages

Or view, comment on, or merge it at:

#209
Commit Summary

  • amqp-consume: support consuming N messages at a time

File Changes

Patch Links:


Reply to this email directly or view it on GitHub
#209.

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Aug 31, 2014

It is. The point is that the amqp-consume tool does not provide an interface to do it, which I am adding with this PR.

It is. The point is that the amqp-consume tool does not provide an interface to do it, which I am adding with this PR.

@lrascao

This comment has been minimized.

Show comment
Hide comment
@lrascao

lrascao Aug 31, 2014

got it, should've looked at the code before asking

On Sunday, August 31, 2014, Antonio Terceiro notifications@github.com
wrote:

It is. The point is that the amqp-consume tool does not provide an
interface to do it, which I am adding with this PR.


Reply to this email directly or view it on GitHub
#209 (comment).

lrascao commented Aug 31, 2014

got it, should've looked at the code before asking

On Sunday, August 31, 2014, Antonio Terceiro notifications@github.com
wrote:

It is. The point is that the amqp-consume tool does not provide an
interface to do it, which I am adding with this PR.


Reply to this email directly or view it on GitHub
#209 (comment).

tools/consume.c
@@ -152,6 +153,20 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
+ /* if there is a maximum number of messages to be received at a time, set the
+ * qos to match */
+ if (messages > 0 && messages <= 65535) {

This comment has been minimized.

@alanxz

alanxz Sep 2, 2014

Owner

Use a constant for 65535.

@alanxz

alanxz Sep 2, 2014

Owner

Use a constant for 65535.

tools/consume.c
+ if (messages > 0 && messages <= 65535) {
+ /* the maximum number of messages to be received at a time must be less
+ * than the global maximum number of messages. */
+ if (count > 0 && count <= 65535 && messages >= count) {

This comment has been minimized.

@alanxz

alanxz Sep 2, 2014

Owner

Same here.

@alanxz

alanxz Sep 2, 2014

Owner

Same here.

tools/doc/amqp-consume.xml
+ messages at a time. If any value was passed to
+ <option>--count</option>, the value passed to
+ <option>--message</option> should be smaller than that,
+ or otherwise it will be ignored.

This comment has been minimized.

@alanxz

alanxz Sep 2, 2014

Owner

You'll probably want to add a comment here about how this interacts with the -A flag. (e.g., if -A is specified - this flag will have no effect).

@alanxz

alanxz Sep 2, 2014

Owner

You'll probably want to add a comment here about how this interacts with the -A flag. (e.g., if -A is specified - this flag will have no effect).

@alanxz

This comment has been minimized.

Show comment
Hide comment
@alanxz

alanxz Sep 2, 2014

Owner

So if I understand how all the pieces are moving here: even though the amqp-consume tool immediately acks messages after writing them to the output, this will only allow -m messages to be buffered between reading them. The back pressure will come from whatever is reading the output of amqp-consume.

The flag feels a bit misleading in its name. How about --prefetch-count or something else?

Owner

alanxz commented Sep 2, 2014

So if I understand how all the pieces are moving here: even though the amqp-consume tool immediately acks messages after writing them to the output, this will only allow -m messages to be buffered between reading them. The back pressure will come from whatever is reading the output of amqp-consume.

The flag feels a bit misleading in its name. How about --prefetch-count or something else?

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Sep 3, 2014

thanks for the review; I have updated the patch accordingly.

terceiro commented Sep 3, 2014

thanks for the review; I have updated the patch accordingly.

tools/consume.c
+ * than the global maximum number of messages. */
+ if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count) {
+ ;
+ } else {

This comment has been minimized.

@alanxz

alanxz Sep 3, 2014

Owner

Lets not have an empty statement for the first part of this if statement. Please re-write the condition so you don't need the else clause.

E.g.,

if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) {
  if (!amqp_basic_qos(....
}
@alanxz

alanxz Sep 3, 2014

Owner

Lets not have an empty statement for the first part of this if statement. Please re-write the condition so you don't need the else clause.

E.g.,

if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) {
  if (!amqp_basic_qos(....
}

This comment has been minimized.

amqp-consume: support consuming N messages at a time
If you have a single consumer C₁ and 10 messages are published, all 10
will be streamed to that one customer. Assume each message takes a few
minutes to be handled.

If a second consumer C₂ comes up before C₁ is able to process its first
message, it will stay idle until new messages are published, while C₁
will still have to process the other 9 messages after finishing with the
first one.

If both consumers were started with `--messages 1`, C₁ would only fetch
a single message, and start handling it; C₂ would start and already
receive the second message .

terceiro added a commit to terceiro/debci that referenced this pull request Sep 3, 2014

worker: process one item at a time
Without this, if you have a single worker and enqueue N packages they
will all be routed to that worker, and if a new worker comes along it
will sit waiting for more packages to be added to the queue, even if N-1
package are already waiting to be processed by the first worker.

This currently depends on an unpublished patch for librabbitmq (source
for amqp-tools):

  alanxz/rabbitmq-c#209
@alanxz

This comment has been minimized.

Show comment
Hide comment
@alanxz

alanxz Sep 3, 2014

Owner

Rebased and merged in as 7188e5d

Thanks for the contribution!

Owner

alanxz commented Sep 3, 2014

Rebased and merged in as 7188e5d

Thanks for the contribution!

@alanxz alanxz closed this Sep 3, 2014

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Sep 3, 2014

cool!

@alanxz would you be willing to make a release? I am using the amqp-* tools in the Debian Continuous Integration project, and I would like to have this code in Debian testing before we freeze in November to make sure it gets into the next stable.

terceiro commented Sep 3, 2014

cool!

@alanxz would you be willing to make a release? I am using the amqp-* tools in the Debian Continuous Integration project, and I would like to have this code in Debian testing before we freeze in November to make sure it gets into the next stable.

@alanxz

This comment has been minimized.

Show comment
Hide comment
@alanxz

alanxz Sep 3, 2014

Owner

I would consider it. When would you need me to create a rabbitmq-c release by to make your freeze deadline in November?

Owner

alanxz commented Sep 3, 2014

I would consider it. When would you need me to create a rabbitmq-c release by to make your freeze deadline in November?

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Sep 3, 2014

Hi, thanks for your cooperation. :-)

I will still need to poke the maintainer to do the update, so the sooner we have a release the better.

If you intend to release only what's currenly in master, it should be trivial, but if there are any API or ABI changes it will be a lot more difficult to update 2 months before the freeze since reverse dependencies need to checked etc.

terceiro commented Sep 3, 2014

Hi, thanks for your cooperation. :-)

I will still need to poke the maintainer to do the update, so the sooner we have a release the better.

If you intend to release only what's currenly in master, it should be trivial, but if there are any API or ABI changes it will be a lot more difficult to update 2 months before the freeze since reverse dependencies need to checked etc.

@alanxz

This comment has been minimized.

Show comment
Hide comment
@alanxz

alanxz Sep 3, 2014

Owner

The two things I would hope to roll into whatever my next release would be:

  • #196 (Implement Auth Failure) - which isn't an API/ABI changer
  • #179 (Implement EXTERNAL SASL method) - which would add a #define, but otherwise wouldn't change the API/ABI.

That said - not a big deal to bump these to the next 'release', not like I'm going to run out of version numbers 😉

Owner

alanxz commented Sep 3, 2014

The two things I would hope to roll into whatever my next release would be:

  • #196 (Implement Auth Failure) - which isn't an API/ABI changer
  • #179 (Implement EXTERNAL SASL method) - which would add a #define, but otherwise wouldn't change the API/ABI.

That said - not a big deal to bump these to the next 'release', not like I'm going to run out of version numbers 😉

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Sep 4, 2014

that would be nice, thanks. :)

terceiro commented Sep 4, 2014

that would be nice, thanks. :)

@alanxz

This comment has been minimized.

Show comment
Hide comment
@alanxz

alanxz Sep 14, 2014

Owner

I've released v0.5.2.

Owner

alanxz commented Sep 14, 2014

I've released v0.5.2.

@terceiro

This comment has been minimized.

Show comment
Hide comment
@terceiro

terceiro Sep 15, 2014

thanks! :-)

thanks! :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment