Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix catchup range processing #187

Merged
merged 1 commit into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [2.2.3] - 2019-10-25
### Fixed
- Catchup range processing

## [2.2.2] - 2019-10-24
### Changed
- Pre publish and publish timer beans now run in a separate thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public class ProcessedEventTrackingRepository {
"AND component = ? " +
"ORDER BY event_number DESC LIMIT 1";

private static final String SELECT_SQL =
private static final String SELECT_ALL_DESCENDING_ORDER_SQL =
"SELECT event_number, previous_event_number " +
"FROM processed_event " +
"WHERE source = ? " +
"AND component = ? " +
"ORDER BY event_number ASC";
"ORDER BY event_number DESC";

@Inject
private JdbcResultSetStreamer jdbcResultSetStreamer;
Expand Down Expand Up @@ -64,11 +64,11 @@ public void save(final ProcessedEventTrackItem processedEventTrackItem) {
}
}

public Stream<ProcessedEventTrackItem> getAllProcessedEvents(final String source, final String componentName) {
public Stream<ProcessedEventTrackItem> getAllProcessedEventsDescendingOrder(final String source, final String componentName) {

try {
final PreparedStatementWrapper preparedStatement = preparedStatementWrapperFactory.preparedStatementWrapperOf(
viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_SQL);
viewStoreJdbcDataSourceProvider.getDataSource(), SELECT_ALL_DESCENDING_ORDER_SQL);

preparedStatement.setString(1, source);
preparedStatement.setString(2, componentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static java.lang.Long.MAX_VALUE;
import static java.lang.String.format;
import static java.lang.System.lineSeparator;
import static java.util.stream.Collectors.joining;

import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange;
import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator;
Expand All @@ -14,6 +16,8 @@

import javax.inject.Inject;

import org.slf4j.Logger;

public class ProcessedEventTrackingService {

private static final long FIRST_POSSIBLE_EVENT_NUMBER = 0L;
Expand All @@ -24,6 +28,9 @@ public class ProcessedEventTrackingService {
@Inject
private EventSourceNameCalculator eventSourceNameCalculator;

@Inject
private Logger logger;

public void trackProcessedEvent(final JsonEnvelope event, final String componentName) {

final Metadata metadata = event.metadata();
Expand Down Expand Up @@ -59,7 +66,7 @@ public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceNam
notSeenEventsRange(1L, eventNumberAccumulator);
}

try (final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(eventSourceName, componentName)) {
try (final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(eventSourceName, componentName)) {
allProcessedEvents
.forEach(processedEventTrackItem -> findMissingRange(processedEventTrackItem, eventNumberAccumulator));
}
Expand All @@ -68,6 +75,8 @@ public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceNam
eventNumberAccumulator.addRangeFrom(FIRST_POSSIBLE_EVENT_NUMBER);
}

logger.info(createMessageMissingEventRanges(eventNumberAccumulator));

return eventNumberAccumulator.getMissingEventRanges().stream();
}

Expand Down Expand Up @@ -100,4 +109,17 @@ private void findMissingRange(final ProcessedEventTrackItem processedEventTrackI

eventNumberAccumulator.set(currentPreviousEventNumber, currentEventNumber);
}

private String createMessageMissingEventRanges(final EventNumberAccumulator eventNumberAccumulator) {

return "Missing Event Ranges: [" +
lineSeparator() +
eventNumberAccumulator
.getMissingEventRanges()
.stream()
.map(MissingEventRange::toString)
.collect(joining("," + lineSeparator())) +
lineSeparator() +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ProcessedEventTrackingRepositoryIT {

@Before
public void ensureOurDatasourceProviderReturnsOurTestDataSource() {

when(viewStoreJdbcDataSourceProvider.getDataSource()).thenReturn(viewStoreDataSource);
}

Expand All @@ -73,20 +73,20 @@ public void shouldSaveAndGetAllProcessedEvents() throws Exception {
processedEventTrackingRepository.save(processedEventTrackItem_3);
processedEventTrackingRepository.save(processedEventTrackItem_4);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder() throws Exception {
public void shouldReturnProcessedEventsInDescendingOrderIfInsertedOutOfOrder() throws Exception {

final String source = "example-context";
final String componentName = "EVENT_LISTENER";
Expand All @@ -101,20 +101,20 @@ public void shouldReturnProcessedEventsInCorrectOrderEventIfInsertedOutOfOrder()
processedEventTrackingRepository.save(processedEventTrackItem_1);
processedEventTrackingRepository.save(processedEventTrackItem_3);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws Exception {
public void shouldReturnOnlyProcessedEventsWIthTheCorrectSourceInDescendingOrder() throws Exception {

final String source = "example-context";
final String otherSource = "another-context";
Expand All @@ -141,16 +141,16 @@ public void shouldReturnOnlyReturnProcessedEventsWIthTheCorrectSource() throws E
processedEventTrackingRepository.save(processedEventTrackItem_3);
processedEventTrackingRepository.save(processedEventTrackItem_9);

final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEvents(source, componentName);
final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(source, componentName);

final List<ProcessedEventTrackItem> processedEventTrackItems = allProcessedEvents.collect(toList());

assertThat(processedEventTrackItems.size(), is(4));

assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_1));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(0), is(processedEventTrackItem_4));
assertThat(processedEventTrackItems.get(1), is(processedEventTrackItem_3));
assertThat(processedEventTrackItems.get(2), is(processedEventTrackItem_2));
assertThat(processedEventTrackItems.get(3), is(processedEventTrackItem_1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package uk.gov.justice.services.subscription;

import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.slf4j.LoggerFactory.getLogger;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField;

import uk.gov.justice.services.eventsourcing.source.api.streams.MissingEventRange;
import uk.gov.justice.services.eventsourcing.util.messaging.EventSourceNameCalculator;
import uk.gov.justice.services.jdbc.persistence.JdbcResultSetStreamer;
import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapperFactory;
import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider;
import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner;
import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory;

import java.util.List;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;


@RunWith(MockitoJUnitRunner.class)
public class ProcessedEventTrackingServiceIT {

private final DataSource viewStoreDataSource = new FrameworkTestDataSourceFactory().createViewStoreDataSource();
private final JdbcResultSetStreamer jdbcResultSetStreamer = new JdbcResultSetStreamer();
private final PreparedStatementWrapperFactory preparedStatementWrapperFactory = new PreparedStatementWrapperFactory();
private final ViewStoreJdbcDataSourceProvider viewStoreJdbcDataSourceProvider = new TestViewStoreJdbcDataSourceProvider(viewStoreDataSource);
private final ProcessedEventTrackingRepository processedEventTrackingRepository = new ProcessedEventTrackingRepository();
private final EventSourceNameCalculator eventSourceNameCalculator = new EventSourceNameCalculator();
private final Logger logger = getLogger(ProcessedEventTrackingService.class);

private final ProcessedEventTrackingService processedEventTrackingService = new ProcessedEventTrackingService();

private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

@Before
public void createClassUnderTest() {
setField(processedEventTrackingRepository, "jdbcResultSetStreamer", jdbcResultSetStreamer);
setField(processedEventTrackingRepository, "preparedStatementWrapperFactory", preparedStatementWrapperFactory);
setField(processedEventTrackingRepository, "viewStoreJdbcDataSourceProvider", viewStoreJdbcDataSourceProvider);

setField(processedEventTrackingService, "processedEventTrackingRepository", processedEventTrackingRepository);
setField(processedEventTrackingService, "eventSourceNameCalculator", eventSourceNameCalculator);
setField(processedEventTrackingService, "logger", logger);

databaseCleaner.cleanProcessedEventTable("framework");
}

@Test
public void shouldFindRangesOfMissingEventsInAscendingOrderOfEventNumbers() throws Exception {

final String source = "example-context";
final String componentName = "EVENT_LISTENER";

// insert events missing event 4 and events 7, 8 and 9
insertEventsWithSomeMissing(source, componentName);

final List<MissingEventRange> missingEventRanges = processedEventTrackingService
.getAllMissingEvents(source, componentName)
.collect(toList());

assertThat(missingEventRanges.size(), is(3));

assertThat(missingEventRanges.get(0), is(new MissingEventRange(4L, 5L)));
assertThat(missingEventRanges.get(1), is(new MissingEventRange(7L, 10L)));
assertThat(missingEventRanges.get(2), is(new MissingEventRange(11L, 9223372036854775807L)));
}

private void insertEventsWithSomeMissing(final String source, final String componentName) {

final ProcessedEventTrackItem processedEventTrackItem_1 = new ProcessedEventTrackItem(0, 1, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_2 = new ProcessedEventTrackItem(1, 2, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_3 = new ProcessedEventTrackItem(2, 3, source, componentName);

final ProcessedEventTrackItem processedEventTrackItem_5 = new ProcessedEventTrackItem(4, 5, source, componentName);
final ProcessedEventTrackItem processedEventTrackItem_6 = new ProcessedEventTrackItem(5, 6, source, componentName);

final ProcessedEventTrackItem processedEventTrackItem_10 = new ProcessedEventTrackItem(9, 10, source, componentName);

processedEventTrackingRepository.save(processedEventTrackItem_1);
processedEventTrackingRepository.save(processedEventTrackItem_2);
processedEventTrackingRepository.save(processedEventTrackItem_3);

processedEventTrackingRepository.save(processedEventTrackItem_5);
processedEventTrackingRepository.save(processedEventTrackItem_6);

processedEventTrackingRepository.save(processedEventTrackItem_10);
}
}
Loading