Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Play! Framework RabbitMQ Module - Consume and Produce messages to RabbitMQ. Live Streaming with WebSockets. Live Stats.

branch: master
README.textile

Play! Framework RabbitMQ Module

Felipe Oliveira
http://mashup.fm
http://playframework.info
http://geeks.aretotally.in
http://twitter.com/_felipera

1) Installation

Under dependencies.yml:

require:
– play → rabbitmq

2) RabbitMQ

Download RabbitMQ from http://www.rabbitmq.com/server.html.
Unzip the distribution, go to folder sbin and run “./rabbitmq-server”.
It should be running on port 5762 by default. The default user “guest” and password is “guest”.

3) Configuration

rabbitmq.seeds=localhost:5672,host2:port2,host3:port:3
rabbitmq.vhost=/
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.exchangeType=direct
rabbitmq.durable=true
rabbitmq.autoAck=false
rabbitmq.basicQos=true
rabbitmq.retries=5
rabbitmq.msgmapper=json

rabbitmq.seeds → An array of known broker addresses (hostname/port pairs) to try in order.
rabbitmq.vhost → If you don’t know what this is I recommend reading the Admin Guide for RabbitMQ (http://www.rabbitmq.com/admin-guide.html)
rabbitmq.user → I am sure you can guess this one. guest is the default user on a fresh RabbitMQ install.
rabbitmq.password → I am sure you can guess this one. guest is the default value on a fresh RabbitMQ install.
rabbitmq.exchangeType → These are the different exchangeTypes provided by RabbitMQ like direct, fanout, etc. If you are not familiar with them, please checkout RabbitMQ’s doc (http://www.rabbitmq.com/getstarted.html).
rabbitmq.durable → Durable message. You most likely want to keep that as true if your app requires reliability in your message deliveries.
rabbitmq.autoAck → If true you are basically turning off any form of retry so you probabily don’t want that.
rabbitmq.basicQos → Please read RabbitMQ’s documentation for more information on basicQos (https://dev.rabbitmq.com/wiki/BasicQosDesign?version=15b1bab0ac73).
rabbitmq.retries → Max number of retries per message. This number can be overwritten on a queue level by overriding the retries() method on your consumer class.
rabbitmq.msgmapper → Possible values are “pojo” or “json”. These are the different implementations used to store and retrieve messages, “json” uses Jackson’s ObjectMapper and “pojo” uses Java’s serialization.

You may also combine several of these properties into a connection url:

rabbitmq.url=amqp://username:password@host[:port]/[vhost]

E.g.:

rabbitmq.url=amqp://guest:gues@localhost/

4) Define Message that will be used by the Queue (just a simple POJO)

public class SampleMessage implements Serializable {

private String field1;

private String field2;

public SampleMessage() {

}

public SampleMessage(String field1, String field2) {
super();
this.field1 = field1;
this.field2 = field2;
}

public String getField1() {
return field1;
}

public void setField1(String field1) {
this.field1 = field1;
}

public String getField2() {
return field2;
}

public void setField2(String field2) {
this.field2 = field2;
}

@Override
public String toString() {
return “SampleMessage [field1=” + field1 + “, field2=” + field2 + “]”;
}

}

5) Publish a Message

public static void publish(SampleMessage q) {
RabbitMQPublisher.publish(“myQueue”, q);
render(q);
}

6) Creating a Message Consumer

@OnApplicationStart(async = true)
public class RabbitMQSampleConsumer extends RabbitMQConsumer {

protected void consume(SampleMessage message) {
System.out.println(“******************************”);
System.out.println("* Message Consumed: " + message);
System.out.println(“******************************”);
}

protected String queue() {
return “myQueue”;
}

protected String routingKey() {
return this.queue();
}

protected int retries() {
// This is the default value defined by “rabbitmq.retries” on
// application.conf (please override if you need a new value)
return RabbitMQPlugin.retries();
}

protected Class getMessageType() {
return SampleMessage.class;
}
}

  • There are some cases where you don’t want to retry re-processing the message, for example in cases you know you will keep getting the same error no matter how many times you retry that message.
    First of all, we avoid infiniate loops by retrying only a certain amount of times, defined by the method retries() on the consumer class.
    For the cases where you don’t want to retry at all simply throw a RabbitMQNotRetriableException. The RabbitMQConsumer base class will catch it and acknowledge to RabbitMQ, instead of trying to reprocess the message.
  • Each consumer has a method routingKey() which you can optionally overwrite to setup a Topic Exchange with RabbitMQ. Same thing on the producer.

7) Firehose – Another way to publish messages in batch

@OnApplicationStart(async = true)
public class RabbitMQSampleFirehose extends RabbitMQFirehose {

public int count = 0;

protected List getData(int n) throws Exception {
if ( count >= 10 ) {
return null;
}
List results = new ArrayList();
for (int i = 0; i < n; i++) {
results.add(new SampleMessage(“field1”, “field2”));
count++;
}
return results;
}

protected int batchSize() {
return 2;
}

protected String queueName() {
return “myQueue”;
}

}

8) Live Stats and Live Streaming

Add the module routes in your routes.conf
* /rabbitmq/ module:rabbitmq

Live Stats and Live Streaming of messages produced and consumed are available at http://localhost:9000/rabbitmq/.
Live Stats uses Ajax to display the number of successful messages produced and consumed to a queue, errors as well.
Live Streaming uses WebSockets (requires newer browsers) to stream messages consumed and produced.

Source Code

The source code is available on Github at https://github.com/feliperazeek/play-rabbitmq.

Changelog

May 11th 2011 – Version 0.0.5: Fixed bugs on stats and added average time to stats service, stats page for a queue and also to the WebSocket live streaming.

May 18th 2011 – Version 0.0.6: Adding routingKey to Publisher, Consumer, Firehose, RabbitMQPlugin, etc to add support for RabbitMQ Topic Exchange.

May 26th 2011 – Version 0.0.8: Adding more stats

May 28th 2011 – Version 0.0.9: Adding ability for consumers to pause.

June 16th 2011 – Version 0.2: Upgrading to RabbitMQ 2.5.0 and fixed bugs

September 20th 2011 – Version 0.3: Just removing broken unittest that wasn’t supposed to be there in the first place.

Something went wrong with that request. Please try again.