Skip to content

Message Queue

elandau edited this page Dec 12, 2012 · 14 revisions

Experimental and not yet released !!!


This recipe uses cassandra to implement a a durable message queue. The implementation uses a combination of the distributed row lock recipe and two layers of sharding (yes sharding) to achieve a high throughput durable queue. Sharding solves two problems. The first is a time based rolling shard where at any given time most of the traffic is going to one shard. The reason for this is to minimize the impact of a growing wide row and to allow compaction to clean up. This is a rolling (i.e. mod) shard to avoid having to go back and check any old shards that may contain data. The second layer of sharding is meant to distribute load across the cluster. This mitigates hot spots and also reduces lock contention where there are multiple client.


Creating a message queue client

The first step to using the queue is to create a MessageQueue instance. Multiple producers and consumers may be created from a single MessageQueue instance.

The following example shows how to create a message queue with 50 rolling time shards of 30 seconds and 20 concurrency shards within each time shard. That's a total of 1000 rows being used by the message queue. Notice that the message queue uses a keyspace client that was previously created.

CountingQueueStats stats  = new CountingQueueStats();
MessageQueue queue = new ShardedDistributedMessageQueue.Builder()
    .withBuckets(50,  30,  TimeUnit.SECONDS)
    .withPollInterval(100L,  TimeUnit.MILLISECONDS)

Creating the column families

As a convenience the API provides a method to create the underlying column families used by the recipe.


Create the queue

'Creating' the queue adds metadata to the queue column family which can be used by other clients to determine the number of shards and other queue parameters. Call this only once when setting up the queue.


Producing messages

Messages may be inserted one at a time or in bulk. Note that a row doesn't need to be locked when producing messages.

MessageProducer producer = queue.createProducer();
String messageId = producer.sendMessage(new Message().addParameter("data", "Hello world!"));

Consuming messages

Using the message dispatcher

MessageQueueDispatcher provides a basic implementation of threads reading from the queue and processing messages from a thread pool

MessageQueueDispatcher dispatcher = new MessageQueueDispatcher.Builder()
    .withCallback(new Function<Message, Boolean>() {
        public synchronized Boolean apply(Message message) {
            // Return true to 'ack' the message
            // Return false to not 'ack' which will result in the message timing out 
            // Throw any exception to put the message into a poison queue
            return true;


// When ready to shut down call

Instead of using the default Function provided to the message dispatcher it is also possible to specify a specific handler class. When a message is processed the dispatcher will create an instance of this class and pass the message as a parameter.

public class HelloWorldFunction implements Function<Message, Boolean>{
    public Boolean apply(@Nullable Message input) {
        System.out.println("Hello world!");
        return true;

Message m = new Message()
String messageId = producer.sendMessage(m);

Consuming messages manually

Messages may be consumed in arbitrary batch sizes. The current implementation will attempt to read up to batchSize messages from the most recent shards while frequently checking older shards for lingering messages. Once consumed the messages must back acknowledged so that the timeout event may be cancelled.

Messages are consumed using the following sequence

  1. Lock a shard
  2. Read N messages
  3. Submit a single mutation which releases the lock, delete the messages, inserts a timeout message for each message.
  4. Process the message
  5. ACK the message by deleting the timeout column

The following example reads a block of up to 100 messages and acks the entire set with one call. Messages can be ack'd individually as well.

Collection<Message> messages = null;
try {
    messages = consumer.readMessages(100);
    try {
        for (Message message : messages) {
            // Do something with the message
finally {
    if (messages != null) 

Deleting messages

The call to produce a message returns a message id that may be used to remove the message from the queue without having to consume it from the queue first.

String messageId = producer.sendMessage(new Message().setData("The quick brown fox jumped over the lazy cow"));

Reading a message without consuming

The call to produce a message returns a message id that may be used to read the message from the queue without having to consume it from the queue first.

String messageId = producer.sendMessage(new Message().setData("The quick brown fox jumped over the lazy cow"));
Message message = queue.readMessage(messageId);

Storing messages with lookup key

Sometimes its nice to be able to assign a key to messages in the queue. This provides the ability to check if a recurring task already exists in the queue and to be able to read/cancel a message without having to know the specific message id (which depends on the execution time and shard). Keep in mind that using a lookup key adds additional overhead since that information must be maintained in a separate column family that is updated or accessed for each operation.

The following code will enqueue an event that has key "HelloWorld"

MessageProducer producer = queue.createProducer();
String messageId = producer.sendMessage(new Message()
   .addParameter("data", "Hello world!"));

It is now possible to access the event by key instead of the messageId

Message message = scheduler.readMessageByKey("HelloWorld");

You can also delete the message by key. This will delete the lookup as well as the queue event.



Run once trigger

To run once either don't specify a trigger or specify a RunOnceTrigger

Message m = new Message()
    .setTrigger(new RunOnceTrigger.Builder()
        .withDelay(10, TimeUnit.MINUTES)

Delayed / Repeating trigger

Message m = new Message()
    .setTrigger(new RepeatingTrigger.Builder()
        .withDelay(10, TimeUnit.MINUTES)
        .withInterval(10,  TimeUnit.MINUTES)

CRON Trigger

You can define this CronTrigger which uses the Quartz cron parser to determine the next execution time.

import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;

import org.quartz.CronExpression;


public class CronTrigger extends AbstractTrigger {
    public static class Builder {
        private CronTrigger trigger = new CronTrigger();
        public Builder withExpression(String expression) {
            trigger.expression = expression;
            return this;
        public CronTrigger build() {
            return trigger;
    private String expression;
    public Trigger nextTrigger() {
        CronTrigger nextTrigger = new CronTrigger();
        nextTrigger.expression     = expression;
        nextTrigger.setExecutionCount(getExecutionCount() + 1);
        return nextTrigger;

    private long getNextCronFromNow() {
        try {
            Calendar cal = Calendar.getInstance();
            Date currTime = cal.getTime();
            CronExpression cronExpression = new CronExpression(expression);
            return cronExpression.getNextValidTimeAfter(currTime).getTime();
        } catch (ParseException e) {
            throw new RuntimeException(e);
    public String getExpression() {
        return expression;

    public void setExpression(String expression) {
        this.expression = expression;

Here's an example of how to define a cron trigger that runs every 5 seconds

Message m = new Message()
    .setTrigger(new CronTrigger.Builder()
        .withExpression("*/5 * * * * ?")

Data Model

Each shard is implemented as a separate wide row with the shard name as the row key and columns representing items in the queue.

The row key format is {QueueName}:{RollingTimeShard}:{ConcurrencyShard}.

The column format is a composite which takes advantage of the composite comparator type sorted to achieve several key features. The structure is {MessageType}{Priority}{TimeUUID}{State}.


  • Metadata (0) - Metadata is used to store any queue configuration information and to notify clients of configuration changes (this ins't implemented yet)
  • Lock (1) - Row lock columns exist on the same row to take advantage of row level isolation and to reduce having to make separate calls for releasing the lock and updating the queue.
  • Message (2) - This is the actual message.

This structure makes it possible to read both the lock columns and message in a single call to cassandra thereby reducing overhead. All of the lock columns will be at the top of the response followed by the actual messages.


Priority makes it possible to inject high priority messages for processing ahead of lower priority events. When using priorities it is important to understand that due to limitations of the queue data model high priority tasks cannot be scheduled for delayed execution. Only priority 0 messages may be scheduled for delayed execution. Also, the timeout event for a high priority event will be scheduled at priority 0 and will therefore loose it's high priority should the event processing time out.


TimeUUID provides both uniqueness and sorting by time of messages in the queue. For lock columns it is primarily used to guarantee unique ids for clients trying to take the lock. Time ordering provides both FIFO like semantics as well as delayed message execution.


State is used by the locking algorithm.

Clone this wiki locally
You can’t perform that action at this time.