Skip to content

Commit

Permalink
Update to use the latest system commands
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed Jun 25, 2019
1 parent dd603a9 commit f8f5d89
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,17 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;
import uk.gov.justice.services.eventstore.management.catchup.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.shuttercatchup.commands.ShutterCatchupCommand;
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
import uk.gov.justice.services.example.cakeshop.it.helpers.PublishedEventCounter;
import uk.gov.justice.services.example.cakeshop.it.helpers.RecipeTableInspector;
import uk.gov.justice.services.example.cakeshop.it.helpers.ProcessedEventCounter;
import uk.gov.justice.services.example.cakeshop.it.helpers.RestEasyClientFactory;
import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClient;
import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClientFactory;
import uk.gov.justice.services.test.utils.core.messaging.Poller;
import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -46,8 +44,7 @@ public class CatchupPerformanceIT {
private final EventStreamJdbsRepositoryFactory eventStreamJdbcRepositoryFactory = new EventStreamJdbsRepositoryFactory();
private final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.getEventStreamJdbcRepository(eventStoreDataSource);

private final RecipeTableInspector recipeTableInspector = new RecipeTableInspector(viewStoreDataSource);
private final PublishedEventCounter publishedEventCounter = new PublishedEventCounter(eventStoreDataSource);
private final ProcessedEventCounter processedEventCounter = new ProcessedEventCounter(viewStoreDataSource);

private static final String HOST = getHost();
private static final int PORT = valueOf(getProperty("random.management.port"));
Expand All @@ -68,6 +65,7 @@ public void before() {

databaseCleaner.cleanEventStoreTables(contextName);
cleanViewstoreTables();
databaseCleaner.cleanSystemTables(contextName);
}

@After
Expand All @@ -82,10 +80,10 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
final int numberOfEventsPerStream = 100;
final int totalEvents = numberOfStreams * numberOfEventsPerStream;

final List<UUID> streamIds = addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);
addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final Optional<Integer> numberOfEvents = longPoller.pollUntilFound(() -> {
final int eventCount = publishedEventCounter.countPublishedEvents();
final int eventCount = processedEventCounter.countProcessedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}
Expand All @@ -99,67 +97,46 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
fail("Failed to insert " + totalEvents + " events");
}

for (final UUID streamId : streamIds) {

final Optional<Long> eventCount = longPoller.pollUntilFound(() -> {
final long eventsPerStream = recipeTableInspector.countEventsPerStream(streamId);
if (eventsPerStream == numberOfEventsPerStream) {
return of(eventsPerStream);
}

return empty();
});
cleanViewstoreTables();

if (!eventCount.isPresent()) {
fail("Expected " + numberOfEventsPerStream + " events but found " + recipeTableInspector.countEventsPerStream(streamId) + " in stream " + streamId);
longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
if (eventCount == 0) {
return of(eventCount);
}
}

cleanViewstoreTables();
return empty();
});

runCatchup();

final Optional<Integer> numberOfReplayedRecipesOptional = checkExpectedNumberOfRecipes(numberOfStreams);

if (!numberOfReplayedRecipesOptional.isPresent()) {
fail();
}


for (final UUID streamId : streamIds) {

final Optional<Long> eventCount = longPoller.pollUntilFound(() -> {
final long eventsPerStream = recipeTableInspector.countEventsPerStream(streamId);
if (eventsPerStream == numberOfEventsPerStream) {
return of(eventsPerStream);
}
final Optional<Integer> numberOfReplayedEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}

return empty();
});
return empty();
});

if (!eventCount.isPresent()) {
fail();
}
if (numberOfReplayedEvents.isPresent()) {
System.out.println("Successfully caught up " + numberOfEvents.get() + " events");
} else {
fail("Failed to catchup " + totalEvents + " events.");
}

publishedEventCounter.truncatePublishQueue();
}

private List<UUID> addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException {
private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();

final List<UUID> streamIds = new ArrayList<>();

for (int seed = 0; seed < numberOfStreams; seed++) {

final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator();

final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator);
final UUID recipeId = recipeAddedEvent.getStreamId();

streamIds.add(recipeId);

eventStreamJdbcRepository.insert(recipeId);
eventJdbcRepository.insert(recipeAddedEvent);

Expand All @@ -168,30 +145,16 @@ private List<UUID> addEventsToEventLog(final int numberOfStreams, final int numb
eventJdbcRepository.insert(recipeRenamedEvent);
}
}

return streamIds;
}

private void runCatchup() throws Exception {

try (final SystemCommanderClient systemCommanderClient = systemCommanderClientFactory.create(HOST, PORT)) {

systemCommanderClient.getRemote().call(new CatchupCommand());
systemCommanderClient.getRemote().call(new ShutterCatchupCommand());
}
}

private Optional<Integer> checkExpectedNumberOfRecipes(final int numberOfStreams) {
return longPoller.pollUntilFound(() -> {
final int numberOfRecipes = recipeTableInspector.countNumberOfRecipes();

if (numberOfRecipes == numberOfStreams) {
return of(numberOfRecipes);
}

return empty();
});
}

private void cleanViewstoreTables() {

final String contextName = "framework";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package uk.gov.justice.services.example.cakeshop.it.helpers;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.sql.DataSource;

public class ProcessedEventCounter {

private final DataSource viewStoreDataSource;

public ProcessedEventCounter(final DataSource viewStoreDataSource) {
this.viewStoreDataSource = viewStoreDataSource;
}


public int countProcessedEvents() {

final String sql = "SELECT COUNT (*) FROM processed_event";

try(final Connection connection = viewStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final ResultSet resultSet = preparedStatement.executeQuery()) {

resultSet.next();

return resultSet.getInt(1);
} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the view store", e);
}
}
}

This file was deleted.

This file was deleted.

10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@

<properties>
<cpp.repo.name>cake-shop</cpp.repo.name>

<framework-api.version>4.0.0-M25</framework-api.version>
<framework.version>6.0.0-M35</framework.version>
<event-store.version>2.0.0-M39</event-store.version>
<framework-generators.version>2.0.0-M27</framework-generators.version>
<framework.version>6.0.0-M37</framework.version>
<event-store.version>2.0.0-M41</event-store.version>
<framework-generators.version>2.0.0-M28</framework-generators.version>
<file.service.version>1.17.8</file.service.version>

<common-bom.version>2.0.2</common-bom.version>
<utilities.version>1.19.0</utilities.version>
<test-utils.version>1.23.0</test-utils.version>
Expand Down

0 comments on commit f8f5d89

Please sign in to comment.