Pure TCL RabbitMQ Library implementing AMQP 0.9.1
This library is completely asynchronous and makes no blocking calls. It relies on TclOO and requires Tcl 8.6, but has no other dependencies (other than a RabbitMQ server).
Developed for use within FlightAware (https://flightaware.com).
The package directory contains a Makefile for installing globally. By
default the Makefile installs to /usr/local/lib
, so this will need editing
if an alternative directory is required.
There are two primary classes required for using the library.
- Connection
The Connection class is used for initiating initial communication with the RabbitMQ server. It also relies on a subsidiary Login class, which is used for specifying username, password, vhost and authentication mechanism. Out of the box, this library only supports the PLAIN SASL mechanism. It can be easily extended to support an additional mechanism if required.
package require rmq
# Arguments: -user -pass -vhost
# All optional and shown with their defaults
set login [Login new -user "guest" -pass "guest" -vhost "/"]
# Pass the login object created above to the Connection
# constructor
# -host and -port are shown with their default values
set conn [Connection new -host localhost -port 5672 -login $login]
# Set a callback for when the connection is ready to use
# which will be passed the connection object
$conn onConnected rmq_conn_ready
proc rmq_conn_ready {conn} {
puts "Connection ready!"
$conn connectionClose
}
# Set a callback for when the connection is closed
$conn onClosed rmq_conn_closed
proc rmq_conn_closed {conn} {
puts "Connection closed!"
}
# Initiate the connection handshake and enter the event loop
$conn connect
vwait die
- Channel
The Channel class is where most of the action happens. The vast majority of AMQP methods refer to a specific channel. After the Connection object has gone through the opening handshake and calls its onOpen handshake a Channel object can be created by passing
# Assume the following proc has been set as the Connection object's
# onOpen callback
proc rmq_conn_ready {conn} {
# Create a channel object
# If no channel number is specified, the
# next available will be chosen
set chan [Channel new $conn]
# Do something with the channel, like
# declare an exchange
set flags [list $::rmq::EXCHANGE_DURABLE]
$chan exchangeDeclare "test" "direct" $flags
}
Using this library for anything useful requires setting callbacks for the AMQP methods needed in the client application. Most callbacks will be set on Channel objects, but the Connection object supports a few as well.
All callbacks are passed the object they were set on as the first parameter. Depending on the AMQP method or object event, additional parameters are provided as appropriate.
Connection objects allow for the setting of the following callbacks:
-
onConnected: called when the AMQP connection handshake finishes and is passed the Connection object
-
onBlocked: called when the RabbitMQ server has blocked connections due to resource limitations. Callback is passed the Connection object, a boolean for whether the connection is blocked or not and a textual reason
-
onClosed: called when the connection is closed and is passed the Connection object and a dict containing any data such as a textual description of the error, the reply code, any reply text, the class ID and the method ID
-
onError: called when an error code has been sent to the Connection and is passed the error code and any accompanying data in the frame
-
onFailedReconnect: called when all reconnection attempts have been exhausted
package require rmq
# Arguments: username password vhost
set login [Login new -user "guest" -pass "guest" -vhost "/"]
# Pass the login object created above to the Connection
# constructor
set conn [Connection new -host localhost -port 5672 -login $login]
$conn onConnected rmq_connected
$conn onClosed rmq_closed
$conn onError rmq_connection_error
proc rmq_connected {rmqConn} {
# do useful things
}
proc rmq_closed {rmqConn closeD} {
# do other useful things
}
proc rmq_error {rmqConn frameType frameData} {
# do even more useful things
}
Channel objects have a few specific callbacks that can be set along with a more general callback mechanism for the majority of AMQP method calls.
The specific callbacks provided for Channel objects mirror those available for Connection objects. They are:
-
onOpen: called when the channel is open and ready to use, i.e., when the Channel.Open-Ok method is received from the RabbitMQ server and is passed the same arguments as the onConnected callback for Connection objects
-
onClose: called when the channel has been fully closed, i.e., when the Channel.Close-Ok method is received from the RabbitMQ server and is passed the same arguments as the onClosed callback for Connection objects
-
onError: called when the channel receives an error, i.e., a frame is received for the given channel but contains an AMQP error code and is passed the same arguments as the onError callback for Connection objects
Other than the above callbacks, a Channel object can be supplied a callback for every method that can be sent in response to an AMQP method by using the on method of Channel objects.
These callbacks are passed the Channel object they were set on unless otherwise specified in the full method documentation found below.
When specifying the name of the AMQP method the callback will be invoked on, start with a lowercase letter and use camel case. All AMQP methods documented in the RabbitMQ 0-9-1 extended specification are available.
# Asumming a channel object by name rmqChan exists
$rmqChan on exchangeDeclareOk exchange_declared
$rmqChan on queueDeclareOk queue_declared
$rmqChan on queueBindOk queue_bound
$rmqChan exchangeDeclare "the_best_exchange" "fanout"
vwait exchangeDelcared
$rmqChan queueDeclare "the_best_queue"
vwait queueDeclared
$rmqChan queueBind "the_best_queue" "the_best_exchange" "the_best_routing_key"
proc exchange_delcared {rmqChan} {
set ::exchangeDeclared 1
}
proc queue_declared {rmqChan} {
set ::queueDeclared 1
}
proc queue_bound {rmqChan} {
set ::queueBound 1
}
When consuming messages from a queue using either Basic.Consume or Basic.Get, the process of setting a callback and the data passed into the callback differs from every other case.
For consuming, the Channel object methods basicConsume and basicGet take the name of the callback invoked for each message delivered and then their arguments. The callbacks get passed in the Channel object, a dictionary of method data, a dictionary of frame data, and the data from the queue.
# Assuming a channel object by name rmqChan exists
$rmqChan basicConsume consume_callback "the_best_queue"
proc consume_callback {rmqChan methodD frameD data} {
# Can inspect the consumer tag and dispatch on it
switch [dict get $methodD consumerTag] {
# useful things
}
# Can get the delivery tag to ack the message
$rmqChan basicAck [dict get $methodD deliveryTag]
# Frame data includes things like the data body size
# and is likely less immediately useful but it is
# passed in because it might be necessary for a given
# application
}
For a given channel, multiple queues can be consumed from and each queue can be given its own callback proc by passing in (or allowing the server to generate) a distinct consumerTag for each invocation of basicConsume. Otherwise, dispatching based on the method or frame metadata allows a single callback proc to customize the handling of messages from different queues. When the client application is not constrained in its use of channels, instantiating multiple Channel objects is a straight-forward way for one consumer to concurrently pull data from more than one queue.
The dictionary of method data passed as the second argument to consumer callbacks contains the following items:
-
consumerTag
The string consumer tag, either specified at the time basicConsume is called, or auto-generated by the server.
-
deliveryTag
Integer numbering for the message being consumed. This is used for the basicAck or basicNack methods.
-
redelivered
Boolean integer.
-
exchange
Name of the exchange the message came from.
-
routingKey
Routing key used for delivery of the message.
The dictionary of frame data passed as the third argument to consumer callbacks contains the following items:
-
classID
AMQP defined integer for the class used for delivering the message.
-
bodySize
Size in bytes for the data consumed from the queue.
-
properties
Dictionary of AMQP Basic method properties, e.g., correlation-id, timestamp or content-type.
For AMQP methods like queueDeclare or exchangeDeclare which take flags, these are passed in as a list of
constants. All supported flags are mentioned in the documentation below detailing each Channel method.
Within the source, supported flag constants are found in constants.tcl.
For AMQP class methods which take properties and/or headers, e.g., basicConsume, basicPublish, or exchangeDeclare, the properties and headers are passed in as a Tcl dict. The library takes care of encoding them properly.
All methods defined for Connection, Login, and Channel classes are detailed below. Only includes methods that are part of the public interface for each object. Any additional methods found in the source are meant to be called internally.
Class for connecting to a RabbitMQ server.
The constructor takes the following arguments (all optional):
-
-host
Defaults to localhost
-
-port
Defaults to 5672
-
-tls
Either 0 or 1, but defaults to 0. Controls whether to connect to the RabbitMQ server using TLS. To set TLS options, e.g., if using a client cert, call the tlsOptions method before invoking connect.
-
-login
Login object. Defaults to calling the Login constructor with no arguments.
-
-frameMax
Maximum frame size in bytes. Defaults to the value offered by the RabbitMQ server in Connection.Tune.
-
-maxChannels
Maximum number of channels available for this connection. Defaults to no imposed limit, which is essentially 65,535.
-
-locale
Defaults to en_US.
-
-heartbeatSecs
Interval in seconds for sending out heartbeat frames. A value of 0 means no heartbeats will be sent.
-
-blockedConnections
Either 0 or 1, but defaults to 1. Controls whether to use this RabbitMQ extension.
-
-cancelNotifications
Either 0 or 1, but deafults to 1. Controls whether to use this RabbitMQ extension.
-
-maxTimeout
Integer seconds to wait before timing out the connection attempt to the server. Defaults to 3.
-
-autoReconnect
Either 0 or 1, but defaults to 1. Controls whether the library attempts to reconnect to the RabbitMQ server when the initial call to Connection.connect fails or an established socket connection is closed by the server or by network conditions.
-
-maxBackoff
Integer number of seconds past which exponential backoff, which is the reconnection strategy employed, will not go. Defaults to 64 seconds.
-
-maxReconnects
Integer number of reconnects to attempt before giving up. Defaults to 5. A value of 0 means infinite reconnects. To disable retries, pass -autoReconnect as 0.
-
-debug
Either 0 or 1, but defaults to 0. Controls whether or not debug statements are printed to stderr detailing the operations of the library.
Takes no arguments. Using the -maxBackoff and -maxReconnects constructor arguments, attempts to reconnect to the server. If this cannot be done, and an onFailedReconnect callback has been set, it is invoked.
Takes an optional boolean argument controlling whether the onClose callback is invoked (defaults to true). Closes the connection and, if specified, calls any callback set with onClose.
Takes no arguments. Actually initiates a socket connection with the RabbitMQ server. If the connection fails the onClose callback is invoked. In the future, this will be amended to support auto-reconnect and the invocation of the onError callback instead.
Takes no arguments. Returns 0 or 1 depending on whether the socket connection to the server has been established. This does not indicate whether or not the AMQP connection handshake has been completed (that is indicated by the invocation of the onConnected callback). This method is available for detecting an inability to establish a network connection.
Takes no arguments. Returns the socket object for communicating with the server. This allows for more fine-grained inspection and tuning if so desired.
Takes the name of a callback proc which will be used for blocked connection notifications. Blocked connection notifications are always requested by this library, but the setting of a callback is optional. The callback takes the Connection object, a boolean for whether the connection is blocked (this callback is also used when the connection is no longer blocked), and a textual reason why.
Takes the name of a callback proc which will be called when the connection is closed. This includes a failed connection to the RabbitMQ server when first calling connect and a disconnection after establishing communication with the RabbitMQ server. The callback takes the Connection object and a dictionary of data specified in the section above about callbacks.
Alias for onClose method.
Takes the name of a callback proc which will be used when the AMQP handshake is finished. When this callback is invoked, the Connection object is ready to create channels and perform useful work.
Takes the name of a callback proc used when an error is reported by the RabbitMQ server on the connection level. The callback proc takes the Connection object, a frame type and any extra data included in the frame.
Takes the name of a callback proc used when the maximum number of connection attempts have been made without sucess. The callback proc takes the Connection object.
Takes an optional boolean channelsToo, which defaults to 0. Unsets all callbacks for the Connection object. If channelsToo is 1, also unsets callbacks on all of its channels.
Used to setup the parameters for an SSL / TLS connection to the RabbitMQ server.
Supports all arguments supported by the Tcl tls package's ::tls::import::
command
as specified in the Tcl TLS documentation.
If a TLS connection is desired, this method needs to be called before connect.
The constructor takes the following arguments (all optional):
-
-user
Username to login with. Defaults to guest
-
-pass
Password to login with. Defaults to guest
-
-mechanism
Authentication mechanism to use. Defaults to PLAIN
-
-vhost
Virtual host to login to. Defaults to /
Takes no arguments. This method needs to overridden if an alternative mechanism is desired.
Most of the methods made available by this library come from the Channel class. It implements the majority of the AMQP methods.
Takes the following arguments:
-
connectionObj
The Connection object to open a channel for. This is the only required argument.
-
channelNum
The channel number to open. Optional. If not specified, the next available number starting from 1 will be used. Passing in an empty string or 0 is equivalent to not providing this argument, i.e., the class will pick the next available channel number for the Connection object provided.
-
shouldOpen
A boolean argument that defaults to 1. If set to 1 the channel will open after it is created. If not, the channelOpen method must be called manually before anything can be done with the Channel object.
Takes no arguments and returns 1 if the channel is active, i.e., it has been opened successfully, and 0 otherwise.
Takes no arguments and closes the channel. If an onClose callback has been specified it is called with the Channel object and a dictionary containing the following keys and values (all keys will be included even if their value is empty):
-
data
Any data sent back from the server when closing. Usually empty.
-
replyCode
The numeric reply code sent from the server.
-
replyText
Textual description of the reply code. Useful for troubleshooting purposes.
-
classID
Numeric class ID for the class responsible for the closing if appropriate.
-
methodID
Numeric method ID for the method responsible for the closing if appropriate.
Takes an optional boolean argument, callCloseCB, which defaults to 1. Closes the associated Connection object and if callCloseCB is true, any callback set with onClose is invoked, otherwise it is ignored.
Takes no arguments and returns 1 if the Channel is in the process of closing and 0 otherwise.
Takes no arguments, and returns the channel number.
Takes no arguments, and returns the Connection object passed into the constructor.
Alias for active?.
Takes an AMQP method name in camel case, starting with a lower case letter and the name of a callback proc for the method. To unset a callback, set its callback proc to the empty string or use removeCallback.
Takes the name of a callback proc to be called when the channel is closed. The callback takes the Channel object and a dictionary of data, which is specified in the README section about callbacks and in the closeChannel method description above.
Alias for onClose.
Takes the name of a callback proc invoked when an error occurs on this particular Channel object. The error callback is passed the Channel object, a numeric error code as returned from the server, and any additional data passed back. Errors occur on a channel when the server returns an unexpected response but not when a disconnection occurs or the channel is closed forcefully by the server.
Takes the name of a callback proc to be called when the channel successfully opens. Once it is open, AMQP methods can be called. The callback takes the Channel object.
Alias for onOpen.
Takes no arguments. Returns 1 if Connection is in the process of attempting a reconnect and 0 otherwise.
Takes the name of an AMQP method as defined on a Channel object.
Takes no arguments. Sets all callbacks to the empty string, effectively removing them.
Takes the name of an AMQP method as defined on a Channel object (or for the on Channel method). The preferred method to use is on, but this is alternative method for setting a callback. To unset a callback, set its callback proc to the empty string or use removeCallback.
The following methods are defined on Channel objects and implement the methods and classes detailed in the AMQP specification.
Takes the following arguments:
-
data
String of data potentially transferred during closing.
-
replyCode
Numeric reply code for closing the channel as specified in the AMQP specification.
-
replyText
Textual description of the reply code.
-
classID
AMQP class ID number.
-
methodID
AMQP method ID number.
To place a callback for the closing of a channel, use the onClose or onClosed method. The callback takes the Channel object and a dictionary of data describing the reason for the closing.
Takes no arguments.
To place a callback for the opening of a channel use the onOpen method. The callback takes only the Channel object.
Takes the following arguments:
-
dst
Destination exchange name.
-
src
Source exchange name.
-
rKey
Routing key for the exchange binding.
-
noWait
Boolean integer, which defaults to 0.
-
eArgs
Exchange binding arguments (optional). Passed in as a dict. Defaults to an empty dict.
To set a callback for exchange to exchange bindings use the on method with exchangeBindOk as the first argument. Callback only takes the Channel object.
Takes the following arguments:
-
eName
Exchange name.
-
eType
Exchange type: direct, fanout, header, topic
-
eFlags
Optional flags. Flags supported (all in the ::rmq namespace):
-
EXCHANGE_PASSIVE
-
EXCHANGE_DURABLE
-
EXCHANGE_AUTO_DELETE
-
EXCHANGE_INTERNAL
-
EXCHANGE_NO_WAIT
-
-
eArgs
Optional dict of exchange declare arguments.
To set a callback on an exchange declaration, use the on method with exchangeDeclareOk as the first argument. Callback only takes the Channel object.
Takes the following arguments:
-
eName
Exchange name to delete.
-
inUse
Optional boolean argument defaults to 0. If set to 1, will not delete an exchange with bindings on it.
-
noWait
Optional boolean argument defaults to 0.
To set a callback on the exchange deletion, use the on method with exchangeDeleteOk as the first argument. Callback only takes the Channel object.
Takes the same arguments as exchangeBind, with the same callback data.
Takes the following arguments:
-
qName
Queue name.
-
eName
Exchange name.
-
rKey
Routing key (optional). Defaults to the empty string.
-
noWait
Boolean integer (optional). Defaults to 0.
-
qArgs
Queue binding arguments (optional). Needs to be passed in as a dict. Defaults to an empty dict.
To set a callback on a queue binding, use the on method with queueBindOk as the first argument. Callback only takes the Channel object.
Takes the following arguments:
-
qName
Queue name.
-
qFlags
Optional list of queue declare flags. Supports the following flag constants (in the ::rmq namespace):
-
QUEUE_PASSIVE
-
QUEUE_DURABLE
-
QUEUE_EXCLUSIVE
-
QUEUE_AUTO_DELETE
-
QUEUE_DECLARE_NO_WAIT
-
To set a callback on a queue declare, use the on method with queueDeclareOk as the first argument. Callback takes the Channel object, the queue name (especially important for exclusive queues), message count, number of consumers on the queue.
Takes the following arguments:
-
qName
Queue name.
-
flags
Optional list of flags. Supported flags (in the ::rmq namespace):
-
QUEUE_IF_UNUSED
-
QUEUE_IF_EMPTY
-
QUEUE_DELETE_NO_WAIT
-
To set a callback on a queue delete, use the on method with queueDeleteOk as the first argument. Callback takes the Channel object and a message count from the delete queue.
Takes the following arguments:
-
qName
Queue name.
-
noWait
Optional boolean argument. Defaults to 0.
To set a callback on a queue purge, use the on method with queuePurgeOk as the first argument. Callback takes the Channel object and a message count from the purged queue.
Takes the following arguments:
-
qName
Queue name.
-
eName
Exchange name.
-
rKey
Routing key.
-
qArgs
Optional queue arguments. Passed in as a dict.
To set a callback on a queue unbinding, use the on method with queueUnbindOk as the first argument. Callback takes only the Channel object.
Takes the following arguments:
-
deliveryTag
Delivery tag being acknowledged.
-
multiple
Optional boolean, defaults to 0. If set to 1, all messages up to and including the deliveryTag argument's value.
Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.
Takes the following arguments:
-
cTag
Consumer tag.
-
noWait
Optional boolean argument. Defaults to 0.
To set a callback on a basic cancel, use the on method with basicCancelOk as the first argument. Callback takes the Channel object and the consumer tag that was canceled.
Takes the following arguments:
-
callback
Name of a callback to use for consuming messages. The callback takes the Channel object, a dict of method data, a dict of frame data and the data from the queue.
-
qName
Queue name to consume from.
-
cTag
Optional consumer tag.
-
cFlags
Optional list of flags. Supported flags (all in the ::rmq namespace):
-
CONSUME_NO_LOCAL
-
CONSUME_NO_ACK
-
CONSUME_EXCLUSIVE
-
CONSUME_NO_WAIT
-
-
cArgs
Optional arguments to control consuming. Passed in as a dict. Supports all arguments specified for the basic class.
Callback is set directly from this method.
Takes the following arguments:
-
callback
Name of a callback proc using the same arguments as that for basicConsume.
-
qName
Queue name to get a message from
-
noWait
Optional boolean. Defaults to 0.
Like with basicConsume the callback for this method is set directly from the method call.
Takes the following arguments:
-
deliveryTag
Delivery tag for message being nack'ed.
-
nackFlags
Optional list of flags. Supports the following (in the ::rmq namespace):
-
NACK_MULTIPLE
-
NACK_REQUEUE
-
Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.
Takes the following arguments:
-
prefetchCount
Integer prefetch count, i.e., the number of unacknowledged messages that can be delivered to a consumer at one time.
-
globalQos
Optional boolean which defaults to 0. If set to 1, the prefecth count is set globally for all consumers on the channel.
To set a callback on a basic QOS call, use the on method with basicQosOk as the first argument. Callback takes only the Channel object.
Takes the following arguments:
-
data
The data to publish to the queue.
-
eName
Exchange name.
-
rKey
Routing key.
-
pFlags
Optional list of flags. Supports the following flags (in the ::rmq namespace):
-
PUBLISH_MANDATORY
-
PUBLISH_IMMEDIATE
-
No callback can be set on this directly. For publisher confirms use the on method with basicAck as the first argument. That callback takes the Channel object, the delivery tag and a boolean for whether the ack is for multiple messages.
Same as basicRecoverAsync.
Takes the following arguments:
-
noWait
Optional boolean argument, defaults to 0.
To set a callback on a confirm select call, use the on method with confirmSelectOk as the first argument. Callback takes the Channel object.
Takes the following arguments:
-
reQueue
Boolean argument. If 0, the message will be redelivered to the original recipient. If 1, an alternate recipient can get the redelivery.
To set a callback on a basic recover, use the on method with basicRecoverOk as the first argument. Callback takes the Channel object.
Takes the following arguments:
-
deliveryTag
Delivery tag of message being rejected by the client.
-
reQueue
Optional boolean argument, defaults to 0. If set to 1, the rejected message will be requeued.
This method is not to be called directly, but to use a callback to handle returned messages, use the on method with basicReturn as the first argument. The callback takes the same arguments as the basicConsume callback.
Takes no arguments.
To set a callback on a transaction select call, use the on method with txSelectOk as the first argument. Callback takes the Channel object.
Takes no arguments.
To set a callback on a transaction commit call, use the on method with txCommitOk as the first argument. Callback takes the Channel object.
Takes no arguments.
To set a callback on a transaction commit call, use the on method with txRollbackOk as the first argument. Callback takes the Channel object.