- message queue system
- emphasis on not loosing any messages
- runs stand-alone (download) or embedded
- Amazon SQS-compatible interface
- in-memory with optional journalling and db-backed message storage
- optionally replicated (guaranteed messaging)
ElasticMQ is a message queue system, offerring Java, Scala and an SQS-compatible REST interface.
ElasticMQ follows the semantics of SQS. Messages are received by polling the queue. When a message is received, it is blocked for a specified amount of time (the visibility timeout). If the message isn't deleted during that time, it will be again available for delivery. Moreover, queues and messages can be configured to always deliver messages with a delay.
The focus in ElasticMQ is to make sure that the messages are delivered, and that no message is lost. It may happen, however, that a message is delivered twice (if, for example, a client dies after receiving a message and processing it, but before deleting). That's why clients of ElasticMQ (and Amazon SQS) should be idempotent.
There are several message storage implementations. Messages can be stored entirely in-memory, providing a volatile but fast message queue. Operations on in-memory storage can be journaled on disk, providing message durability across server restarts/crashes. Alternatively, messages can be persisted in a database (MySQL, Postgres, H2, ...).
ElasticMQ supports data replication across a cluster, thus providing a replicated/guaranteed message queue. Each node in the cluster can use any storage implementation.
As ElasticMQ implements a subset of the SQS REST interface, it is a great SQS alternative both for testing purposes (ElasticMQ is easily embeddable) and for creating systems which work both within and outside of the Amazon infrastructure.
The future will most probably bring even more exciting features :).
You can download the stand-alone distribution here:
Java 6 or above is required for running the server.
Installation is as easy as unpacking the
.tar.gz file. The contents of the package are:
bin: scripts to start the server
conf: ElasticMQ and logging (logback) configuration
NOTICE.txt: this file, license documentation
Additionally two directories will be created when the server is started:
data: stores the command journal (messages file log), if enabled
log: default location for log files
You can configure ElasticMQ through the
conf/Default.scala file. There you can choose which storage to use, should
journalling and replication be enabled, should the server expose an SQS interface, on what interface and port to bind
etc. More documentation can be found in the file itself.
Starting an embedded ElasticMQ server with an SQS interface
// First we need to create a Node val node = NodeBuilder.withStorage(new InMemoryStorage) // Then we can expose the native client using the SQS REST interface val server = SQSRestServerFactory.start(node.nativeClient, 9324, new NodeAddress()) // ... use ... // Finally we need to stop the server and the node server.stop() node.shutdown()
Alternatively, you can use e.g. MySQL to store the data:
val node = NodeBuilder.withStorage(new SquerylStorage(DBConfiguration.mysql("elasticmq", "root", "")))
Adding journaling to an in-memory storage
This is as simple as wrapping the original storage (it only makes sense to wrap an in memory storage, as a DB storage has its own persistence):
val wrappedStorage = new FileLogConfigurator( inMemoryStorage, FileLogConfiguration(new File("/store/here"), 100000) .start()
Note that even though messages are now durable (restarting the server won't cause message loss), the overall capacity of the queues (how many messages the queue can store at a time) is limited by the amount of RAM allocated to the process.
Writing the journal is an asynchronous process, done by a separate thread. In case of a server crash, some commands may thus be lost. Even if writing the journal was a synchronous process, data could end up not being flushed from buffers; even then, if some OS caches are not disabled, data could be lost. That's why if you require even more data durability, use replication.
Starting a replicated storage
Any storage can be replicated by wrapping it using
ReplicatedStorageConfigurator. Nodes can join and leave the cluster
at any time; existing data will be transferred to new cluster members.
Storage commands can be replicated in several modes:
- fire-and-forget (
- waiting for at least one cluster member to apply the changes (
- waiting for a majority of cluster members (
- waiting for all (
Client operations return only when the specified number of members applied the changes.
val storage = new InMemoryStorage val replicatedStorage = ReplicatedStorageConfigurator.start(storage, NodeAddress(), WaitForMajorityReplicationMode) val node = NodeBuilder.withStorage(replicatedStorage) // ... use ... node.shutdown() storage.shutdown()
NodeAddresses are entirely logical (the actual value can be any string) and for example can be used
by ElasticMQ clients to determine which node is the master.
Operations can be only executed on the master node. If you attempt to execute an operation on a node which is not
the master, the
NodeIsNotMasterException exception will be thrown, containing the master node address, if available.
In case of cluster partitions, replication is designed to only operate on the parition which contains
a majority of nodes (
n/2+1). Otherwise data could get easily corrupted, if two disconnected cluster partitions
(split-brain) changed the same things; such a situation could lead to a very high number of duplicate deliveries and an
That is also why when creating the replicated storage, you must provide the expected number of nodes. Note that an even number of nodes makes most sense (e.g. in a 3-node cluster, 2 nodes must be active in order for the cluster to work).
Using the Amazon Java SDK to access an ElasticMQ Server
To use Amazon Java SDK as an interface to an ElasticMQ server you just need to change the endpoint:
client = new AmazonSQSClient(new BasicAWSCredentials("x", "x")) client.setEndpoint("http://localhost:9324")
The endpoint value should be the same address as the
NodeAddress provided as an argument to
SQSRestServerFactory or in the configuration file.
rest-sqs-testing-amazon-java-sdk module contains some more usage examples.
- In-memory storage, single node, embedded: ideal for testing
- In-memory storage, single node, journalling: fast persistent messaging with queues bounded by the amount of memory
- DB storage, local DB, single node: persistent messaging with unbounded queues
- DB storage, shared DB, multiple nodes: persistent messaging. Multiple nodes can use the same database. The database can be replicated/backed up for data safety.
- In-memory storage, multiple nodes, journalled, replicated: Data safety through replication and journalling. Fast guaranteed messaging.
- DB storage, local DB, multiple nodes, replication: each node stores the data in a separate DB. Recommended if a shared DB is not available. Provides good data safety and unbounded queues.
ElasticMQ dependencies in SBT
// Includes the in-memory storage val elasticmqCore = "org.elasticmq" %% "elasticmq-core" % "0.5" // If you want to use the database storage val elasticmqStorageDb = "org.elasticmq" %% "elasticmq-storage-database" % "0.5" // If you want to expose an SQS interface val elasticmqSqs = "org.elasticmq" %% "elasticmq-rest-sqs" % "0.5" // If you want to use replication val elasticmqRepl = "org.elasticmq" %% "elasticmq-replication" % "0.5" val smlResolverReleases = "SotwareMill Public Releases" at "http://tools.softwaremill.pl/nexus/content/repositories/releases" val smlResolverSnapshots = "SotwareMill Public Snapshots" at "http://tools.softwaremill.pl/nexus/content/repositories/snapshots"
ElasticMQ dependencies in Maven
<dependency> <groupId>org.elasticmq</groupId> <artifactId>elasticmq-core_2.9.1</artifactId> <version>0.5</version> </dependency> <dependency> <groupId>org.elasticmq</groupId> <artifactId>elasticmq-storage-database_2.9.1</artifactId> <version>0.5</version> </dependency> <dependency> <groupId>org.elasticmq</groupId> <artifactId>elasticmq-rest-sqs_2.9.1</artifactId> <version>0.5</version> </dependency> <dependency> <groupId>org.elasticmq</groupId> <artifactId>elasticmq-replication_2.9.1</artifactId> <version>0.5</version> </dependency>
And our repositories:
<repository> <id>SotwareMillPublicReleases</id> <name>SotwareMill Public Releases</name> <url>http://tools.softwaremill.pl/nexus/content/repositories/releases/</url> </repository> <repository> <id>SotwareMillPublicSnapshots</id> <name>SotwareMill Public Snapshots</name> <url>http://tools.softwaremill.pl/nexus/content/repositories/snapshots/</url> </repository>
The MySQL Schema can be found in
storage-database/src/main/resource/schema-mysql.sql file. It should be easily
adaptable to other databases.
Tests done on a 2009 MBP, 2.4GHz Core2Duo, 8GB RAM, no replication. Throughput is in messages per second (messages are small).
Directly accessing the client:
Running test for [in-memory], iterations: 10, msgs in iteration: 100000, thread count: 1. Overall in-memory throughput: 41373.603641 Running test for [in-memory], iterations: 10, msgs in iteration: 100000, thread count: 2. Overall in-memory throughput: 32646.665143 Running test for [in-memory], iterations: 3, msgs in iteration: 1000000, thread count: 1. Overall in-memory throughput: 35157.211330 Running test for [file log + in-memory], iterations: 10, msgs in iteration: 100000, thread count: 1. Overall file log + in-memory throughput: 15464.316091 Running test for [h2], iterations: 10, msgs in iteration: 1000, thread count: 8. Overall h2 throughput: 334.085025 Running test for [mysql], iterations: 10, msgs in iteration: 1000, thread count: 2. Overall mysql throughput: 143.620383
Through the SQS REST interface:
Running test for [rest-sqs + in-memory], iterations: 10, msgs in iteration: 1000, thread count: 20. Overall rest-sqs + in-memory throughput: 781.460628 Running test for [rest-sqs + file log + in-memory], iterations: 10, msgs in iteration: 1000, thread count: 20. Overall rest-sqs + in-memory throughput: 675.851488
Note that both the client and the server were on the same machine.
- Core: Scala
- Database access: Squeryl
- Rest server: Netty, a high-performance, asynchronous, event-driven Java NIO framework.
- Replication: JGroups
- Testing the SQS interface: Amazon Java SDK;
rest-sqs-testing-amazon-java-sdkmodule for the testsuite.
- Server configuration: Ostrich
Version 0.5 (26 May 2012)
- stand-alone distribution (download)
- file log for message storage (journal)
- factoring out
storage-databasemodule, to decrease the dependencies of the core modules
Version 0.4 (27 Mar 2012)
Version 0.3 (6 Feb 2012)
- in-memory storage
- new native API
- bug fixes
Version 0.2 (12 Jan 2012)
- new SQS functions support
- testing with Amazon Java SDK
- bug fixes
Version 0.1 (12 Oct 2011)
- initial release
- DB storage
- SQS interface support