Skip to content
Permalink
Browse files
ARIES-1878 - In memory implementation
  • Loading branch information
cschneider committed Jan 2, 2019
1 parent f5a98ce commit c5625d7943405be2c14bdfc2eb94b9dfa11ccc1c
Showing 8 changed files with 407 additions and 0 deletions.
@@ -0,0 +1,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.aries.events.memory</groupId>
<artifactId>org.apache.aries.events.memory</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events.api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,48 @@
package org.apache.aries.events.memory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.Subscription;
import org.apache.aries.events.api.Type;
import org.osgi.service.component.annotations.Component;

@Component
@Type("memory")
public class InMemoryMessaging implements Messaging {
private Map<String, Topic> topics = new ConcurrentHashMap<>();

@Override
public Position send(String topicName, Message message) {
Topic topic = getOrCreate(topicName);
return topic.send(message);
}

@Override
public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
Topic topic = getOrCreate(topicName);
return topic.subscribe(position, seek, callback);
}

@Override
public Message newMessage(byte[] payload, Map<String, String> props) {
return new MemoryMessage(payload, props);
}

@Override
public Position positionFromString(String position) {
long offset = new Long(position).longValue();
return new MemoryPosition(offset);
}

private Topic getOrCreate(String topicName) {
return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
}

}
@@ -0,0 +1,38 @@
package org.apache.aries.events.memory;

import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;

public class Journal<T> {
private AtomicLong nextOffset = new AtomicLong();
private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();

public long append(T message) {
Long offset = nextOffset.getAndIncrement();
messages.put(offset, message);
return offset;
}

public long getFirstOffset() {
try {
return messages.firstKey();
} catch (NoSuchElementException e) {
return 0;
}
}

public long getLastOffset() {
try {
return messages.lastKey();
} catch (NoSuchElementException e) {
return -1;
}
}

public Entry<Long, T> getNext(long offset) {
return this.messages.ceilingEntry(offset);
}
}
@@ -0,0 +1,27 @@
package org.apache.aries.events.memory;

import java.util.Map;

import org.apache.aries.events.api.Message;

class MemoryMessage implements Message {

private byte[] payload;
private Map<String, String> properties;

MemoryMessage(byte[] payload, Map<String, String> props) {
this.payload = payload;
properties = props;
}

@Override
public byte[] getPayload() {
return this.payload;
}

@Override
public Map<String, String> getProperties() {
return this.properties;
}

}
@@ -0,0 +1,21 @@
package org.apache.aries.events.memory;

import org.apache.aries.events.api.Position;

class MemoryPosition implements Position {

private long offset;

MemoryPosition(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}

@Override
public String toString() {
return new Long(offset).toString();
}
}
@@ -0,0 +1,106 @@
package org.apache.aries.events.memory;

import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Topic {
private Logger log = LoggerFactory.getLogger(this.getClass());

private String topicName;
private Journal<Message> journal;
private Set<Subscription> subscriptions = new HashSet<>();

public Topic(String topicName) {
this.topicName = topicName;
this.journal = new Journal<>();
}

public Position send(Message message) {
long offset = this.journal.append(message);
return new MemoryPosition(offset);
}

public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
long startOffset = getStartOffset(position, seek);
log.debug("Consuming from " + startOffset);
return new TopicSubscription(startOffset, callback);
}

private long getStartOffset(Position position, Seek seek) {
if (position != null) {
return position.getOffset();
} else {
if (seek == Seek.earliest) {
return this.journal.getFirstOffset();
} else if (seek == Seek.latest) {
return this.journal.getLastOffset() + 1;
} else {
throw new IllegalArgumentException("Seek must not be null");
}
}
}

class TopicSubscription implements Subscription {
private Consumer<Received> callback;
private ExecutorService executor;
private volatile boolean running;
private long currentOffset;

TopicSubscription(long startOffset, Consumer<Received> callback) {
this.currentOffset = startOffset;
this.callback = callback;
this.running = true;
String name = "Poller for " + topicName;
this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
this.executor.execute(this::poll);
}

private void poll() {
while (running) {
Entry<Long, Message> entry = journal.getNext(currentOffset);
if (entry != null) {
long offset = entry.getKey();
try {
MemoryPosition position = new MemoryPosition(this.currentOffset);
Received received = new Received(position, entry.getValue());
callback.accept(received);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
this.currentOffset = offset + 1;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}

@Override
public void close() {
this.running = false;
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
}
subscriptions.remove(this);
}

}
}

0 comments on commit c5625d7

Please sign in to comment.