Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the insert of rebuild published events to a batch insert #197

Merged
merged 1 commit into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Now batch inserting PublishedEvents on rebuild to speed up the command

## [2.4.0] - 2019-11-13
### Added
New SystemCommand VERIFY_REBUILD to verify the results of of the rebuild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import static java.util.UUID.fromString;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.INSERT_INTO_PUBLISHED_EVENT_SQL;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.SELECT_FROM_PUBLISHED_EVENT_QUERY;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.TRUNCATE_PREPUBLISH_QUEUE;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.TRUNCATE_PUBLISHED_EVENT;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
Expand All @@ -21,23 +25,10 @@

public class PublishedEventQueries {

private static final String TRUNCATE_LINKED_EVENT = "TRUNCATE published_event";
private static final String TRUNCATE_PREPUBLISH_QUEUE = "TRUNCATE pre_publish_queue";

private static final String INSERT_INTO_LINKED_EVENT_SQL = "INSERT into published_event (" +
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " +
"VALUES " +
"(?, ?, ?, ?, ?, ?, ?, ?, ?)";

private static final String SELECT_FROM_LINKED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM published_event " +
"WHERE id = ?";

public void truncate(final DataSource dataSource) throws SQLException {

try (final Connection connection = dataSource.getConnection()) {
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_LINKED_EVENT)) {
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_PUBLISHED_EVENT)) {
preparedStatement.executeUpdate();
}
try (final PreparedStatement preparedStatement = connection.prepareStatement(TRUNCATE_PREPUBLISH_QUEUE)) {
Expand All @@ -49,7 +40,7 @@ public void truncate(final DataSource dataSource) throws SQLException {
public void insertPublishedEvent(final PublishedEvent publishedEvent, final DataSource dataSource) throws SQLException {

try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_INTO_LINKED_EVENT_SQL)) {
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_INTO_PUBLISHED_EVENT_SQL)) {
preparedStatement.setObject(1, publishedEvent.getId());
preparedStatement.setObject(2, publishedEvent.getStreamId());
preparedStatement.setLong(3, publishedEvent.getPositionInStream());
Expand All @@ -68,7 +59,7 @@ public Optional<PublishedEvent> getPublishedEvent(final UUID id, final DataSourc


try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_LINKED_EVENT_QUERY)) {
final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_PUBLISHED_EVENT_QUERY)) {

preparedStatement.setObject(1, id);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package uk.gov.justice.services.eventsourcing.publishedevent.jdbc;

public interface PublishedEventStatements {

String TRUNCATE_PUBLISHED_EVENT = "TRUNCATE published_event";
String TRUNCATE_PREPUBLISH_QUEUE = "TRUNCATE pre_publish_queue";

String INSERT_INTO_PUBLISHED_EVENT_SQL = "INSERT into published_event (" +
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " +
"VALUES " +
"(?, ?, ?, ?, ?, ?, ?, ?, ?)";

String SELECT_FROM_PUBLISHED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM published_event " +
"WHERE id = ?";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

import java.util.Set;
import java.util.UUID;

public class ActiveEventFilter {

public boolean isActiveEvent(final Event event, final Set<UUID> activeStreamIds) {
return activeStreamIds.contains(event.getStreamId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -21,51 +20,48 @@

public class BatchPublishedEventProcessor {

private static final int PAGE_SIZE = 1_000;
private static final int PAGE_SIZE = 10_000;

@Inject
private EventJdbcRepository eventJdbcRepository;

@Inject
private PublishedEventInserter publishedEventInserter;
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;

@Inject
private BatchProcessingDetailsCalculator batchProcessingDetailsCalculator;
private PublishedEventsRebuilder publishedEventsRebuilder;

@Inject
private Logger logger;

@Transactional(REQUIRED)
public BatchProcessDetails processNextBatchOfEvents(
final BatchProcessDetails batchProcessDetails,
final BatchProcessDetails currentBatchProcessDetails,
final Set<UUID> activeStreamIds) {

final AtomicLong currentEventNumber = batchProcessDetails.getCurrentEventNumber();
final AtomicLong previousEventNumber = batchProcessDetails.getPreviousEventNumber();
final AtomicLong currentEventNumber = currentBatchProcessDetails.getCurrentEventNumber();
final AtomicLong previousEventNumber = currentBatchProcessDetails.getPreviousEventNumber();

final List<PublishedEvent> publishedEvents = new ArrayList<>();
try (final Stream<Event> eventStream = eventJdbcRepository.findAllFromEventNumberUptoPageSize(currentEventNumber.get(), PAGE_SIZE)) {
try (final Stream<Event> eventStream = eventJdbcRepository.findAllFromEventNumberUptoPageSize(currentEventNumber.get(), PAGE_SIZE);) {

eventStream
.peek(event -> currentEventNumber.set(event.getEventNumber().get()))
.forEach(event -> publishedEventInserter
.convertAndSave(event, previousEventNumber, activeStreamIds)
.ifPresent(publishedEvents::add));
final List<PublishedEvent> publishedEvents = publishedEventsRebuilder.rebuild(
eventStream,
previousEventNumber, currentEventNumber,
activeStreamIds);

}
final BatchProcessDetails nextBatchProcessDetails = batchProcessingDetailsCalculator.calculateNextBatchProcessDetails(
currentBatchProcessDetails,
currentEventNumber,
previousEventNumber,
publishedEvents);

final BatchProcessDetails currentBatchProcessDetails = batchProcessingDetailsCalculator.calculateNextBatchProcessDetails(
batchProcessDetails,
currentEventNumber,
previousEventNumber,
publishedEvents);
if (nextBatchProcessDetails.getProcessedInBatchCount() > 0) {
logger.info(format("Inserted %d PublishedEvents", nextBatchProcessDetails.getProcessCount()));
} else {
logger.info("Skipping inactive events...");
}

if (currentBatchProcessDetails.getProcessedInBatchCount() > 0) {
logger.info(format("Inserted %d PublishedEvents", currentBatchProcessDetails.getProcessCount()));
} else {
logger.info("Skipping inactive events...");
return nextBatchProcessDetails;
}

return currentBatchProcessDetails;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.INSERT_INTO_PUBLISHED_EVENT_SQL;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventsourcing.util.io.Closer;
import uk.gov.justice.services.jdbc.persistence.DataAccessException;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import javax.sql.DataSource;

public class BatchedPublishedEventInserter implements AutoCloseable {

private final Closer closer;

private PreparedStatement preparedStatement;
private Connection connection;

public BatchedPublishedEventInserter(final Closer closer) {
this.closer = closer;
}

public void prepareForInserts(final DataSource eventStoreDataSource) {
try {
connection = eventStoreDataSource.getConnection();
preparedStatement = connection.prepareStatement(INSERT_INTO_PUBLISHED_EVENT_SQL);
} catch (final SQLException e) {
throw new DataAccessException("Failed to prepare statement for batch insert of PublishedEvents", e);
}
}

public PublishedEvent addToBatch(final PublishedEvent publishedEvent) {

try {
preparedStatement.setObject(1, publishedEvent.getId());
preparedStatement.setObject(2, publishedEvent.getStreamId());
preparedStatement.setLong(3, publishedEvent.getPositionInStream());
preparedStatement.setString(4, publishedEvent.getName());
preparedStatement.setString(5, publishedEvent.getPayload());
preparedStatement.setString(6, publishedEvent.getMetadata());
preparedStatement.setObject(7, toSqlTimestamp(publishedEvent.getCreatedAt()));
preparedStatement.setLong(8, publishedEvent.getEventNumber().orElseThrow(() -> new MissingEventNumberException("Event with id '%s' does not have an event number")));
preparedStatement.setLong(9, publishedEvent.getPreviousEventNumber());

preparedStatement.addBatch();

return publishedEvent;

} catch (final SQLException e) {
throw new DataAccessException("Failed to add PublishedEvent to batch", e);
}
}

public void insertBatch() {

try {
preparedStatement.executeBatch();
} catch (final SQLException e) {
throw new DataAccessException("Failed to insert batch of PublishedEvents", e);
}
}

@Override
public void close() {
closer.closeQuietly(preparedStatement);
closer.closeQuietly(connection);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;
import uk.gov.justice.services.eventsourcing.util.io.Closer;

import javax.inject.Inject;

public class BatchedPublishedEventInserterFactory {

@Inject
private EventStoreDataSourceProvider eventStoreDataSourceProvider;

@Inject
private Closer closer;

public BatchedPublishedEventInserter createInitialised() {

final BatchedPublishedEventInserter batchedPublishedEventInserter = new BatchedPublishedEventInserter(closer);

batchedPublishedEventInserter.prepareForInserts(eventStoreDataSourceProvider.getDefaultDataSource());

return batchedPublishedEventInserter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

public class EventNumberGetter {

public Long eventNumberFrom(final Event event) {

return event
.getEventNumber()
.orElseThrow(() -> new RebuildException(format(
"No eventNumber found for event with id '%s'",
event.getId())));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild;

import static java.util.stream.Collectors.toList;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import javax.inject.Inject;

public class PublishedEventsRebuilder {

@Inject
private EventNumberGetter eventNumberGetter;

@Inject
private BatchedPublishedEventInserterFactory batchedPublishedEventInserterFactory;

@Inject
private ActiveEventFilter activeEventFilter;

@Inject
private RebuildPublishedEventFactory rebuildPublishedEventFactory;

@SuppressWarnings("squid:S3864")
public List<PublishedEvent> rebuild(
final Stream<Event> eventStream,
final AtomicLong previousEventNumber,
final AtomicLong currentEventNumber,
final Set<UUID> activeStreamIds) {

try (final BatchedPublishedEventInserter batchedPublishedEventInserter = batchedPublishedEventInserterFactory.createInitialised()) {
final List<PublishedEvent> publishedEvents = eventStream
.peek(event -> currentEventNumber.set(eventNumberGetter.eventNumberFrom(event)))
.filter(event -> activeEventFilter.isActiveEvent(event, activeStreamIds))
.map(event -> rebuildPublishedEventFactory.createPublishedEventFrom(event, previousEventNumber))
.map(batchedPublishedEventInserter::addToBatch)
.collect(toList());

batchedPublishedEventInserter.insertBatch();

return publishedEvents;
}
}
}
Loading