diff --git a/CHANGELOG.md b/CHANGELOG.md index 13d6a6311..e1433e07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +## [2.2.3] - 2019-10-25 +### Fixed +- Catchup range processing + ## [2.2.2] - 2019-10-24 ### Changed - Pre publish and publish timer beans now run in a separate thread. diff --git a/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepository.java b/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepository.java index 9ea40a696..4aa71b6a3 100644 --- a/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepository.java +++ b/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepository.java @@ -30,12 +30,12 @@ public class ProcessedEventTrackingRepository { "AND component = ? " + "ORDER BY event_number DESC LIMIT 1"; - private static final String SELECT_SQL = + private static final String SELECT_ALL_DESCENDING_ORDER_SQL = "SELECT event_number, previous_event_number " + "FROM processed_event " + "WHERE source = ? " + "AND component = ? " + - "ORDER BY event_number ASC"; + "ORDER BY event_number DESC"; @Inject private JdbcResultSetStreamer jdbcResultSetStreamer; @@ -64,11 +64,11 @@ public void save(final ProcessedEventTrackItem processedEventTrackItem) { } } - public Stream getAllProcessedEvents(final String source, final String componentName) { + public Stream getAllProcessedEventsDescendingOrder(final String source, final String componentName) { try { final PreparedStatementWrapper preparedStatement = preparedStatementWrapperFactory.preparedStatementWrapperOf( - viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_SQL); + viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_ALL_DESCENDING_ORDER_SQL); preparedStatement.setString(1, source); preparedStatement.setString(2, componentName); diff --git a/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingService.java b/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingService.java index 214ac53f7..d4591d351 100644 --- a/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingService.java +++ b/event-tracking/event-tracking-service/src/main/java/uk/gov/justice/services/subscription/ProcessedEventTrackingService.java @@ -2,6 +2,8 @@ import static java.lang.Long.MAX_VALUE; import static java.lang.String.format; +import static java.lang.System.lineSeparator; +import static java.util.stream.Collectors.joining; import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange; import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator; @@ -14,6 +16,8 @@ import javax.inject.Inject; +import org.slf4j.Logger; + public class ProcessedEventTrackingService { private static final long FIRST_POSSIBLE_EVENT_NUMBER = 0L; @@ -24,6 +28,9 @@ public class ProcessedEventTrackingService { @Inject private EventSourceNameCalculator eventSourceNameCalculator; + @Inject + private Logger logger; + public void trackProcessedEvent(final JsonEnvelope event, final String componentName) { final Metadata metadata = event.metadata(); @@ -59,7 +66,7 @@ public Stream getAllMissingEvents(final String eventSourceNam notSeenEventsRange(1L, eventNumberAccumulator); } - try (final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(eventSourceName, componentName)) { + try (final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(eventSourceName, componentName)) { allProcessedEvents .forEach(processedEventTrackItem -> findMissingRange(processedEventTrackItem, eventNumberAccumulator)); } @@ -68,6 +75,8 @@ public Stream getAllMissingEvents(final String eventSourceNam eventNumberAccumulator.addRangeFrom(FIRST_POSSIBLE_EVENT_NUMBER); } + logger.info(createMessageMissingEventRanges(eventNumberAccumulator)); + return eventNumberAccumulator.getMissingEventRanges().stream(); } @@ -100,4 +109,17 @@ private void findMissingRange(final ProcessedEventTrackItem processedEventTrackI eventNumberAccumulator.set(currentPreviousEventNumber, currentEventNumber); } + + private String createMessageMissingEventRanges(final EventNumberAccumulator eventNumberAccumulator) { + + return "Missing Event Ranges: [" + + lineSeparator() + + eventNumberAccumulator + .getMissingEventRanges() + .stream() + .map(MissingEventRange::toString) + .collect(joining("," + lineSeparator())) + + lineSeparator() + + "]"; + } } diff --git a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepositoryIT.java b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepositoryIT.java index 39ecb8d1f..daef534d6 100644 --- a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepositoryIT.java +++ b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingRepositoryIT.java @@ -48,7 +48,7 @@ public class ProcessedEventTrackingRepositoryIT { @Before public void ensureOurDatasourceProviderReturnsOurTestDataSource() { - + when(viewStoreJdbcDataSourceProvider.getDataSource()).thenReturn(viewStoreDataSource); } @@ -73,20 +73,20 @@ public void shouldSaveAndGetAllProcessedEvents() throws Exception { processedEventTrackingRepository.save(processedEventTrackItem_3); processedEventTrackingRepository.save(processedEventTrackItem_4); - final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName); + final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName); final List processedEventTrackItems = allProcessedEvents.collect(toList()); assertThat(processedEventTrackItems.size(), is(4)); - assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1)); - assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2)); - assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3)); - assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3)); + assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2)); + assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1)); } @Test - public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder() throws Exception { + public void shouldReturnProcessedEventsInDescendingOrderIfInsertedOutOfOrder() throws Exception { final String source = "example-context"; final String componentName = "EVENT_LISTENER"; @@ -101,20 +101,20 @@ public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder() processedEventTrackingRepository.save(processedEventTrackItem_1); processedEventTrackingRepository.save(processedEventTrackItem_3); - final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName); + final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName); final List processedEventTrackItems = allProcessedEvents.collect(toList()); assertThat(processedEventTrackItems.size(), is(4)); - assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1)); - assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2)); - assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3)); - assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3)); + assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2)); + assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1)); } @Test - public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws Exception { + public void shouldReturnOnlyProcessedEventsWIthTheCorrectSourceInDescendingOrder() throws Exception { final String source = "example-context"; final String otherSource = "another-context"; @@ -141,16 +141,16 @@ public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws E processedEventTrackingRepository.save(processedEventTrackItem_3); processedEventTrackingRepository.save(processedEventTrackItem_9); - final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName); + final Stream allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName); final List processedEventTrackItems = allProcessedEvents.collect(toList()); assertThat(processedEventTrackItems.size(), is(4)); - assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1)); - assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2)); - assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3)); - assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4)); + assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3)); + assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2)); + assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1)); } @Test diff --git a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceIT.java b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceIT.java new file mode 100644 index 000000000..d0884d3cc --- /dev/null +++ b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceIT.java @@ -0,0 +1,96 @@ +package uk.gov.justice.services.subscription; + +import static java.util.stream.Collectors.toList; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.slf4j.LoggerFactory.getLogger; +import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField; + +import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange; +import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator; +import uk.gov.justice.services.jdbc.persistence.JdbcResultSetStreamer; +import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapperFactory; +import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider; +import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner; +import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory; + +import java.util.List; + +import javax.sql.DataSource; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + + +@RunWith(MockitoJUnitRunner.class) +public class ProcessedEventTrackingServiceIT { + + private final DataSource viewStoreDataSource = new FrameworkTestDataSourceFactory().createViewStoreDataSource(); + private final JdbcResultSetStreamer jdbcResultSetStreamer = new JdbcResultSetStreamer(); + private final PreparedStatementWrapperFactory preparedStatementWrapperFactory = new PreparedStatementWrapperFactory(); + private final ViewStoreJdbcDataSourceProvider viewStoreJdbcDataSourceProvider = new TestViewStoreJdbcDataSourceProvider(viewStoreDataSource); + private final ProcessedEventTrackingRepository processedEventTrackingRepository = new ProcessedEventTrackingRepository(); + private final EventSourceNameCalculator eventSourceNameCalculator = new EventSourceNameCalculator(); + private final Logger logger = getLogger(ProcessedEventTrackingService.class); + + private final ProcessedEventTrackingService processedEventTrackingService = new ProcessedEventTrackingService(); + + private final DatabaseCleaner databaseCleaner = new DatabaseCleaner(); + + @Before + public void createClassUnderTest() { + setField(processedEventTrackingRepository, "jdbcResultSetStreamer", jdbcResultSetStreamer); + setField(processedEventTrackingRepository, "preparedStatementWrapperFactory", preparedStatementWrapperFactory); + setField(processedEventTrackingRepository, "viewStoreJdbcDataSourceProvider", viewStoreJdbcDataSourceProvider); + + setField(processedEventTrackingService, "processedEventTrackingRepository", processedEventTrackingRepository); + setField(processedEventTrackingService, "eventSourceNameCalculator", eventSourceNameCalculator); + setField(processedEventTrackingService, "logger", logger); + + databaseCleaner.cleanProcessedEventTable("framework"); + } + + @Test + public void shouldFindRangesOfMissingEventsInAscendingOrderOfEventNumbers() throws Exception { + + final String source = "example-context"; + final String componentName = "EVENT_LISTENER"; + + // insert events missing event 4 and events 7, 8 and 9 + insertEventsWithSomeMissing(source, componentName); + + final List missingEventRanges = processedEventTrackingService + .getAllMissingEvents(source, componentName) + .collect(toList()); + + assertThat(missingEventRanges.size(), is(3)); + + assertThat(missingEventRanges.get(0), is(new MissingEventRange(4L, 5L))); + assertThat(missingEventRanges.get(1), is(new MissingEventRange(7L, 10L))); + assertThat(missingEventRanges.get(2), is(new MissingEventRange(11L, 9223372036854775807L))); + } + + private void insertEventsWithSomeMissing(final String source, final String componentName) { + + final ProcessedEventTrackItem processedEventTrackItem_1 = new ProcessedEventTrackItem(0, 1, source, componentName); + final ProcessedEventTrackItem processedEventTrackItem_2 = new ProcessedEventTrackItem(1, 2, source, componentName); + final ProcessedEventTrackItem processedEventTrackItem_3 = new ProcessedEventTrackItem(2, 3, source, componentName); + + final ProcessedEventTrackItem processedEventTrackItem_5 = new ProcessedEventTrackItem(4, 5, source, componentName); + final ProcessedEventTrackItem processedEventTrackItem_6 = new ProcessedEventTrackItem(5, 6, source, componentName); + + final ProcessedEventTrackItem processedEventTrackItem_10 = new ProcessedEventTrackItem(9, 10, source, componentName); + + processedEventTrackingRepository.save(processedEventTrackItem_1); + processedEventTrackingRepository.save(processedEventTrackItem_2); + processedEventTrackingRepository.save(processedEventTrackItem_3); + + processedEventTrackingRepository.save(processedEventTrackItem_5); + processedEventTrackingRepository.save(processedEventTrackItem_6); + + processedEventTrackingRepository.save(processedEventTrackItem_10); + } +} diff --git a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceTest.java b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceTest.java index 8bd83e08b..84e71e0b1 100644 --- a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceTest.java +++ b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/ProcessedEventTrackingServiceTest.java @@ -34,6 +34,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; @RunWith(MockitoJUnitRunner.class) public class ProcessedEventTrackingServiceTest { @@ -44,6 +45,9 @@ public class ProcessedEventTrackingServiceTest { @Mock private EventSourceNameCalculator eventSourceNameCalculator; + @Mock + private Logger logger; + @InjectMocks private ProcessedEventTrackingService processedEventTrackingService; @@ -167,7 +171,7 @@ public void shouldGetTheListOfAllMissingEvents() throws Exception { final Stream processedEventTrackItemStream = processedEventTrackItems.stream().onClose(streamCloseSpy); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(latestProcessedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -180,6 +184,12 @@ public void shouldGetTheListOfAllMissingEvents() throws Exception { assertThat(missingEventRanges.get(1).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 4, missingEventTo (exclusive) = 7},\n" + + "MissingEventRange{missingEventFrom (inclusive) = 8, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -220,7 +230,7 @@ public void shouldHandleMissingEventsFromZero() throws Exception { .build(); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(processedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -235,6 +245,13 @@ public void shouldHandleMissingEventsFromZero() throws Exception { assertThat(missingEventRanges.get(2).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 1, missingEventTo (exclusive) = 20},\n" + + "MissingEventRange{missingEventFrom (inclusive) = 21, missingEventTo (exclusive) = 24},\n" + + "MissingEventRange{missingEventFrom (inclusive) = 26, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -244,7 +261,7 @@ public void shouldReturnRangeOfOneToMaxLongIfNoEventsFound() throws Exception { final String componentName = "EVENT_LISTENER"; when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(Optional.empty()); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(empty()); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(empty()); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -253,6 +270,11 @@ public void shouldReturnRangeOfOneToMaxLongIfNoEventsFound() throws Exception { assertThat(missingEventRanges.get(0).getMissingEventFrom(), is(1L)); assertThat(missingEventRanges.get(0).getMissingEventTo(), is(MAX_VALUE)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 1, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -299,7 +321,7 @@ public void shouldReturnNoMissingEventsIfNoEventsAreActuallyMissing() throws Exc final Stream processedEventTrackItemStream = processedEventTrackItems.stream().onClose(streamCloseSpy); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(latestProcessedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -309,6 +331,11 @@ public void shouldReturnNoMissingEventsIfNoEventsAreActuallyMissing() throws Exc assertThat(missingEventRanges.get(0).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 5, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -337,7 +364,7 @@ public void shouldReturnNoMissingEventsIfOnlyOneEventExists() throws Exception { final Stream processedEventTrackItemStream = processedEventTrackItems.stream().onClose(streamCloseSpy); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(latestProcessedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -348,6 +375,11 @@ public void shouldReturnNoMissingEventsIfOnlyOneEventExists() throws Exception { assertThat(missingEventRanges.get(0).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 2, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -382,7 +414,7 @@ public void shouldHandleARangeOfMissingEventsOfJustOneMissingEvent() throws Exce final Stream processedEventTrackItemStream = processedEventTrackItems.stream().onClose(streamCloseSpy); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(latestProcessedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -395,6 +427,12 @@ public void shouldHandleARangeOfMissingEventsOfJustOneMissingEvent() throws Exce assertThat(missingEventRanges.get(1).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 2, missingEventTo (exclusive) = 3},\n" + + "MissingEventRange{missingEventFrom (inclusive) = 4, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test @@ -423,7 +461,7 @@ public void shouldReturnMissingEventIfPreviousEventNumberIsNotZero() throws Exce final Stream processedEventTrackItemStream = processedEventTrackItems.stream().onClose(streamCloseSpy); when(processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)).thenReturn(of(latestProcessedEventTrackItem)); - when(processedEventTrackingRepository.getAllProcessedEvents(source, componentName)).thenReturn(processedEventTrackItemStream); + when(processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName)).thenReturn(processedEventTrackItemStream); final List missingEventRanges = processedEventTrackingService.getAllMissingEvents(source, componentName) .collect(toList()); @@ -436,6 +474,12 @@ public void shouldReturnMissingEventIfPreviousEventNumberIsNotZero() throws Exce assertThat(missingEventRanges.get(1).getMissingEventTo(), is(MAX_VALUE)); assertThat(streamCloseSpy.streamClosed(), is(true)); + + verify(logger).info("Missing Event Ranges: [\n" + + "MissingEventRange{missingEventFrom (inclusive) = 1, missingEventTo (exclusive) = 2},\n" + + "MissingEventRange{missingEventFrom (inclusive) = 3, missingEventTo (exclusive) = " + MAX_VALUE + "}\n" + + "]" + ); } @Test diff --git a/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/TestViewStoreJdbcDataSourceProvider.java b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/TestViewStoreJdbcDataSourceProvider.java new file mode 100644 index 000000000..18c852342 --- /dev/null +++ b/event-tracking/event-tracking-service/src/test/java/uk/gov/justice/services/subscription/TestViewStoreJdbcDataSourceProvider.java @@ -0,0 +1,20 @@ +package uk.gov.justice.services.subscription; + +import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider; +import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory; + +import javax.sql.DataSource; + +public class TestViewStoreJdbcDataSourceProvider extends ViewStoreJdbcDataSourceProvider { + + private final DataSource viewStoreDataSource; + + public TestViewStoreJdbcDataSourceProvider(final DataSource viewStoreDataSource) { + this.viewStoreDataSource = viewStoreDataSource; + } + + @Override + public synchronized DataSource getDataSource() { + return viewStoreDataSource; + } +}