Skip to content

Commit

Permalink
Merge 6baeafe into 568dbf9
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Nov 4, 2019
2 parents 568dbf9 + 6baeafe commit b2b4f0e
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.example.cakeshop.it;

import static java.lang.Integer.valueOf;
import static java.lang.String.format;
import static java.lang.System.getProperty;
import static java.util.Optional.empty;
import static java.util.Optional.of;
Expand All @@ -11,11 +12,7 @@
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventStreamJdbsRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;
import uk.gov.justice.services.example.cakeshop.it.helpers.BatchEventInserter;
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
Expand All @@ -31,6 +28,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -43,14 +42,12 @@

public class CatchupPerformanceIT {

private static final int BATCH_INSERT_SIZE = 10_000;

private static final String CONTEXT_NAME = "example";

private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb();
private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb();
private final EventJdbcRepository eventJdbcRepository = new EventRepositoryFactory().getEventJdbcRepository(eventStoreDataSource);

private final EventStreamJdbsRepositoryFactory eventStreamJdbcRepositoryFactory = new EventStreamJdbsRepositoryFactory();
private final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.getEventStreamJdbcRepository(eventStoreDataSource);

private final ProcessedEventCounter processedEventCounter = new ProcessedEventCounter(viewStoreDataSource);

Expand All @@ -60,7 +57,8 @@ public class CatchupPerformanceIT {
private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory();
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

private final Poller longPoller = new Poller(1200, 1000L);
private final Poller longPoller = new Poller(2400, 1000L);
private final BatchEventInserter batchEventInserter = new BatchEventInserter(eventStoreDataSource, BATCH_INSERT_SIZE);

private Client client;

Expand All @@ -87,27 +85,23 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
final int numberOfEventsPerStream = 100;
final int totalEvents = numberOfStreams * numberOfEventsPerStream;

addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final Optional<Integer> numberOfEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}

return empty();
});
System.out.println(format(
"Inserting %d events into event_log (%d events in %d streams)",
totalEvents,
numberOfEventsPerStream,
numberOfStreams
));

if (numberOfEvents.isPresent()) {
System.out.println("Inserted " + numberOfEvents.get() + " events");
} else {
fail("Failed to insert " + totalEvents + " events");
}
addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

System.out.println("Inserted " + totalEvents + " events into event_log");
System.out.println("Waiting for events to publish...");
cleanViewstoreTables();

longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

if (eventCount == 0) {
return of(eventCount);
}
Expand All @@ -127,10 +121,13 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
assertThat(eventCount, is(0));
}

System.out.println("Running catchup...");
runCatchup();

final Optional<Integer> numberOfReplayedEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

if (eventCount == totalEvents) {
return of(eventCount);
}
Expand All @@ -139,31 +136,41 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
});

if (numberOfReplayedEvents.isPresent()) {
System.out.println("Successfully caught up " + numberOfEvents.get() + " events");
System.out.println("Successfully caught up " + numberOfReplayedEvents.get() + " events");
} else {
fail("Failed to catchup " + totalEvents + " events.");
}
}

private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException {
private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws Exception {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();

final List<Event> events = new ArrayList<>();
final List<UUID> streamIds = new ArrayList<>();

for (int seed = 0; seed < numberOfStreams; seed++) {

final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator();

final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator);
final UUID recipeId = recipeAddedEvent.getStreamId();

eventStreamJdbcRepository.insert(recipeId);
eventJdbcRepository.insert(recipeAddedEvent);
if (!streamIds.contains(recipeId)) {
streamIds.add(recipeId);
}

events.add(recipeAddedEvent);

for (int renameNumber = 1; renameNumber < numberOfEventsPerStream; renameNumber++) {
final Event recipeRenamedEvent = cakeshopEventGenerator.createRecipeRenamedEvent(recipeId, seed, renameNumber, positionInStreamIterator);
eventJdbcRepository.insert(recipeRenamedEvent);
events.add(recipeRenamedEvent);
}
}

batchEventInserter.updateEventStreamTable(streamIds);
batchEventInserter.updateEventLogTable(events);

}

private void runCatchup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package uk.gov.justice.services.example.cakeshop.it;

import static java.lang.Integer.valueOf;
import static java.lang.System.getProperty;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_COMPLETE;
import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_FAILED;
import static uk.gov.justice.services.jmx.system.command.client.connection.JmxParametersBuilder.jmxParameters;
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.example.cakeshop.it.helpers.BatchEventInserter;
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
import uk.gov.justice.services.jmx.api.command.ValidatePublishedEventsCommand;
import uk.gov.justice.services.jmx.api.domain.CommandState;
import uk.gov.justice.services.jmx.api.domain.SystemCommandStatus;
import uk.gov.justice.services.jmx.api.mbean.SystemCommanderMBean;
import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClient;
import uk.gov.justice.services.jmx.system.command.client.TestSystemCommanderClientFactory;
import uk.gov.justice.services.jmx.system.command.client.connection.JmxParameters;
import uk.gov.justice.services.test.utils.core.messaging.Poller;
import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import javax.sql.DataSource;

import org.junit.Test;

public class EventValidationIT {

private static final int BATCH_INSERT_SIZE = 10_000;

private static final String CONTEXT_NAME = "example";

private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb();
private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb();

private static final String HOST = getHost();
private static final int PORT = valueOf(getProperty("random.management.port"));

private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory();
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

private final Poller longPoller = new Poller(2400, 1000L);
private final BatchEventInserter batchEventInserter = new BatchEventInserter(eventStoreDataSource, BATCH_INSERT_SIZE);

@Test
public void shouldRunValidateEventsCommand() throws Exception {

final int numberOfStreams = 2;
final int numberOfEventsPerStream = 3;

addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final JmxParameters jmxParameters = jmxParameters()
.withHost(HOST)
.withPort(PORT)
.build();

final Optional<SystemCommandStatus> systemCommandStatus = runEventValidationCommand(jmxParameters);

if (systemCommandStatus.isPresent()) {
assertThat(systemCommandStatus.get().getMessage(), is("All PublishedEvents successfully passed schema validation"));
} else {
fail();
}
}

private Optional<SystemCommandStatus> runEventValidationCommand(final JmxParameters jmxParameters) {
try (final SystemCommanderClient systemCommanderClient = systemCommanderClientFactory.create(jmxParameters)) {

final SystemCommanderMBean systemCommanderMBean = systemCommanderClient.getRemote(CONTEXT_NAME);
final UUID commandId = systemCommanderMBean
.call(new ValidatePublishedEventsCommand());

return longPoller.pollUntilFound(() -> commandNoLongerInProgress(systemCommanderMBean, commandId));
}
}

private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws Exception {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();

final List<Event> events = new ArrayList<>();
final List<UUID> streamIds = new ArrayList<>();

for (int seed = 0; seed < numberOfStreams; seed++) {

final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator();

final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator);
final UUID recipeId = recipeAddedEvent.getStreamId();

if (!streamIds.contains(recipeId)) {
streamIds.add(recipeId);
}

events.add(recipeAddedEvent);

for (int renameNumber = 1; renameNumber < numberOfEventsPerStream; renameNumber++) {
final Event recipeRenamedEvent = cakeshopEventGenerator.createRecipeRenamedEvent(recipeId, seed, renameNumber, positionInStreamIterator);
events.add(recipeRenamedEvent);
}
}

batchEventInserter.updateEventStreamTable(streamIds);
batchEventInserter.updateEventLogTable(events);

final JmxParameters jmxParameters = jmxParameters()
.withHost(HOST)
.withPort(PORT)
.build();

runEventValidationCommand(jmxParameters);

}

private Optional<SystemCommandStatus> commandNoLongerInProgress(final SystemCommanderMBean systemCommanderMBean, final UUID commandId) {

final SystemCommandStatus systemCommandStatus = systemCommanderMBean.getCommandStatus(commandId);

final CommandState commandState = systemCommandStatus.getCommandState();
if (commandState == COMMAND_COMPLETE || commandState == COMMAND_FAILED) {
return of(systemCommandStatus);

}

return empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uk.gov.justice.services.jmx.api.command.ShutterCommand;
import uk.gov.justice.services.jmx.api.command.SystemCommand;
import uk.gov.justice.services.jmx.api.command.UnshutterCommand;
import uk.gov.justice.services.jmx.api.command.ValidatePublishedEventsCommand;
import uk.gov.justice.services.jmx.api.command.VerifyCatchupCommand;
import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClient;
import uk.gov.justice.services.jmx.system.command.client.TestSystemCommanderClientFactory;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void shouldListAllSystemCommands() throws Exception {

final List<SystemCommand> systemCommands = systemCommanderClient.getRemote(CONTEXT_NAME).listCommands();

assertThat(systemCommands.size(), is(11));
assertThat(systemCommands.size(), is(12));
assertThat(systemCommands, hasItem(new AddTriggerCommand()));
assertThat(systemCommands, hasItem(new DisablePublishingCommand()));
assertThat(systemCommands, hasItem(new EnablePublishingCommand()));
Expand All @@ -59,6 +60,7 @@ public void shouldListAllSystemCommands() throws Exception {
assertThat(systemCommands, hasItem(new RemoveTriggerCommand()));
assertThat(systemCommands, hasItem(new ShutterCommand()));
assertThat(systemCommands, hasItem(new UnshutterCommand()));
assertThat(systemCommands, hasItem(new ValidatePublishedEventsCommand()));
assertThat(systemCommands, hasItem(new VerifyCatchupCommand()));
}
}
Expand Down
Loading

0 comments on commit b2b4f0e

Please sign in to comment.