Skip to content

Commit

Permalink
Add integration test for validation failure
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed Nov 5, 2019
1 parent 58fa49d commit 35b459b
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {

longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

if (eventCount == 0) {
return of(eventCount);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package uk.gov.justice.services.example.cakeshop.it;

import static java.lang.Integer.valueOf;
import static java.lang.Integer.parseInt;
import static java.lang.System.getProperty;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static javax.json.Json.createObjectBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
Expand All @@ -17,6 +18,8 @@
import uk.gov.justice.services.example.cakeshop.it.helpers.CakeshopEventGenerator;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
import uk.gov.justice.services.example.cakeshop.it.helpers.PositionInStreamIterator;
import uk.gov.justice.services.example.cakeshop.it.helpers.PublishedEventCounter;
import uk.gov.justice.services.jmx.api.command.SystemCommand;
import uk.gov.justice.services.jmx.api.command.ValidatePublishedEventsCommand;
import uk.gov.justice.services.jmx.api.domain.CommandState;
import uk.gov.justice.services.jmx.api.domain.SystemCommandStatus;
Expand All @@ -27,13 +30,17 @@
import uk.gov.justice.services.test.utils.core.messaging.Poller;
import uk.gov.justice.services.test.utils.persistence.DatabaseCleaner;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;

public class EventValidationIT {
Expand All @@ -43,50 +50,114 @@ public class EventValidationIT {
private static final String CONTEXT_NAME = "example";

private final DataSource eventStoreDataSource = new DatabaseManager().initEventStoreDb();

@SuppressWarnings("unused")
private final DataSource viewStoreDataSource = new DatabaseManager().initViewStoreDb();
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

private static final String HOST = getHost();
private static final int PORT = valueOf(getProperty("random.management.port"));
private static final int PORT = parseInt(getProperty("random.management.port"));

private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory();
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();

private final Poller longPoller = new Poller(2400, 1000L);
private final Poller poller = new Poller();
private final BatchEventInserter batchEventInserter = new BatchEventInserter(eventStoreDataSource, BATCH_INSERT_SIZE);

private PublishedEventCounter publishedEventCounter = new PublishedEventCounter(eventStoreDataSource);

private final JmxParameters jmxParameters = jmxParameters()
.withHost(HOST)
.withPort(PORT)
.build();

@Before
public void cleanTables() {

final String databaseContextName = "framework";

databaseCleaner.cleanEventStoreTables(databaseContextName);

databaseCleaner.cleanProcessedEventTable(databaseContextName);
databaseCleaner.cleanStreamStatusTable(databaseContextName);
databaseCleaner.cleanStreamBufferTable(databaseContextName);
databaseCleaner.cleanViewStoreTables(databaseContextName, "recipe");

databaseCleaner.cleanSystemTables(databaseContextName);
}

@Test
public void shouldRunValidateEventsCommand() throws Exception {

final int numberOfStreams = 2;
final int numberOfEventsPerStream = 3;

final int totalEvents = numberOfEventsPerStream * numberOfStreams;

addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

final JmxParameters jmxParameters = jmxParameters()
.withHost(HOST)
.withPort(PORT)
.build();
waitForEventsToPublish(totalEvents);

final Optional<SystemCommandStatus> systemCommandStatus = runEventValidationCommand(jmxParameters);
final Optional<SystemCommandStatus> systemCommandStatus = runCommand(jmxParameters, new ValidatePublishedEventsCommand());

if (systemCommandStatus.isPresent()) {
assertThat(systemCommandStatus.get().getMessage(), is("All PublishedEvents successfully passed schema validation"));
} else {
fail();
}

}

@Test
public void shouldFailIfAnyEventsAreInvalid() throws Exception {

final int numberOfStreams = 2;
final int numberOfEventsPerStream = 3;

final int totalEvents = numberOfEventsPerStream * numberOfStreams;

addEventsToEventLog(numberOfStreams, numberOfEventsPerStream);

waitForEventsToPublish(totalEvents);

setPayloadsOfEventsInvalid();

final Optional<SystemCommandStatus> systemCommandStatus = runCommand(jmxParameters, new ValidatePublishedEventsCommand());

if (systemCommandStatus.isPresent()) {
assertThat(systemCommandStatus.get().getMessage(), is("6 PublishedEvent(s) failed schema validation. Please see server logs for errors"));
} else {
fail();
}
}

private Optional<SystemCommandStatus> runEventValidationCommand(final JmxParameters jmxParameters) {
@SuppressWarnings("SameParameterValue")
private void waitForEventsToPublish(final int totalEvents) {
final Optional<Integer> publishedEventCount = poller.pollUntilFound(() -> {
final int eventCount = publishedEventCounter.countPublishedEvents();
if (eventCount == totalEvents) {
return of(eventCount);
}

return empty();
});

if (! publishedEventCount.isPresent()) {
assertThat("Incorrect number of events were published", publishedEventCounter.countPublishedEvents(), is(totalEvents));
}
}

private Optional<SystemCommandStatus> runCommand(final JmxParameters jmxParameters, final SystemCommand systemCommand) {
try (final SystemCommanderClient systemCommanderClient = systemCommanderClientFactory.create(jmxParameters)) {

final SystemCommanderMBean systemCommanderMBean = systemCommanderClient.getRemote(CONTEXT_NAME);
final UUID commandId = systemCommanderMBean
.call(new ValidatePublishedEventsCommand());
.call(systemCommand);

return longPoller.pollUntilFound(() -> commandNoLongerInProgress(systemCommanderMBean, commandId));
return poller.pollUntilFound(() -> commandNoLongerInProgress(systemCommanderMBean, commandId));
}
}

@SuppressWarnings("SameParameterValue")
private void addEventsToEventLog(final int numberOfStreams, final int numberOfEventsPerStream) throws Exception {

final CakeshopEventGenerator cakeshopEventGenerator = new CakeshopEventGenerator();
Expand Down Expand Up @@ -115,14 +186,43 @@ private void addEventsToEventLog(final int numberOfStreams, final int numberOfEv

batchEventInserter.updateEventStreamTable(streamIds);
batchEventInserter.updateEventLogTable(events);
}

private void setPayloadsOfEventsInvalid() throws Exception {

final JmxParameters jmxParameters = jmxParameters()
.withHost(HOST)
.withPort(PORT)
.build();
final List<UUID> eventIds = getEventIds();

runEventValidationCommand(jmxParameters);
try (final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement("UPDATE published_event set payload = ? where id = ?")) {

for (final UUID eventId : eventIds) {
final String dodgyPayload = createObjectBuilder()
.add("dodgyProperty", "Event with id '" + eventId + "' is dodgy")
.build().toString();

preparedStatement.setString(1, dodgyPayload);
preparedStatement.setObject(2, eventId);

preparedStatement.addBatch();
}

preparedStatement.executeBatch();
}
}

private List<UUID> getEventIds() throws Exception {

final List<UUID> eventIds = new ArrayList<>();
try (final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement("SELECT id from published_event");
final ResultSet resultSet = preparedStatement.executeQuery()) {

while (resultSet.next()) {
eventIds.add((UUID) resultSet.getObject(1));
}
}

return eventIds;
}

private Optional<SystemCommandStatus> commandNoLongerInProgress(final SystemCommanderMBean systemCommanderMBean, final UUID commandId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void updateEventLogTable(final List<Event> events) throws Exception {

preparedStatement.addBatch();

if (i % batchSize == 0) {
if (i > 0 && i % batchSize == 0) {
preparedStatement.executeBatch();
System.out.println(format("Inserted %d events into event_log...", i));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package uk.gov.justice.services.example.cakeshop.it.helpers;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.sql.DataSource;

public class PublishedEventCounter {

private final DataSource eventStoreDataSource;

public PublishedEventCounter(final DataSource eventStoreDataSource) {
this.eventStoreDataSource = eventStoreDataSource;
}


public int countPublishedEvents() {

final String sql = "SELECT COUNT (*) FROM published_event";

try(final Connection connection = eventStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final ResultSet resultSet = preparedStatement.executeQuery()) {

resultSet.next();

return resultSet.getInt(1);
} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the event store", e);
}
}
}

0 comments on commit 35b459b

Please sign in to comment.