Skip to content
Permalink
Browse files
ARIES-1878 - Add eviction after a certain number of messages
  • Loading branch information
cschneider committed Jan 5, 2019
1 parent 8189e0d commit 42932be35652c18969c9133b3aa25a9492a27e38
Showing 4 changed files with 58 additions and 11 deletions.
@@ -31,7 +31,17 @@
@Component
@Type("memory")
public class InMemoryMessaging implements Messaging {
private Map<String, Topic> topics = new ConcurrentHashMap<>();
private final Map<String, Topic> topics = new ConcurrentHashMap<>();
private final int keepAtLeast;

public InMemoryMessaging() {
this(10000);
}

public InMemoryMessaging(int keepAtLeast) {
this.keepAtLeast = keepAtLeast;

}

@Override
public void send(String topicName, Message message) {
@@ -57,7 +67,7 @@ public Position positionFromString(String position) {
}

private Topic getOrCreate(String topicName) {
return topics.computeIfAbsent(topicName, Topic::new);
return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2, keepAtLeast));
}

}
@@ -17,22 +17,40 @@
*/
package org.apache.aries.events.memory;

import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;

public class Journal<T> {
private AtomicLong nextOffset = new AtomicLong();
private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
class Journal<T> {
private final int keepAtLeast;
private final AtomicLong nextOffset = new AtomicLong();
private final ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
private final AtomicLong count = new AtomicLong();

public Journal(int keepAtLeast) {
this.keepAtLeast = keepAtLeast;
}

public long append(T message) {
if (count.incrementAndGet() > keepAtLeast * 2) {
evict();
}
Long offset = nextOffset.getAndIncrement();
messages.put(offset, message);
return offset;
}

private synchronized void evict() {
Iterator<Long> it = messages.keySet().iterator();
for (int c = 0; c < keepAtLeast; c++) {
messages.remove(it.next());
}
count.set(0);
}

public long getFirstOffset() {
try {
return messages.firstKey();
@@ -31,15 +31,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Topic {
private Logger log = LoggerFactory.getLogger(this.getClass());
class Topic {
private final Logger log = LoggerFactory.getLogger(this.getClass());

private String topicName;
private Journal<Message> journal;
private final String topicName;
private final Journal<Message> journal;

public Topic(String topicName) {
public Topic(String topicName, int keepAtLeast) {
this.topicName = topicName;
this.journal = new Journal<>();
this.journal = new Journal<>(keepAtLeast);
}

public synchronized Position send(Message message) {
@@ -1,6 +1,7 @@
package org.apache.aries.events.memory;

import static org.apache.aries.events.api.SubscribeRequest.to;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
@@ -16,6 +17,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

@@ -36,6 +38,8 @@

public class MessagingTest {

private static final long MAX_MANY = 100000l;

@Mock
private Consumer<Received> callback;

@@ -135,6 +139,21 @@ public void testFrom1() {
assertThat(messageContents(), contains("testcontent2"));
}

@Test
public void testMany() {
AtomicLong count = new AtomicLong();
Consumer<Received> manyCallback = rec -> { count.incrementAndGet(); };
messaging.subscribe(to("test", manyCallback));
for (long c=0; c < MAX_MANY; c++) {
send("test", "content " + c);
if (c % 10000 == 0) {
System.out.println("Sending " + c);
}

}
await().until(count::get, equalTo(MAX_MANY));
}

private void assertMessages(int num) {
verify(callback, timeout(1000).times(num)).accept(messageCaptor.capture());
}

0 comments on commit 42932be

Please sign in to comment.