Skip to content
Permalink
Browse files
ARIES-1881 Mongo based implementation
- Added few unit tests. In order to enable use aries.events.test.mongoUri system property which should point to a running mongodb instance
  • Loading branch information
Alexei Krainiouk committed Jan 7, 2019
1 parent d63ee56 commit 222279761f01937af95069d7cf367732a17f3a86
Showing 6 changed files with 168 additions and 8 deletions.
@@ -17,10 +17,13 @@
*/
package org.apache.aries.events.api;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

/**
* TODO If we allow wild card consumption then a message also needs a topic
@@ -31,6 +34,8 @@ public final class Message {
private final Map<String, String> properties;

public Message(byte[] payload, Map<String, String> properties) {
requireNonNull(payload);
requireNonNull(properties);
this.payload = payload.clone();
this.properties = unmodifiableMap(new HashMap<>(properties));
}
@@ -43,4 +48,25 @@ public Map<String, String> getProperties() {
return properties;
}

@Override
public String toString() {
return "Message" + properties;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message = (Message) o;
return Arrays.equals(payload, message.payload) &&
properties.equals(message.properties);
}

@Override
public int hashCode() {
int result = Objects.hash(properties);
result = 31 * result + Arrays.hashCode(payload);
return result;
}

}
@@ -100,7 +100,7 @@ public void close() {
private volatile boolean interrupted = false;

private MessageReceiverImpl(MongoCollection<Document> col, Optional<MongoClient> mongoClient) {
LOGGER.info("Creating new receiver: " + col.getNamespace().getCollectionName());
LOGGER.debug("Creating new receiver: " + col.getNamespace().getCollectionName());
this.mongoClient = mongoClient;
this.col = col;
}
@@ -70,7 +70,7 @@ public void close() {}
private final long maxAge;

private MessageSenderImpl(MongoCollection<Document> collection, long maxAge) {
LOGGER.info("Creating new publisher: " + collection.getNamespace().getCollectionName());
LOGGER.debug("Creating new publisher: " + collection.getNamespace().getCollectionName());
ensureIndexes(collection);
this.collection = collection;
this.maxAge = maxAge;
@@ -22,12 +22,8 @@
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.Subscription;
import org.bson.Document;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;

import static java.lang.Thread.currentThread;
@@ -114,7 +110,7 @@ private void poll(MessageReceiver receiver) {
while (!interrupted()) {
try {
Message message = receiver.receive(index);
LOGGER.info("Received: " + message);
LOGGER.debug("Received: " + message);
Received received = new Received(position(index), message);
consumer.accept(received);
index += 1L;
@@ -124,7 +120,7 @@ private void poll(MessageReceiver receiver) {
LOGGER.error("Error handling message", e);
}
}
LOGGER.info("Quitting " + this);
LOGGER.debug("Quitting " + this);
receiver.close();
}

@@ -0,0 +1,71 @@
package org.apache.aries.events.mongo;

import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.junit.rules.ExternalResource;

import java.util.Optional;
import java.util.logging.Logger;

import static org.junit.Assume.assumeTrue;

/**
* Provides connection to an external mongodb instance
* New database gets created for each test and dropped
* afterwards.
* Database URL must be provided by mongoUri system
* property
*/
public class MongoProvider extends ExternalResource {

MongoCollection<Document> getCollection(String name) {
return database.getCollection(name);
}

//*********************************************
// Internals
//*********************************************

private static final String MONGO_URI_PROP = "aries.events.test.mongoUri";
private static final String DEFAULT_DB_NAME = "tmp_aries_events_test";
private MongoDatabase database;
private MongoClient client;

@Override
protected void before() {
String mongoUri = mongoUri();
client = MongoClients.create(mongoUri);
String dbName = Optional.ofNullable(new MongoClientURI(mongoUri).getDatabase())
.orElse(DEFAULT_DB_NAME);
database = client.getDatabase(dbName);
}

@Override
protected void after() {
if (database != null) {
database.drop();
}
if (client != null) {
client.close();
}
}

private static String mongoUri() {
String result = System.getProperty(MONGO_URI_PROP);
if (result == null) {
String message = "No mongo URI provided.\n" +
" In order to enable mongo tests, define " + MONGO_URI_PROP + " system property\n" +
" to point to a running instance of mongodb.\n" +
" Example:\n" +
" mvn test -D" + MONGO_URI_PROP + "=mongodb://localhost:27017/";
System.out.println("WARNING: " + message);
assumeTrue(message, false);
}
return result;
}

}
@@ -0,0 +1,67 @@
package org.apache.aries.events.mongo;

import com.mongodb.client.MongoCollection;
import org.apache.aries.events.api.Message;
import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;

import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;

import static java.util.Collections.emptyMap;
import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
import static org.junit.Assert.assertEquals;

public class SenderReceiverTest {

@Test public void testReplicate() throws InterruptedException {
MongoCollection<Document> collection = mongoProvider.getCollection("events");
MessageSender sender = messageSender(collection, 1000 * 60 * 60 * 24 * 7);
MessageReceiver receiver = messageReceiver(collection);
Message expected = new Message(new byte[]{ 1, 2, 3 }, mapOf(
keyVal("key1", "val1"),
keyVal("key2", "val2"))
);
sender.send(expected);
sender.send(expected);
Message actual = receiver.receive(0);
assertEquals(expected, actual);
}

@Test(expected = NoSuchElementException.class)
public void testEvicted() throws InterruptedException {
MongoCollection<Document> collection = mongoProvider.getCollection("events");
MessageSender sender = messageSender(collection, 0);
MessageReceiver receiver = messageReceiver(collection);
Message expected = new Message(new byte[] { 1, 2, 3}, emptyMap());
sender.send(expected);
sender.send(expected);
receiver.receive(0);
}

//*********************************************
// Internals
//*********************************************

private MongoCollection<Document> collection;

@Rule
public MongoProvider mongoProvider = new MongoProvider();

private static Map.Entry<String, String> keyVal(String key, String value) {
return new SimpleEntry<>(key, value);
}

private static Map<String, String> mapOf(Map.Entry<String, String>... mappings) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry : mappings) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}

}

0 comments on commit 2222797

Please sign in to comment.