Skip to content

Commit

Permalink
Process rebuild in pages of events
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo authored and amckenzie committed Sep 11, 2019
1 parent dc6509a commit b2eff83
Show file tree
Hide file tree
Showing 19 changed files with 569 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
### Changed
- Reduced the maximum runtime for each iteration of the publishing beans to 450 milliseconds
- Long running transaction during rebuild broken into separate transactions
- Process rebuild in pages of events

## [2.0.14] - 2019-09-08
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import uk.gov.justice.services.jdbc.persistence.JndiAppNameProvider;
import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapper;
import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapperFactory;
import uk.gov.justice.services.jmx.api.mbean.AsynchronousCommandRunnerBean;
import uk.gov.justice.services.messaging.DefaultJsonObjectEnvelopeConverter;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.JsonObjectEnvelopeConverter;
Expand Down Expand Up @@ -171,7 +170,6 @@ public void initializeDatabase() throws Exception {
DummySystemCommandStore.class,

JndiAppNameProvider.class,
AsynchronousCommandRunnerBean.class,
StackTraceProvider.class
})
public WebApp war() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package uk.gov.justice.services.eventsourcing.publishedevent.jdbc;

import static java.lang.String.format;
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;
import static javax.transaction.Transactional.TxType.REQUIRED;

import uk.gov.justice.services.eventsourcing.publishedevent.PublishedEventException;
import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;
Expand All @@ -22,7 +22,7 @@ public class PublishedEventTableCleaner {
@Inject
private DatabaseTableTruncator databaseTableTruncator;

@Transactional(REQUIRES_NEW)
@Transactional(REQUIRED)
public void deleteAll() {

final DataSource defaultDataSource = eventStoreDataSourceProvider.getDefaultDataSource();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.util.stream.Collectors.toSet;
import static javax.transaction.Transactional.TxType.REQUIRED;

import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
Expand All @@ -10,12 +11,14 @@
import java.util.stream.Stream;

import javax.inject.Inject;
import javax.transaction.Transactional;

public class ActiveEventStreamIdProvider {

@Inject
private EventStreamJdbcRepository eventStreamJdbcRepository;

@Transactional(REQUIRED)
public Set<UUID> getActiveStreamIds() {

try(final Stream<EventStream> activeEventStreamStream = eventStreamJdbcRepository.findActive()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class BatchProcessDetails {

private final AtomicLong previousEventNumber;
private final AtomicLong currentEventNumber;
private final int processCount;
private final boolean complete;

public BatchProcessDetails(final AtomicLong previousEventNumber, final AtomicLong currentEventNumber, final int processCount, final boolean complete) {
this.previousEventNumber = previousEventNumber;
this.currentEventNumber = currentEventNumber;
this.processCount = processCount;
this.complete = complete;
}

public AtomicLong getPreviousEventNumber() {
return previousEventNumber;
}

public AtomicLong getCurrentEventNumber() {
return currentEventNumber;
}

public int getProcessCount() {
return processCount;
}

public boolean isComplete() {
return complete;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof BatchProcessDetails)) return false;
final BatchProcessDetails that = (BatchProcessDetails) o;
return processCount == that.processCount &&
complete == that.complete &&
Objects.equals(previousEventNumber, that.previousEventNumber) &&
Objects.equals(currentEventNumber, that.currentEventNumber);
}

@Override
public int hashCode() {
return Objects.hash(previousEventNumber, currentEventNumber, processCount, complete);
}

@Override
public String toString() {
return "BatchProcessDetails{" +
"previousEventNumber=" + previousEventNumber +
", currentEventNumber=" + currentEventNumber +
", processCount=" + processCount +
", complete=" + complete +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class BatchProcessingDetailsCalculator {

public BatchProcessDetails createFirstBatchProcessDetails() {

return new BatchProcessDetails(
new AtomicLong(0),
new AtomicLong(0),
0,
false
);
}

public BatchProcessDetails calculateNextBatchProcessDetails(
final BatchProcessDetails currentBatchProcessDetails,
final AtomicLong previousEventNumber,
final List<PublishedEvent> publishedEvents) {

if (publishedEvents.isEmpty()) {
return new BatchProcessDetails(
previousEventNumber,
currentBatchProcessDetails.getCurrentEventNumber(),
currentBatchProcessDetails.getProcessCount(),
true
);
}

final PublishedEvent lastPublishedEvent = publishedEvents.get(publishedEvents.size() - 1);

final Long newCurrentEventNumber = lastPublishedEvent.getEventNumber().orElseThrow(() -> new MissingEventNumberException(""));
return new BatchProcessDetails(
previousEventNumber,
new AtomicLong(newCurrentEventNumber),
currentBatchProcessDetails.getProcessCount() + publishedEvents.size(),
false
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;
import static javax.transaction.Transactional.TxType.REQUIRED;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import javax.inject.Inject;
import javax.transaction.Transactional;

import org.slf4j.Logger;

public class BatchPublishedEventProcessor {

private static final int PAGE_SIZE = 1_000;

@Inject
private EventJdbcRepository eventJdbcRepository;

@Inject
private PublishedEventInserter publishedEventInserter;

@Inject
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;

@Inject
private Logger logger;

@Transactional(REQUIRED)
public BatchProcessDetails processNextBatchOfEvents(
final BatchProcessDetails batchProcessDetails,
final Set<UUID> activeStreamIds) {

final AtomicLong currentEventNumber = batchProcessDetails.getCurrentEventNumber();
final AtomicLong previousEventNumber = batchProcessDetails.getPreviousEventNumber();

final List<PublishedEvent> publishedEvents = new ArrayList<>();
try (final Stream<Event> eventStream = eventJdbcRepository.findAllFromEventNumberUptoPageSize(currentEventNumber.get(), PAGE_SIZE)) {

eventStream.forEach(event -> publishedEventInserter
.convertAndSave(event, previousEventNumber, activeStreamIds)
.ifPresent(publishedEvents::add));

}

final BatchProcessDetails currentBatchProcessDetails = batchProcessingDetailsCalculator.calculateNextBatchProcessDetails(
batchProcessDetails,
previousEventNumber,
publishedEvents);

final int processCount = currentBatchProcessDetails.getProcessCount();
logger.info(format("Inserted %d PublishedEvents", processCount));

return currentBatchProcessDetails;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;
import static javax.transaction.Transactional.TxType.REQUIRED;

import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;

Expand All @@ -28,7 +28,7 @@ public class EventNumberRenumberer {
@Inject
private Logger logger;

@Transactional(REQUIRES_NEW)
@Transactional(REQUIRED)
public void renumberEventLogEventNumber() {
resetSequence();
renumberEvents();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;
import static java.util.Optional.empty;
import static java.util.Optional.of;

import uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;

import org.slf4j.Logger;

public class PublishedEventInserter {

private static final int NUMBER_OF_EVENTS_TO_LOG_AFTER = 1_000;

@Inject
private PublishedEventConverter publishedEventConverter;

@Inject
private PublishedEventRepository publishedEventRepository;

@Inject
private Logger logger;

public int convertAndSave(final Event event, final AtomicLong previousEventNumber, final Set<UUID> activeStreamIds) {
public Optional<PublishedEvent> convertAndSave(final Event event, final AtomicLong previousEventNumber, final Set<UUID> activeStreamIds) {

if (activeStreamIds.contains(event.getStreamId())) {
final Long eventNumber = event.getEventNumber()
.orElseThrow(() -> new RebuildException(format("No eventNumber found for event with id '%s'", event.getId())));

if (eventNumber > 0 && eventNumber % NUMBER_OF_EVENTS_TO_LOG_AFTER == 0) {
logger.info(format("Inserted %d PublishedEvents...", eventNumber));
}

final PublishedEvent publishedEvent = publishedEventConverter.toPublishedEvent(
event,
previousEventNumber.get());
Expand All @@ -45,9 +37,9 @@ public int convertAndSave(final Event event, final AtomicLong previousEventNumbe

previousEventNumber.set(eventNumber);

return 1;
return of(publishedEvent);
}

return 0;
return empty();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import static javax.transaction.Transactional.TxType.NOT_SUPPORTED;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import javax.inject.Inject;
import javax.transaction.Transactional;
Expand All @@ -19,31 +14,29 @@
public class PublishedEventUpdater {

@Inject
private EventJdbcRepository eventJdbcRepository;
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;

@Inject
private ActiveEventStreamIdProvider activeEventStreamIdProvider;

@Inject
private PublishedEventInserter publishedEventInserter;
private BatchPublishedEventProcessor batchPublishedEventProcessor;

@Inject
private Logger logger;

@Transactional(REQUIRES_NEW)
@Transactional(NOT_SUPPORTED)
public void createPublishedEvents() {

logger.info("Creating PublishedEvents..");

final AtomicLong previousEventNumber = new AtomicLong(0);
final Set<UUID> activeStreamIds = activeEventStreamIdProvider.getActiveStreamIds();

try (final Stream<Event> eventStream = eventJdbcRepository.findAllOrderedByEventNumber()) {
final int eventCount = eventStream
.mapToInt(event -> publishedEventInserter.convertAndSave(event, previousEventNumber, activeStreamIds))
.sum();

logger.info(format("Inserted %d PublishedEvents", eventCount));
BatchProcessDetails batchProcessDetails = batchProcessingDetailsCalculator.createFirstBatchProcessDetails();
while (! batchProcessDetails.isComplete()) {
batchProcessDetails = batchPublishedEventProcessor.processNextBatchOfEvents(batchProcessDetails, activeStreamIds);
}

logger.info(format("Inserted %d PublishedEvents in total", batchProcessDetails.getProcessCount()));
}
}
Loading

0 comments on commit b2eff83

Please sign in to comment.