Skip to content
Permalink
Browse files
ARIES-1880 - Use builder in subscribe
  • Loading branch information
cschneider committed Jan 4, 2019
1 parent 56efda4 commit 12c21af3f7bea344f594546d74227dd1862d0eca
Showing 6 changed files with 97 additions and 23 deletions.
@@ -18,7 +18,6 @@
package org.apache.aries.events.api;

import java.util.Map;
import java.util.function.Consumer;

/**
* Journaled messaging API
@@ -39,7 +38,7 @@ public interface Messaging {
* @param callback will be called for each message received
* @return Returned subscription must be closed by the caller to unsubscribe
*/
Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
Subscription subscribe(SubscribeRequest request);

/**
* Create a message with payload and metadata
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.aries.events.api;

import static java.util.Objects.requireNonNull;

import java.util.function.Consumer;

import javax.annotation.ParametersAreNonnullByDefault;

@ParametersAreNonnullByDefault
public class SubscribeRequest {
private final String topic;
private final Consumer<Received> callback;
private Position position;
private Seek seek = Seek.latest;

public SubscribeRequest(String topic, Consumer<Received> callback) {
this.topic = topic;
this.callback = callback;
}

public static SubscribeRequest to(String topic, Consumer<Received> callback) {
return new SubscribeRequest(topic, callback);
}

public SubscribeRequest startAt(Position position) {
this.position = position;
return this;
}

public SubscribeRequest seek(Seek seek) {
this.seek = requireNonNull(seek, "Seek must not be null");
return this;
}

public String getTopic() {
return topic;
}

public Position getPosition() {
return position;
}

public Seek getSeek() {
return seek;
}

public Consumer<Received> getCallback() {
return callback;
}
}
@@ -19,13 +19,11 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.apache.aries.events.api.Type;
import org.osgi.service.component.annotations.Component;
@@ -42,9 +40,9 @@ public void send(String topicName, Message message) {
}

@Override
public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
Topic topic = getOrCreate(topicName);
return topic.subscribe(position, seek, callback);
public Subscription subscribe(SubscribeRequest request) {
Topic topic = getOrCreate(request.getTopic());
return topic.subscribe(request);
}

@Override
@@ -27,6 +27,7 @@
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +48,10 @@ public Position send(Message message) {
return new MemoryPosition(offset);
}

public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
long startOffset = getStartOffset((MemoryPosition) position, seek);
public Subscription subscribe(SubscribeRequest request) {
long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
log.debug("Consuming from " + startOffset);
return new TopicSubscription(startOffset, callback);
return new TopicSubscription(startOffset, request.getCallback());
}

private long getStartOffset(MemoryPosition position, Seek seek) {
@@ -59,10 +60,8 @@ private long getStartOffset(MemoryPosition position, Seek seek) {
} else {
if (seek == Seek.earliest) {
return this.journal.getFirstOffset();
} else if (seek == Seek.latest) {
return this.journal.getLastOffset() + 1;
} else {
throw new IllegalArgumentException("Seek must not be null");
return this.journal.getLastOffset() + 1;
}
}
}
@@ -1,5 +1,6 @@
package org.apache.aries.events.memory;

import static org.apache.aries.events.api.SubscribeRequest.to;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
@@ -23,6 +24,7 @@
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.junit.After;
import org.junit.Before;
@@ -64,7 +66,7 @@ public void testPositionFromString() {

@Test
public void testSend() {
subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
subscribe(to("test", callback).seek(Seek.earliest));
String content = "testcontent";
send("test", content);
verify(callback, timeout(1000)).accept(messageCaptor.capture());
@@ -75,22 +77,22 @@ public void testSend() {
assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
}

@Test(expected=IllegalArgumentException.class)
@Test(expected=NullPointerException.class)
public void testInvalidSubscribe() {
messaging.subscribe("test", null, null, callback);
subscribe(to("test", callback).seek(null));
}

@Test
public void testExceptionInHandler() {
doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
subscribe(to("test", callback));
send("test", "testcontent");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
}

@Test
public void testEarliestBefore() {
subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -100,15 +102,15 @@ public void testEarliestBefore() {
@Test
public void testEarliestAfter() {
send("test", "testcontent");
subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}

@Test
public void testLatestBefore() {
subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
subscribe(to("test", callback));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -118,7 +120,7 @@ public void testLatestBefore() {
@Test
public void testLatest() {
send("test", "testcontent");
subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
subscribe(to("test", callback));
send("test", "testcontent2");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
@@ -128,10 +130,14 @@ public void testLatest() {
public void testFrom1() {
send("test", "testcontent");
send("test", "testcontent2");
subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
}

private void subscribe(SubscribeRequest request) {
this.subscriptions.add(messaging.subscribe(request));
}

private List<String> messageContents() {
return messageCaptor.getAllValues().stream()
@@ -81,6 +81,11 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>

0 comments on commit 12c21af

Please sign in to comment.