Permalink
Browse files

The rest of the skeleton for the Queues guide

  • Loading branch information...
1 parent 51d6d18 commit 1da53d3d085babfb8e6e8040d14f36624e7751b7 @michaelklishin michaelklishin committed Sep 21, 2012
Showing with 306 additions and 0 deletions.
  1. +306 −0 articles/queues.md
View
@@ -154,8 +154,314 @@ use the `langohr.queue/bind` function:
{% gist %}
+## Subscribing to receive messages ("push API")
+To set up a queue subscription to enable an application to receive messages as they arrive in a queue, one uses the `langohr.consumers/subscribe` function.
+Then when a message arrives, the message header (metadata) and body (payload) are passed to the handler:
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+Subscriptions for message delivery are usually referred to as <span class="note">consumers</span> in the AMQP 0.9.1 specification, client library documentation and books. Consumers last as long as the channel that they were declared on, or until client cancels them (unsubscribes).
+
+Consumers are identified by <span class="note">consumer tags</span>. If you need to obtain the consumer tag of a subscribed queue then use {% yard_link AMQP::Queue#consumer_tag %}.
+
+### Accessing message metadata
+
+The <span class="note">header</span> object in the example above provides access to message metadata and delivery information:
+
+ * Message content type
+ * Message content encoding
+ * Message routing key
+ * Message delivery mode (persistent or not)
+ * Consumer tag this delivery is for
+ * Delivery tag
+ * Message priority
+ * Whether or not message is redelivered
+ * Producer application id
+
+and so on. An example to demonstrate how to access some of those attributes:
+
+{% gist %}
+
+### Exclusive consumers
+
+Consumers can request exclusive access to the queue (meaning only this consumer can access the queue). This is useful when you want a long-lived shared queue to be temporarily accessible by just one application (or thread, or process). If the application employing the exclusive consumer crashes or loses the TCP connection to the broker, then the channel is closed and the exclusive consumer is cancelled.
+
+To exclusively receive messages from the queue, pass the ":exclusive" option to {% yard_link AMQP::Queue#subscribe %}:
+
+{% gist %}
+
+TBD: describe what happens when exclusivity property is violated and how to handle it.
+
+
+### Using multiple consumers per queue
+
+TBD
+
+
+### Cancelling a Consumer
+
+{% gist %}
+
+To cancel a particular consumer, use {% yard_link AMQP::Consumer#cancel %} method. To cancel a default queue consumer, use {% yard_link AMQP::Queue#unsubscribe %}.
+
+### Message acknowledgements
+
+Consumer applications — applications that receive and process messages ‚ may occasionally fail to process individual messages, or will just crash.
+There is also the possibility of network issues causing problems. This raises a question — "When should the AMQP broker remove messages from queues?"
+
+The AMQP 0.9.1 specification proposes two choices:
+
+ * After broker sends a message to an application (using either basic.deliver or basic.get-ok methods).
+ * After the application sends back an acknowledgement (using basic.ack AMQP method).
+
+The former choice is called the *automatic acknowledgement model*, while the latter is called the *explicit acknowledgement model*.
+With the explicit model, the application chooses when it is time to send an acknowledgement. It can be right after receiving a message,
+or after persisting it to a data store before processing, or after fully processing the message (for example, successfully fetching a Web page,
+processing and storing it into some persistent data store).
+
+!https://github.com/ruby-amqp/amqp/raw/master/docs/diagrams/006_amqp_091_message_acknowledgements.png!
+
+If a consumer dies without sending an acknowledgement, the AMQP broker will redeliver it to another consumer, or, if none are available at the time,
+the broker will wait until at least one consumer is registered for the same queue before attempting redelivery.
+
+The acknowledgement model is chosen when a new consumer is registered for a queue. By default, `langohr.consumers/subscribe` will use the *automatic* model.
+To switch to the *explicit* model, the `:ack` option should be used:
+
+{% gist %}
+
+To demonstrate how redelivery works, let us have a look at the following code example:
+
+{% gist %}
+
+So what is going on here? This example uses three AMQP connections to imitate three applications, one producer and two consumers.
+Each AMQP connection opens a single channel:
+
+{% gist %}
+
+The consumers share a queue and the producer publishes messages to the queue periodically using an <span class="note">`amq.direct`</span> exchange.
+Both "applications" subscribe to receive messages using the explicit acknowledgement model. The AMQP broker by default will send each message to
+the next consumer in sequence (this kind of load balancing is known as *round-robin*). This means that some messages will be delivered
+to consumer #1 and some to consumer #2.
+
+{% gist %}
+
+To demonstrate message redelivery we make consumer #1 randomly select which messages to acknowledge. After 4 seconds we disconnect it (to imitate a crash).
+When that happens, the AMQP broker redelivers unacknowledged messages to consumer #2 which acknowledges them unconditionally. After 10 seconds, this example
+closes all outstanding connections and exits.
+
+An extract of output produced by this example:
+
+{% gist %}
+
+As we can see, consumer #1 did not acknowledge three messages (labelled 1, 2 and 5):
+
+{% gist %}
+
+and then, once consumer #1 had "crashed", those messages were immediately redelivered to the consumer #2:
+
+{% gist %}
+
+To acknowledge a message use `langohr.basic/ack`:
+
+{% gist %}
+
+`langohr.basic/ack` takes two arguments: message *delivery tag* and a flag that indicates whether or not we want to acknowledge multiple messages at once.
+Delivery tag is simply a channel-specific increasing number that the server uses to identify deliveries.
+
+When acknowledging multiple messages at once, the delivery tag is treated as "up to and including". For example, if delivery tag = 5 that would mean "acknowledge messages 1, 2, 3, 4 and 5".
+
+<p class="alert alert-error">Acknowledgements are channel-specific. Applications must not receive messages on one channel and acknowledge them on another.</p>
+
+<p class="alert alert-error">A message MUST not be acknowledged more than once. Doing so will result in a channel-level exception (PRECONDITION_FAILED) with an error message like this: "PRECONDITION_FAILED - unknown delivery tag"</p>
+
+### Rejecting messages
+
+When a consumer application receives a message, processing of that message may or may not succeed. An application can indicate to the broker that message
+processing has failed (or cannot be accomplished at the time) by rejecting a message. When rejecting a message, an application can ask the broker to discard or requeue it.
+
+To reject a message use the `langohr.basic/reject` method:
+
+{% gist %}
+
+in the example above, messages are rejected without requeueing (broker will simply discard them). To requeue a rejected message, use the second argument
+that `langohr.basic/reject` takes:
+
+{% gist %}
+
+### Negative acknowledgements
+
+Messages are rejected with the <span class="note">`basic.reject`</span> AMQP method. There is one limitation that `basic.reject` has:
+there is no way to reject multiple messages, as you can do with acknowledgements. However, if you are using [RabbitMQ](http://rabbitmq.com), then there is a solution.
+RabbitMQ provides an AMQP 0.9.1 extension known as [negative acknowledgements](http://www.rabbitmq.com/extensions.html#negative-acknowledgements) (nacks) and
+Langohr supports this extension. For more information, please refer to the [RabbitMQ Extensions guide](/articles/rabbitmq_extensions.html).
+
+### QoS — Prefetching messages
+
+For cases when multiple consumers share a queue, it is useful to be able to specify how many messages each consumer can be sent at once before sending the next acknowledgement.
+This can be used as a simple load balancing technique to improve throughput if messages tend to be published in batches. For example, if a producing application
+sends messages every minute because of the nature of the work it is doing.
+
+Imagine a website that takes data from social media sources like Twitter or Facebook during the Champions League final (or the Superbowl),
+and then calculates how many tweets mention a particular team during the last minute. The site could be structured as 3 applications:
+
+ * A crawler that uses streaming APIs to fetch tweets/statuses, normalizes them and sends them in JSON for processing by other applications ("app A").
+ * A calculator that detects what team is mentioned in a message, updates statistics and pushes an update to the Web UI once a minute ("app B").
+ * A Web UI that fans visit to see the stats ("app C").
+
+In this imaginary example, the "tweets per second" rate will vary, but to improve the throughput of the system and to decrease the maximum number of messages
+that the AMQP broker has to hold in memory at once, applications can be designed in such a way that application "app B", the "calculator",
+receives 5000 messages and then acknowledges them all at once. The broker will not send message 5001 unless it receives an acknowledgement.
+
+In AMQP parlance this is know as *QoS* or *message prefetching*. Prefetching is configured on a per-channel (typically) or per-connection (rarely used) basis.
+To configure prefetching per channel, use the {AMQP::Channel#prefetch} method. Let us return to the example we used in the "Message acknowledgements" section:
+
+{% gist %}
+
+In that example, one consumer prefetches three messages and another consumer prefetches just one. If we take a look at the output that the example produces, we will see that `consumer1` fetched four messages and acknowledged one. After that, all subsequent messages were delivered to `consumer2`:
+
+<code>[consumer2] Received Message #0, redelivered = false, ack-ed
+[consumer1] Got message #1, SKIPPED
+[consumer1] Got message #2, SKIPPED
+[consumer1] Got message #3, ack-ed
+[consumer2] Received Message #4, redelivered = false, ack-ed
+[consumer1] Got message #5, SKIPPED
+---
+ by now consumer 1 has received three messages it did not acknowledge.
+ With prefetch = 3, AMQP broker will not send it any more messages until consumer 1 sends an ack
+---
+[consumer2] Received Message #6, redelivered = false, ack-ed
+[consumer2] Received Message #7, redelivered = false, ack-ed
+[consumer2] Received Message #8, redelivered = false, ack-ed
+[consumer2] Received Message #9, redelivered = false, ack-ed
+[consumer2] Received Message #10, redelivered = false, ack-ed
+[consumer2] Received Message #11, redelivered = false, ack-ed
+</code>
+
+<span class="alert alert-error">The prefetching setting is ignored for consumers that do not use explicit acknowledgements.</span>
+
+
+## How message acknowledgements relate to transactions and Publisher Confirms
+
+In cases where you cannot afford to lose a single message, AMQP 0.9.1 applications can use one or a combination of the following protocol features:
+
+ * Publisher confirms (a RabbitMQ-specific extension to AMQP 0.9.1)
+ * Publishing messages as immediate
+ * Transactions (noticeable overhead)
+
+This topic is covered in depth in the [Working With Exchanges](/articles/exchanges.html) guide. In this guide, we will only mention how
+message acknowledgements are related to AMQP transactions and the Publisher Confirms extension.
+
+Let us consider a publisher application (P) that communications with a consumer (C) using AMQP 0.9.1. Their communication can be graphically represented like this:
+
+<code>
+----- ----- -----
+| | S1 | | S2 | |
+| P | ====> | B | ====> | C |
+| | | | | |
+----- ----- -----
+</code>
+
+We have two network segments, S1 and S2. Each of them may fail. P is concerned with making sure that messages cross S1, while broker (B) and C are concerned with ensuring
+that messages cross S2 and are only removed from the queue when they are processed successfully.
+
+Message acknowledgements cover reliable delivery over S2 as well as successful processing. For S1, P has to use transactions (a heavyweight solution) or the more lightweight
+Publisher Confirms RabbitMQ extension.
+
+
+## Fetching messages when needed ("pull API")
+
+The AMQP 0.9.1 specification also provides a way for applications to fetch (pull) messages from the queue only when necessary.
+For that, use the `langohr.basic/get` function:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+If the queue is empty, then ... will be nil, otherwise ....
+
+## Unsubscribing From Messages
+
+Sometimes it is necessary to unsubscribe from messages without deleting a queue. To do that, use the {% yard_link AMQP::Queue#unsubscribe %} method:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+In AMQP parlance, unsubscribing from messages is often referred to as "cancelling a consumer". Once a consumer is cancelled, messages will
+no longer be delivered to it, however, due to the asynchronous nature of the protocol, it is possible for "in flight" messages to be received
+after this call completes.
+
+Fetching messages with `langohr.basic/get` is still possible even after a consumer is cancelled.
+
+
+## Unbinding Queues From Exchanges
+
+To unbind a queue from an exchange use the `langohr.queue/unbind` function:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+Note that trying to unbind a queue from an exchange that the queue was never bound to will result in a channel-level exception.
+
+## Querying the Number of Messages in a Queue
+
+It is possible to query the number of messages sitting in the queue by declaring the queue with the ":passive" attribute set.
+The response (`queue.declare-ok` AMQP method) will include the number of messages along with other attributes. However, the amqp gem provides
+a convenience function `langohr.queue/status`:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+## Querying the Number of Consumers On a Queue
+
+It is possible to query the number of consumers on a queue by declaring the queue with the ":passive" attribute set. The response (`queue.declare-ok` AMQP method)
+will include the number of consumers along with other attributes. However, the amqp gem provides a convenience function `langohr.queue/status`:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+## Purging queues
+
+It is possible to purge a queue (remove all of the messages from it) using the `langohr.queues/purge` function:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+Note that this example purges a newly declared queue with a unique server-generated name. When a queue is declared, it is empty,
+so for server-named queues, there is no need to purge them before they are used.
+
+## Deleting queues
+
+To delete a queue, use the `langohr.queue/delete` function:
+
+{% gist %}
+
+The same example in context:
+
+{% gist %}
+
+When a queue is deleted, all of the messages in it are deleted as well.
## Wrapping Up

0 comments on commit 1da53d3

Please sign in to comment.