From 9f12707105e8c58475de624b2f00eb6804982e0b Mon Sep 17 00:00:00 2001 From: Mahesh Subramanian Date: Mon, 28 Jan 2019 16:24:22 +0000 Subject: [PATCH] TP-269: Handling optimistic locking exception retry for event_stream table insertion --- .../EventStreamJdbcRepository.java | 8 +- .../jdbc/JdbcBasedEventRepositoryTest.java | 2 +- .../EventStreamJdbcRepositoryTest.java | 141 ++++++++++++++++++ 3 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepositoryTest.java diff --git a/event-sourcing/event-repository/event-repository-jdbc/src/main/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepository.java b/event-sourcing/event-repository/event-repository-jdbc/src/main/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepository.java index 77f8f28fd..bd2f95e48 100644 --- a/event-sourcing/event-repository/event-repository-jdbc/src/main/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepository.java +++ b/event-sourcing/event-repository/event-repository-jdbc/src/main/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepository.java @@ -6,6 +6,7 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidStreamIdException; +import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.OptimisticLockingRetryException; import uk.gov.justice.services.jdbc.persistence.JdbcDataSourceProvider; import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException; import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper; @@ -26,7 +27,7 @@ public class EventStreamJdbcRepository { private static final String SQL_FIND_BY_POSITION = "SELECT * FROM event_stream WHERE position_in_stream>=? ORDER BY position_in_stream ASC"; private static final String SQL_FIND_POSITION_BY_STREAM = "SELECT position_in_stream FROM event_stream s WHERE s.stream_id=?"; private static final String SQL_FIND_EVENT_STREAM = "SELECT * FROM event_stream s WHERE s.stream_id=?"; - private static final String SQL_INSERT_EVENT_STREAM = "INSERT INTO event_stream (stream_id, date_created, active) values (?, ?, ?)"; + private static final String SQL_INSERT_EVENT_STREAM = "INSERT INTO event_stream (stream_id, date_created, active) values (?, ?, ?) ON CONFLICT DO NOTHING"; private static final String SQL_UPDATE_EVENT_STREAM_ACTIVE = "UPDATE event_stream SET active=? WHERE stream_id=?"; private static final String SQL_DELETE_EVENT_STREAM = "DELETE FROM event_stream t WHERE t.stream_id=?"; private static final String SQL_FIND_ALL = "SELECT * FROM event_stream ORDER BY position_in_stream ASC"; @@ -72,7 +73,10 @@ public void insert(final UUID streamId, final boolean active) { ps.setTimestamp(2, toSqlTimestamp(clock.now())); ps.setBoolean(3, active); - ps.executeUpdate(); + final int updatedRows = ps.executeUpdate(); + if (updatedRows == 0) { + throw new OptimisticLockingRetryException(format("Locking Exception while storing stream %s", streamId)); + } } catch (final SQLException e) { throw new JdbcRepositoryException(format("Exception while storing stream %s", streamId), e); } diff --git a/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/JdbcBasedEventRepositoryTest.java b/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/JdbcBasedEventRepositoryTest.java index c62d35f1f..b65f9900e 100644 --- a/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/JdbcBasedEventRepositoryTest.java +++ b/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/JdbcBasedEventRepositoryTest.java @@ -77,7 +77,7 @@ public class JdbcBasedEventRepositoryTest { @InjectMocks private JdbcBasedEventRepository jdbcBasedEventRepository; - private final static ZonedDateTime TIMESTAMP = new UtcClock().now(); + private static final ZonedDateTime TIMESTAMP = new UtcClock().now(); @Test public void shouldGetAllEvents() throws Exception { diff --git a/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepositoryTest.java b/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepositoryTest.java new file mode 100644 index 000000000..26768e73a --- /dev/null +++ b/event-sourcing/event-repository/event-repository-jdbc/src/test/java/uk/gov/justice/services/eventsourcing/repository/jdbc/eventstream/EventStreamJdbcRepositoryTest.java @@ -0,0 +1,141 @@ +package uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream; + +import static java.util.UUID.randomUUID; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField; + +import uk.gov.justice.services.common.converter.ZonedDateTimes; +import uk.gov.justice.services.common.util.UtcClock; +import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.OptimisticLockingRetryException; +import uk.gov.justice.services.jdbc.persistence.JdbcDataSourceProvider; +import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper; +import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapper; + +import java.sql.SQLException; +import java.time.ZonedDateTime; +import java.util.UUID; + +import javax.sql.DataSource; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + +@RunWith(MockitoJUnitRunner.class) +public class EventStreamJdbcRepositoryTest { + + private static final String SQL_FIND_EVENT_STREAM = "SELECT * FROM event_stream s WHERE s.stream_id=?"; + private static final String SQL_INSERT_EVENT_STREAM = "INSERT INTO event_stream (stream_id, date_created, active) values (?, ?, ?) ON CONFLICT DO NOTHING"; + + @Mock + private Logger logger; + + @Mock + private JdbcRepositoryHelper eventStreamJdbcRepositoryHelper; + + @Mock + private JdbcDataSourceProvider jdbcDataSourceProvider; + + @Mock + private UtcClock clock; + + private final String jndiDatasource = "jndiDatasource"; + + @Mock + private DataSource dataSource; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private PreparedStatementWrapper queryPreparedStatementWrapper, insertPreparedStatementWrapper; + + @InjectMocks + private EventStreamJdbcRepository repository; + + @Before + public void setup() { + setField(repository, "jndiDatasource", jndiDatasource); + when(jdbcDataSourceProvider.getDataSource(jndiDatasource)).thenReturn(dataSource); + } + + @Test + public void insertActiveStreamSuccessfully() throws SQLException { + + when(eventStreamJdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM)).thenReturn(queryPreparedStatementWrapper); + when(queryPreparedStatementWrapper.executeQuery().next()).thenReturn(false); + + when(eventStreamJdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_INSERT_EVENT_STREAM)).thenReturn(insertPreparedStatementWrapper); + final ZonedDateTime streamCreationTimestamp = ZonedDateTime.now(); + when(clock.now()).thenReturn(streamCreationTimestamp); + when(insertPreparedStatementWrapper.executeUpdate()).thenReturn(1); + + final UUID streamId = randomUUID(); + repository.insert(streamId); + + verify(jdbcDataSourceProvider).getDataSource(jndiDatasource); + verify(eventStreamJdbcRepositoryHelper).preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM); + verify(queryPreparedStatementWrapper).setObject(1, streamId); + verify(eventStreamJdbcRepositoryHelper).preparedStatementWrapperOf(dataSource, SQL_INSERT_EVENT_STREAM); + verify(insertPreparedStatementWrapper).setObject(1, streamId); + verify(insertPreparedStatementWrapper).setTimestamp(2, ZonedDateTimes.toSqlTimestamp(streamCreationTimestamp)); + verify(insertPreparedStatementWrapper).setBoolean(3, true); + verify(insertPreparedStatementWrapper).executeUpdate(); + } + + @Test + public void insertExistingStreamAndRecordOptimisticLockingException() throws SQLException { + + when(eventStreamJdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM)).thenReturn(queryPreparedStatementWrapper); + // indicates stream doesn't exist + when(queryPreparedStatementWrapper.executeQuery().next()).thenReturn(false); + + when(eventStreamJdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_INSERT_EVENT_STREAM)).thenReturn(insertPreparedStatementWrapper); + final ZonedDateTime streamCreationTimestamp = ZonedDateTime.now(); + when(clock.now()).thenReturn(streamCreationTimestamp); + // not able to insert stream as another transaction inserted a stream with the same identifier after the check above + when(insertPreparedStatementWrapper.executeUpdate()).thenReturn(0); + + final UUID streamId = randomUUID(); + try { + repository.insert(streamId); + fail("Exception should be thrown when 0 records are updated"); + } catch (OptimisticLockingRetryException e) { + verify(jdbcDataSourceProvider).getDataSource(jndiDatasource); + verify(eventStreamJdbcRepositoryHelper).preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM); + verify(queryPreparedStatementWrapper).setObject(1, streamId); + verify(eventStreamJdbcRepositoryHelper).preparedStatementWrapperOf(dataSource, SQL_INSERT_EVENT_STREAM); + verify(insertPreparedStatementWrapper).setObject(1, streamId); + verify(insertPreparedStatementWrapper).setTimestamp(2, ZonedDateTimes.toSqlTimestamp(streamCreationTimestamp)); + verify(insertPreparedStatementWrapper).setBoolean(3, true); + verify(insertPreparedStatementWrapper).executeUpdate(); + } + } + + @Test + public void insertExistingStreamAndReturnWithoutException() throws SQLException { + + when(eventStreamJdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM)).thenReturn(queryPreparedStatementWrapper); + // indicates stream already exists + when(queryPreparedStatementWrapper.executeQuery().next()).thenReturn(true); + + + final UUID streamId = randomUUID(); + repository.insert(streamId); + verify(jdbcDataSourceProvider).getDataSource(jndiDatasource); + verify(eventStreamJdbcRepositoryHelper).preparedStatementWrapperOf(dataSource, SQL_FIND_EVENT_STREAM); + verify(queryPreparedStatementWrapper).setObject(1, streamId); + verify(eventStreamJdbcRepositoryHelper, never()).preparedStatementWrapperOf(dataSource, SQL_INSERT_EVENT_STREAM); + verify(insertPreparedStatementWrapper, never()).setObject(1, streamId); + verify(insertPreparedStatementWrapper, never()).setTimestamp(eq(2), any()); + verify(insertPreparedStatementWrapper, never()).setBoolean(3, true); + verify(insertPreparedStatementWrapper, never()).executeUpdate(); + } +} \ No newline at end of file