From 0cd9e44cf758327494e6cc471aaae5b6d3199319 Mon Sep 17 00:00:00 2001 From: Mohamed Farouk Date: Thu, 14 Mar 2019 12:52:06 +0000 Subject: [PATCH] add stream transformation support for linked events --- CHANGELOG.md | 3 + .../justice/event/tool/PassesDeterminer.java | 6 + .../event/tool/StartTransformation.java | 19 +- .../event/tool/PassesDeterminerTest.java | 1 + .../event/tool/StartTransformationTest.java | 86 +++++--- .../task/StreamTransformationTaskTest.java | 7 + pom.xml | 7 +- .../sample-transformations/pom.xml | 5 + .../stream-transformation-it/pom.xml | 23 +- .../tools/transformation/DatabaseUtils.java | 33 ++- .../tools/transformation/EventLogBuilder.java | 35 ++- .../transformation/SwarmStarterUtil.java | 71 +++---- .../TestLinkedEventJdbcRepository.java | 77 +++++++ .../StreamTransformationIT.java | 200 +++++++++++++++--- .../StreamTransformationMoveIT.java | 61 ++++-- stream-transformation-tool-service/pom.xml | 7 + .../EventStreamTransformationService.java | 7 +- ...inkedEventStreamTransformationService.java | 42 ++++ ...dEventStreamTransformationServiceTest.java | 34 +++ 19 files changed, 589 insertions(+), 135 deletions(-) create mode 100644 stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/TestLinkedEventJdbcRepository.java create mode 100644 stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationService.java create mode 100644 stream-transformation-tool-service/src/test/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationServiceTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ababe7..b71b8c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +## [5.2.0] - 2019-03-20 +- Add support for linked event synch with event log after transformation + ## [5.1.0] - 2018-12-12 ### Fixed diff --git a/event-tool/src/main/java/uk/gov/justice/event/tool/PassesDeterminer.java b/event-tool/src/main/java/uk/gov/justice/event/tool/PassesDeterminer.java index c5b08f8..8a80036 100644 --- a/event-tool/src/main/java/uk/gov/justice/event/tool/PassesDeterminer.java +++ b/event-tool/src/main/java/uk/gov/justice/event/tool/PassesDeterminer.java @@ -27,4 +27,10 @@ public int getPassValue() { public boolean isLastElementInPasses() { return eventTransformationRegistry.getPasses().size() == passValue.get(); } + + public int passCount(){ + return eventTransformationRegistry.getPasses().size(); + } } + + diff --git a/event-tool/src/main/java/uk/gov/justice/event/tool/StartTransformation.java b/event-tool/src/main/java/uk/gov/justice/event/tool/StartTransformation.java index 8428880..bbf0203 100644 --- a/event-tool/src/main/java/uk/gov/justice/event/tool/StartTransformation.java +++ b/event-tool/src/main/java/uk/gov/justice/event/tool/StartTransformation.java @@ -7,6 +7,7 @@ import uk.gov.justice.event.tool.task.StreamTransformationTask; import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository; import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService; +import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService; import java.io.File; import java.io.IOException; @@ -54,7 +55,10 @@ public class StartTransformation implements ManagedTaskListener { @Inject private PassesDeterminer passesDeterminer; - Deque> outstandingTasks = new LinkedBlockingDeque<>(); + @Inject + private LinkedEventStreamTransformationService linkedEventStreamTransformationService; + + final Deque> outstandingTasks = new LinkedBlockingDeque<>(); boolean allTasksCreated = false; @@ -125,6 +129,7 @@ public void taskAborted(final Future futureTask, final ManagedExecutorService logger.error(String.format("Aborted Transformation task: '%s'", throwable.getMessage())); removeOutstandingTask(futureTask); shutDownIfFinished(); + truncateAndPopulateLinkedEvents(); } private void removeOutstandingTask(final Future futureTask) { @@ -135,6 +140,7 @@ private void nextPassIfFinished() { if (isTaskFinished()) { final boolean isLastElementInPasses = passesDeterminer.isLastElementInPasses(); if (isLastElementInPasses) { + truncateAndPopulateLinkedEvents(); shutdown(); } else { createTransformationTasks(passesDeterminer.getNextPassValue()); @@ -176,6 +182,17 @@ private void shutdown() { } } + private void truncateAndPopulateLinkedEvents() { + + logger.info("-------------- Truncating the Linked Events Log after complete transformation --------------"); + + linkedEventStreamTransformationService.truncateLinkedEvents(); + + logger.info("-------------- Populating the Linked Events Log --------------"); + + linkedEventStreamTransformationService.populateLinkedEvents(); + } + private void checkForMainProcessFile() { if (System.getProperty(MAIN_PROCESS_FILE) == null) { logger.warn(NO_PROCESS_FILE_WARNING); diff --git a/event-tool/src/test/java/uk/gov/justice/event/tool/PassesDeterminerTest.java b/event-tool/src/test/java/uk/gov/justice/event/tool/PassesDeterminerTest.java index 5088eea..165a3f4 100644 --- a/event-tool/src/test/java/uk/gov/justice/event/tool/PassesDeterminerTest.java +++ b/event-tool/src/test/java/uk/gov/justice/event/tool/PassesDeterminerTest.java @@ -37,5 +37,6 @@ public void shouldIncrementPassValue() throws Exception { assertThat(passesDeterminer.getPassValue(), is(1)); assertThat(passesDeterminer.getNextPassValue(), is(2)); assertThat(passesDeterminer.getNextPassValue(), is(3)); + assertThat(passesDeterminer.passCount(), is(0)); } } \ No newline at end of file diff --git a/event-tool/src/test/java/uk/gov/justice/event/tool/StartTransformationTest.java b/event-tool/src/test/java/uk/gov/justice/event/tool/StartTransformationTest.java index 88d5a59..a8d1ec6 100644 --- a/event-tool/src/test/java/uk/gov/justice/event/tool/StartTransformationTest.java +++ b/event-tool/src/test/java/uk/gov/justice/event/tool/StartTransformationTest.java @@ -1,12 +1,15 @@ package uk.gov.justice.event.tool; +import static java.lang.String.format; import static java.util.UUID.randomUUID; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import uk.gov.justice.event.tool.task.StreamTransformationTask; @@ -14,6 +17,7 @@ import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream; import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry; import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService; +import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService; import java.lang.reflect.Field; import java.util.UUID; @@ -29,7 +33,6 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import org.slf4j.Logger; @@ -48,6 +51,9 @@ public class StartTransformationTest { @Mock private EventStreamTransformationService eventStreamTransformationService; + @Mock + private LinkedEventStreamTransformationService linkedEventStreamTransformationService; + @InjectMocks private StartTransformation startTransformation; @@ -89,12 +95,24 @@ public void shouldCreateTasksForEachStream() { final UUID streamId_2 = randomUUID(); when(eventRepository.getAllActiveStreamIds()).thenReturn(Stream.of(streamId_1, streamId_2)); - when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(mock(Future.class)); + final Future future = mock(Future.class); + final Future future2 = mock(Future.class); + + when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2); startTransformation.go(); assertThat(startTransformation.outstandingTasks.size(), is(2)); assertTrue(startTransformation.allTasksCreated); + + when(passesDeterminer.isLastElementInPasses()).thenReturn(true); + startTransformation.taskDone(future, null, null, null); + startTransformation.taskDone(future2, null, null, null); + + assertThat(startTransformation.outstandingTasks.size(), is(0)); + + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); } @Test @@ -117,8 +135,14 @@ public void shouldRemoveFinishedTasks() { startTransformation.taskAborted(future, null, null, mock(Throwable.class)); assertThat(startTransformation.outstandingTasks.size(), is(1)); + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); + startTransformation.taskDone(future2, null, null, null); assertThat(startTransformation.outstandingTasks.size(), is(0)); + + verify(linkedEventStreamTransformationService,times(2)).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService,times(2)).populateLinkedEvents(); } @Test @@ -152,12 +176,16 @@ public void shouldRemoveFinishedTaskForAllPasses() { assertThat(startTransformation.outstandingTasks.size(), is(2)); startTransformation.taskAborted(future, null, null, mock(Throwable.class)); + assertThat(startTransformation.outstandingTasks.size(), is(1)); + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); startTransformation.taskDone(future2, null, null, null); - startTransformation.taskDone(future3, null, null, null); - assertThat(startTransformation.outstandingTasks.size(), is(0)); + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); + } @@ -165,12 +193,18 @@ public void shouldRemoveFinishedTaskForAllPasses() { public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsNull() throws Exception { streamsProcessedCountStepInfo.set(startTransformation, null); startTransformation.go(); + + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); } @Test(expected = IllegalArgumentException.class) public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsZero() throws Exception { streamsProcessedCountStepInfo.set(startTransformation, 0); startTransformation.go(); + + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); } @Test @@ -186,35 +220,20 @@ public void shouldNotThrowExceptionWhenstreamsProcessedCountStepInfoIsNotZero() when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); - final Future future = mock(Future.class); + final Future future1 = mock(Future.class); final Future future2 = mock(Future.class); final Future future3 = mock(Future.class); - when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); + when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3); streamsProcessedCountStepInfo.set(startTransformation, 10); startTransformation.go(); - } - - @Test - public void shouldNotLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsZero() throws IllegalArgumentException, IllegalAccessException { - final UUID streamId_1 = randomUUID(); - final UUID streamId_2 = randomUUID(); - final UUID streamId_3 = randomUUID(); - - when(eventRepository.getAllActiveStreamIds()) - .thenReturn(Stream.of(streamId_1, streamId_2)) - .thenReturn(Stream.of(streamId_3)); - - when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); - when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); - - final Future future = mock(Future.class); - final Future future2 = mock(Future.class); - final Future future3 = mock(Future.class); - when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); + startTransformation.taskDone(future1, null, null, null); + startTransformation.taskDone(future2, null, null, null); + startTransformation.taskDone(future3, null, null, null); + assertThat(startTransformation.outstandingTasks.size(), is(0)); - streamsProcessedCountStepInfo.set(startTransformation, 10); - startTransformation.go(); + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); } @Test @@ -230,13 +249,22 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); - final Future future = mock(Future.class); + final Future future1 = mock(Future.class); final Future future2 = mock(Future.class); final Future future3 = mock(Future.class); - when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); + when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3); when(stopWatch.getTime()).thenReturn(12222222l); streamsProcessedCountStepInfo.set(startTransformation, 2); + startTransformation.go(); + verify(logger).info("Pass 1 - Streams count: 4 - time(ms): 12222222"); + startTransformation.taskDone(future1, null, null, null); + startTransformation.taskDone(future2, null, null, null); + startTransformation.taskDone(future3, null, null, null); + assertThat(startTransformation.outstandingTasks.size(), is(0)); + + verify(linkedEventStreamTransformationService).truncateLinkedEvents(); + verify(linkedEventStreamTransformationService).populateLinkedEvents(); } } diff --git a/event-tool/src/test/java/uk/gov/justice/event/tool/task/StreamTransformationTaskTest.java b/event-tool/src/test/java/uk/gov/justice/event/tool/task/StreamTransformationTaskTest.java index 0c04283..d659194 100644 --- a/event-tool/src/test/java/uk/gov/justice/event/tool/task/StreamTransformationTaskTest.java +++ b/event-tool/src/test/java/uk/gov/justice/event/tool/task/StreamTransformationTaskTest.java @@ -2,6 +2,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.mockito.Matchers.isNotNull; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.verify; import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException; @@ -43,4 +46,8 @@ public void shouldReturnTransformationListener() { assertThat(streamTransformationTask.getManagedTaskListener(), is(transformationListener)); } + @Test + public void shouldReturnExecutionProperties() { + assertThat(streamTransformationTask.getExecutionProperties(), is(nullValue())); + } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 766e846..a785529 100644 --- a/pom.xml +++ b/pom.xml @@ -22,13 +22,12 @@ stream-transformation-tool - - 5.0.4 - 1.0.4 + 3.1.1 + 5.1.2 + 1.1.7 1.28.0 1.16.1 1.18.1 - 2017.11.0 77 1.13.1 diff --git a/stream-transformation-test/sample-transformations/pom.xml b/stream-transformation-test/sample-transformations/pom.xml index 4835f93..7626e26 100644 --- a/stream-transformation-test/sample-transformations/pom.xml +++ b/stream-transformation-test/sample-transformations/pom.xml @@ -18,6 +18,11 @@ stream-transformation-tool-api ${project.version} + + uk.gov.justice.framework-api + framework-api-core + ${framework-api.version} + uk.gov.justice.services test-utils-core diff --git a/stream-transformation-test/stream-transformation-it/pom.xml b/stream-transformation-test/stream-transformation-it/pom.xml index 417ba49..b52d2e1 100644 --- a/stream-transformation-test/stream-transformation-it/pom.xml +++ b/stream-transformation-test/stream-transformation-it/pom.xml @@ -23,6 +23,11 @@ event-repository-liquibase ${event-store.version} + + uk.gov.justice.event-store + linked-event-processor + ${event-store.version} + uk.gov.justice.services test-utils-core @@ -53,12 +58,6 @@ org.hamcrest hamcrest-library - - uk.gov.justice.utils - test-utils-logging-simple - pom - test - org.apache.commons commons-dbcp2 @@ -67,6 +66,18 @@ commons-io commons-io + + org.liquibase + liquibase-core + + + uk.gov.justice.utils + test-utils-logging-simple + pom + test + + + diff --git a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/DatabaseUtils.java b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/DatabaseUtils.java index dc89805..ee2fd31 100644 --- a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/DatabaseUtils.java +++ b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/DatabaseUtils.java @@ -5,8 +5,6 @@ import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.ZonedDateTime; import java.util.UUID; @@ -17,35 +15,34 @@ public class DatabaseUtils { - private TestEventLogJdbcRepository eventLogJdbcRepository; - private TestEventStreamJdbcRepository eventStreamJdbcRepository; - + private final TestEventLogJdbcRepository eventLogJdbcRepository; + private final TestEventStreamJdbcRepository eventStreamJdbcRepository; private final LiquibaseUtil liquibaseUtil = new LiquibaseUtil(); + private final DataSource dataSource; public DatabaseUtils() throws SQLException, LiquibaseException { - final DataSource dataSource = liquibaseUtil.initEventStoreDb(); + dataSource = liquibaseUtil.initEventStoreDb(); eventLogJdbcRepository = new TestEventLogJdbcRepository(dataSource); eventStreamJdbcRepository = new TestEventStreamJdbcRepository(dataSource); } + public DataSource getDataSource() { + return dataSource; + } + public void dropAndUpdateLiquibase() throws SQLException, LiquibaseException { liquibaseUtil.dropAndUpdate(); } - public void resetDatabase() throws SQLException { - try (final Connection connection = eventLogJdbcRepository.getDataSource().getConnection(); - final PreparedStatement preparedStatement = connection.prepareStatement("delete from event_log")) { - preparedStatement.executeUpdate(); - } - try (final Connection connection = eventStreamJdbcRepository.getDatasource().getConnection(); - final PreparedStatement preparedStatement = connection.prepareStatement("delete from event_stream")) { + public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt) throws InvalidPositionException { + final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt); - preparedStatement.executeUpdate(); - } + eventLogJdbcRepository.insert(event); + eventStreamJdbcRepository.insert(streamId); } - public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt) throws InvalidPositionException { - final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt); + public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final long eventNumber) throws InvalidPositionException { + final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt, eventNumber); eventLogJdbcRepository.insert(event); eventStreamJdbcRepository.insert(streamId); @@ -58,6 +55,4 @@ public TestEventLogJdbcRepository getEventLogJdbcRepository() { public TestEventStreamJdbcRepository getEventStreamJdbcRepository() { return eventStreamJdbcRepository; } - - } diff --git a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/EventLogBuilder.java b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/EventLogBuilder.java index aabfec3..916f63c 100644 --- a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/EventLogBuilder.java +++ b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/EventLogBuilder.java @@ -1,5 +1,6 @@ package uk.gov.justice.framework.tools.transformation; +import static java.util.Optional.of; import static uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder.envelope; import static uk.gov.justice.services.test.utils.core.messaging.MetadataBuilderFactory.metadataWithRandomUUID; @@ -15,7 +16,6 @@ public class EventLogBuilder { private EventLogBuilder() { } - public static Event eventLogFrom( final String eventName, final Long sequenceId, @@ -46,4 +46,37 @@ public static Event eventLogFrom( payload, createdAt); } + + public static Event eventLogFrom( + final String eventName, + final Long sequenceId, + final UUID streamId, + final ZonedDateTime createdAt, + final long eventNumber) { + final JsonEnvelope jsonEnvelope = envelope() + .with(metadataWithRandomUUID(eventName) + .createdAt(createdAt) + .withVersion(sequenceId) + .withStreamId(streamId) + .withSource("sample") + ) + .withPayloadOf("test", "a string") + .build(); + + final Metadata metadata = jsonEnvelope.metadata(); + final UUID id = metadata.id(); + + final String name = metadata.name(); + final String payload = jsonEnvelope.payloadAsJsonObject().toString(); + + return new Event( + id, + streamId, + sequenceId, + name, + metadata.asJsonObject().toString(), + payload, + createdAt, + of(eventNumber)); + } } diff --git a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/SwarmStarterUtil.java b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/SwarmStarterUtil.java index 7f269ad..bdecb43 100644 --- a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/SwarmStarterUtil.java +++ b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/SwarmStarterUtil.java @@ -18,39 +18,12 @@ public class SwarmStarterUtil { private static final Logger LOGGER = getLogger(SwarmStarterUtil.class); - private String EVENT_TOOL_JAR_LOCATION; - private String STREAM_JAR_LOCATION; - private String STAND_ALOND_DS_LOCATION; - private String MAIN_PROCESS_FILE_PATH; - - public SwarmStarterUtil() throws IOException { - getResources(); - } - - public void runCommand(final boolean enableRemoteDebugging, final long timeoutInSeconds) throws IOException { - runCommand(enableRemoteDebugging, timeoutInSeconds, 5l, "2048Mb"); - } public void runCommand(final boolean enableRemoteDebugging, final long timeoutInSeconds, final long streamCountReportingInterval, final String memoryOptions) throws IOException { final String memoryParmeter = format("-DXmx=%s", memoryOptions); final String streamCountReportingIntervalParameter = format("-DstreamCountReportingInterval=%s", streamCountReportingInterval); - final String command = format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s -c %s %s %s", - debug(enableRemoteDebugging), MAIN_PROCESS_FILE_PATH, STREAM_JAR_LOCATION, EVENT_TOOL_JAR_LOCATION, STAND_ALOND_DS_LOCATION, - streamCountReportingIntervalParameter, memoryParmeter); - - startWildfly(timeoutInSeconds, command); - } - - private String debug(boolean enableRemoteDebugging) { - String debug = ""; - if (enableRemoteDebugging) { - debug = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"; - } - return debug; - } - - private void startWildfly(long timeoutInSeconds, String command) throws IOException { + final String command = createCommandToExecuteTransformationTool(enableRemoteDebugging, streamCountReportingIntervalParameter, memoryParmeter); final Process exec = execute(command); final BufferedReader reader = new BufferedReader(new InputStreamReader(exec.getInputStream())); @@ -64,8 +37,41 @@ private void startWildfly(long timeoutInSeconds, String command) throws IOExcept waitUntilDone(exec, timeoutInSeconds); } - private String getResource(final String pattern) { + private String createCommandToExecuteTransformationTool(final boolean enableRemoteDebugging, + final String streamCountReportingIntervalParameter, + final String memoryParmeter) throws IOException { + final String eventToolJarLocation = getResource("event-tool*.jar"); + final String streamJarLocation = getResource("stream-transformations*.jar"); + final String standaloneDSLocation = getResource("standalone-ds.xml"); + final String mainProcessFilePath = Paths.get(File.createTempFile("mainProcessFile", "tmp").toURI()).toAbsolutePath().toString(); + String debug = ""; + + if (enableRemoteDebugging) { + debug = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"; + } + + return commandFrom(debug, mainProcessFilePath, streamJarLocation, eventToolJarLocation, standaloneDSLocation, streamCountReportingIntervalParameter, memoryParmeter); + } + + private String commandFrom(final String debug, + final String mainProcessFilePath, + final String streamJarLocation, + final String eventToolJarLocation, + final String standaloneDSLocation, + final String streamCountReportingIntervalParameter, + final String memoryParmeter) throws IOException { + return format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s -c %s %s %s", + debug, + mainProcessFilePath, + streamJarLocation, + eventToolJarLocation, + standaloneDSLocation, + streamCountReportingIntervalParameter, + memoryParmeter); + } + + private String getResource(final String pattern) { final File dir = new File(this.getClass().getClassLoader().getResource("").getPath()); final FileFilter fileFilter = new WildcardFileFilter(pattern); return dir.listFiles(fileFilter)[0].getAbsolutePath(); @@ -114,11 +120,4 @@ private Process execute(final String command) { throw new SwarmStarterException(format("Failed to execute external process '%s'", command), e); } } - - private void getResources() throws IOException { - EVENT_TOOL_JAR_LOCATION = getResource("event-tool*.jar"); - STREAM_JAR_LOCATION = getResource("stream-transformations*.jar"); - STAND_ALOND_DS_LOCATION = getResource("standalone-ds.xml"); - MAIN_PROCESS_FILE_PATH = Paths.get(File.createTempFile("mainProcessFile", "tmp").toURI()).toAbsolutePath().toString(); - } } diff --git a/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/TestLinkedEventJdbcRepository.java b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/TestLinkedEventJdbcRepository.java new file mode 100644 index 0000000..d760d6b --- /dev/null +++ b/stream-transformation-test/stream-transformation-it/src/main/java/uk/gov/justice/framework/tools/transformation/TestLinkedEventJdbcRepository.java @@ -0,0 +1,77 @@ +package uk.gov.justice.framework.tools.transformation; + +import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp; + +import uk.gov.justice.services.eventsourcing.linkedevent.LinkedEventJdbcRepository; +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent; +import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException; +import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper; +import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapper; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Stream; + +import javax.sql.DataSource; + +public class TestLinkedEventJdbcRepository extends LinkedEventJdbcRepository { + private static final String SQL_FIND_BY_STREAM_ID = "SELECT * FROM linked_event WHERE stream_id=? ORDER BY position_in_stream ASC"; + + private final DataSource dbsource; + private static final String COL_ID = "id"; + private static final String COL_STREAM_ID = "stream_id"; + private static final String COL_POSITION = "position_in_stream"; + private static final String COL_EVENT_NUMBER = "event_number"; + private static final String COL_PREVIOUS_EVENT_NUMBER = "previous_event_number"; + private static final String COL_DATE_CREATED = "date_created"; + + public TestLinkedEventJdbcRepository(final DataSource dataSource) { + this.dbsource = dataSource; + } + + public DataSource getDatasource() { + return dbsource; + } + + + public Stream findByStreamIdOrderByPositionAsc(final UUID streamId) throws SQLException { + final JdbcRepositoryHelper jdbcRepositoryHelper = new JdbcRepositoryHelper(); + + final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDatasource(), SQL_FIND_BY_STREAM_ID); + preparedStatementWrapper.setObject(1, streamId); + + return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, entityFromFunction1()); + } + + + + public long linkedEventsCount(final UUID streamId) throws SQLException { + final JdbcRepositoryHelper jdbcRepositoryHelper = new JdbcRepositoryHelper(); + + final PreparedStatementWrapper preparedStatementWrapper = jdbcRepositoryHelper.preparedStatementWrapperOf(getDatasource(), SQL_FIND_BY_STREAM_ID); + preparedStatementWrapper.setObject(1, streamId); + + return jdbcRepositoryHelper.streamOf(preparedStatementWrapper, entityFromFunction1()).count(); + } + + protected Function entityFromFunction1() { + return resultSet -> { + try { + return new LinkedEvent((UUID) resultSet.getObject(COL_ID), + (UUID) resultSet.getObject(COL_STREAM_ID), + resultSet.getLong(COL_POSITION), + resultSet.getString("name"), + resultSet.getString("payload"), + resultSet.getString("metadata"), + fromSqlTimestamp(resultSet.getTimestamp(COL_DATE_CREATED)), + resultSet.getLong(COL_EVENT_NUMBER), + resultSet.getLong(COL_PREVIOUS_EVENT_NUMBER)); + } catch (final SQLException e) { + throw new JdbcRepositoryException(e); + } + }; + } +} diff --git a/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationIT.java b/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationIT.java index bd4390b..e26556f 100644 --- a/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationIT.java +++ b/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationIT.java @@ -7,6 +7,7 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent; import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream; import java.time.ZonedDateTime; @@ -20,42 +21,64 @@ import org.junit.Test; public class StreamTransformationIT { + private static final long STREAM_COUNT_REPORTING_INTERVAL = 10L; + private static final String MEMORY_OPTIONS_PARAMETER = "2048M"; private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false; - private static final long STREAMS_PROCESSED_COUNT_STEP_INFO = 100; private static final int WILDFLY_TIMEOUT_IN_SECONDS = 60; - private static final String MEMORY_OPTIONS = "2048Mb"; - private UUID STREAM_ID = UUID.randomUUID(); private SwarmStarterUtil swarmStarterUtil; private DatabaseUtils databaseUtils; + private TestLinkedEventJdbcRepository linkedEventJdbcRepository; + @Before public void setUp() throws Exception { swarmStarterUtil = new SwarmStarterUtil(); databaseUtils = new DatabaseUtils(); databaseUtils.dropAndUpdateLiquibase(); - } - - @After - public void cleanup() throws Exception { - databaseUtils.resetDatabase(); + linkedEventJdbcRepository = new TestLinkedEventJdbcRepository(databaseUtils.getDataSource()); } @Test public void shouldTransformEventInEventStore() throws Exception { final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1); - databaseUtils.insertEventLogData("sample.events.name", STREAM_ID, 1L, createdAt); - databaseUtils.insertEventLogData("sample.v2.events.name", STREAM_ID, 2L, createdAt); + databaseUtils.insertEventLogData("sample.events.name", STREAM_ID, 1L, createdAt, 1); + databaseUtils.insertEventLogData("sample.v2.events.name", STREAM_ID, 2L, createdAt, 2); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); - assertThat(eventStoreTransformedEventPresent("sample.events.transformedName"), is(true)); + final String transformedEventName = "sample.events.transformedName"; + assertThat(eventStoreTransformedEventPresent(transformedEventName), is(true)); assertThat(originalEventStreamIsActive(), is(true)); assertThat(clonedStreamAvailableAndActive(), is(false)); + assertThat(totalEventCount(), is(5L)); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(linkedEventsCount, is(2L)); + + final Stream linkedEventStream = linkedEventJdbcRepository.findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + if (linkedEvent.getSequenceId() == 1L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is(transformedEventName)); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(6L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + if (linkedEvent.getSequenceId() == 2L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is(transformedEventName)); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(7L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(6L)); + } + + }); } @Test @@ -65,11 +88,29 @@ public void shouldUseTheOriginalCreatedAtDateInTransformation() throws Exception final String eventName = "sample.events.check-date-not-transformed"; databaseUtils.insertEventLogData(eventName, STREAM_ID, 1L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); - final List transformedEvents = getTransformedEvents(eventName); + final List transformedEvents = getTransformedEvents(); transformedEvents.forEach(event -> assertThat(event.getCreatedAt(), is(createdAt))); + assertThat(totalEventCount(), is(3L)); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(linkedEventsCount, is(1L)); + + final Stream linkedEventStream = linkedEventJdbcRepository + .findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + + if (linkedEvent.getSequenceId() == 1L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.events.check-date-not-transformed")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(4L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + }); } @Test @@ -79,9 +120,14 @@ public void shouldDeactivateStreamInEventStore() throws Exception { databaseUtils.insertEventLogData("sample.deactivate.events.name", STREAM_ID, 1L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(originalEventStreamIsActive(), is(false)); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(totalEventCount(), is(1L)); + + assertThat(linkedEventsCount, is(0L)); } @Test @@ -91,12 +137,17 @@ public void shouldPerformCustomActionOnStreamInEventStore() throws Exception { databaseUtils.insertEventLogData("sample.event.name.archived.old.release", STREAM_ID, 1L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(eventStoreTransformedEventPresent("sample.event.name"), is(true)); assertThat(eventStoreEventIsPresent("sample.event.name.archived.old.release"), is(false)); assertThat(streamAvailableAndActive(STREAM_ID), is(false)); assertThat(clonedStreamAvailableAndActive(), is(false)); + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(totalEventCount(), is(1L)); + + assertThat(linkedEventsCount, is(0L)); + } @Test @@ -106,11 +157,30 @@ public void shouldTransformEventByPass() throws Exception { databaseUtils.insertEventLogData("sample.events.name.sequence", STREAM_ID, 1L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(eventStoreTransformedEventPresent("sample.events.name.sequence2"), is(true)); assertThat(originalEventStreamIsActive(), is(true)); assertThat(clonedStreamAvailableAndActive(), is(false)); + + assertThat(totalEventCount(), is(5L)); + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + + assertThat(linkedEventsCount, is(1L)); + + final Stream linkedEventStream = linkedEventJdbcRepository + .findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + + if (linkedEvent.getSequenceId() == 1L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.events.name.sequence2")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(7L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + }); } @Test @@ -118,15 +188,41 @@ public void shouldTransformEventAndAddToSameStreamUsingSetStreamIdMethod() throw final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1); - databaseUtils.insertEventLogData("sample.transformation.with.stream.id", fromString("80764cb1-a031-4328-b59e-6c18b0974a84"), 1L, createdAt); - databaseUtils.insertEventLogData("sample.transformation.should.not.transform", fromString("80764cb1-a031-4328-b59e-6c18b0974a84"), 2L, createdAt); + final String localStreamId = "80764cb1-a031-4328-b59e-6c18b0974a84"; + databaseUtils.insertEventLogData("sample.transformation.with.stream.id", fromString(localStreamId), 1L, createdAt); + databaseUtils.insertEventLogData("sample.transformation.should.not.transform", fromString(localStreamId), 2L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); - assertThat(eventStoreTransformedEventPresent("sample.transformation.with.stream.id.transformed", fromString("80764cb1-a031-4328-b59e-6c18b0974a84")), is(true)); - assertThat(eventStoreTransformedEventPresent("sample.transformation.should.not.transform", fromString("80764cb1-a031-4328-b59e-6c18b0974a84")), is(true)); + assertThat(eventStoreTransformedEventPresent("sample.transformation.with.stream.id.transformed", fromString(localStreamId)), is(true)); + assertThat(eventStoreTransformedEventPresent("sample.transformation.should.not.transform", fromString(localStreamId)), is(true)); assertThat(totalStreamCount(), is(2L)); + assertThat(totalEventCount(), is(5L)); + assertThat(clonedStreamAvailableAndActive(), is(false)); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(fromString(localStreamId)); + assertThat(linkedEventsCount, is(2L)); + + final Stream linkedEventStream = linkedEventJdbcRepository + .findByStreamIdOrderByPositionAsc(fromString(localStreamId)); + + linkedEventStream.forEach(linkedEvent -> { + if (linkedEvent.getSequenceId() == 1L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.transformation.with.stream.id.transformed")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(6L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + if (linkedEvent.getSequenceId() == 2L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.transformation.should.not.transform")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(7L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(6L)); + } + }); } @Test @@ -136,14 +232,60 @@ public void shouldTransformAndMoveEventInEventStoreAndPreserveEventSequenceInThe databaseUtils.insertEventLogData("sample.events.name.pass1.sequence", STREAM_ID, 1L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(totalStreamCount(), is(2L)); - assertThat(eventStoreTransformedEventPresentAndSequenceCorrect("sample.events.name.pass1.sequence2", fromString("80764cb1-a031-4328-b59e-6c18b0974a84") ,1L), is(true)); - assertThat(eventStoreTransformedEventPresentAndSequenceCorrect("sample.events.name.pass1.sequence3", fromString("80764cb1-a031-4328-b59e-6c18b0974a84"), 2L), is(true)); + final String stream2 = "80764cb1-a031-4328-b59e-6c18b0974a84"; + assertThat(eventStoreTransformedEventPresentAndSequenceCorrect("sample.events.name.pass1.sequence2", fromString(stream2), 1L), is(true)); + assertThat(eventStoreTransformedEventPresentAndSequenceCorrect("sample.events.name.pass1.sequence3", fromString(stream2), 2L), is(true)); assertThat(eventStoreTransformedEventPresentAndSequenceCorrect("sample.events.name.pass1.sequence1", STREAM_ID ,1L), is(true)); + + assertThat(totalEventCount(), is(3L)); + + final long linkedEventsStream1Count = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(linkedEventsStream1Count, is(1L)); + + final long linkedEventsStream2Count = linkedEventJdbcRepository.linkedEventsCount(fromString(stream2)); + assertThat(linkedEventsStream2Count, is(2L)); + + final Stream linkedEventStream = linkedEventJdbcRepository + .findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + if (linkedEvent.getSequenceId() == 1L) { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence1")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(8L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + + }); + + final Stream linkedEventStream2 = linkedEventJdbcRepository + .findByStreamIdOrderByPositionAsc(fromString(stream2)); + linkedEventStream2.forEach(linkedEvent -> { + if (linkedEvent.getSequenceId() == 1L) + + { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence2")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(10L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(8L)); + } + if (linkedEvent.getSequenceId() == 2L) + + { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence3")); + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(11L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(10L)); + } + }); } private boolean clonedStreamAvailableAndActive() { @@ -206,7 +348,13 @@ private long totalStreamCount() { return databaseUtils.getEventStreamJdbcRepository().findAll().count(); } - private List getTransformedEvents(final String transformedEventName) { + + private long totalEventCount() { + return databaseUtils.getEventLogJdbcRepository().findAll().count(); + } + + + private List getTransformedEvents() { final Stream eventLogs = databaseUtils.getEventLogJdbcRepository().findAll(); return eventLogs.filter(item -> item.getStreamId().equals(STREAM_ID)).collect(toList()); } diff --git a/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationMoveIT.java b/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationMoveIT.java index 457e824..53f7a7e 100644 --- a/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationMoveIT.java +++ b/stream-transformation-test/stream-transformation-it/src/test/java/uk/gov/justice/framework/tools/transformation/StreamTransformationMoveIT.java @@ -7,6 +7,7 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent; import java.time.ZonedDateTime; import java.util.Optional; @@ -20,29 +21,25 @@ public class StreamTransformationMoveIT { private static final UUID STREAM_ID = randomUUID(); + private static final long STREAM_COUNT_REPORTING_INTERVAL = 10L; + private static final String MEMORY_OPTIONS_PARAMETER = "2048M"; private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false; - private static final long STREAMS_PROCESSED_COUNT_STEP_INFO = 100; - private static final int WILDFLY_TIMEOUT_IN_SECONDS = 60; - private static final String MEMORY_OPTIONS = "2048Mb"; - private SwarmStarterUtil swarmStarterUtil; private DatabaseUtils databaseUtils; + private TestLinkedEventJdbcRepository linkedEventJdbcRepository; + @Before public void setUp() throws Exception { swarmStarterUtil = new SwarmStarterUtil(); databaseUtils = new DatabaseUtils(); databaseUtils.dropAndUpdateLiquibase(); - } - - @After - public void cleanup() throws Exception { - databaseUtils.resetDatabase(); + linkedEventJdbcRepository = new TestLinkedEventJdbcRepository(databaseUtils.getDataSource()); } @Test @@ -53,11 +50,32 @@ public void shouldMoveEventInEventStore() throws Exception { databaseUtils.insertEventLogData("sample.transformation.move.1", STREAM_ID, 1L, createdAt); databaseUtils.insertEventLogData("sample.events.name.should.not.be.transformed", STREAM_ID, 2L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(totalStreamCount(), is(5L)); assertThat(totalClonedStreamsCreated(), is(3L)); - assertTrue(eventNameExist("sample.transformation.move.4")); + + final String transformedEventName = "sample.transformation.move.4"; + assertTrue(eventNameExist(transformedEventName)); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(linkedEventsCount, is(1L)); + + final Stream linkedEventStream = linkedEventJdbcRepository.findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + if (linkedEvent.getName() == "sample.events.name.should.not.be.transformed") { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(11L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + if (linkedEvent.getName() == "sample.transformation.move.4") { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(15L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(11L)); + } + + }); } @Test @@ -68,11 +86,30 @@ public void shouldMoveEventInEventStoreWithoutBackup() throws Exception { databaseUtils.insertEventLogData("sample.transformation.move.without.backup", STREAM_ID, 1L, createdAt); databaseUtils.insertEventLogData("sample.events.name.passer1", STREAM_ID, 2L, createdAt); - swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS); + swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER); assertThat(totalStreamCount(), is(2L)); assertThat(totalClonedStreamsCreated(), is(0L)); assertTrue(eventNameExist("sample.transformation.move.without.backup.transformed")); + + final long linkedEventsCount = linkedEventJdbcRepository.linkedEventsCount(STREAM_ID); + assertThat(linkedEventsCount, is(1L)); + + final Stream linkedEventStream = linkedEventJdbcRepository.findByStreamIdOrderByPositionAsc(STREAM_ID); + + linkedEventStream.forEach(linkedEvent -> { + if (linkedEvent.getName() == "sample.events.name.passer1") { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(3L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(0L)); + } + if (linkedEvent.getName() == "sample.transformation.move.without.backup.transformed") { + assertThat(linkedEvent.getCreatedAt(), is(createdAt)); + assertThat(linkedEvent.getEventNumber().get(), is(4L)); + assertThat(linkedEvent.getPreviousEventNumber(), is(3L)); + } + + }); } private boolean eventNameExist(final String eventName) { diff --git a/stream-transformation-tool-service/pom.xml b/stream-transformation-tool-service/pom.xml index 8dfd8c9..a8c2d9b 100644 --- a/stream-transformation-tool-service/pom.xml +++ b/stream-transformation-tool-service/pom.xml @@ -24,6 +24,12 @@ uk.gov.justice.event-store event-source + ${event-store.version} + + + uk.gov.justice.framework-api + framework-api-event-source + ${framework-api.version} @@ -47,6 +53,7 @@ junit-dataprovider test + diff --git a/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/EventStreamTransformationService.java b/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/EventStreamTransformationService.java index 9f9524e..4e65e60 100644 --- a/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/EventStreamTransformationService.java +++ b/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/EventStreamTransformationService.java @@ -75,7 +75,7 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) { if (action.isTransform()) { final Optional newStreamId = eventTransformationStreamIdFilter.getEventTransformationStreamId(eventTransformations, jsonEnvelopeList); - if (newStreamId.isPresent()) { + if (isNewStreamId(newStreamId, originalStreamId)) { streamMover.transformAndMoveStream(originalStreamId, eventTransformations, newStreamId.get()); } else { streamTransformer.transformStream(originalStreamId, eventTransformations); @@ -102,6 +102,11 @@ private void cloneStream(final UUID originalStreamId) { } } + private boolean isNewStreamId(final Optional newStreamId, + final UUID originalStreamId) { + return newStreamId.isPresent() && !newStreamId.get().equals(originalStreamId); + } + private Set getEventTransformations(final int pass) { return eventTransformationRegistry.getEventTransformationBy(pass); } diff --git a/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationService.java b/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationService.java new file mode 100644 index 0000000..125de2a --- /dev/null +++ b/stream-transformation-tool-service/src/main/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationService.java @@ -0,0 +1,42 @@ +package uk.gov.justice.tools.eventsourcing.transformation.service; + +import static javax.transaction.Transactional.TxType.REQUIRES_NEW; + +import uk.gov.justice.services.eventsourcing.source.core.LinkedEventSourceTransformation; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.transaction.Transactional; + +import org.slf4j.Logger; + +/** + * Service to transform events on an event-stream. + */ +@ApplicationScoped +public class LinkedEventStreamTransformationService { + + @Inject + private Logger logger; + + @Inject + private LinkedEventSourceTransformation linkedEventSourceTransformation; + + @Transactional(REQUIRES_NEW) + public void truncateLinkedEvents() { + try { + linkedEventSourceTransformation.truncate(); + } catch (final Exception e) { + logger.error("Failed to truncate linked events log", e); + } + } + + @Transactional(REQUIRES_NEW) + public void populateLinkedEvents() { + try { + linkedEventSourceTransformation.populate(); + } catch (final Exception e) { + logger.error("Failed to populate linked events log", e); + } + } +} diff --git a/stream-transformation-tool-service/src/test/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationServiceTest.java b/stream-transformation-tool-service/src/test/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationServiceTest.java new file mode 100644 index 0000000..323f27c --- /dev/null +++ b/stream-transformation-tool-service/src/test/java/uk/gov/justice/tools/eventsourcing/transformation/service/LinkedEventStreamTransformationServiceTest.java @@ -0,0 +1,34 @@ +package uk.gov.justice.tools.eventsourcing.transformation.service; + +import static org.mockito.Mockito.verify; + +import uk.gov.justice.services.eventsourcing.source.core.LinkedEventSourceTransformation; +import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class LinkedEventStreamTransformationServiceTest { + + @Mock + private LinkedEventSourceTransformation linkedEventSourceTransformation; + + @InjectMocks + private LinkedEventStreamTransformationService linkedEventStreamTransformationService; + + @Test + public void shouldPopulateLinkedEvents() throws EventStreamException { + linkedEventStreamTransformationService.populateLinkedEvents(); + verify(linkedEventSourceTransformation).populate(); + } + + @Test + public void shouldTruncateLinkedEvents() throws EventStreamException { + linkedEventStreamTransformationService.truncateLinkedEvents(); + verify(linkedEventSourceTransformation).truncate(); + } +} \ No newline at end of file