Skip to content

Commit

Permalink
Improve performance of CatchupPerformanceIT
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed Oct 29, 2019
1 parent 568dbf9 commit 5d8ffa7
Showing 1 changed file with 98 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package uk.gov.justice.services.example.cakeshop.it;

import static java.lang.Integer.valueOf;
import static java.lang.String.format;
import static java.lang.System.getProperty;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;
import static uk.gov.justice.services.jmx.system.command.client.connection.JmxParametersBuilder.jmxParameters;
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventStreamJdbsRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
Expand All @@ -31,6 +29,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -43,14 +43,19 @@

public class CatchupPerformanceIT {

private static final String SQL_INSERT_EVENT =
"INSERT INTO event_log " +
"(id, stream_id, position_in_stream, name, metadata, payload, date_created) " +
"VALUES(?, ?, ?, ?, ?, ?, ?)";

private static final String SQL_INSERT_STREAM = "INSERT INTO event_stream (stream_id, date_created, active) values (?, ?, ?)";

private static final int BATCH_INSERT_SIZE = 10_000;

private static final String CONTEXT_NAME = "example";

private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb();
private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb();
private final EventJdbcRepository eventJdbcRepository = new EventRepositoryFactory().getEventJdbcRepository(eventStoreDataSource);

private final EventStreamJdbsRepositoryFactory eventStreamJdbcRepositoryFactory = new EventStreamJdbsRepositoryFactory();
private final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.getEventStreamJdbcRepository(eventStoreDataSource);

private final ProcessedEventCounter processedEventCounter = new ProcessedEventCounter(viewStoreDataSource);

Expand All @@ -60,7 +65,10 @@ public class CatchupPerformanceIT {
private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory();
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

private final Poller longPoller = new Poller(1200, 1000L);
private final Poller longPoller = new Poller(2400, 1000L);


private final UtcClock clock = new UtcClock();

private Client client;

Expand All @@ -87,27 +95,23 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
final int numberOfEventsPerStream = 100;
final int totalEvents = numberOfStreams * numberOfEventsPerStream;

addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final Optional<Integer> numberOfEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}
System.out.println(format(
"Inserting %d events into event_log (%d events in %d streams)",
totalEvents,
numberOfEventsPerStream,
numberOfStreams
));

return empty();
});

if (numberOfEvents.isPresent()) {
System.out.println("Inserted " + numberOfEvents.get() + " events");
} else {
fail("Failed to insert " + totalEvents + " events");
}
addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

System.out.println("Inserted " + totalEvents + " events into event_log");
System.out.println("Waiting for events to publish...");
cleanViewstoreTables();

longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

if (eventCount == 0) {
return of(eventCount);
}
Expand All @@ -127,10 +131,13 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
assertThat(eventCount, is(0));
}

System.out.println("Running catchup...");
runCatchup();

final Optional<Integer> numberOfReplayedEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

if (eventCount == totalEvents) {
return of(eventCount);
}
Expand All @@ -139,31 +146,92 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
});

if (numberOfReplayedEvents.isPresent()) {
System.out.println("Successfully caught up " + numberOfEvents.get() + " events");
System.out.println("Successfully caught up " + numberOfReplayedEvents.get() + " events");
} else {
fail("Failed to catchup " + totalEvents + " events.");
}
}

private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException {
private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws Exception {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();

final List<Event> events = new ArrayList<>();
final List<UUID> streamIds = new ArrayList<>();

for (int seed = 0; seed < numberOfStreams; seed++) {

final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator();

final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator);
final UUID recipeId = recipeAddedEvent.getStreamId();

eventStreamJdbcRepository.insert(recipeId);
eventJdbcRepository.insert(recipeAddedEvent);
if (!streamIds.contains(recipeId)) {
streamIds.add(recipeId);
}

events.add(recipeAddedEvent);

for (int renameNumber = 1; renameNumber < numberOfEventsPerStream; renameNumber++) {
final Event recipeRenamedEvent = cakeshopEventGenerator.createRecipeRenamedEvent(recipeId, seed, renameNumber, positionInStreamIterator);
eventJdbcRepository.insert(recipeRenamedEvent);
events.add(recipeRenamedEvent);
}
}

updateEventStreamTable(streamIds);
updateEventLogTable(events);

}

private void updateEventLogTable(final List<Event> events) throws Exception {

try (final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT_EVENT)) {
for (int i = 0; i < events.size(); i++) {

final Event event = events.get(i);
preparedStatement.setObject(1, event.getId());
preparedStatement.setObject(2, event.getStreamId());
preparedStatement.setLong(3, event.getPositionInStream());
preparedStatement.setString(4, event.getName());
preparedStatement.setString(5, event.getMetadata());
preparedStatement.setString(6, event.getPayload());
preparedStatement.setTimestamp(7, toSqlTimestamp(clock.now()));

preparedStatement.addBatch();

if (i % BATCH_INSERT_SIZE == 0) {
preparedStatement.executeBatch();
System.out.println(format("Inserted %d events into event_log...", i));
}

}

preparedStatement.executeBatch();
}
}

private void updateEventStreamTable(final List<UUID> streamIds) throws Exception {

try (final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT_STREAM)) {
for (int i = 0; i < streamIds.size(); i++) {
final UUID streamId = streamIds.get(i);
preparedStatement.setObject(1, streamId);
preparedStatement.setTimestamp(2, toSqlTimestamp(clock.now()));
preparedStatement.setBoolean(3, true);

preparedStatement.addBatch();

if (i % BATCH_INSERT_SIZE == 0) {
preparedStatement.executeBatch();
}

}

preparedStatement.executeBatch();

}
}

private void runCatchup() throws Exception {
Expand Down

0 comments on commit 5d8ffa7

Please sign in to comment.