Skip to content

Commit

Permalink
Merge 480de92 into 2ff6595
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Apr 29, 2019
2 parents 2ff6595 + 480de92 commit 9e60aee
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField;

import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;
import uk.gov.justice.services.test.utils.persistence.SettableEventStoreDataSourceProvider;

import javax.inject.Inject;
import javax.sql.DataSource;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package uk.gov.justice.services.example.cakeshop.it;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static javax.management.JMX.newMBeanProxy;
import static org.junit.Assert.fail;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventStreamJdbsRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEventRepositoryTruncator;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.MBeanHelper;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
import uk.gov.justice.services.example.cakeshop.it.helpers.PublishedEventCounter;
import uk.gov.justice.services.example.cakeshop.it.helpers.RecipeTableInspector;
import uk.gov.justice.services.example.cakeshop.it.helpers.RestEasyClientFactory;
import uk.gov.justice.services.example.cakeshop.it.helpers.StandaloneStreamStatusJdbcRepositoryFactory;
import uk.gov.justice.services.jmx.Catchup;
import uk.gov.justice.services.jmx.CatchupMBean;
import uk.gov.justice.services.jmx.Shuttering;
import uk.gov.justice.services.jmx.ShutteringMBean;
import uk.gov.justice.services.test.utils.core.messaging.Poller;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.sql.DataSource;
import javax.ws.rs.client.Client;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CatchupPerformanceIT {

private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb();
private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb();
private final EventJdbcRepository eventJdbcRepository = new EventRepositoryFactory().getEventJdbcRepository(eventStoreDataSource);
private final LinkedEventRepositoryTruncator linkedEventRepositoryTruncator = new LinkedEventRepositoryTruncator(eventStoreDataSource);

private final EventStreamJdbsRepositoryFactory eventStreamJdbcRepositoryFactory = new EventStreamJdbsRepositoryFactory();
private final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.getEventStreamJdbcRepository(eventStoreDataSource);

private final StandaloneStreamStatusJdbcRepositoryFactory standaloneStreamStatusJdbcRepositoryFactory = new StandaloneStreamStatusJdbcRepositoryFactory();

private final RecipeTableInspector recipeTableInspector = new RecipeTableInspector(viewStoreDataSource);
private final PublishedEventCounter publishedEventCounter = new PublishedEventCounter(eventStoreDataSource);

private final Poller longPoller = new Poller(1200, 1000L);

private Client client;
private MBeanHelper mBeanHelper;

@Before
public void before() {
client = new RestEasyClientFactory().createResteasyClient();
mBeanHelper = new MBeanHelper();
}

@After
public void cleanup() {
client.close();
}

@After
public void unShutter() throws Exception {

try (final JMXConnector jmxConnector = mBeanHelper.getJMXConnector()) {
final MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

final ObjectName objectName = new ObjectName("shuttering", "type", Shuttering.class.getSimpleName());

mBeanHelper.getMbeanProxy(connection, objectName, ShutteringMBean.class).doUnshutteringRequested();
}
}

@Test
public void shouldReplayAndFindRecipesInViewStore() throws Exception {

final int numberOfStreams = 10;
final int numberOfEventsPerStream = 100;
final int totalEvents = numberOfStreams * numberOfEventsPerStream;

shutter();

truncateEventLog();
recipeTableInspector.truncateViewstoreTables();

final List<UUID> streamIds = addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final Optional<Integer> numberOfEvents = longPoller.pollUntilFound(() -> {
final int eventCount = publishedEventCounter.countPublishedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}

return empty();
});

if (numberOfEvents.isPresent()) {
System.out.println("Inserted " + numberOfEvents.get() + " events");
} else {
fail("Failed to insert " + totalEvents + " events");
}


recipeTableInspector.truncateViewstoreTables();

runCatchup();

final Optional<Integer> numberOfReplayedRecipesOptional = checkExpectedNumberOfRecipes(numberOfStreams);

if (!numberOfReplayedRecipesOptional.isPresent()) {
fail();
}


for(final UUID streamId: streamIds) {

final Optional<Long> eventCount = longPoller.pollUntilFound(() -> {
final long eventsPerStream = recipeTableInspector.countEventsPerStream(streamId);
if (eventsPerStream == numberOfEventsPerStream) {
return of(eventsPerStream);
}

return empty();
});

if (! eventCount.isPresent()) {
fail();
}
}

publishedEventCounter.truncatePublishQueue();
}

private List<UUID> addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws InvalidPositionException {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();

final List<UUID> streamIds = new ArrayList<>();

for (int seed = 0; seed < numberOfStreams; seed++) {

final PositionInStreamIterator positionInStreamIterator = new PositionInStreamIterator();

final Event recipeAddedEvent = cakeshopEventGenerator.createRecipeAddedEvent(seed, positionInStreamIterator);
final UUID recipeId = recipeAddedEvent.getStreamId();

streamIds.add(recipeId);

eventStreamJdbcRepository.insert(recipeId);
eventJdbcRepository.insert(recipeAddedEvent);

for (int renameNumber = 1; renameNumber < numberOfEventsPerStream; renameNumber++) {
final Event recipeRenamedEvent = cakeshopEventGenerator.createRecipeRenamedEvent(recipeId, seed, renameNumber, positionInStreamIterator);
eventJdbcRepository.insert(recipeRenamedEvent);
}
}

return streamIds;
}

private void truncateEventLog() throws SQLException {
final Stream<Event> eventStream = eventJdbcRepository.findAll();
eventStream.forEach(event -> eventJdbcRepository.clear(event.getStreamId()));

linkedEventRepositoryTruncator.truncate();
}

private void shutter() throws Exception {

try (final JMXConnector jmxConnector = mBeanHelper.getJMXConnector()) {
final MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

final ObjectName objectName = new ObjectName("shuttering", "type", Shuttering.class.getSimpleName());

mBeanHelper.getMbeanProxy(connection, objectName, ShutteringMBean.class).doShutteringRequested();
}
}

private void runCatchup() throws Exception {

try (final JMXConnector jmxConnector = mBeanHelper.getJMXConnector()) {
final MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

final ObjectName objectName = new ObjectName("catchup", "type", Catchup.class.getSimpleName());

mBeanHelper.getMbeanDomains(connection);

mBeanHelper.getMbeanOperations(objectName, connection);

final CatchupMBean catchupMBean = newMBeanProxy(connection, objectName, CatchupMBean.class, true);

catchupMBean.doCatchupRequested();
}
}

private Optional<Integer> checkExpectedNumberOfRecipes(final int numberOfStreams) {
return longPoller.pollUntilFound(() -> {
final int numberOfRecipes = recipeTableInspector.countNumberOfRecipes();

if (numberOfRecipes == numberOfStreams) {
return of(numberOfRecipes);
}

return empty();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package uk.gov.justice.services.example.cakeshop.it.helpers;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.sql.DataSource;

public class PublishedEventCounter {

private final DataSource eventStoreDataSource;

public PublishedEventCounter(final DataSource eventStoreDataSource) {
this.eventStoreDataSource = eventStoreDataSource;
}


public int countPublishedEvents() {

final String sql = "SELECT COUNT (*) FROM published_event";

try(final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final ResultSet resultSet = preparedStatement.executeQuery()) {

resultSet.next();

return resultSet.getInt(1);
} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the event store", e);
}
}

public void truncatePublishQueue() {

final String sql = "DELETE FROM publish_queue";

try(final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql)) {

preparedStatement.executeUpdate();

} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the event store", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,23 @@ public List<Recipe> getAllRecipes() {
throw new RuntimeException("Failed to run query '" + sql + "' against the view store", e);
}
}

public long countEventsPerStream(final UUID streamId) {

final String sql = "SELECT version FROM stream_status WHERE stream_id = ?";

try(final Connection connection = viewStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql)) {

preparedStatement.setObject(1, streamId);

try(final ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
}

} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the event store", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@
<logger category="com.arjuna">
<level name="WARN"/>
</logger>
<logger category="org.jboss.as.ejb3">
<level name="ERROR"/>
</logger>
<logger category="org.jboss.as.config">
<level name="WARN"/>
</logger>
<logger category="sun.rmi">
<level name="WARN"/>
</logger>
<logger category="uk.gov.justice">
<level name="WARN"/>
<level name="INFO"/>
</logger>
<root-logger>
<level name="WARN"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import uk.gov.justice.services.core.annotation.Handles;
import uk.gov.justice.services.core.annotation.ServiceComponent;
import uk.gov.justice.services.core.requester.Requester;
import uk.gov.justice.services.example.cakeshop.query.api.response.CakesView;
import uk.gov.justice.services.example.cakeshop.query.api.request.SearchCake;
import uk.gov.justice.services.example.cakeshop.query.api.response.CakesView;
import uk.gov.justice.services.messaging.Envelope;

import javax.inject.Inject;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<common-bom.version>1.28.0</common-bom.version>
<framework.version>6.0.0-M20</framework.version>
<embedded-artemis.version>1.2.0</embedded-artemis.version>
<event-store.version>2.0.0-M17</event-store.version>
<event-store.version>2.0.0-M18</event-store.version>
<framework-generators.version>2.0.0-M13</framework-generators.version>
<file.service.version>1.17.4</file.service.version>
<framework-api.version>4.0.0-M13</framework-api.version>
Expand Down

0 comments on commit 9e60aee

Please sign in to comment.