Skip to content

Commit

Permalink
Current event number is initialised to zero if it does not exist on a…
Browse files Browse the repository at this point in the history
…pp startup
  • Loading branch information
amckenzie committed Dec 14, 2018
1 parent 41bc3d6 commit 0dc40df
Show file tree
Hide file tree
Showing 19 changed files with 407 additions and 230 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Current event number is initialised to zero if it does not exist on app startup

## [1.1.0-M1]
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import javax.sql.DataSource;

@ApplicationScoped
public class SubscriptionJdbcRepository {
public class StreamStatusJdbcRepository {

/**
* Column Names
Expand All @@ -45,9 +45,9 @@ public class SubscriptionJdbcRepository {

DataSource dataSource;

public SubscriptionJdbcRepository() {}
public StreamStatusJdbcRepository() {}

public SubscriptionJdbcRepository(final DataSource dataSource, final JdbcRepositoryHelper jdbcRepositoryHelper) {
public StreamStatusJdbcRepository(final DataSource dataSource, final JdbcRepositoryHelper jdbcRepositoryHelper) {
this.dataSource = dataSource;
this.jdbcRepositoryHelper = jdbcRepositoryHelper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferEvent;
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription;
import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.JsonObjectEnvelopeConverter;

Expand Down Expand Up @@ -36,7 +36,7 @@ public class ConsecutiveEventBufferService implements EventBufferService {
private EventBufferJdbcRepository streamBufferRepository;

@Inject
private SubscriptionJdbcRepository subscriptionJdbcRepository;
private StreamStatusJdbcRepository streamStatusJdbcRepository;

@Inject
private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter;
Expand All @@ -61,9 +61,9 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming
final long incomingEventVersion = versionOf(incomingEvent);
final String source = getSource(incomingEvent);

subscriptionJdbcRepository.updateSource(streamId, source);
subscriptionJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
final long currentVersion = subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)
streamStatusJdbcRepository.updateSource(streamId, source);
streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)
.orElseThrow(() -> new IllegalStateException("stream status cannot be empty"))
.getPosition();

Expand All @@ -78,7 +78,7 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming

} else {
logger.trace("Message : {} version is valid sending stream to dispatcher", incomingEvent);
subscriptionJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source));
streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source));
return bufferedEvents(streamId, incomingEvent, incomingEventVersion);
}
}
Expand All @@ -97,7 +97,7 @@ private Stream<JsonEnvelope> bufferedEvents(final UUID streamId, final JsonEnvel
final String source = getSource(incomingEvent);
return concat(Stream.of(incomingEvent), consecutiveEventStreamFromBuffer(streamBufferRepository.findStreamByIdAndSource(streamId, source), incomingEventVersion)
.peek(streamBufferEvent -> streamBufferRepository.remove(streamBufferEvent))
.peek(streamBufferEvent -> subscriptionJdbcRepository.update(new Subscription(
.peek(streamBufferEvent -> streamStatusJdbcRepository.update(new Subscription(
streamBufferEvent.getStreamId(),
streamBufferEvent.getPosition(),
source)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import org.junit.Before;
import org.junit.Test;

public class SubscriptionJdbcRepositoryIT {
public class StreamStatusJdbcRepositoryIT {

private static final String COUNT_BY_STREAM_ID = "SELECT count(*) FROM stream_status WHERE stream_id=?";

private static final long INITIAL_POSITION = 0L;

private SubscriptionJdbcRepository jdbcRepository;
private StreamStatusJdbcRepository jdbcRepository;

private DataSource dataSource;

Expand All @@ -41,7 +41,7 @@ public void initDatabase() throws Exception {
.createDataSource("frameworkviewstore");

jdbcRepositoryHelper = new JdbcRepositoryHelper();
jdbcRepository = new SubscriptionJdbcRepository(dataSource, jdbcRepositoryHelper);
jdbcRepository = new StreamStatusJdbcRepository(dataSource, jdbcRepositoryHelper);

new DatabaseCleaner().cleanViewStoreTables("framework", "stream_status", "stream_buffer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SubscriptionJdbcRepositoryTest {
public class StreamStatusJdbcRepositoryTest {

@InjectMocks
private SubscriptionJdbcRepository repository;
private StreamStatusJdbcRepository repository;

@Mock
DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferEvent;
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription;
import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.JsonObjectEnvelopeConverter;
import uk.gov.justice.services.test.utils.common.stream.StreamCloseSpy;
Expand All @@ -43,7 +43,7 @@ public class ConsecutiveEventBufferServiceTest {
private Logger logger;

@Mock
private SubscriptionJdbcRepository subscriptionJdbcRepository;
private StreamStatusJdbcRepository streamStatusJdbcRepository;

@Mock
private EventBufferJdbcRepository streamBufferRepository;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void shouldIgnoreObsoleteEvent() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);

final JsonEnvelope event_3 = envelopeFrom(
Expand All @@ -115,11 +115,11 @@ public void shouldIgnoreObsoleteEvent() {

verifyZeroInteractions(streamBufferRepository);

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -131,7 +131,7 @@ public void shouldReturnEventThatIsInCorrectOrder() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);

when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());
Expand All @@ -144,11 +144,11 @@ public void shouldReturnEventThatIsInCorrectOrder() {
final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);
assertThat(returnedEvents, contains(incomingEvent));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -164,18 +164,18 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);
when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());

bufferService.currentOrderedEventsWith(incomingEvent);

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).update(new Subscription(streamId, 5L, source));
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).update(new Subscription(streamId, 5L, source));

}

Expand All @@ -192,19 +192,19 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);
when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 4L, source)));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 4L, source)));

when(jsonObjectEnvelopeConverter.asJsonString(incomingEvent)).thenReturn("someStringRepresentation");

final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);

final InOrder inOrder = inOrder(subscriptionJdbcRepository, streamBufferRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository, streamBufferRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamBufferRepository).insert(new EventBufferEvent(streamId, 6L, "someStringRepresentation", source));

assertThat(returnedEvents, empty());
Expand All @@ -219,7 +219,7 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);

when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(
Expand Down Expand Up @@ -247,11 +247,11 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG
final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);
assertThat(returnedEvents, contains(incomingEvent, bufferedEvent4, bufferedEvent5, bufferedEvent6));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -264,7 +264,7 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);

final StreamCloseSpy sourceStreamSpy = new StreamCloseSpy();
Expand All @@ -287,11 +287,11 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() {

assertThat(sourceStreamSpy.streamClosed(), is(true));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -303,7 +303,7 @@ public void shouldRemoveEventsFromBufferOnceStreamed() {
final String eventName = "source.event.name";
final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);


Expand Down Expand Up @@ -331,11 +331,11 @@ public void shouldRemoveEventsFromBufferOnceStreamed() {

assertThat(returnedEvents, contains(incomingEvent, bufferedEvent4, bufferedEvent5, bufferedEvent6));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

verify(streamBufferRepository).remove(event4);
verify(streamBufferRepository).remove(event5);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package uk.gov.justice.services.event.source.subscriptions.repository.jdbc;

import static java.util.Optional.empty;
import static java.util.Optional.of;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;

public class SubscriptionsJdbc {

private static final String INSERT_OR_UPDATE_EVENT_NUMBER_SQL =
"INSERT INTO subscriptions (subscription_name, current_event_number) VALUES (?, ?) " +
"ON CONFLICT (subscription_name) DO UPDATE " +
"SET current_event_number = EXCLUDED.current_event_number";
private static final String SELECT_CURRENT_EVENT_NUMBER_SQL =
"SELECT current_event_number " +
"FROM subscriptions " +
"WHERE subscription_name = ?";
private static final String DELETE_CURRENT_EVENT_NUMBER_SQL =
"DELETE FROM subscriptions " +
"WHERE subscription_name = ?";


public Optional<Long> readCurrentEventNumber(final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_CURRENT_EVENT_NUMBER_SQL)) {

preparedStatement.setString(1, subscriptionName);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return of(resultSet.getLong(1));
}
}
}

return empty();
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
public void insertOrUpdateCurrentEventNumber(final long eventNumber, final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_OR_UPDATE_EVENT_NUMBER_SQL)) {
preparedStatement.setString(1, subscriptionName);
preparedStatement.setLong(2, eventNumber);
preparedStatement.executeUpdate();
}
}

public void deleteCurrentEventNumber(final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(DELETE_CURRENT_EVENT_NUMBER_SQL)) {

preparedStatement.setString(1, subscriptionName);
preparedStatement.executeUpdate();
}
}
}
Loading

0 comments on commit 0dc40df

Please sign in to comment.