Skip to content
Permalink
Browse files
Merge branch 'master' into comparable-position
  • Loading branch information
cschneider committed Jan 3, 2019
2 parents f5cad44 + 090771c commit 7e2f7a0bd08c2cbb2a1d21e7377e0c07d2a5e950
Showing 4 changed files with 7 additions and 9 deletions.
@@ -28,7 +28,7 @@ public interface Messaging {
* Send a message to a topic. When this method returns the message
* is safely persisted.
*/
Position send(String topic, Message message);
void send(String topic, Message message);

/**
* Subscribe to a topic. The callback is called for each message received.
@@ -36,9 +36,9 @@ public class InMemoryMessaging implements Messaging {
private Map<String, Topic> topics = new ConcurrentHashMap<>();

@Override
public Position send(String topicName, Message message) {
public void send(String topicName, Message message) {
Topic topic = getOrCreate(topicName);
return topic.send(message);
topic.send(message);
}

@Override
@@ -32,7 +32,7 @@ long getOffset() {
}

@Override
public String toString() {
public String positionToString() {
return Long.toString(offset);
}

@@ -63,9 +63,7 @@ public void testPositionFromString() {
public void testSend() {
subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
String content = "testcontent";
Position pos = send("test", content);
assertThat(pos.toString(), equalTo("0"));

send("test", content);
verify(callback, timeout(1000)).accept(messageCaptor.capture());
Received received = messageCaptor.getValue();
assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
@@ -133,11 +131,11 @@ private String getContent(Received rec) {
return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
}

private Position send(String topic, String content) {
private void send(String topic, String content) {
Map<String, String> props = new HashMap<String, String>();
props.put("my", "testvalue");
Message message = messaging.newMessage(toBytes(content), props);
return messaging.send(topic, message);
messaging.send(topic, message);
}

private byte[] toBytes(String content) {

0 comments on commit 7e2f7a0

Please sign in to comment.