Skip to content

Commit

Permalink
Merge pull request #20 from CJSCommonPlatform/update-to-latest-framework
Browse files Browse the repository at this point in the history
Added a page size when reading stream of events
  • Loading branch information
Allan Mckenzie committed Nov 16, 2018
2 parents c38f037 + a64c81e commit ac855aa
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 28 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [1.0.4] - 2018-11-16
### Changed
- Updated framework-api to 3.0.1
- Updated framework to 5.0.4
- Updated framework-domain to 1.0.3

### Added
- Added a page size when reading stream of events

## [1.0.3] - 2018-11-13
### Changed
- Removed hard coded localhost from test datasource url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ public interface EventRepository {
*/
Stream<JsonEnvelope> getEventsByStreamIdFromPosition(final UUID streamId, final Long position);

/**
* Get a stream of envelopes from a given version, ordered by sequence id. The stream is paged
* for efficiency
*
* @param streamId the id of the stream to retrieve
* @param position the sequence id to read the stream from
* @param pageSize the size of the result set page.
* @return the stream of envelopes. Never returns null.
*/
Stream<JsonEnvelope> getEventsByStreamIdFromPosition(final UUID streamId, final Long position, final Integer pageSize);

/**
* Stores the given envelope into the event stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ public Stream<JsonEnvelope> getEventsByStreamIdFromPosition(final UUID streamId,
.map(eventConverter::envelopeOf);
}

@Override
public Stream<JsonEnvelope> getEventsByStreamIdFromPosition(final UUID streamId, final Long position, final Integer pageSize) {
if (streamId == null) {
throw new InvalidStreamIdException("streamId is null.");
} else if (position == null) {
throw new JdbcRepositoryException("position is null.");
}

logger.trace("Retrieving event stream for {} at sequence {}", streamId, position);
return eventJdbcRepository.findByStreamIdFromPositionOrderByPositionAsc(streamId, position, pageSize)
.map(eventConverter::envelopeOf);
}

@Override
@Transactional(dontRollbackOn = OptimisticLockingRetryException.class)
public void storeEvent(final JsonEnvelope envelope) throws StoreEventRequestFailedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ public class EventJdbcRepository {
static final String COL_PAYLOAD = "payload";
static final String COL_TIMESTAMP = "date_created";



/**
* Statements
*/
static final String SQL_FIND_ALL = "SELECT * FROM event_log ORDER BY sequence_id ASC";
static final String SQL_FIND_BY_STREAM_ID = "SELECT * FROM event_log WHERE stream_id=? ORDER BY sequence_id ASC";
static final String SQL_FIND_BY_STREAM_ID_AND_POSITION = "SELECT * FROM event_log WHERE stream_id=? AND sequence_id>=? ORDER BY sequence_id ASC";
static final String SQL_FIND_BY_STREAM_ID_AND_POSITION_BY_PAGE = "SELECT * FROM event_log WHERE stream_id=? AND sequence_id>=? ORDER BY sequence_id ASC LIMIT ?";
static final String SQL_FIND_LATEST_POSITION = "SELECT MAX(sequence_id) FROM event_log WHERE stream_id=?";
static final String SQL_DISTINCT_STREAM_ID = "SELECT DISTINCT stream_id FROM event_log";
static final String SQL_DELETE_STREAM = "DELETE FROM event_log t WHERE t.stream_id=?";
Expand Down Expand Up @@ -87,8 +86,8 @@ public EventJdbcRepository(final EventInsertionStrategy eventInsertionStrategy,
* @throws InvalidPositionException if the version already exists or is null.
*/
public void insert(final Event event) throws InvalidPositionException {
try (final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), eventInsertionStrategy.insertStatement())) {
eventInsertionStrategy.insert(ps, event);
try (final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), eventInsertionStrategy.insertStatement())) {
eventInsertionStrategy.insert(preparedStatementWrapper, event);
} catch (final SQLException e) {
logger.error("Error persisting event to the database", e);
throw new JdbcRepositoryException(format("Exception while storing sequence %s of stream %s",
Expand All @@ -104,10 +103,10 @@ public void insert(final Event event) throws InvalidPositionException {
*/
public Stream<Event> findByStreamIdOrderByPositionAsc(final UUID streamId) {
try {
final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_BY_STREAM_ID);
ps.setObject(1, streamId);
final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_BY_STREAM_ID);
preparedStatementWrapper.setObject(1, streamId);

return jdbcRepositoryHelper.streamOf(ps, entityFromFunction());
return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, entityFromFunction());
} catch (final SQLException e) {
logger.warn(FAILED_TO_READ_STREAM, streamId, e);
throw new JdbcRepositoryException(format(READING_STREAM_EXCEPTION, streamId), e);
Expand All @@ -125,11 +124,27 @@ public Stream<Event> findByStreamIdOrderByPositionAsc(final UUID streamId) {
public Stream<Event> findByStreamIdFromPositionOrderByPositionAsc(final UUID streamId,
final Long position) {
try {
final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_BY_STREAM_ID_AND_POSITION);
ps.setObject(1, streamId);
ps.setLong(2, position);
final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_BY_STREAM_ID_AND_POSITION);
preparedStatementWrapper.setObject(1, streamId);
preparedStatementWrapper.setLong(2, position);

return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, entityFromFunction());
} catch (final SQLException e) {
logger.warn(FAILED_TO_READ_STREAM, streamId, e);
throw new JdbcRepositoryException(format(READING_STREAM_EXCEPTION, streamId), e);
}
}

public Stream<Event> findByStreamIdFromPositionOrderByPositionAsc(final UUID streamId,
final Long versionFrom,
final Integer pageSize) {
try {
final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_BY_STREAM_ID_AND_POSITION_BY_PAGE);
preparedStatementWrapper.setObject(1, streamId);
preparedStatementWrapper.setLong(2, versionFrom);
preparedStatementWrapper.setInt(3, pageSize);

return jdbcRepositoryHelper.streamOf(ps, entityFromFunction());
return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, entityFromFunction());
} catch (final SQLException e) {
logger.warn(FAILED_TO_READ_STREAM, streamId, e);
throw new JdbcRepositoryException(format(READING_STREAM_EXCEPTION, streamId), e);
Expand Down Expand Up @@ -158,10 +173,11 @@ public Stream<Event> findAll() {
* @return current position streamId for the stream. Returns 0 if stream doesn't exist.
*/
public long getStreamSize(final UUID streamId) {
try (final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_LATEST_POSITION)) {
ps.setObject(1, streamId);
try (final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_FIND_LATEST_POSITION)) {
preparedStatementWrapper.setObject(1, streamId);

final ResultSet resultSet = preparedStatementWrapper.executeQuery();

ResultSet resultSet = ps.executeQuery();
if (resultSet.next()) {
return resultSet.getLong(1);
}
Expand All @@ -182,8 +198,8 @@ public long getStreamSize(final UUID streamId) {
*/
public Stream<UUID> getStreamIds() {
try {
final PreparedStatementWrapper psWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_DISTINCT_STREAM_ID);
return streamFrom(psWrapper);
final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_DISTINCT_STREAM_ID);
return streamFrom(preparedStatementWrapper);
} catch (final SQLException e) {
throw new JdbcRepositoryException(READING_STREAM_ALL_EXCEPTION, e);
}
Expand All @@ -198,17 +214,17 @@ private DataSource getDataSource() {
return dataSource;
}

private Stream<UUID> streamFrom(final PreparedStatementWrapper psWrapper) throws SQLException {
return jdbcRepositoryHelper.streamOf(psWrapper, e -> {
private Stream<UUID> streamFrom(final PreparedStatementWrapper preparedStatementWrapper) throws SQLException {
return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, e -> {
try {
return (UUID) e.getObject(COL_STREAM_ID);
} catch (final SQLException e1) {
throw jdbcRepositoryHelper.handled(e1, psWrapper);
throw jdbcRepositoryHelper.handled(e1, preparedStatementWrapper);
}
});
}

protected Function<ResultSet, Event> entityFromFunction() {
private Function<ResultSet, Event> entityFromFunction() {
return resultSet -> {
try {
return new Event((UUID) resultSet.getObject(PRIMARY_KEY_ID),
Expand All @@ -227,10 +243,10 @@ protected Function<ResultSet, Event> entityFromFunction() {
public void clear(final UUID streamId) {
final long eventCount = getStreamSize(streamId);

try (final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_DELETE_STREAM)) {
ps.setObject(1, streamId);
try (final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDataSource(), SQL_DELETE_STREAM)) {
preparedStatementWrapper.setObject(1, streamId);

final int deletedRows = ps.executeUpdate();
final int deletedRows = preparedStatementWrapper.executeUpdate();

if (deletedRows != eventCount) {
// Rollback, something went wrong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ public void shouldGetByStreamIdAndSequenceId() throws Exception {
verify(logger).trace("Retrieving event stream for {} at sequence {}", STREAM_ID, POSITION);
}

@Test
public void shouldGetByStreamIdAndPositionByPage() throws Exception {

final Integer pageSize = 1000;

when(eventJdbcRepository.findByStreamIdFromPositionOrderByPositionAsc(STREAM_ID, POSITION, pageSize)).thenReturn(Stream.of(event));
when(eventConverter.envelopeOf(event)).thenReturn(envelope);

final Stream<JsonEnvelope> streamOfEnvelopes = jdbcBasedEventRepository.getEventsByStreamIdFromPosition(STREAM_ID, POSITION, pageSize);

assertThat(streamOfEnvelopes, not(nullValue()));
assertThat(streamOfEnvelopes.findFirst().get(), equalTo(envelope));
verify(logger).trace("Retrieving event stream for {} at sequence {}", STREAM_ID, POSITION);
}

@Test(expected = InvalidStreamIdException.class)
public void shouldThrowExceptionOnNullStreamIdWhenGettingStreamByStreamIdAndSequence() throws Exception {
Expand All @@ -131,6 +145,15 @@ public void shouldThrowExceptionOnNullSequenceIdWhenGettingStreamByStreamIdAndSe
jdbcBasedEventRepository.getEventsByStreamIdFromPosition(STREAM_ID, null);
}

@Test(expected = InvalidStreamIdException.class)
public void shouldThrowExceptionOnNullStreamIdWhenGettingStreamByStreamIdAndPositionByPage() throws Exception {
jdbcBasedEventRepository.getEventsByStreamIdFromPosition(null, POSITION, 10);
}

@Test(expected = JdbcRepositoryException.class)
public void shouldThrowExceptionOnNullSequenceIdWhenGettingStreamByStreamIdAndPositonByPage() throws Exception {
jdbcBasedEventRepository.getEventsByStreamIdFromPosition(STREAM_ID, null, 10);
}

@Test
public void shouldGetStreamOfStreams() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ public void shouldReturnEventsByStreamIdFromSequenceIdOrderBySequenceId() throws
assertThat(eventList.get(1).getSequenceId(), is(7L));
}

@Test
public void shouldReturnEventsByStreamIdFromPostionOrderByPositionByPage() throws InvalidPositionException {

final int pageSize = 2;

jdbcRepository.insert(eventOf(7, STREAM_ID));
jdbcRepository.insert(eventOf(4, STREAM_ID));
jdbcRepository.insert(eventOf(3, STREAM_ID));

final Stream<Event> events = jdbcRepository.findByStreamIdFromPositionOrderByPositionAsc(STREAM_ID, 3L, pageSize);
final List<Event> eventList = events.collect(toList());
assertThat(eventList, hasSize(2));
assertThat(eventList.get(0).getSequenceId(), is(3L));
assertThat(eventList.get(1).getSequenceId(), is(4L));
}

@Test
public void shouldReturnAllEventsOrderedBySequenceId() throws InvalidPositionException {
jdbcRepository.insert(eventOf(1, randomUUID()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package uk.gov.justice.services.eventsourcing.repository.jdbc.event;

import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository.SQL_FIND_BY_STREAM_ID;
import static uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository.SQL_FIND_BY_STREAM_ID_AND_POSITION;
import static uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository.SQL_FIND_BY_STREAM_ID_AND_POSITION_BY_PAGE;

import uk.gov.justice.services.eventsourcing.repository.jdbc.EventInsertionStrategy;
import uk.gov.justice.services.jdbc.persistence.JdbcDataSourceProvider;
import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException;
import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper;

import java.sql.SQLException;
import java.util.UUID;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;

@RunWith(MockitoJUnitRunner.class)
public class EventJdbcRepositoryTest {

@Mock
private EventInsertionStrategy eventInsertionStrategy;

@Mock
private JdbcRepositoryHelper jdbcRepositoryHelper;

@Mock
private JdbcDataSourceProvider jdbcDataSourceProvider;

@Mock
private Logger logger;

private String jndiDatasource;
private EventJdbcRepository eventJdbcRepository;

@Before
public void setup() throws Exception {
eventJdbcRepository = new EventJdbcRepository(eventInsertionStrategy, jdbcRepositoryHelper, jdbcDataSourceProvider, jndiDatasource, logger);
}

@Test
public void shouldLogAndThrowExceptionIfSqlExceptionIsThrownInInsert() throws Exception {
final UUID streamId = randomUUID();
final DataSource dataSource = mock(DataSource.class);
final SQLException sqlException = new SQLException();
final String statement = "STATEMENT";
final Event event = mock(Event.class);

when(jdbcDataSourceProvider.getDataSource(jndiDatasource)).thenReturn(dataSource);
when(eventInsertionStrategy.insertStatement()).thenReturn(statement);
when(jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, statement)).thenThrow(sqlException);
when(event.getSequenceId()).thenReturn(5L);
when(event.getStreamId()).thenReturn(streamId);

try {
eventJdbcRepository.insert(event);
fail();
} catch (final JdbcRepositoryException e) {
assertThat(e.getMessage(), is("Exception while storing sequence 5 of stream " + streamId));
verify(logger).error("Error persisting event to the database", sqlException);
}
}

@Test
public void shouldLogAndThrowExceptionIfSqlExceptionIsThrownInFindByStreamIdOrderByPositionAsc() throws Exception {
final UUID streamId = randomUUID();
final DataSource dataSource = mock(DataSource.class);
final SQLException sqlException = new SQLException();

when(jdbcDataSourceProvider.getDataSource(jndiDatasource)).thenReturn(dataSource);
when(jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_BY_STREAM_ID)).thenThrow(sqlException);

try {
eventJdbcRepository.findByStreamIdOrderByPositionAsc(streamId);
fail();
} catch (final JdbcRepositoryException e) {
assertThat(e.getMessage(), is("Exception while reading stream " + streamId));
verify(logger).warn("Failed to read stream {}", streamId, sqlException);
}
}

@Test
public void shouldLogAndThrowExceptionIfSqlExceptionIsThrownInFindByStreamIdFromPositionOrderByPositionAsc() throws Exception {
final UUID streamId = randomUUID();
final long position = 2L;
final DataSource dataSource = mock(DataSource.class);
final SQLException sqlException = new SQLException();

when(jdbcDataSourceProvider.getDataSource(jndiDatasource)).thenReturn(dataSource);
when(jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_BY_STREAM_ID_AND_POSITION)).thenThrow(sqlException);

try {
eventJdbcRepository.findByStreamIdFromPositionOrderByPositionAsc(streamId, position);
fail();
} catch (final JdbcRepositoryException e) {
assertThat(e.getMessage(), is("Exception while reading stream " + streamId));
verify(logger).warn("Failed to read stream {}", streamId, sqlException);
}
}

@Test
public void shouldLogAndThrowExceptionIfSqlExceptionIsThrownInFindByStreamIdFromPositionOrderByPositionAscWithPage() throws Exception {
final UUID streamId = randomUUID();
final long position = 2L;
final int pageSize = 10;
final DataSource dataSource = mock(DataSource.class);
final SQLException sqlException = new SQLException();

when(jdbcDataSourceProvider.getDataSource(jndiDatasource)).thenReturn(dataSource);
when(jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_BY_STREAM_ID_AND_POSITION_BY_PAGE)).thenThrow(sqlException);

try {
eventJdbcRepository.findByStreamIdFromPositionOrderByPositionAsc(streamId, position, pageSize);
fail();
} catch (final JdbcRepositoryException e) {
assertThat(e.getMessage(), is("Exception while reading stream " + streamId));
verify(logger).warn("Failed to read stream {}", streamId, sqlException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public Stream<JsonEnvelope> readFrom(final long position) {
return eventStreamManager.readFrom(id, position).map(this::recordCurrentPosition);
}

@Override
public Stream<JsonEnvelope> readFrom(final long position, final int pageSize) {
markAsReadFrom(position - 1);
return eventStreamManager.readFrom(id, position, pageSize).map(this::recordCurrentPosition);
}

@Override
public long append(final Stream<JsonEnvelope> events) throws EventStreamException {
return append(events, CONSECUTIVE);
Expand Down
Loading

0 comments on commit ac855aa

Please sign in to comment.