- Model Architecture
- Command Architecture
- Transport Architecture
- Client Architecture
- Functional Specifications
- Technical Specifications
This document describes Haigha, a client for AMQP servers. AMQP is a messaging protocol which can be used to route large volumes of data across a wide network of application servers. The document covers the design, implementation and usage of Haigha to support fast, event-driven Python applications using AMQP. It should provide sufficient specifications for an engineer to integrate haigha into their applications.
This document is divided into chapters according to the layout of the AMQP 0.9.1 specification [PDF].
- Overview Read this for a general introduction
- Architecture The architecture of haigha code base
- Functional Specifications How applications work with haigha
- Technical Specifications How haigha transport layer is implemented
TODO Write conventions on common terms and usage
This section describes the semantics of haigha to integrate with AMQP servers
The AMQP protocol divides the tasks of message routing and delivery between two distinct objects:
- Exchange, to which messages are written
- Queue, to which messages are routed and stored for consumption by clients
To connect an exchange and a queue, a binding is defined. When a message is published to an exchange, a route is supplied which is compared against the binding to determine delivery.
To manage the stateful connection to a broker for both the publishing and consuming of messages, the following entities are defined:
- Connection The authenticated socket connection between a client and broker on a specific vhost
- Channel TODO how to even describe this
The message queue is the final destination of any published message, and it is the location from which a client will consume messages. Each queue with a binding to an exchange for which a message was published with a matching routing key will receive a copy of a message .
Haigha implements queue declaration and deletion, in the QueueClass.
The exchange accepts messages from applications. There are several different exchange types, the standard ones defined in the specification and possibly some additional ones supplied by your broker. The common types of exchanges are:
- direct The routing key and binding key must exactly match
- topic The routing key must match the pattern defined by the binding keu
- fanout All queues will receive a copy of the message.
Haigha implements exchange declaration and deletion in the ExchangeClass.
After an exchange and a queue have been declared, one or more bindings can be defined between them. It is possible for a single queue to be bound to multiple exchanges, or a shared queue can be used to distribute messages among a pool of consumers.
Shared queues are the standard point-to-point queue, useful for distributing messages among consumers. It assumes a Connection is initiated to
connection and that the user has the method
application_consumer defined to receive messages.
ch = connection.channel() ch.exchange.declare('an_exchange', 'direct') ch.queue.declare('a_queue') ch.queue.bind('a_queue', 'an_exchange', routing_key='route') ch.basic.consume('a_queue', application_consumer)
Handling replies, or receiving consumer-targetted messages, is a common use case for creating exclusive queues for a process. In this example, we'll let the broker assign the queue name and use callbacks to set up a consumer after the server has replied.
ch = connection.channel() ch.exchange.declare('reply', 'direct') ch.queue.declare(exclusive=True,cb=lambda queue,messages,consumers: \ ch.queue.bind(queue, 'reply', route=queue)
By convention, we'll now use a
reply-to header in our messages when this consumer requests data from another consumer, so that the reply can be routed using the appropriate binding key.
Topic routing forms the basis of pub-sub models. When combined with a shared queue semantics, it allows for AMQP to be used as a powerful routing engine across a large pool of varied applications.
ch = connection.channel() ch.exchange.declare('pub', 'topic') ch.queue.declare('stock.usd') ch.queue.bind('stock.usd', 'pub', routing_key='stock.usd.*')
This section describes how haigha talks to the broker.
The AMQP protocol divides its commands among classes of functionality. The ProtocolClass defines the base class for each of these, with each class of functionality defined in a subclass such as QueueClass, ExchangeClass, etc, for each of the AMQP protocol classes
[basic, channel, exchange, queue, transaction]. These are exposed in the Channel as properties as shown in the examples above.
The protocol also separates commands between synchronous and asynchronous. In all cases[#]_, if an operation is (optionally) synchronous it will support a
cb= keyword argument. Many methods support both synchronous and asynchronous behavior; haigha always defaults to asynchronous operation when available through the
nowait=True keyword argument, and automatically switches to synchronous mode if an application callback is supplied.
Commands are further identified as originating from the client, server or either. As haigha is a client library, it only supports those commands which can be initiated by the client. With the exception of publishing, these commands are available soley in the respective ProtocolClass to which the command belongs. For convenience, the Channel exposes two publishing methods,
publish_synchronous, as well as
close. All methods of a ProtocolClass which handle server-originated messages are named beginning with the string
The mapping of classes and commands has already been described via the ProtocolClass and its implementations. Each method is responsible for constructing the frame(s) necessary to implement the command, and the user should never have to worry about constructing frames by hand.
The Connection class manages the state of the AMQP connection. The life-cycle is:
- User creates a new Connection object, setting the configuration through keyword params (TODO document).
- A ConnectionStrategy is created and a blocking TCP connection is initiated to the broker.
- After a socket connection is created, it is set to non-blocking mode.
- The Connection sends a protocol header defining specification 0.9.1.
- The ConnectionChannel, id
0, receives the
startcommand and replies with
- If authorized, the server responds with the
securecommand, to which ConnectionChannel responds with
open. If not authorized, the socket is immediately closed.
- The server responds with
open-okand any pending frames are flushed.
- At any time, the client or server may send or reply with
tune-okrespectively to negotiate frame size or channel limits.
- The connection is available for the application.
- The server sends a
closecommand, or client sends it by calling
- Peer acknowledges with
close-okand sock is disconnected.
The Connection class manages the state of the socket connection and the negotiation with the broker. It is also responsible for maintaining a buffer of both input and output frames. The output buffer is used during the initialization of the connection, so that it can be used immediately by the application.
connection = Connection() channel = connection.channel()
In this example, the channel will be negotiated immediately following the receipt of the
open-ok command in the ConnectionChannel.
- User creates a Channel by calling
connection.channel. The channel is enumerated, and references to existing channels can be fetched by id.
- The Channel initializes all supported protocol classes and internal buffers.
- The channel immediate sends the
- The server responds with
- The channel is available for the application.
- The server sends a
closecommand, or the client sends it by calling
- Peer acknowledges with
close-okand the channel is closed. All future use will raise a
The AMQP protocol isolates all synchronous and asynchronous transactions per channel. The Channel class implements this behavior by maintaining a buffer of pending outbound frames. If the buffer is empty, a frame is immediately forwarded to the Connection, else it's appended to the end. When a synchronous method is called by the user, after all frames have been sent or queued, a callback is appended to the buffer.
When a command is received from the broker, the dispatch will find the appropriate haigha method and if that method is at the front of the buffer, will pop it off. All remaining frames are then flushed until the buffer is empty, or the first item is another pending synchronous callback. This solution implements a very lightweight system for reliably managing multiple outstanding synchronous calls in an asynchronous dispatch loop. The user is free to interact with AMQP without worrying about whether a method is synchronous or not .
When receiving frames, the Connection first queues frames to each channel via
channel.buffer_frame(). It then iterates over all channels for which a frame was queued and calls
channel.process_frames(). In most cases, an AMQP command is isolated to one frame, but in the case of messages, the content may be split across multiple frames. In the situation where not all content frames have been received yet, the BasicClass will raise a
ProtocolClass.FrameUnderflow exception and re-buffer any message frames on the channel. When the next frame arrives for the channel, the process will repeat, until all frames have arrived and the message is complete.
The ExchangeClass is used to declare and delete exchanges.
All methods of ExchangeClass are optionally synchronous and can callback to user code.
TODO say something more
The QueueClass is used to declare, delete, bind and purge queues.
All methods of QueueClass are optionally or permanently synchronous and can callback to user code.
TODO say something more
The BasicClass is used to publish messages, manage consumers, handle message delivery, acknolwedge receipts, and synchronously fetch messages.
TODO say something more
The TransactionClass is used to setup and use server-side transaction isolation. The life-cycle is:
- User calls
selectcommand to the server.
- Server replies with
select-okand the channel is permanently in transaction mode.
- The application publishes or acknowledges messages.
- The application commits or rolls-back the publish or acknowledge commands through
All methos of the TransactionClass are synchronous and can callback to application code.
This section describes how haigha implements the wire-level protocol.
AMQP is a frame-oriented protocol and haigha is designed around this in every respect.
The Connection class implements an EventSocket callback which will call
connection._read_frames(). It will take the current buffer on the socket, place it in a Reader object, and pass that to the
read_frames() method of the Frame class. The reader acts as both a stream object, with methods such as
tell(), as well as an implementation of the basic data types in AMQP.
For each frame read, the connection will queue the frame on to the channel specified in the frame, for later processing. If the input buffer contains a partial frame, a
Reader.BufferUnderflow exception will be raised and
Frame.read_frames() will exit, leaving the reader positioned at the end of the last full frame (or beginning of the buffer). The connection will re-buffer any pending data on the socket and wait for the next callback to attempt to read frames from the byte stream.
To send frames, each command implemented by a ProtocolClass will construct a Writer object which is used to format the arguments for that command. It then constructs a subclass of Frame, usually a MethodFrame, and writes that to the channel to which the protocol class is bound.
AMQP defines several data types which form the basis of all frames. One of these data types, tables (i.e. dicts), supports the basic types in addition to a few others. There is disagreement on official versus supported types in tables, as well as subtle differences in the encoding of some types. Haigha is written to conform to the errata implemented in RabbitMQ.
The implementation of the data types is in both the Reader and Writer_ classes. When converting from Python to AMQP data types when serializing tables, the Writer assumes that all floats are double-precision, converts unicode to utf8 strings, and intelligently packs integers according to their required byte-width.
AMQP defines two classes of exceptions for error handling. Operational errors, such as invalid queue names, will close a channel. Structural errors, such as invalid or out-of-order frames, will result in a connection closure.
Because haigha is asynchronous, handlers must be defined to receive notification when a connection or channel are closed . The closed state will be saved on the respective connection or channel, and accessible via the
close_info property. This will always return a dictionary with the following fields defined:
- reply_code The 3 digit error code
- reply_text The text of the error message
- class_id The class id of the offending command
- method_id The method id of the offending command
When closing due to an error on the client side, these same parameters can be supplied to
Haigha's client architecture closely matches AMQP's recommended abstraction layers.
The framing layer is shared across a number of different classes.
- Connection Manages input byte buffer, calls into frame reader, and writes frames to the socket
Frame Implements frame reading, calls into frame implementations for further decoding, subclasses implement
- Channel Implements input frame buffer, dispatch to protocol classes, and interfaces for sending frames
The connection management is handled primarily by the Connection class. The AMQP specification suggests that this layer may also be responsible for sending content, but that is handled in the frame buffering implementation of Channel and the specific implementation of BasicClass.
The primary API of haigha are the methods exposed through the subclasses of ProtocolClass and which are made available in the afore-mentioned per-channel properties that map to the classes of AMQP protocol messages,
[basic, channel, exchange, queue, transaction]. Additional APIs of which the user should be aware:
- ChannelPool Transaction-based publishing for guaranteed delivery and high-throughput
TODO Document other features that the client implements.
Messages are created with the Message class and sent via one of several publishing methods.
channel.basic.publishThe "standard" publish which is the publish command exposed by the BasicClass.
channel.publishA convenience method that aliases
channel.publish_synchronousA wrapper around
transaction.commit. A callback argument will be called when the server acknowledges
channelpool.publishPublish using a pool of transaction-isolated channels. Will create a new channel if none are free. A callback argument will be called when the server acknowledges transaction commit.
The preferred mechanism for reading messages from an AMQP queue is to register a consumer via
basic.consume call. This will register a Python function to be called each time the client receives a message from a queue.
|||Your broker may support other types of exchanges, such as a deliver-once exchange.|
|||All synchronous methods will support callbacks by 0.4.0.|
|||Synchronous methods have more overhead, so some awareness and caution is recommended.|
|||Channel close callbacks will be supported by 0.4.0.|