Skip to content
Permalink
Browse files
Replace sleep with wait/notify
  • Loading branch information
cschneider committed Jan 5, 2019
1 parent bc2769a commit 4455f8e3f93834a5f339ca4d6c723414690ed7b2
Showing 1 changed file with 33 additions and 28 deletions.
@@ -20,7 +20,6 @@
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.aries.events.api.Message;
@@ -43,8 +42,9 @@ public Topic(String topicName) {
this.journal = new Journal<>();
}

public Position send(Message message) {
public synchronized Position send(Message message) {
long offset = this.journal.append(message);
notifyAll();
return new MemoryPosition(offset);
}

@@ -66,53 +66,58 @@ private long getStartOffset(MemoryPosition position, Seek seek) {
}
}

private synchronized Entry<Long, Message> waitNext(long currentOffset) throws InterruptedException {
Entry<Long, Message> entry = journal.getNext(currentOffset);
if (entry != null) {
return entry;
}
log.debug("Waiting for next message");
wait();
return journal.getNext(currentOffset);
}

class TopicSubscription implements Subscription {
private Consumer<Received> callback;
private ExecutorService executor;
private volatile boolean running;
private long currentOffset;

TopicSubscription(long startOffset, Consumer<Received> callback) {
this.currentOffset = startOffset;
this.callback = callback;
this.running = true;
String name = "Poller for " + topicName;
this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
this.executor.execute(this::poll);
}

private void poll() {
while (running) {
Entry<Long, Message> entry = journal.getNext(currentOffset);
if (entry != null) {
long offset = entry.getKey();
try {
MemoryPosition position = new MemoryPosition(this.currentOffset);
Received received = new Received(position, entry.getValue());
callback.accept(received);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
this.currentOffset = offset + 1;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
try {
while (true) {
Entry<Long, Message> entry = waitNext(currentOffset);
if (entry != null) {
handleMessage(entry);
}
}
} catch (InterruptedException e) {
log.debug("Poller thread for consumer on topic " + topicName + " stopped.");
}
}

private void handleMessage(Entry<Long, Message> entry) {
long offset = entry.getKey();
try {
MemoryPosition position = new MemoryPosition(this.currentOffset);
Received received = new Received(position, entry.getValue());
callback.accept(received);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
this.currentOffset = offset + 1;
}

@Override
public void close() {
this.running = false;
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
}
executor.shutdownNow();
}

}

0 comments on commit 4455f8e

Please sign in to comment.