Skip to content

Commit

Permalink
Merge 96ab1ff into cdae070
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Oct 25, 2019
2 parents cdae070 + 96ab1ff commit 9abfcd8
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [2.2.3] - 2019-10-25
### Fixed
- Catchup range processing

## [2.2.2] - 2019-10-24
### Changed
- Pre publish and publish timer beans now run in a separate thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public class ProcessedEventTrackingRepository {
"AND component = ? " +
"ORDER BY event_number DESC LIMIT 1";

private static final String SELECT_SQL =
private static final String SELECT_ALL_DESCENDING_ORDER_SQL =
"SELECT event_number, previous_event_number " +
"FROM processed_event " +
"WHERE source = ? " +
"AND component = ? " +
"ORDER BY event_number ASC";
"ORDER BY event_number DESC";

@Inject
private JdbcResultSetStreamer jdbcResultSetStreamer;
Expand Down Expand Up @@ -64,11 +64,11 @@ public void save(final ProcessedEventTrackItem processedEventTrackItem) {
}
}

public Stream<ProcessedEventTrackItem> getAllProcessedEvents(final String source, final String componentName) {
public Stream<ProcessedEventTrackItem> getAllProcessedEventsDescendingOrder(final String source, final String componentName) {

try {
final PreparedStatementWrapper preparedStatement = preparedStatementWrapperFactory.preparedStatementWrapperOf(
viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_SQL);
viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_ALL_DESCENDING_ORDER_SQL);

preparedStatement.setString(1, source);
preparedStatement.setString(2, componentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static java.lang.Long.MAX_VALUE;
import static java.lang.String.format;
import static java.lang.System.lineSeparator;
import static java.util.stream.Collectors.joining;

import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange;
import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator;
Expand All @@ -14,6 +16,8 @@

import javax.inject.Inject;

import org.slf4j.Logger;

public class ProcessedEventTrackingService {

private static final long FIRST_POSSIBLE_EVENT_NUMBER = 0L;
Expand All @@ -24,6 +28,9 @@ public class ProcessedEventTrackingService {
@Inject
private EventSourceNameCalculator eventSourceNameCalculator;

@Inject
private Logger logger;

public void trackProcessedEvent(final JsonEnvelope event, final String componentName) {

final Metadata metadata = event.metadata();
Expand Down Expand Up @@ -59,7 +66,7 @@ public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceNam
notSeenEventsRange(1L, eventNumberAccumulator);
}

try (final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(eventSourceName, componentName)) {
try (final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(eventSourceName, componentName)) {
allProcessedEvents
.forEach(processedEventTrackItem -> findMissingRange(processedEventTrackItem, eventNumberAccumulator));
}
Expand All @@ -68,6 +75,8 @@ public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceNam
eventNumberAccumulator.addRangeFrom(FIRST_POSSIBLE_EVENT_NUMBER);
}

logger.info(createMessageMissingEventRanges(eventNumberAccumulator));

return eventNumberAccumulator.getMissingEventRanges().stream();
}

Expand Down Expand Up @@ -100,4 +109,17 @@ private void findMissingRange(final ProcessedEventTrackItem processedEventTrackI

eventNumberAccumulator.set(currentPreviousEventNumber, currentEventNumber);
}

private String createMessageMissingEventRanges(final EventNumberAccumulator eventNumberAccumulator) {

return "Missing Event Ranges: [" +
lineSeparator() +
eventNumberAccumulator
.getMissingEventRanges()
.stream()
.map(MissingEventRange::toString)
.collect(joining("," + lineSeparator())) +
lineSeparator() +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ProcessedEventTrackingRepositoryIT {

@Before
public void ensureOurDatasourceProviderReturnsOurTestDataSource() {

when(viewStoreJdbcDataSourceProvider.getDataSource()).thenReturn(viewStoreDataSource);
}

Expand All @@ -73,20 +73,20 @@ public void shouldSaveAndGetAllProcessedEvents() throws Exception {
processedEventTrackingRepository.save(processedEventTrackItem_3);
processedEventTrackingRepository.save(processedEventTrackItem_4);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder() throws Exception {
public void shouldReturnProcessedEventsInDescendingOrderIfInsertedOutOfOrder() throws Exception {

final String source = "example-context";
final String componentName = "EVENT_LISTENER";
Expand All @@ -101,20 +101,20 @@ public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder()
processedEventTrackingRepository.save(processedEventTrackItem_1);
processedEventTrackingRepository.save(processedEventTrackItem_3);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws Exception {
public void shouldReturnOnlyProcessedEventsWIthTheCorrectSourceInDescendingOrder() throws Exception {

final String source = "example-context";
final String otherSource = "another-context";
Expand All @@ -141,16 +141,16 @@ public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws E
processedEventTrackingRepository.save(processedEventTrackItem_3);
processedEventTrackingRepository.save(processedEventTrackItem_9);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package uk.gov.justice.services.subscription;

import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.slf4j.LoggerFactory.getLogger;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField;

import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange;
import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator;
import uk.gov.justice.services.jdbc.persistence.JdbcResultSetStreamer;
import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapperFactory;
import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider;
import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner;
import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory;

import java.util.List;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;


@RunWith(MockitoJUnitRunner.class)
public class ProcessedEventTrackingServiceIT {

private final DataSource viewStoreDataSource = new FrameworkTestDataSourceFactory().createViewStoreDataSource();
private final JdbcResultSetStreamer jdbcResultSetStreamer = new JdbcResultSetStreamer();
private final PreparedStatementWrapperFactory preparedStatementWrapperFactory = new PreparedStatementWrapperFactory();
private final ViewStoreJdbcDataSourceProvider viewStoreJdbcDataSourceProvider = new TestViewStoreJdbcDataSourceProvider(viewStoreDataSource);
private final ProcessedEventTrackingRepository processedEventTrackingRepository = new ProcessedEventTrackingRepository();
private final EventSourceNameCalculator eventSourceNameCalculator = new EventSourceNameCalculator();
private final Logger logger = getLogger(ProcessedEventTrackingService.class);

private final ProcessedEventTrackingService processedEventTrackingService = new ProcessedEventTrackingService();

private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

@Before
public void createClassUnderTest() {
setField(processedEventTrackingRepository, "jdbcResultSetStreamer", jdbcResultSetStreamer);
setField(processedEventTrackingRepository, "preparedStatementWrapperFactory", preparedStatementWrapperFactory);
setField(processedEventTrackingRepository, "viewStoreJdbcDataSourceProvider", viewStoreJdbcDataSourceProvider);

setField(processedEventTrackingService, "processedEventTrackingRepository", processedEventTrackingRepository);
setField(processedEventTrackingService, "eventSourceNameCalculator", eventSourceNameCalculator);
setField(processedEventTrackingService, "logger", logger);

databaseCleaner.cleanProcessedEventTable("framework");
}

@Test
public void shouldFindRangesOfMissingEventsInAscendingOrderOfEventNumbers() throws Exception {

final String source = "example-context";
final String componentName = "EVENT_LISTENER";

// insert events missing event 4 and events 7, 8 and 9
insertEventsWithSomeMissing(source, componentName);

final List<MissingEventRange> missingEventRanges = processedEventTrackingService
.getAllMissingEvents(source, componentName)
.collect(toList());

assertThat(missingEventRanges.size(), is(3));

assertThat(missingEventRanges.get(0), is(new MissingEventRange(4L, 5L)));
assertThat(missingEventRanges.get(1), is(new MissingEventRange(7L, 10L)));
assertThat(missingEventRanges.get(2), is(new MissingEventRange(11L, 9223372036854775807L)));
}

private void insertEventsWithSomeMissing(final String source, final String componentName) {

final ProcessedEventTrackItem processedEventTrackItem_1 = new ProcessedEventTrackItem(0, 1, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_2 = new ProcessedEventTrackItem(1, 2, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_3 = new ProcessedEventTrackItem(2, 3, source, componentName);

final ProcessedEventTrackItem processedEventTrackItem_5 = new ProcessedEventTrackItem(4, 5, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_6 = new ProcessedEventTrackItem(5, 6, source, componentName);

final ProcessedEventTrackItem processedEventTrackItem_10 = new ProcessedEventTrackItem(9, 10, source, componentName);

processedEventTrackingRepository.save(processedEventTrackItem_1);
processedEventTrackingRepository.save(processedEventTrackItem_2);
processedEventTrackingRepository.save(processedEventTrackItem_3);

processedEventTrackingRepository.save(processedEventTrackItem_5);
processedEventTrackingRepository.save(processedEventTrackItem_6);

processedEventTrackingRepository.save(processedEventTrackItem_10);
}
}
Loading

0 comments on commit 9abfcd8

Please sign in to comment.