Skip to content
Permalink
Browse files
Merge pull request #5 from tmaret/comparable-position
Abstract the offset from the API
  • Loading branch information
cschneider committed Jan 3, 2019
2 parents 090771c + 7e2f7a0 commit e8b41131e097f473e0d34ed1429965e00c0a7728
Showing 4 changed files with 9 additions and 18 deletions.
@@ -21,17 +21,7 @@
* Position in a the topic.
* E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions
* this could simply be like an offset.
* TODO How do we provide ordering without being too specific?
*/
public interface Position {
long getOffset();
public interface Position extends Comparable<Position> {


/**
* Serialise the position into a {@code String} string.
*
* @see {@link Messaging#positionFromString(String)} for the reverse operation.
* @return the position as a string
*/
String positionToString();
}
@@ -27,7 +27,7 @@ class MemoryPosition implements Position {
this.offset = offset;
}

public long getOffset() {
long getOffset() {
return offset;
}

@@ -37,7 +37,7 @@ public String positionToString() {
}

@Override
public String toString() {
return positionToString();
public int compareTo(Position p) {
return Long.compare(((MemoryPosition)p).offset, offset);
}
}
@@ -48,12 +48,12 @@ public Position send(Message message) {
}

public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
long startOffset = getStartOffset(position, seek);
long startOffset = getStartOffset((MemoryPosition) position, seek);
log.debug("Consuming from " + startOffset);
return new TopicSubscription(startOffset, callback);
}

private long getStartOffset(Position position, Seek seek) {
private long getStartOffset(MemoryPosition position, Seek seek) {
if (position != null) {
return position.getOffset();
} else {
@@ -3,6 +3,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -55,7 +56,7 @@ public void after() {
@Test
public void testPositionFromString() {
Position pos = messaging.positionFromString("1");
assertThat(pos.getOffset(), equalTo(1l));
assertEquals(0, pos.compareTo(new MemoryPosition(1)));
}

@Test
@@ -66,7 +67,7 @@ public void testSend() {
verify(callback, timeout(1000)).accept(messageCaptor.capture());
Received received = messageCaptor.getValue();
assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
assertThat(received.getPosition().getOffset(), equalTo(0l));
assertEquals(0, received.getPosition().compareTo(new MemoryPosition(0)));
assertThat(received.getMessage().getProperties().size(), equalTo(1));
assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
}

0 comments on commit e8b4113

Please sign in to comment.