Message queue system written in Go and backed by Redis
Clone or download
wellle Merge pull request #53 from xpzouying/master
fix: fix data race(consumingStopped var member) in redisQueue
Latest commit fdaebe8 Oct 19, 2018
Permalink
Failed to load latest commit information.
example fix Go's "<cmd> ./..." by moving examples to own package Mar 1, 2018
.gitignore add Gopkg meta to constrain dependencies Mar 1, 2018
.travis.yml add .travis.yml Mar 1, 2018
Gopkg.lock add Gopkg meta to constrain dependencies Mar 1, 2018
Gopkg.toml add Gopkg meta to constrain dependencies Mar 1, 2018
LICENSE Initial commit Aug 23, 2015
README.md Update README to only suggest using AddConsumerFunc Aug 1, 2018
batch_consumer.go Change package name to rmq Aug 23, 2015
cleaner.go update redis client to lattest go-redis/redis Feb 18, 2018
cleaner_test.go Remove dependency goenv Aug 23, 2015
connection.go Add TestRedisClient skeleton May 29, 2018
consumer.go Use ConsumerFunc as parater type in AddConsumerFunc Aug 1, 2018
deliveries.go Deliveries.Push() newline fix Aug 1, 2018
delivery.go Add and use RedisClient interface Apr 17, 2018
queue.go fix: fix data race(consumingStopped var member) in redisQueue Oct 8, 2018
queue_test.go Use anonymous function in AddConsumerFunc tests Aug 1, 2018
redis_client.go Add and use RedisClient interface Apr 17, 2018
redis_wrapper.go Add and use RedisClient interface Apr 17, 2018
state.go make state in test_delivery public Mar 11, 2016
state_string.go make state in test_delivery public Mar 11, 2016
stats.go Expose QueueStat methods for tracking queue size Mar 5, 2016
stats_test.go Comment out logs Aug 30, 2015
test_batch_consumer.go Comment out logs Aug 30, 2015
test_connection.go use sync.Map in TestConnection to avoid concurrent access issues Jul 12, 2018
test_connection_test.go Add prefix rmq to error messages and logs Aug 30, 2015
test_consumer.go Change package name to rmq Aug 23, 2015
test_delivery.go make state in test_delivery public Mar 11, 2016
test_delivery_test.go Change package name to rmq Aug 23, 2015
test_queue.go Use ConsumerFunc as parater type in AddConsumerFunc Aug 1, 2018
test_redis_client.go Fix LTrim and TTL in TestRedisClient May 29, 2018
test_redis_client_test.go Implement test_redis_client + tests May 29, 2018

README.md

Build Status GoDoc


Note: We recently updated rmq to use latest go-redis client github.com/go-redis/redis. If you don't want to upgrade yet, you can continue using rmq branch v1:

  1. Using a dependency manager, i.e. dep

    # Gopkg.toml
    [[constraint]]
      name = "github.com/adjust/rmq"
      branch = "v1"
  2. Using gopkg.in

    import "gopkg.in/adjust/rmq.v1"

    See https://gopkg.in/adjust/rmq.v1


Overview

rmq is short for Redis message queue. It's a message queue system written in Go and backed by Redis. It's similar to redismq, but implemented independently with a different interface in mind.

Basic Usage

Lets take a look at how to use rmq.

Import

Of course you need to import rmq wherever you want to use it.

import "github.com/adjust/rmq"

Connection

Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port.

connection := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1)

But it's also possible to access a Redis listening on a Unix socket.

connection := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1)

Note: rmq panics on Redis connection errors. Your producers and consumers will crash if Redis goes down. Please let us know if you would see this handled differently.

Queue

Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":

taskQueue := connection.OpenQueue("tasks")

Producer

An empty queue is boring, lets add some deliveries! Internally all deliveries are saved to Redis as strings. This is how you can publish a string payload to a queue:

delivery := "task payload"
taskQueue.Publish(delivery)

In practice, however, it's more common to have instances of some struct that we want to publish to a queue. Assuming task is of some type like Kind, this is how to publish the JSON representation of that task:

// create task
taskBytes, err := json.Marshal(task)
if err != nil {
    // handle error
    return
}

taskQueue.PublishBytes(taskBytes)

For a full example see example/producer

Consumer

Now that our queue starts filling, lets add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.

taskQueue.StartConsuming(10, time.Second)

This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling producers in times of full queues, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long to wait before checking for new deliveries in Redis.

Once this is set up, we can actually add consumers to the consuming queue.

taskConsumer := &TaskConsumer{}
taskQueue.AddConsumer("task consumer", taskConsumer)

For our example this assumes that you have a struct TaskConsumer that implements the rmq.Consumer interface like this:

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
    var task Task
    if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        // handle error
        delivery.Reject()
        return
    }

    // perform task
    log.Printf("performing task %s", task)
    delivery.Ack()
}

First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery, otherwise we perform the task and ack the delivery.

If you don't actually need a consumer struct you can just call AddConsumerFunc instead and pass in a consumer function which directly handles an rmq.Delivery:

taskQueue.AddConsumerFunc(func(delivery rmq.Delivery) {
    // handle delivery and call Ack() or Reject() on it
})

For a full example see example/consumer

Testing Included

To simplify testing of queue producers and consumers we include test mocks.

Test Connection

As before, we first need a queue connection, but this time we use a rmq.TestConnection that doesn't need any connection settings.

testConn := rmq.NewTestConnection()

If you are using a testing framework that uses test suites, you can reuse that test connection by setting it up once for the suite and resetting it with testConn.Reset before each test.

Producer Test

Now lets say we want to test the function publishTask that creates a task and publishes it to a queue from that connection.

// call the function that should publish a task
publishTask(testConn)

// check that the task is published
c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task payload")

The c.Check part is from gocheck, but it will look similar for other testing frameworks. Given a rmq.TestConnection, we can check the deliveries that were published to it's queues (since the last Reset() call) with GetDelivery(queue, index). In this case we want to extract the first (and possibly only) delivery that was published to queue tasks and just check the payload string.

If the payload is JSON again, the unmarshalling and check might look like this:

var task Task
err := json.Unmarshal([]byte(suite.testConn.GetDelivery("tasks", 0)), &task)
c.Assert(err, IsNil)
c.Assert(task, NotNil)
c.Check(task.Property, Equals, "value")

If you expect a producer to create multiple deliveries you can use different indexes to access them all.

c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task1")
c.Check(suite.testConn.GetDelivery("tasks", 1), Equals, "task2")

For convenience there's also a function GetDeliveries that returns all published deliveries to a queue as string array.

c.Check(suite.testConn.GetDeliveries("tasks"), DeepEquals, []string{"task1", "task2"})

If your producer doesn't have guarantees about the order of its deliveries, you could implement a selector function like findByPrefix and then check each delivery regardless of their index.

tasks := suite.testConn.GetDeliveries("tasks")
c.Assert(tasks, HasLen, 2)
xTask := findByPrefix(tasks, "x")
yTask := findByPrefix(tasks, "y")
c.Check(xTask.Id, Equals, "3")
c.Check(yTask.Id, Equals, "4")

These examples assumed that you inject the rmq.Connection into your testable functions. If you inject instances of rmq.Queue instead, you can use rmq.TestQueue instances in tests and access their LastDeliveries (since Reset()) directly.

Consumer Test

Testing consumers is a bit easier because consumers must implement the rmq.Consumer interface. In the tests just create rmq.TestDelivery and pass it to your Consume implementation. This example creates a test delivery from a string and then checks that the delivery was acked.

consumer := &TaskConsumer{}
delivery := rmq.NewTestDeliveryString("task payload")

consumer.Consume(delivery)

c.Check(delivery.State, Equals, rmq.Acked)

The State field will always be one of these values:

  • rmq.Acked: The delivery was acked
  • rmq.Rejected: The delivery was rejected
  • rmq.Pushed: The delivery was pushed (see below)
  • rmq.Unacked: Nothing of the above

If your packages are JSON marshalled objects, then you can create test deliveries out of those like this:

task := Task{Property: "bad value"}
delivery := rmq.NewTestDelivery(task)

Integration tests

If you want to write integration tests which exercise both producers and consumers at the same time, you can use the OpenConnectionWithTestRedisClient constructor. It returns a real rmq.Connection instance which is backed by an in memory Redis client implementation. That way it behaves exactly as in production, just without the durability of a real Redis client. Don't use this in production!

Statistics

Given a connection, you can call connection.CollectStats to receive rmq.Stats about all open queues, connections and consumers. If you run example/handler you can see what's available:

In this example you see three connections consuming things, each wich 10 consumers each. Two of them have 8 packages unacked. Below the marker you see connections which are not consuming. One of the handler connections died because I stopped the handler. Running the cleaner would clean that up (see below).

TODO

There are some features and aspects not properly documented yet. I will quickly list those here, hopefully they will be expanded in the future. 😉

  • Batch Consumers: Use queue.AddBatchConsumer() to register a consumer that receives batches of deliveries to be consumed at once (database bulk insert) See example/batch_consumer
  • Push Queues: When consuming queue A you can set up its push queue to be queue B. The consumer can then call delivery.Push() to push this delivery (originally from queue A) to the associated push queue B. (useful for retries)
  • Cleaner: Run this regularly to return unacked deliveries of stopped or crashed consumers back to ready so they can be consumed by a new consumer. See example/cleaner
  • Returner: Imagine there was some error that made you reject a lot of deliveries by accident. Just call queue.ReturnRejected() to return all rejected deliveries of that queue back to ready. (Similar to ReturnUnacked which is used by the cleaner) Consider using push queues if you do this regularly. See example/returner
  • Purger: If deliveries failed you don't want to retry them anymore for whatever reason, you can call queue.PurgeRejected() to dispose of them for good. There's also queue.PurgeReady if you want to get a queue clean without consuming possibly bad deliveries. See example/purger