A gem for consuming messages from RabbitMQ
Ruby
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
docs
lib
spec
.gitignore
.rspec
.ruby-version
CHANGELOG.md
Gemfile
Jenkinsfile
LICENCE
README.md
Rakefile
govuk_message_queue_consumer.gemspec

README.md

GOV.UK Message Queue Consumer

Gem Version

Standardises the way GOV.UK consumes messages from RabbitMQ. RabbitMQ is a messaging framework that allows applications to broadcast messages that can be picked up by other applications.

On GOV.UK, publishing-api publishes the content-items it receives, so that applications such as email-alert-service can be notified of changes in content.

For detailed documentation, check out the gem documentation on rubydoc.info.

This gem is used by Rummager.

Nomenclature

A graph showing the message flow

  • Producer: an application that sends messages RabbitMQ. On GOV.UK this could be publishing-api.
  • Message: an object sent over RabbitMQ. It consists of a payload and headers. In the case of the publishing-api the payload is a content item.
  • Consumer: the app that receives the messages and does something with them. On GOV.UK, this is email-alert-service.
  • Exchange: in RabbitMQ's model, producers send messages to an exchange. Consumers can create a Queue that listens to the exchange, instead of subscribing to the exchange directly. This is done so that the queue can buffer any messages and we can make sure all messages get delivered to the consumer.
  • Queue: a queue listens to an exchange. In most cases the queue will listen to all messages, but it's also possible to listen to a specific pattern.
  • Processor: the specific class that processes a message.

Technical documentation

This is a ruby gem that deals with the boiler plate code of communicating with RabbitMQ. The user of this gem is left the task of supplying the configuration and a class that processes messages.

The gem is automatically released by Jenkins. To release a new version, raise a pull request with the version number incremented.

Dependencies

  • The Bunny gem: to interact with RabbitMQ.

Usage

Add the gem to your Gemfile.

Add a rake task like the following example:

# lib/tasks/message_queue.rake
namespace :message_queue do
  desc "Run worker to consume messages from rabbitmq"
  task consumer: :environment do
    GovukMessageQueueConsumer::Consumer.new(
      queue_name: "some-queue",
      processor: MyProcessor.new,
    ).run
  end
end

More options are documented here.

The consumer expects a number of environment variables to be present. On GOV.UK, these should be set up in puppet.

RABBITMQ_HOSTS=rabbitmq1.example.com,rabbitmq2.example.com
RABBITMQ_VHOST=/
RABBITMQ_USER=a_user
RABBITMQ_PASSWORD=a_super_secret

Define a class that will process the messages:

# eg. app/queue_consumers/my_processor.rb
class MyProcessor
  def process(message)
    # do something cool
  end
end

The worker should also be added to the Procfile to run in production:

# Procfile
worker: bundle exec rake message_queue:consumer

Because you need the environment variables when running the consumer, you should use govuk_setenv to run your app in development:

$ govuk_setenv app-name bundle exec rake message_queue:consumer

Processing a message

Once you receive a message, you must tell RabbitMQ once you've processed it. This is called acking. You can also discard the message, or retry it.

class MyProcessor
  def process(message)
    result = do_something_with(message)

    if result.ok?
      # Ack the message when it has been processed correctly.
      message.ack
    elsif result.failed_temporarily?
      # Retry the message to make RabbitMQ send the message again later.
      message.retry
    elsif result.failed_permanently?
      # Discard the message when it can't be processed.
      message.discard
    end
  end
end

Statsd integration

You can pass a statsd_client to the GovukMessageQueueConsumer::Consumer initializer. The consumer will emit counters to statsd with these keys:

  • your_queue_name.started - message picked up from the your_queue_name
  • your_queue_name.retried - message has been retried
  • your_queue_name.acked - message has been processed and acked
  • your_queue_name.discarded - message has been discarded
  • your_queue_name.uncaught_exception - an uncaught exception occured during processing

Remember to use a namespace for the Statsd client:

statsd_client = Statsd.new("localhost")
statsd_client.namespace = "govuk.app.my_app_name"

GovukMessageQueueConsumer::Consumer.new(
  statsd_client: statsd_client
  # ... other setup code omitted
).run

Testing your processor

This gem provides a test helper for your processor.

# eg. spec/queue_consumers/my_processor_spec.rb
require 'test_helper'
require 'govuk_message_queue_consumer/test_helpers'

describe MyProcessor do
  it_behaves_like "a message queue processor"
end

This will verify that your processor class implements the correct methods. You should add your own tests to verify its behaviour.

You can use GovukMessageQueueConsumer::MockMessage to test the processor behaviour. When using the mock, you can verify it acknowledged, retried or discarded. For example, with MyProcessor above:

it "acks incoming messages" do
  message = GovukMessageQueueConsumer::MockMessage.new

  MyProcessor.new.process(message)

  expect(message).to be_acked

  # or if you use minitest:
  assert message.acked?
end

For more test cases see the spec for the mock itself.

Running the test suite

bundle exec rake spec

Further reading

  • Bunny is the RabbitMQ client we use.
  • The Bunny Guides explain all AMQP concepts really well.
  • The Developer Docs documents the usage of "heartbeat" messages, which this gem also supports.

Licence

MIT License

Versioning policy

We follow Semantic versioning. Check the CHANGELOG for changes.