Skip to content

Commit

Permalink
Change the insert of rebuild published events to a batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed Nov 19, 2019
1 parent 4a59ac0 commit 7725bd7
Show file tree
Hide file tree
Showing 24 changed files with 920 additions and 280 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Now batch inserting PublishedEvents on rebuild to speed up the command

## [2.4.0] - 2019-11-13
### Added
New SystemCommand VERIFY_REBUILD to verify the results of of the rebuild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import static java.util.UUID.fromString;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.INSERT_INTO_PUBLISHED_EVENT_SQL;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.SELECT_FROM_PUBLISHED_EVENT_QUERY;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.TRUNCATE_PREPUBLISH_QUEUE;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.TRUNCATE_PUBLISHED_EVENT;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
Expand All @@ -21,23 +25,10 @@

public class PublishedEventQueries {

private static final String TRUNCATE_LINKED_EVENT = "TRUNCATE published_event";
private static final String TRUNCATE_PREPUBLISH_QUEUE = "TRUNCATE pre_publish_queue";

private static final String INSERT_INTO_LINKED_EVENT_SQL = "INSERT into published_event (" +
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " +
"VALUES " +
"(?, ?, ?, ?, ?, ?, ?, ?, ?)";

private static final String SELECT_FROM_LINKED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM published_event " +
"WHERE id = ?";

public void truncate(final DataSource dataSource) throws SQLException {

try (final Connection connection = dataSource.getConnection()) {
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_LINKED_EVENT)) {
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_PUBLISHED_EVENT)) {
preparedStatement.executeUpdate();
}
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_PREPUBLISH_QUEUE)) {
Expand All @@ -49,7 +40,7 @@ public void truncate(final DataSource dataSource) throws SQLException {
public void insertPublishedEvent(final PublishedEvent publishedEvent, final DataSource dataSource) throws SQLException {

try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_INTO_LINKED_EVENT_SQL)) {
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_INTO_PUBLISHED_EVENT_SQL)) {
preparedStatement.setObject(1, publishedEvent.getId());
preparedStatement.setObject(2, publishedEvent.getStreamId());
preparedStatement.setLong(3, publishedEvent.getPositionInStream());
Expand All @@ -68,7 +59,7 @@ public Optional<PublishedEvent> getPublishedEvent(final UUID id, final DataSourc


try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_LINKED_EVENT_QUERY)) {
final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_PUBLISHED_EVENT_QUERY)) {

preparedStatement.setObject(1, id);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package uk.gov.justice.services.eventsourcing.publishedevent.jdbc;

public interface PublishedEventStatements {

String TRUNCATE_PUBLISHED_EVENT = "TRUNCATE published_event";
String TRUNCATE_PREPUBLISH_QUEUE = "TRUNCATE pre_publish_queue";

String INSERT_INTO_PUBLISHED_EVENT_SQL = "INSERT into published_event (" +
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " +
"VALUES " +
"(?, ?, ?, ?, ?, ?, ?, ?, ?)";

String SELECT_FROM_PUBLISHED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM published_event " +
"WHERE id = ?";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

import java.util.Set;
import java.util.UUID;

public class ActiveEventFilter {

public boolean isActiveEvent(final Event event, final Set<UUID> activeStreamIds) {
return activeStreamIds.contains(event.getStreamId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -21,51 +20,48 @@

public class BatchPublishedEventProcessor {

private static final int PAGE_SIZE = 1_000;
private static final int PAGE_SIZE = 10_000;

@Inject
private EventJdbcRepository eventJdbcRepository;

@Inject
private PublishedEventInserter publishedEventInserter;
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;

@Inject
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;
private PublishedEventsRebuilder publishedEventsRebuilder;

@Inject
private Logger logger;

@Transactional(REQUIRED)
public BatchProcessDetails processNextBatchOfEvents(
final BatchProcessDetails batchProcessDetails,
final BatchProcessDetails currentBatchProcessDetails,
final Set<UUID> activeStreamIds) {

final AtomicLong currentEventNumber = batchProcessDetails.getCurrentEventNumber();
final AtomicLong previousEventNumber = batchProcessDetails.getPreviousEventNumber();
final AtomicLong currentEventNumber = currentBatchProcessDetails.getCurrentEventNumber();
final AtomicLong previousEventNumber = currentBatchProcessDetails.getPreviousEventNumber();

final List<PublishedEvent> publishedEvents = new ArrayList<>();
try (final Stream<Event> eventStream = eventJdbcRepository.findAllFromEventNumberUptoPageSize(currentEventNumber.get(), PAGE_SIZE)) {
try (final Stream<Event> eventStream = eventJdbcRepository.findAllFromEventNumberUptoPageSize(currentEventNumber.get(), PAGE_SIZE);) {

eventStream
.peek(event -> currentEventNumber.set(event.getEventNumber().get()))
.forEach(event -> publishedEventInserter
.convertAndSave(event, previousEventNumber, activeStreamIds)
.ifPresent(publishedEvents::add));
final List<PublishedEvent> publishedEvents = publishedEventsRebuilder.rebuild(
eventStream,
previousEventNumber, currentEventNumber,
activeStreamIds);

}
final BatchProcessDetails nextBatchProcessDetails = batchProcessingDetailsCalculator.calculateNextBatchProcessDetails(
currentBatchProcessDetails,
currentEventNumber,
previousEventNumber,
publishedEvents);

final BatchProcessDetails currentBatchProcessDetails = batchProcessingDetailsCalculator.calculateNextBatchProcessDetails(
batchProcessDetails,
currentEventNumber,
previousEventNumber,
publishedEvents);
if (nextBatchProcessDetails.getProcessedInBatchCount() > 0) {
logger.info(format("Inserted %d PublishedEvents", nextBatchProcessDetails.getProcessCount()));
} else {
logger.info("Skipping inactive events...");
}

if (currentBatchProcessDetails.getProcessedInBatchCount() > 0) {
logger.info(format("Inserted %d PublishedEvents", currentBatchProcessDetails.getProcessCount()));
} else {
logger.info("Skipping inactive events...");
return nextBatchProcessDetails;
}

return currentBatchProcessDetails;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.INSERT_INTO_PUBLISHED_EVENT_SQL;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventsourcing.util.io.Closer;
import uk.gov.justice.services.jdbc.persistence.DataAccessException;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import javax.sql.DataSource;

public class BatchedPublishedEventInserter implements AutoCloseable {

private final Closer closer;

private PreparedStatement preparedStatement;
private Connection connection;

public BatchedPublishedEventInserter(final Closer closer) {
this.closer = closer;
}

public void prepareForInserts(final DataSource eventStoreDataSource) {
try {
connection = eventStoreDataSource.getConnection();
preparedStatement = connection.prepareStatement(INSERT_INTO_PUBLISHED_EVENT_SQL);
} catch (final SQLException e) {
throw new DataAccessException("Failed to prepare statement for batch insert of PublishedEvents", e);
}
}

public PublishedEvent addToBatch(final PublishedEvent publishedEvent) {

try {
preparedStatement.setObject(1, publishedEvent.getId());
preparedStatement.setObject(2, publishedEvent.getStreamId());
preparedStatement.setLong(3, publishedEvent.getPositionInStream());
preparedStatement.setString(4, publishedEvent.getName());
preparedStatement.setString(5, publishedEvent.getPayload());
preparedStatement.setString(6, publishedEvent.getMetadata());
preparedStatement.setObject(7, toSqlTimestamp(publishedEvent.getCreatedAt()));
preparedStatement.setLong(8, publishedEvent.getEventNumber().orElseThrow(() -> new MissingEventNumberException("Event with id '%s' does not have an event number")));
preparedStatement.setLong(9, publishedEvent.getPreviousEventNumber());

preparedStatement.addBatch();

return publishedEvent;

} catch (final SQLException e) {
throw new DataAccessException("Failed to add PublishedEvent to batch", e);
}
}

public void insertBatch() {

try {
preparedStatement.executeBatch();
} catch (final SQLException e) {
throw new DataAccessException("Failed to insert batch of PublishedEvents", e);
}
}

@Override
public void close() {
closer.closeQuietly(preparedStatement);
closer.closeQuietly(connection);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;
import uk.gov.justice.services.eventsourcing.util.io.Closer;

import javax.inject.Inject;

public class BatchedPublishedEventInserterFactory {

@Inject
private EventStoreDataSourceProvider eventStoreDataSourceProvider;

@Inject
private Closer closer;

public BatchedPublishedEventInserter createInitialised() {

final BatchedPublishedEventInserter batchedPublishedEventInserter = new BatchedPublishedEventInserter(closer);

batchedPublishedEventInserter.prepareForInserts(eventStoreDataSourceProvider.getDefaultDataSource());

return batchedPublishedEventInserter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

public class EventNumberGetter {

public Long eventNumberFrom(final Event event) {

return event
.getEventNumber()
.orElseThrow(() -> new RebuildException(format(
"No eventNumber found for event with id '%s'",
event.getId())));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.util.stream.Collectors.toList;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import javax.inject.Inject;

public class PublishedEventsRebuilder {

@Inject
private EventNumberGetter eventNumberGetter;

@Inject
private BatchedPublishedEventInserterFactory batchedPublishedEventInserterFactory;

@Inject
private ActiveEventFilter activeEventFilter;

@Inject
private RebuildPublishedEventFactory rebuildPublishedEventFactory;

@SuppressWarnings("squid:S3864")
public List<PublishedEvent> rebuild(
final Stream<Event> eventStream,
final AtomicLong previousEventNumber,
final AtomicLong currentEventNumber,
final Set<UUID> activeStreamIds) {

try (final BatchedPublishedEventInserter batchedPublishedEventInserter = batchedPublishedEventInserterFactory.createInitialised()) {
final List<PublishedEvent> publishedEvents = eventStream
.filter(event -> activeEventFilter.isActiveEvent(event, activeStreamIds))
.peek(event -> currentEventNumber.set(eventNumberGetter.eventNumberFrom(event)))
.map(event -> rebuildPublishedEventFactory.createPublishedEventFrom(event, previousEventNumber))
.map(batchedPublishedEventInserter::addToBatch)
.collect(toList());

batchedPublishedEventInserter.insertBatch();

return publishedEvents;
}
}
}
Loading

0 comments on commit 7725bd7

Please sign in to comment.