Skip to content

RabbitMQ showcase implementing messaging patterns using different AMQP 1.0 and AMQP 0.9.1 clients

Notifications You must be signed in to change notification settings

NovatecConsulting/tc-showcase-rabbitmq

Repository files navigation

RabbitMQ Showcase

Since RabbitMQ is increasingly being requested as an alternative to Kafka, Rabbit should be examined more closely for its advantages and disadvantages. However, the focus of this repository should primarily be on getting to know RabbitMQ and AMQP (v0.9.1 and v1.0.0).

The overall goal should be to enable an Event Mesh with different protocols. This requires, among other things, the connection of RabbitMQ and the provision of AMQP endpoints.

Objective

RabbitMQ is a versatile message broker that supports various messaging patterns. These are to be implemented using small sample implementations:

  • Competing Consumers

  • Publish-Subscribe Channel

Prerequisites

A local RabbitMQ installation can be used to run the code. Start a RabbitMQ Broker instance using Docker:

docker run --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3

Tests and Setup

The project uses a RabbitMQ testcontainer to execute tests without the need of a local RabbitMQ instance. After publishing your code to Github, build and test steps will be executed by a Github Actions pipeline.

Tests are written in Groovy using the Spock Test Framework.

Pull requests will automatically trigger a new test run.

1. RabbitMQ Basics

1.1. High-Level Model (AMQP 0.9.1): Queues, Exchanges, Channels

RabbitMQ uses the following model:

Publishers send their messages to exchanges which can be compared to post offices or mailboxes. These messages are then forwarded to one or more binded queues. Consumers can subscribe to queues or explicitly pull from them.

Exchanges

Exchanges route messages to one or more defined queues. There are four different types of exchanges:

  • Direct exchange

  • Fanout exchange

  • Topic exchange

  • Headers exchange

By default, if no exchange is explicitly created, the broker will use the same name for the routing-key as for the queue that the publisher wants to send the messages to.

Queues

RabbitMQ uses queues as a sequential data structure with the two primary operations of enqueueing and dequeueing. The important features are:

  • Queues in RabbitMQ use FIFO prioritization.

  • Applications can select a queue name or can request the broker to generate a name. The latter can be done by passing an empty string where a queue name is expected.

  • Queue names starting with "amq." are reserved for internal use only.

A queue can be declared with the properties name, durability, exclusivity, auto-deletion and additional arguments.

Connections and Channels

On the technical side, clients have to connect and authenticate to a RabbitMQ node before they are able to publish or consume messages.

Since there are some applications that need multiple connections, AMQP 0.9.1 offers to open one or more channels on a single TCP connection. Like this, one TCP connection can be shared to save resources. The channels rely completely on the connection which means that if the connection is closed, all channels will be closed as well.

One example for the usage of multiple channels are applications that use multiple threads. For those applications, it is very common to open a new channel for each thread.

Exchanges and queues are completely independent of connections and channels. A client can publish to the same exchange or consume from the same queue using multiple channels with one connection.

1.2. Queue Properties: Auto-deletion, Exclusivity, TTL and Durability

  1. auto-delete
    Queues declared with the auto-delete property will be deleted when all consumers have finished using it. The last consumer can be cancelled explicitly (channel.basicCancel(consumerTag)) or by closing its channel (session in case of SwiftMQ).

  2. exclusive
    Exclusive queues can only be accessed by the current connection. Therefore, they are deleted as soon as the connection(!) closes. It is not possible to declare an exclusive queue from an external connection. This is also the reason why it is not possible to declare an exclusive queue using the RabbitMQ Java client and to subscribe to it with an AMQP 1.0 client connection.

  3. TTL
    The TTL-property can be used to avoid deletion of queues in cases when a consumer shortly disconnects and then re-connects to the queue. The corresponding queue will only be deleted if it stays unused for the defined time limit.

  4. durable
    Metadata of durable queues is stored on disk while metadata of transient queues is stored in memory. Therefore, transient queues and their messages will not be recovered after node restart.

2. Implementation of Messaging Patterns using RabbitMQ

The SenderApplication and ReceiverApplication make it possible to test the code via the command line. To test the different implementations, the used libraries can simply be exchanged since both applications are accessing the AMQPClient interface.

2.1. Competing Consumers

The Competing Consumers Pattern (also known as Point-to-Point) describes the scenario when one producer can publish messages to a messaging queue. Those messages can be processed by any of multiple consumers while each message will only be processed once.

The implementation of producers and consumers can be found at:

/src/main/java/rabbitclients/version091/competingconsumers/
/src/main/java/rabbitclients/version100/competingconsumers/

2.2. Publish-Subscribe

When using the Publish-Subscribe Pattern, one publisher can send messages to a queue from where they will be consumed by EACH consumer that has subscribed to this queue. Usually, the subscribers need to be active at the same time when the messages are delivered to the queue. The pattern is often compared to television programs or radio stations where everybody can tune in and start consuming.

The implementation of producers and consumers can be found at:

/src/main/java/rabbitclients/version091/publishsubscribe/
/src/main/java/rabbitclients/version100/publishsubscribe/

The publish-subscribe implementation for AMQP 1.0 has some constraints and disadvantages which is why it probably should not be used practically. See paragraph 4.8 for more details.

3. AMQP 0.9.1

AMQP 0.9.1 is RabbitMQ’s default protocol. There are various client libraries available that were developed by the RabbitMQ team to support AMQP 0.9.1.

3.1. Competing Consumers: Event-driven Consumer

Using an event-driven consumer for the competing consumer pattern is considered as standard for RabbitMQ. The basicConsume()-method uses the DeliverCallBack interface to notify the consumer as soon as a new message is available. The consumer is not blocked while it is waiting for messages.

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

   System.out.println("Received '" + message + "'");
   try {
       messageHandler.accept(message);
   } finally {
       System.out.println("Done.");
       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
;

channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

Read more about this type of consumer here.

3.2. Competing Consumers: Polling Consumer

The RabbitMQ client also provides the possibility to use a polling consumer which needs to actively ask for new messages. The basicGet()-method directly returns when no message is available and does not offer to specify a polling timeout. Because of this implementation, the consumer has to constantly poll for messages using a loop which is highly inefficient and therefore not recommended.

GetResponse response = channel.basicGet(TASK_QUEUE_NAME, false);

Read more about this type of consumer here.

4. AMQP 1.0

4.1. Plugin Installation

RabbitMQ primarily supports AMQP 0.9.1 and uses a plugin mechanism to enable the use of AMQP 1.0. To run the code, a RabbitMQ broker with the enabled plugin has to be started using the Dockerfile located in this directory:

docker build -f Dockerfile -t rabbitmq-amqp-1_0-enabled .
docker run --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq-amqp-1_0-enabled

Alternatively, you can pull the image from Dockerhub:

docker run --hostname my-rabbit -p 5672:5672 -p 15672:15672 nadjahagen/rabbitmq-amqp-1_0-enabled:latest
Important
Without exposing the ports explicitly, the application won’t be able to connect to the broker. A ConnectionRefusedException will occur.

4.2. Client Library

As already mentioned, RabbitMQ does not natively support AMQP 1.0. Therefore, their Java Client does not support AMQP 1.0 which is also not planned for the future.

The number of available Java clients that support AMQP 1.0 is still limited at the moment:

  • SwiftMQ: The plugin’s developers tested the functionalities mainly using SwiftMQ.

  • Qpid Proton-J: The documentation is not that detailed and does not provide examples.

  • Qpid JMS: Uses Proton-J internally. According to the documentation, the plugin’s developers could not establish a connection using Qpid JMS/Proton-J. Nevertheless, a tested and working implementation can be found at /src/main/java/rabbitclients/version100/qpidjms.

This project focuses on the usage of the SwiftMQ library.

4.3. AMQP 1.0 Protocol

Important AMQP 1.0 message fields MUST not be set by the application because they are overwritten by the Producer:

  • Header fields: durable, priority, ttl

  • Property fields: messageId, to, userId

Additional fields that are provided by AMQP 1.0:

  • Header fields: first-acquirer, delivery-count

  • Property fields: subject, reply-to, correlation-id, content-type, content-encoding, absolute-expiry-time, creation-time

A short comparison of the AMQP 0.9.1 and AMQP 1.0 fields is also provided in the RabbitMQ plugin documentation.

Routing and Addresses

If no specific exchange or topic name is provided, the messages are sent to the default exchange with the queue name as routing key. To specify exchange, topic or queue names, the address schema like described in the plugin documentation can be used.

AMQP 1.0 was developed to be universally applicable and therefore does not know concepts like topics or queues. When creating a producer or consumer, addresses can be used to set the target/source of messages:

com.swiftmq.amqp.v100.client.Producer producerInstance = session.createProducer(/topic/my_routing_key, qos);

In the example above, the producer sends all messages to the amq.topic exchange, from where they are then further distributed to all queues with the binding my_routing_key.

Frame Sizes and Session Windows

For each connection, a maximum frame size has to be set to make sure that the sender/receiver has enough capacity to process the message. Messages that are too large for one frame will be split internally by SwiftMQ into multiple frames forming one so-called "Delivery".

The frames are buffered in an outgoing/incoming session window with a limited size. Before they can be consumed as an AMQPMessage, the frames are put together again internally. The protocol implementation and the RabbitMQ plugin handle the frame sequence and lost frames according to the quality of service. Therefore, this does not need to be handled by the Java Client.

Settlement and Quality of Service

There are three different qualities of service available for clients:

  1. At-Most-Once / Fire-and-Forget
    Sent messages are already in the state "settled". The sender won’t wait until the receiver has successfully received the message and will immediately forget about it. If the message is already settled, the receiver will never send an acknowledgement because even if it would do so, the sender would already have forgotten about the message.

  2. At-Least-Once
    The sender waits for a reply before it marks the message as settled. Released and modified messages are sent again. In case the receiver or sender fails, the link can be re-established: The nodes compare their unsettled messages, update their states accordingly, eventually re-sent the frames and clean their buffers.

  3. Exactly-Once
    According to AMQP 1.0, sender and receiver must be able to renegotiate the delivery status in order to ensure "Exactly-Once"-quality. RabbitMQ does not support this concept.

4.4. Competing Consumers Pattern

In contrast to the RabbitMQ client, SwiftMQ implements a polling consumer, i.e., the consumer has to ask explicitly for messages. To avoid blocking, it is possible to use the receiveNoWait() method that also provides the option to add a message listener.

4.5. Publish-Subscribe Pattern

AMQP 1.0 does not know concepts like queues or topics. The target (or source) of a message has to be specified using the address-field of an AMQP 1.0 message like it is described in the plugin’s documentation.

Since this concept does not exist in AMQP 1.0, the SwiftMQ Java Client does not offer the possibility to manage queues, topics and bindings like it is possible with the RabbitMQ Java Client for AMQP 0.9.1. Therefore, exchanges and queues need to already exist when the application is started or can be created using RabbitMQ’s REST API.

The publish-subscribe pattern requires one queue for each subscriber. The publisher sends all messages to a fanout exchange from where they will be broadcast to all queues with a corresponding binding. Since the queue name is different for each subscriber, the name has to be passed into the program (if the queue already exists), or the queue has to be created within the application via the RabbitMQ REST API. Both possibilities come along with advantages and disadvantages which is why AMQP 1.0 is probably not ideal for the publish-subscribe pattern.

In this repository, the Setup-class implements methods for the creation of exchanges, queues and bindings via REST. The entities that are created with HTTP methods are not client-bound and therefore cannot be created as "exclusive". Automatic deletion can be achieved through other queue properties like auto-delete and TTL. In addition, the REST API does not offer to create queues without names to let the broker choose a (temporary) unique queue name. This leads to the constraint, that unique names have to be managed by the application.

4.6. Error Handling

A ConnectionClosedException will occur if the plugin is not enabled for the RabbitMQ broker. The connection will be refused because of conflicting AMQP protocol versions:

com.swiftmq.amqp.v100.client.ConnectionClosedException: java.io.IOException: End-of-Stream reached

    at com.swiftmq.amqp.v100.client.Connection$2.run(Connection.java:432)

    com.swiftmq.amqp.v100.client.UnsupportedProtocolVersionException: Incompatible AMQP protocols. Local=[ProtocolHeader, name=AMQP, id=0, major=1, minor=0, revision=0], remote=[ProtocolHeader, name=AMQP, id=0, major=0, minor=9, revision=1]
    ...

To resolve this, enable the plugin like described above at Plugin Installation.

5. AMQP Version and Library Interoperability

5.1. RabbitMQ, SwiftMQ and Qpid JMS

Producer Library Consumer Library Interoperability

SwiftMQ 1.0

RabbitMQ 0.9.1

provided with constraints → sending as plain bytes

RabbitMQ 0.9.1

SwiftMQ 1.0

provided with constraints → reading from data field

Qpid JMS 1.0

SwiftMQ 1.0

provided

SwiftMQ 1.0

Qpid JMS 1.0

provided

Qpid JMS 1.0

RabbitMQ 0.9.1

provided with constraints → sending as plain bytes

RabbitMQ 0.9.1

Qpid JMS 1.0

provided

AMQP 1.0 (SwiftMQ) to AMQP 0.9.1 (RabbitMQ)

If messages are sent by an AMQP 1.0 client and consumed using an AMQP 0.9.1 client, the consumer will get the message with extra bytes in the beginning.

Example:

  • Sent message: M1

  • Received message: �w��M1

These extra bytes are added by the client when the message is encoded into an AMQP-value or AMQP-sequence. To be able to read messages that were encoded in this way, the AMQP 0.9.1 client would need a corresponding decoder.

The AMQP 1.0 protocol offers to send the message in the body section as AMQP-value, AMQP-sequence or as plain bytes. According to this definition, it would be possible to send the messages in byte-format so that the AMQP 0.9.1 clients do not need to decode them additionally. Unfortunately, the SwiftMQ client does only offer to send messages as AMQP-values or -sequences and not as plain bytes.

AMQP 0.9.1 (RabbitMQ) to AMQP 1.0 (SwiftMQ)

When messages are sent in AMQP 0.9.1 format, the plugin extracts the payload bytes and transforms them into an AMQP 1.0 message. This means that the payload is transferred in the data section of the AMQP 1.0 message. The receiving client then needs to transform the bytes into the desired data type.

As mentioned in the paragraph above, the SwiftMQ client does not offer a method to send data in the data section of AMQP 1.0 messages. In contradiction to this, it is possible to read data from the data section and therefore, to receive messages that were sent using AMQP 0.9.1. In case of the SwiftMQ library, the identification of a message being in data-format or AMQP-value-format needs to be handled by the implementation. An example can be found at the InteroperabilityConsumer class.

AMQP 1.0 (Qpid JMS) to AMQP 0.9.1 (RabbitMQ)

For Qpid JMS producers applies the same as for SwiftMQ producers when the messages are consumed by a RabbitMQ consumer. The messages will not be decoded correctly:

Example:

  • Sent message: M1

  • Received message: Sw��M1

The example shows that the bytes that are appended in front of the message are similar to those appended by SwiftMQ clients, but are not exactly the same. This suggests that the appended bytes are client-dependent and therefore cannot be handled by the RabbitMQ plugin.
This issue was also discussed in the RabbitMQ mailing list.

It is possible to work around this issue when sending the messages as (unencoded) plain bytes by using the method Producer.sendUnencodedMessage(String message). In terms of interoperability, the Qpid JMS client seems to be the better choice compared to the SwiftMQ client.

AMQP 0.9.1 (RabbitMQ) to AMQP 1.0 (Qpid JMS)

Sending messages using a RabbitMQ client and receiving those messages using a Qpid JMS client works without constraints. In contradiction to the SwiftMQ client, it is not necessary to read the payload explicitly from the data field. The distinction between AMQP 0.9.1 messages (at the data field) and AMQP 1.0 messages (at the AMQP value field) is handled by the Qpid JMS library.

AMQP 1.0 (SwiftMQ) to AMQP 1.0 (Qpid JMS) and vice versa

The testing class QpidSwiftInteroperability tests the interoperability of SwiftMQ consumers and producers and Qpid JMS consumers and producers. The tests proved that messages can be sent and received independently of the used client library. All messages are encoded and decoded correctly.

5.2. Shovel Plugin for AMQP 0.9.1 and AMQP 1.0 Interoperability

Besides the AMQP 1.0 plugin, RabbitMQ also offers the Shovel plugin to transfer and redeliver messages between clusters and nodes. The Shovel plugin also supports AMQP 0.9.1 and AMQP 1.0, but is using the same Erlang clients as the AMQP 1.0 plugin. Therefore, shovels provide the same interoperability as the AMQP 1.0 plugin, as also indicated in the comparison table above.
However, dynamic shovels can only be configured via the management UI or via the HTTP API. The latter was implemented within the Setup-class where JSON configurations can be sent to create new shovels. The exact configuration depends on the endpoints that should be connected. Read more about this topic:

About

RabbitMQ showcase implementing messaging patterns using different AMQP 1.0 and AMQP 0.9.1 clients

Topics

Resources

Stars

Watchers

Forks