Skip to content

Commit

Permalink
Merge a3af2b2 into c38f037
Browse files Browse the repository at this point in the history
  • Loading branch information
Allan Mckenzie committed Nov 13, 2018
2 parents c38f037 + a3af2b2 commit 05d3b12
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 295 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ConsecutiveEventBufferService implements EventBufferService {
private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter;

@Inject
private BufferInitialisationStrategy bufferInitialisationStrategy;
private EventBufferInitialiser eventBufferInitialiser;


/**
Expand All @@ -64,7 +64,7 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming
final long incomingEventVersion = versionOf(incomingEvent);
final String source = getSource(incomingEvent);

final long currentVersion = bufferInitialisationStrategy.initialiseBuffer(streamId, source);
final long currentVersion = eventBufferInitialiser.initialiseBuffer(streamId, source);

if (incomingEventObsolete(incomingEventVersion, currentVersion)) {
logger.warn("Message : {} is an obsolete version", incomingEvent);
Expand Down Expand Up @@ -115,7 +115,7 @@ private void addToBuffer(final JsonEnvelope incomingEvent, final UUID streamId,
}

private Stream<EventBufferEvent> consecutiveEventStreamFromBuffer(final Stream<EventBufferEvent> messageBuffer, final long currentVersion) {
return stream(new ConsecutiveEventsSpliterator(messageBuffer, currentVersion), false).onClose(() -> messageBuffer.close());
return stream(new ConsecutiveEventsSpliterator(messageBuffer, currentVersion), false).onClose(messageBuffer::close);
}

private boolean incomingEventNotInOrder(final long incomingEventVersion, final long currentVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@

import java.util.UUID;

public class PostgreSQLBasedBufferInitialisationStrategy implements BufferInitialisationStrategy {
private static final long INITIAL_VERSION = 0L;
import javax.inject.Inject;

private final SubscriptionJdbcRepository subscriptionJdbcRepository;
public class EventBufferInitialiser {

public PostgreSQLBasedBufferInitialisationStrategy(final SubscriptionJdbcRepository subscriptionJdbcRepository) {
this.subscriptionJdbcRepository = subscriptionJdbcRepository;
}
private static final long INITIAL_VERSION = 0L;

@Inject
private SubscriptionJdbcRepository subscriptionJdbcRepository;

@Override
/**
* Initialises buffer (if not already intialised) and returns the current version of the buffer
* status
*
* @param streamId - id of the stream to be initialised
* @return - version of the last event that was in order
*/
public long initialiseBuffer(final UUID streamId, final String source) {
subscriptionJdbcRepository.updateSource(streamId,source);
subscriptionJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ConsecutiveEventBufferServiceTest {
private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter;

@Mock
private BufferInitialisationStrategy bufferInitialisationStrategy;
private EventBufferInitialiser eventBufferInitialiser;

@InjectMocks
private ConsecutiveEventBufferService bufferService;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void shouldIgnoreObsoleteEvent() {
final String source = "source";
final String eventName = "source.event.name";

when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L);
when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L);

final JsonEnvelope event_3 = envelopeFrom(
metadataBuilder().withId(randomUUID()).withName(eventName).withStreamId(streamId).withVersion(3L),
Expand All @@ -121,7 +121,7 @@ public void shouldReturnEventThatIsInCorrectOrder() {
final String source = "source";
final String eventName = "source.event.name";

when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L);
when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L);


when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());
Expand All @@ -145,7 +145,7 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() {
createObjectBuilder()
);

when(bufferInitialisationStrategy.initialiseBuffer(streamId, source)).thenReturn(4L);
when(eventBufferInitialiser.initialiseBuffer(streamId, source)).thenReturn(4L);
when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());

bufferService.currentOrderedEventsWith(incomingEvent);
Expand All @@ -164,7 +164,7 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() {
createObjectBuilder()
);

when(bufferInitialisationStrategy.initialiseBuffer(streamId, source)).thenReturn(4L);
when(eventBufferInitialiser.initialiseBuffer(streamId, source)).thenReturn(4L);
when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.of(new Subscription(streamId, 4L, source)));

when(jsonObjectEnvelopeConverter.asJsonString(incomingEvent)).thenReturn("someStringRepresentation");
Expand All @@ -183,7 +183,7 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG
final String source = "source";
final String eventName = "source.event.name";

when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);
when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);

when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(
Stream.of(new EventBufferEvent(streamId, 4L, "someEventContent4", "source_4"),
Expand Down Expand Up @@ -218,7 +218,7 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() {
final String source = "source";
final String eventName = "source.event.name";

when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);
when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);

final StreamCloseSpy sourceStreamSpy = new StreamCloseSpy();

Expand Down Expand Up @@ -247,7 +247,7 @@ public void shouldRemoveEventsFromBufferOnceStreamed() {
final UUID streamId = randomUUID();
final String source = "source";
final String eventName = "source.event.name";
when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);
when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L);


final EventBufferEvent event4 = new EventBufferEvent(streamId, 4L, "someEventContent4", "source_1");
Expand Down
Loading

0 comments on commit 05d3b12

Please sign in to comment.