From 5d8ffa7ade7c2ae145246787678a7377c177f9b2 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Tue, 29 Oct 2019 11:10:07 +0000 Subject: [PATCH] Improve performance of CatchupPerformanceIT --- .../cakeshop/it/CatchupPerformanceIT.java | 128 ++++++++++++++---- 1 file changed, 98 insertions(+), 30 deletions(-) diff --git a/example-context/example-service/example-it/src/test/java/uk/gov/justice/services/example/cakeshop/it/CatchupPerformanceIT.java b/example-context/example-service/example-it/src/test/java/uk/gov/justice/services/example/cakeshop/it/CatchupPerformanceIT.java index fddc5855..884acd2f 100644 --- a/example-context/example-service/example-it/src/test/java/uk/gov/justice/services/example/cakeshop/it/CatchupPerformanceIT.java +++ b/example-context/example-service/example-it/src/test/java/uk/gov/justice/services/example/cakeshop/it/CatchupPerformanceIT.java @@ -1,21 +1,19 @@ package uk.gov.justice.services.example.cakeshop.it; import static java.lang.Integer.valueOf; +import static java.lang.String.format; import static java.lang.System.getProperty; import static java.util.Optional.empty; import static java.util.Optional.of; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp; import static uk.gov.justice.services.jmx.system.command.client.connection.JmxParametersBuilder.jmxParameters; import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost; +import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; -import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository; -import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventRepositoryFactory; -import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventStreamJdbsRepositoryFactory; -import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository; -import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException; import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator; import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager; import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator; @@ -31,6 +29,8 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.UUID; @@ -43,14 +43,19 @@ public class CatchupPerformanceIT { + private static final String SQL_INSERT_EVENT = + "INSERT INTO event_log " + + "(id, stream_id, position_in_stream, name, metadata, payload, date_created) " + + "VALUES(?, ?, ?, ?, ?, ?, ?)"; + + private static final String SQL_INSERT_STREAM = "INSERT INTO event_stream (stream_id, date_created, active) values (?, ?, ?)"; + + private static final int BATCH_INSERT_SIZE = 10_000; + private static final String CONTEXT_NAME = "example"; private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb(); private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb(); - private final EventJdbcRepository eventJdbcRepository = new EventRepositoryFactory().getEventJdbcRepository(eventStoreDataSource); - - private final EventStreamJdbsRepositoryFactory eventStreamJdbcRepositoryFactory = new EventStreamJdbsRepositoryFactory(); - private final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.getEventStreamJdbcRepository(eventStoreDataSource); private final ProcessedEventCounter processedEventCounter = new ProcessedEventCounter(viewStoreDataSource); @@ -60,7 +65,10 @@ public class CatchupPerformanceIT { private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory(); private final DatabaseCleaner databaseCleaner = new DatabaseCleaner(); - private final Poller longPoller = new Poller(1200, 1000L); + private final Poller longPoller = new Poller(2400, 1000L); + + + private final UtcClock clock = new UtcClock(); private Client client; @@ -87,27 +95,23 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception { final int numberOfEventsPerStream = 100; final int totalEvents = numberOfStreams * numberOfEventsPerStream; - addEventsToEventLog(numberOfStreams, numberOfEventsPerStream); - - final Optional numberOfEvents = longPoller.pollUntilFound(() -> { - final int eventCount = processedEventCounter.countProcessedEvents(); - if (eventCount == totalEvents) { - return of(eventCount); - } + System.out.println(format( + "Inserting %d events into event_log (%d events in %d streams)", + totalEvents, + numberOfEventsPerStream, + numberOfStreams + )); - return empty(); - }); - - if (numberOfEvents.isPresent()) { - System.out.println("Inserted " + numberOfEvents.get() + " events"); - } else { - fail("Failed to insert " + totalEvents + " events"); - } + addEventsToEventLog(numberOfStreams, numberOfEventsPerStream); + System.out.println("Inserted " + totalEvents + " events into event_log"); + System.out.println("Waiting for events to publish..."); cleanViewstoreTables(); longPoller.pollUntilFound(() -> { final int eventCount = processedEventCounter.countProcessedEvents(); + System.out.println(format("%s events in processed_event table", eventCount)); + if (eventCount == 0) { return of(eventCount); } @@ -127,10 +131,13 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception { assertThat(eventCount, is(0)); } + System.out.println("Running catchup..."); runCatchup(); final Optional numberOfReplayedEvents = longPoller.pollUntilFound(() -> { final int eventCount = processedEventCounter.countProcessedEvents(); + System.out.println(format("%s events in processed_event table", eventCount)); + if (eventCount == totalEvents) { return of(eventCount); } @@ -139,16 +146,19 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception { }); if (numberOfReplayedEvents.isPresent()) { - System.out.println("Successfully caught up " + numberOfEvents.get() + " events"); + System.out.println("Successfully caught up " + numberOfReplayedEvents.get() + " events"); } else { fail("Failed to catchup " + totalEvents + " events."); } } - private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException { + private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws Exception { final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator(); + final List events = new ArrayList<>(); + final List streamIds = new ArrayList<>(); + for (int seed = 0; seed < numberOfStreams; seed++) { final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator(); @@ -156,14 +166,72 @@ private void addEventsToEventLog(final int numberOfStreams, final int numberOfEv final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator); final UUID recipeId = recipeAddedEvent.getStreamId(); - eventStreamJdbcRepository.insert(recipeId); - eventJdbcRepository.insert(recipeAddedEvent); + if (!streamIds.contains(recipeId)) { + streamIds.add(recipeId); + } + + events.add(recipeAddedEvent); for (int renameNumber = 1; renameNumber < numberOfEventsPerStream; renameNumber++) { final Event recipeRenamedEvent = cakeshopEventGenerator.createRecipeRenamedEvent(recipeId, seed, renameNumber, positionInStreamIterator); - eventJdbcRepository.insert(recipeRenamedEvent); + events.add(recipeRenamedEvent); } } + + updateEventStreamTable(streamIds); + updateEventLogTable(events); + + } + + private void updateEventLogTable(final List events) throws Exception { + + try (final Connection connection = eventStoreDataSource.getConnection(); + final PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT_EVENT)) { + for (int i = 0; i < events.size(); i++) { + + final Event event = events.get(i); + preparedStatement.setObject(1, event.getId()); + preparedStatement.setObject(2, event.getStreamId()); + preparedStatement.setLong(3, event.getPositionInStream()); + preparedStatement.setString(4, event.getName()); + preparedStatement.setString(5, event.getMetadata()); + preparedStatement.setString(6, event.getPayload()); + preparedStatement.setTimestamp(7, toSqlTimestamp(clock.now())); + + preparedStatement.addBatch(); + + if (i % BATCH_INSERT_SIZE == 0) { + preparedStatement.executeBatch(); + System.out.println(format("Inserted %d events into event_log...", i)); + } + + } + + preparedStatement.executeBatch(); + } + } + + private void updateEventStreamTable(final List streamIds) throws Exception { + + try (final Connection connection = eventStoreDataSource.getConnection(); + final PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT_STREAM)) { + for (int i = 0; i < streamIds.size(); i++) { + final UUID streamId = streamIds.get(i); + preparedStatement.setObject(1, streamId); + preparedStatement.setTimestamp(2, toSqlTimestamp(clock.now())); + preparedStatement.setBoolean(3, true); + + preparedStatement.addBatch(); + + if (i % BATCH_INSERT_SIZE == 0) { + preparedStatement.executeBatch(); + } + + } + + preparedStatement.executeBatch(); + + } } private void runCatchup() throws Exception {