Skip to content

Commit

Permalink
Merge 0dc40df into 41bc3d6
Browse files Browse the repository at this point in the history
  • Loading branch information
Allan Mckenzie committed Dec 14, 2018
2 parents 41bc3d6 + 0dc40df commit 85c13ba
Show file tree
Hide file tree
Showing 19 changed files with 407 additions and 230 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Current event number is initialised to zero if it does not exist on app startup

## [1.1.0-M1]
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import javax.sql.DataSource;

@ApplicationScoped
public class SubscriptionJdbcRepository {
public class StreamStatusJdbcRepository {

/**
* Column Names
Expand All @@ -45,9 +45,9 @@ public class SubscriptionJdbcRepository {

DataSource dataSource;

public SubscriptionJdbcRepository() {}
public StreamStatusJdbcRepository() {}

public SubscriptionJdbcRepository(final DataSource dataSource, final JdbcRepositoryHelper jdbcRepositoryHelper) {
public StreamStatusJdbcRepository(final DataSource dataSource, final JdbcRepositoryHelper jdbcRepositoryHelper) {
this.dataSource = dataSource;
this.jdbcRepositoryHelper = jdbcRepositoryHelper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferEvent;
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription;
import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.JsonObjectEnvelopeConverter;

Expand Down Expand Up @@ -36,7 +36,7 @@ public class ConsecutiveEventBufferService implements EventBufferService {
private EventBufferJdbcRepository streamBufferRepository;

@Inject
private SubscriptionJdbcRepository subscriptionJdbcRepository;
private StreamStatusJdbcRepository streamStatusJdbcRepository;

@Inject
private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter;
Expand All @@ -61,9 +61,9 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming
final long incomingEventVersion = versionOf(incomingEvent);
final String source = getSource(incomingEvent);

subscriptionJdbcRepository.updateSource(streamId, source);
subscriptionJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
final long currentVersion = subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)
streamStatusJdbcRepository.updateSource(streamId, source);
streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)
.orElseThrow(() -> new IllegalStateException("stream status cannot be empty"))
.getPosition();

Expand All @@ -78,7 +78,7 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming

} else {
logger.trace("Message : {} version is valid sending stream to dispatcher", incomingEvent);
subscriptionJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source));
streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source));
return bufferedEvents(streamId, incomingEvent, incomingEventVersion);
}
}
Expand All @@ -97,7 +97,7 @@ private Stream<JsonEnvelope> bufferedEvents(final UUID streamId, final JsonEnvel
final String source = getSource(incomingEvent);
return concat(Stream.of(incomingEvent), consecutiveEventStreamFromBuffer(streamBufferRepository.findStreamByIdAndSource(streamId, source), incomingEventVersion)
.peek(streamBufferEvent -> streamBufferRepository.remove(streamBufferEvent))
.peek(streamBufferEvent -> subscriptionJdbcRepository.update(new Subscription(
.peek(streamBufferEvent -> streamStatusJdbcRepository.update(new Subscription(
streamBufferEvent.getStreamId(),
streamBufferEvent.getPosition(),
source)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import org.junit.Before;
import org.junit.Test;

public class SubscriptionJdbcRepositoryIT {
public class StreamStatusJdbcRepositoryIT {

private static final String COUNT_BY_STREAM_ID = "SELECT count(*) FROM stream_status WHERE stream_id=?";

private static final long INITIAL_POSITION = 0L;

private SubscriptionJdbcRepository jdbcRepository;
private StreamStatusJdbcRepository jdbcRepository;

private DataSource dataSource;

Expand All @@ -41,7 +41,7 @@ public void initDatabase() throws Exception {
.createDataSource("frameworkviewstore");

jdbcRepositoryHelper = new JdbcRepositoryHelper();
jdbcRepository = new SubscriptionJdbcRepository(dataSource, jdbcRepositoryHelper);
jdbcRepository = new StreamStatusJdbcRepository(dataSource, jdbcRepositoryHelper);

new DatabaseCleaner().cleanViewStoreTables("framework", "stream_status", "stream_buffer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SubscriptionJdbcRepositoryTest {
public class StreamStatusJdbcRepositoryTest {

@InjectMocks
private SubscriptionJdbcRepository repository;
private StreamStatusJdbcRepository repository;

@Mock
DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferEvent;
import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription;
import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository;
import uk.gov.justice.services.event.buffer.core.repository.subscription.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.JsonObjectEnvelopeConverter;
import uk.gov.justice.services.test.utils.common.stream.StreamCloseSpy;
Expand All @@ -43,7 +43,7 @@ public class ConsecutiveEventBufferServiceTest {
private Logger logger;

@Mock
private SubscriptionJdbcRepository subscriptionJdbcRepository;
private StreamStatusJdbcRepository streamStatusJdbcRepository;

@Mock
private EventBufferJdbcRepository streamBufferRepository;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void shouldIgnoreObsoleteEvent() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);

final JsonEnvelope event_3 = envelopeFrom(
Expand All @@ -115,11 +115,11 @@ public void shouldIgnoreObsoleteEvent() {

verifyZeroInteractions(streamBufferRepository);

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -131,7 +131,7 @@ public void shouldReturnEventThatIsInCorrectOrder() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);

when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());
Expand All @@ -144,11 +144,11 @@ public void shouldReturnEventThatIsInCorrectOrder() {
final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);
assertThat(returnedEvents, contains(incomingEvent));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -164,18 +164,18 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);
when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty());

bufferService.currentOrderedEventsWith(incomingEvent);

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).update(new Subscription(streamId, 5L, source));
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).update(new Subscription(streamId, 5L, source));

}

Expand All @@ -192,19 +192,19 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(4L);
when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 4L, source)));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 4L, source)));

when(jsonObjectEnvelopeConverter.asJsonString(incomingEvent)).thenReturn("someStringRepresentation");

final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);

final InOrder inOrder = inOrder(subscriptionJdbcRepository, streamBufferRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository, streamBufferRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamBufferRepository).insert(new EventBufferEvent(streamId, 6L, "someStringRepresentation", source));

assertThat(returnedEvents, empty());
Expand All @@ -219,7 +219,7 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);

when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(
Expand Down Expand Up @@ -247,11 +247,11 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG
final Stream<JsonEnvelope> returnedEvents = bufferService.currentOrderedEventsWith(incomingEvent);
assertThat(returnedEvents, contains(incomingEvent, bufferedEvent4, bufferedEvent5, bufferedEvent6));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -264,7 +264,7 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() {

final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);

final StreamCloseSpy sourceStreamSpy = new StreamCloseSpy();
Expand All @@ -287,11 +287,11 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() {

assertThat(sourceStreamSpy.streamClosed(), is(true));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

}

Expand All @@ -303,7 +303,7 @@ public void shouldRemoveEventsFromBufferOnceStreamed() {
final String eventName = "source.event.name";
final Subscription subscription = mock(Subscription.class);

when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription));
when(subscription.getPosition()).thenReturn(2L);


Expand Down Expand Up @@ -331,11 +331,11 @@ public void shouldRemoveEventsFromBufferOnceStreamed() {

assertThat(returnedEvents, contains(incomingEvent, bufferedEvent4, bufferedEvent5, bufferedEvent6));

final InOrder inOrder = inOrder(subscriptionJdbcRepository);
final InOrder inOrder = inOrder(streamStatusJdbcRepository);

inOrder.verify(subscriptionJdbcRepository).updateSource(streamId, source);
inOrder.verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source);
inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source));
inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source);

verify(streamBufferRepository).remove(event4);
verify(streamBufferRepository).remove(event5);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package uk.gov.justice.services.event.source.subscriptions.repository.jdbc;

import static java.util.Optional.empty;
import static java.util.Optional.of;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;

public class SubscriptionsJdbc {

private static final String INSERT_OR_UPDATE_EVENT_NUMBER_SQL =
"INSERT INTO subscriptions (subscription_name, current_event_number) VALUES (?, ?) " +
"ON CONFLICT (subscription_name) DO UPDATE " +
"SET current_event_number = EXCLUDED.current_event_number";
private static final String SELECT_CURRENT_EVENT_NUMBER_SQL =
"SELECT current_event_number " +
"FROM subscriptions " +
"WHERE subscription_name = ?";
private static final String DELETE_CURRENT_EVENT_NUMBER_SQL =
"DELETE FROM subscriptions " +
"WHERE subscription_name = ?";


public Optional<Long> readCurrentEventNumber(final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_CURRENT_EVENT_NUMBER_SQL)) {

preparedStatement.setString(1, subscriptionName);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return of(resultSet.getLong(1));
}
}
}

return empty();
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
public void insertOrUpdateCurrentEventNumber(final long eventNumber, final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_OR_UPDATE_EVENT_NUMBER_SQL)) {
preparedStatement.setString(1, subscriptionName);
preparedStatement.setLong(2, eventNumber);
preparedStatement.executeUpdate();
}
}

public void deleteCurrentEventNumber(final String subscriptionName, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(DELETE_CURRENT_EVENT_NUMBER_SQL)) {

preparedStatement.setString(1, subscriptionName);
preparedStatement.executeUpdate();
}
}
}
Loading

0 comments on commit 85c13ba

Please sign in to comment.