From d30eb7a39bf4d3a7c80b07fbd5e92b7c8755df14 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Mon, 4 Jun 2018 15:26:29 +0100 Subject: [PATCH] Fix non closed stream and add test for multiple streams --- .../tools/replay/AsyncStreamDispatcher.java | 12 +- .../replay/JsonEnvelopeJdbcRepository.java | 9 +- .../JsonEnvelopeJdbcRepositoryTest.java | 23 +++- .../tools/replay/EventOrderingIT.java | 1 + .../InsertAllEventsIntoViewStoreIT.java | 1 + .../tools/replay/MultipleEventSteamsIT.java | 105 ++++++++++++++++++ .../src/test/resources/standalone-ds.xml | 12 +- .../listener/FrameworkToolsTestListener.java | 6 + 8 files changed, 154 insertions(+), 15 deletions(-) create mode 100644 replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/MultipleEventSteamsIT.java diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java index 093f9be..fd35dc1 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java @@ -41,11 +41,15 @@ public UUID dispatch(final UUID streamId) { progressLogger.logStart(streamId); - replayAllEventsOf(streamId); + try { + replayAllEventsOf(streamId); + } finally { + final StreamStatus streamStatus = streamStatusFactory.create( + jsonEnvelopeJdbcRepository.getLatestEvent(streamId), + streamId); - insertStreamStatus(streamStatusFactory.create( - jsonEnvelopeJdbcRepository.getLatestEvent(streamId), - streamId)); + insertStreamStatus(streamStatus); + } progressLogger.logCompletion(streamId); diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepository.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepository.java index 79bbf5d..b697248 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepository.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepository.java @@ -28,10 +28,11 @@ public JsonEnvelope getLatestEvent(final UUID streamId) { final EventStream eventStream = eventSource.getStreamById(streamId); final long currentVersion = eventStream.getCurrentVersion(); - return eventStream - .readFrom(currentVersion) - .findFirst() - .orElseThrow(() -> new MissingEventStreamHeadException(format("Unable to retrieve head Event from stream with id '%s'", streamId))); + try(final Stream jsonEnvelopeStream = eventStream.readFrom(currentVersion)) { + return jsonEnvelopeStream + .findFirst() + .orElseThrow(() -> new MissingEventStreamHeadException(format("Unable to retrieve head Event from stream with id '%s'", streamId))); + } } public long getCurrentVersion(final UUID streamId) { diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepositoryTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepositoryTest.java index 42832de..c737bea 100644 --- a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepositoryTest.java +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/JsonEnvelopeJdbcRepositoryTest.java @@ -88,11 +88,19 @@ public void shouldGetTheLatestEventFromAStream() throws Exception { final EventStream eventStream = mock(EventStream.class); final JsonEnvelope latestJsonEnvelope = mock(JsonEnvelope.class); + final CloseChecker closeChecker = new CloseChecker(); + + final Stream jsonEnvelopeStream = Stream.of(latestJsonEnvelope) + .onClose(closeChecker::setClosed); + when(eventSource.getStreamById(streamId)).thenReturn(eventStream); when(eventStream.getCurrentVersion()).thenReturn(currentVersion); - when(eventStream.readFrom(currentVersion)).thenReturn(Stream.of(latestJsonEnvelope)); + + when(eventStream.readFrom(currentVersion)).thenReturn(jsonEnvelopeStream); assertThat(jsonEnvelopeJdbcRepository.getLatestEvent(streamId), is(latestJsonEnvelope)); + + assertThat(closeChecker.isClosed(), is(true)); } @Test @@ -128,4 +136,17 @@ public void shouldGetTheCurrentVersion() throws Exception { assertThat(jsonEnvelopeJdbcRepository.getCurrentVersion(streamId), is(currentVersion)); } + + private static class CloseChecker { + + private boolean closed = false; + + public void setClosed() { + closed = true; + } + + public boolean isClosed() { + return closed; + } + } } diff --git a/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/EventOrderingIT.java b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/EventOrderingIT.java index 884664d..13721fe 100644 --- a/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/EventOrderingIT.java +++ b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/EventOrderingIT.java @@ -23,6 +23,7 @@ import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class EventOrderingIT { diff --git a/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/InsertAllEventsIntoViewStoreIT.java b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/InsertAllEventsIntoViewStoreIT.java index dc34293..b787932 100644 --- a/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/InsertAllEventsIntoViewStoreIT.java +++ b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/InsertAllEventsIntoViewStoreIT.java @@ -24,6 +24,7 @@ import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class InsertAllEventsIntoViewStoreIT { diff --git a/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/MultipleEventSteamsIT.java b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/MultipleEventSteamsIT.java new file mode 100644 index 0000000..f00d05a --- /dev/null +++ b/replay-tool-test/replay-tool-integration-test/src/test/java/uk/gov/justice/framework/tools/replay/MultipleEventSteamsIT.java @@ -0,0 +1,105 @@ +package uk.gov.justice.framework.tools.replay; + + +import static java.lang.String.format; +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import uk.gov.justice.framework.tools.replay.database.DatasourceCreator; +import uk.gov.justice.framework.tools.replay.database.EventInserter; +import uk.gov.justice.framework.tools.replay.database.LiquibaseRunner; +import uk.gov.justice.framework.tools.replay.events.User; +import uk.gov.justice.framework.tools.replay.events.UserFactory; +import uk.gov.justice.framework.tools.replay.h2.InMemoryDatabaseRunner; +import uk.gov.justice.framework.tools.replay.wildfly.WildflyRunner; +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import javax.sql.DataSource; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MultipleEventSteamsIT { + + private static final Boolean SHOULD_LOG_WILDFLY_PROCESS_TO_CONSOLE = true; + private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false; + + private static final int SECONDS_IN_A_MINUTE = 60; + + private static final int WILDFLY_TIMEOUT_IN_SECONDS = SECONDS_IN_A_MINUTE * 5; + private static final int NUMBER_OF_EVENTS_TO_INSERT = 100; + private static final int NUMBER_OF_STREAMS = 10; + + private final LiquibaseRunner liquibaseRunner = new LiquibaseRunner(); + private final DatasourceCreator datasourceCreator = new DatasourceCreator(); + private final WildflyRunner wildflyRunner = new WildflyRunner(); + + private final DataSource viewStoreDataSource = datasourceCreator.createViewStoreDataSource(); + private final DataSource eventStoreDataSource = datasourceCreator.createEventStoreDataSource(); + private final EventInserter eventInserter = new EventInserter(eventStoreDataSource, viewStoreDataSource); + private final UserFactory userFactory = new UserFactory(); + private final InMemoryDatabaseRunner inMemoryDatabaseRunner = new InMemoryDatabaseRunner(); + + @Before + public void startDatabase() { + inMemoryDatabaseRunner.startH2Database(); + } + + @Before + public void runLiquibase() throws Exception { + liquibaseRunner.createEventStoreSchema(eventStoreDataSource); + liquibaseRunner.createViewStoreSchema(viewStoreDataSource); + } + + @After + public void stopDB() throws Exception { + inMemoryDatabaseRunner.stopH2Database(); + } + + @Test + public void shouldInsertEventsIntoMultipleStreams() throws Exception { + + final String eventName = "framework.update-user"; + + final List allInsertedUsers = new ArrayList<>(); + + for(int i = 0; i < NUMBER_OF_STREAMS; i++) { + final UUID streamId = randomUUID(); + System.out.println(format("Inserting %d events into stream %s", NUMBER_OF_EVENTS_TO_INSERT, streamId)); + final List users = userFactory.createSomeUsers(NUMBER_OF_EVENTS_TO_INSERT); + final List someEvents = userFactory.convertToEvents(users, eventName, streamId); + + eventInserter.insertEventsIntoVewstore( + streamId, + someEvents); + + allInsertedUsers.addAll(users); + } + + final List insertedEvents = eventInserter.getAllFromEventStore().collect(toList()); + + System.out.println(format("%d events inserted into view store in %d streams", insertedEvents.size(), NUMBER_OF_STREAMS)); + + final boolean wildflyRanSuccessfully = wildflyRunner.run( + WILDFLY_TIMEOUT_IN_SECONDS, + SHOULD_LOG_WILDFLY_PROCESS_TO_CONSOLE, + ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY + ); + + assertTrue("Wildfly process exited abnormally", wildflyRanSuccessfully); + + final List usersFromViewStore = eventInserter.getUsersFromViewStore(); + + allInsertedUsers.forEach(usersFromViewStore::remove); + + assertTrue(usersFromViewStore.isEmpty()); + } +} diff --git a/replay-tool-test/replay-tool-integration-test/src/test/resources/standalone-ds.xml b/replay-tool-test/replay-tool-integration-test/src/test/resources/standalone-ds.xml index 55f854b..132a896 100644 --- a/replay-tool-test/replay-tool-integration-test/src/test/resources/standalone-ds.xml +++ b/replay-tool-test/replay-tool-integration-test/src/test/resources/standalone-ds.xml @@ -83,13 +83,13 @@ - + - + - + @@ -124,7 +124,7 @@ jdbc:h2:tcp://localhost:8092/mem:eventstore;MVCC=true 3 - 1000 + 20 true @@ -138,7 +138,7 @@ jdbc:h2:tcp://localhost:8092/mem:viewstore;MVCC=true 3 - 1000 + 20 true @@ -152,7 +152,7 @@ jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE 3 - 1000 + 20 true diff --git a/replay-tool-test/replay-tool-it-example-listener/src/main/java/uk/gov/justice/framework/tools/listener/FrameworkToolsTestListener.java b/replay-tool-test/replay-tool-it-example-listener/src/main/java/uk/gov/justice/framework/tools/listener/FrameworkToolsTestListener.java index abe9e48..05620cc 100644 --- a/replay-tool-test/replay-tool-it-example-listener/src/main/java/uk/gov/justice/framework/tools/listener/FrameworkToolsTestListener.java +++ b/replay-tool-test/replay-tool-it-example-listener/src/main/java/uk/gov/justice/framework/tools/listener/FrameworkToolsTestListener.java @@ -14,6 +14,7 @@ import javax.inject.Inject; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; @ServiceComponent(value = EVENT_LISTENER) public class FrameworkToolsTestListener { @@ -24,13 +25,18 @@ public class FrameworkToolsTestListener { @Inject private ObjectMapper objectMapper; + @Inject + private Logger logger; + @Handles("framework.update-user") public void handle(final JsonEnvelope envelope) { + logger.info("Replaying event: " + envelope); testViewstoreRepository.save(fromJsonEnvelope(envelope)); } private User fromJsonEnvelope(final JsonEnvelope envelope) { + final String payload = envelope.payloadAsJsonObject().toString(); try {