Skip to content

Message Queue

elandau edited this page Nov 30, 2012 · 14 revisions

Experimental and not yet released !!!

Overview

This recipe uses cassandra to implement a a durable message queue. The implementation uses a combination of the distributed row lock recipe and two layers of sharding (yes sharding) to achieve a high throughput durable queue. Sharding solves two problems. The first is a time based rolling shard where at any given time most of the traffic is going to one shard. The reason for this is to minimize the impact of a growing wide row and to allow compaction to clean up. This is a rolling (i.e. mod) shard to avoid having to go back and check any old shards that may contain data. The second layer of sharding is meant to distribute load across the cluster. This mitigates hot spots and also reduces lock contention where there are multiple client.

Data Model

Each shard is implemented as a separate wide row with the shard name as the row key and columns representing items in the queue.

The row key format is {QueueName}:{RollingTimeShard}:{ConcurrencyShard}.

The column format is a composite which takes advantage of the composite comparator type sorted to achieve several key features. The structure is {MessageType}{Priority}{TimeUUID}{State}.

MessageType

  • Metadata (0) - Metadata is used to store any queue configuration information and to notify clients of configuration changes (this ins't implemented yet)
  • Lock (1) - Row lock columns exist on the same row to take advantage of row level isolation and to reduce having to make separate calls for releasing the lock and updating the queue.
  • Message (2) - This is the actual message.

This structure makes it possible to read both the lock columns and message in a single call to cassandra thereby reducing overhead. All of the lock columns will be at the top of the response followed by the actual messages.

Priority

Priority makes it possible to inject high priority messages for processing.

TimeUUID

TimeUUID provides both uniqueness and sorting by time of messages in the queue. For lock columns it is primarily used to guarantee unique ids for clients trying to take the lock. Time ordering provides both FIFO like semantics as well as delayed message execution.

State

State is used by the locking algorithm.

Examples

Creating a message queue client

The first step to using the queue is to create a MessageQueue instance. Multiple producers and consumers may be created from a single MessageQueue instance.

The following example shows how to create a message queue with 50 rolling time shards of 30 seconds and 20 concurrency shards within each time shard. That's a total of 1000 rows being used by the message queue. Notice that the message queue uses a keyspace client that was previously created.

CountingQueueStats stats  = new CountingQueueStats();
MessageQueue queue = new ShardedDistributedMessageQueue.Builder()
    .withConsistencyLevel(ConsistencyLevel.CL_QUORUM)
    .withColumnFamily("Queue")
    .withKeyspace(keyspace)
    .withQueueStats(stats)
    .withBuckets(50,  30,  TimeUnit.SECONDS)
    .withShardCount(20)
    .withPollInterval(100L,  TimeUnit.MILLISECONDS)
    .build();

Create the queue

The MessageQueue API provides a convenience method that creates a metadata row within the same column family as the shards which other clients can read to get the queue configuration (mainly number of shards).

queue.createQueue();

Producing events

Events may be inserted one at a time or in bulk. Note that a row doesn't need to be locked when producing events.

MessageProducer producer = queue.createProducer();
producer.sendMessage(new Message().setData("The quick brown fox jumped over the lazy cow"));

Consuming events

Events are consumed using the following sequence

  1. Lock a shard
  2. Read N messages
  3. Submit a single mutation which releases the lock, delete the messages, inserts a timeout message for each message.
  4. Process the message
  5. ACK the message by deleting the timeout column

The following example reads a block of up to 100 messages and acks the entire set with one call. Messages can be ack'd individually as well.

Collection<Message> messages = null;
try {
    messages = consumer.readMessages(100);
    try {
        for (Message message : messages) {
            // Do something with the message
        }
    }
finally {
    if (messages != null) 
        consumer.ackMessage(messages);
}
Clone this wiki locally