diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java index a1c140f..cdc274d 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java @@ -1,85 +1,71 @@ package uk.gov.justice.framework.tools.replay; -import static org.apache.commons.lang3.StringUtils.substringBefore; +import static java.util.stream.IntStream.range; +import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED; +import static javax.ejb.TransactionAttributeType.REQUIRED; import uk.gov.justice.services.core.handler.exception.MissingHandlerException; import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository; import uk.gov.justice.services.messaging.JsonEnvelope; -import uk.gov.justice.services.messaging.Metadata; +import java.util.List; import java.util.UUID; -import java.util.stream.Stream; import javax.ejb.Stateless; +import javax.ejb.TransactionAttribute; import javax.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @Stateless public class AsyncStreamDispatcher { - private static final Logger LOGGER = LoggerFactory.getLogger(AsyncStreamDispatcher.class); - @Inject private TransactionalEnvelopeDispatcher envelopeDispatcher; @Inject private StreamStatusJdbcRepository streamStatusRepository; - public UUID dispatch(final Stream stream) { - final int[] noOfProcessedElements = {0}; - final JsonEnvelope[] envelopes = {null}; - - try (final Stream stream1 = stream) { - stream1.forEach(envelope -> { - if (firstElement(noOfProcessedElements)) { - LOGGER.info("Starting processing of stream: {}", streamIdOf(envelope)); - } - try { - envelopeDispatcher.dispatch(envelope); - } catch (MissingHandlerException ex) { - final Metadata metadata = envelope.metadata(); - LOGGER.warn("Missing handler for stream Id: {}, event name: {}, version: {}", metadata.streamId().get(), - metadata.name(), metadata.version().get()); - } - noOfProcessedElements[0]++; - if (shouldLogProgress(noOfProcessedElements)) { - LOGGER.info("Processed {} elements of stream: {}", noOfProcessedElements[0], streamIdOf(envelope)); - } - envelopes[0] = envelope; - }); - final UUID streamId = streamIdOf(envelopes[0]); - streamStatusRepository.insert(new StreamStatus(streamId, versionOf(envelopes[0]), sourceOf(envelopes[0]))); - LOGGER.info("Finished processing of stream: {}, elements processed: {}", streamId, noOfProcessedElements[0]); - return streamId; - } - } + @Inject + private StreamEnvelopeProvider streamEnvelopeProvider; - private UUID streamIdOf(final JsonEnvelope envelope) { - return envelope - .metadata() - .streamId() - .orElseThrow(() -> new IllegalArgumentException(String.format("Stream id not found in the envelope: %s", envelope.toString()))); - } + @Inject + private StreamStatusFactory streamStatusFactory; - private Long versionOf(final JsonEnvelope envelope) { - return envelope - .metadata() - .version() - .orElseThrow(() -> new IllegalArgumentException(String.format("Version not found in the envelope: %s", envelope.toString()))); - } + @Inject + private ProgressLogger progressLogger; + + @TransactionAttribute(NOT_SUPPORTED) + public UUID dispatch(final UUID streamId) { + + progressLogger.logStart(streamId); + + final List envelopes = streamEnvelopeProvider.getStreamAsList(streamId); - private String sourceOf(final JsonEnvelope envelope) { - return substringBefore(envelope.metadata().name(), "."); + range(0, envelopes.size()) + .forEach(index -> dispatchEnvelope( + envelopes.get(index), + streamId, + index)); + + insertStreamStatus(streamStatusFactory.create(envelopes, streamId)); + + progressLogger.logCompletion(streamId); + + return streamId; } - private boolean shouldLogProgress(final int[] i) { - return i[0] % 100 == 0; + @TransactionAttribute(REQUIRED) + private void dispatchEnvelope(final JsonEnvelope jsonEnvelope, final UUID streamId, final int index) { + try { + envelopeDispatcher.dispatch(jsonEnvelope); + progressLogger.logSuccess(streamId, index); + } catch (final MissingHandlerException ex) { + progressLogger.logFailure(streamId, jsonEnvelope); + } } - private boolean firstElement(final int[] i) { - return i[0] == 0; + @TransactionAttribute(REQUIRED) + private void insertStreamStatus(final StreamStatus streamStatus) { + streamStatusRepository.insert(streamStatus); } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressChecker.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressChecker.java new file mode 100644 index 0000000..4411828 --- /dev/null +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressChecker.java @@ -0,0 +1,10 @@ +package uk.gov.justice.framework.tools.replay; + +public class ProgressChecker { + + private static final int PROGRESS_INTERVAL = 100; + + public boolean shouldLogProgress(final int index) { + return index % PROGRESS_INTERVAL == 0; + } +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressLogger.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressLogger.java new file mode 100644 index 0000000..68c6f66 --- /dev/null +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressLogger.java @@ -0,0 +1,47 @@ +package uk.gov.justice.framework.tools.replay; + +import uk.gov.justice.services.messaging.JsonEnvelope; +import uk.gov.justice.services.messaging.Metadata; + +import java.util.UUID; + +import javax.inject.Inject; + +import org.slf4j.Logger; + +public class ProgressLogger { + + @Inject + private ProgressChecker progressChecker; + + @Inject + private Logger logger; + + private int sucessCount = 0; + + public void logStart(final UUID streamId) { + logger.info("Starting processing of stream: {}", streamId); + } + + public void logSuccess(final UUID streamId, final int index) { + + sucessCount++; + + if (progressChecker.shouldLogProgress(index)) { + logger.info("Processed {} element(s) of stream: {}", sucessCount, streamId); + } + } + + public void logFailure(final UUID streamId, final JsonEnvelope jsonEnvelope) { + final Metadata metadata = jsonEnvelope.metadata(); + logger.warn("Missing handler for stream Id: {}, event name: {}, version: {}", + streamId, + metadata.name(), + metadata.version().map(theVersion -> "" + theVersion).orElse("Not set") + ); + } + + public void logCompletion(final UUID streamId) { + logger.info("Finished processing of stream: {}. Processed {} elements", streamId, sucessCount); + } +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StartReplay.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StartReplay.java index 2050883..1759fe1 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StartReplay.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StartReplay.java @@ -1,5 +1,6 @@ package uk.gov.justice.framework.tools.replay; +import static java.util.stream.Collectors.toList; import static org.wildfly.swarm.bootstrap.Main.MAIN_PROCESS_FILE; import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository; @@ -8,6 +9,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Deque; +import java.util.List; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; @@ -33,12 +35,14 @@ public class StartReplay implements ManagedTaskListener { private Logger logger; @Resource - private ManagedExecutorService executorService; + private ManagedExecutorService managedExecutorService; @Inject private JdbcEventRepository jdbcEventRepository; + @Inject private AsyncStreamDispatcher asyncStreamDispatcher; + private Deque> outstandingTasks = new LinkedBlockingDeque<>(); private boolean allTasksCreated = false; @@ -47,11 +51,13 @@ public class StartReplay implements ManagedTaskListener { void go() { logger.info("-------------- Invoke Event Streams Replay-------------!"); checkForMainProcessFile(); - jdbcEventRepository.getStreamOfAllActiveEventStreams() - .forEach(eventStream -> { - StreamDispatchTask dispatchTask = new StreamDispatchTask(eventStream, asyncStreamDispatcher, this); - outstandingTasks.add(executorService.submit(dispatchTask)); - }); + + final List activeStreamIds = jdbcEventRepository.getAllActiveStreamIds().collect(toList()); + activeStreamIds.forEach(uuid -> { + StreamDispatchTask dispatchTask = new StreamDispatchTask(uuid, asyncStreamDispatcher, this); + outstandingTasks.add(managedExecutorService.submit(dispatchTask)); + }); + allTasksCreated = true; if (outstandingTasks.isEmpty()) shutdown(); logger.info("-------------- Invocation of Event Streams Replay Completed --------------"); @@ -107,4 +113,4 @@ private void checkForMainProcessFile() { logger.warn(NO_PROCESS_FILE_WARNING); } } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamDispatchTask.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamDispatchTask.java index 6e3bbc9..f7741b2 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamDispatchTask.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamDispatchTask.java @@ -1,33 +1,36 @@ package uk.gov.justice.framework.tools.replay; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import uk.gov.justice.services.messaging.JsonEnvelope; - -import javax.enterprise.concurrent.ManagedTask; -import javax.enterprise.concurrent.ManagedTaskListener; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.stream.Stream; + +import javax.enterprise.concurrent.ManagedTask; +import javax.enterprise.concurrent.ManagedTaskListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StreamDispatchTask implements Callable, ManagedTask { private static final Logger LOGGER = LoggerFactory.getLogger(StreamDispatchTask.class); - private final Stream stream; + private final UUID streamId; private final AsyncStreamDispatcher dispatcher; private final ManagedTaskListener dispatchListener; - public StreamDispatchTask(final Stream stream, final AsyncStreamDispatcher dispatcher, final ManagedTaskListener dispatchListener) { + public StreamDispatchTask( + final UUID streamId, + final AsyncStreamDispatcher dispatcher, + final ManagedTaskListener dispatchListener) { + this.streamId = streamId; this.dispatcher = dispatcher; this.dispatchListener = dispatchListener; - this.stream = stream; } @Override public UUID call() { LOGGER.debug("---------- Dispatching stream -------------"); - return dispatcher.dispatch(this.stream); + + return dispatcher.dispatch(streamId); } @Override @@ -39,4 +42,4 @@ public Map getExecutionProperties() { public ManagedTaskListener getManagedTaskListener() { return dispatchListener; } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProvider.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProvider.java new file mode 100644 index 0000000..e09b87a --- /dev/null +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProvider.java @@ -0,0 +1,24 @@ +package uk.gov.justice.framework.tools.replay; + +import static java.util.stream.Collectors.toList; + +import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository; +import uk.gov.justice.services.messaging.JsonEnvelope; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import javax.inject.Inject; + +public class StreamEnvelopeProvider { + + @Inject + private JdbcEventRepository jdbcEventRepository; + + public List getStreamAsList(final UUID streamId) { + try (final Stream stream = jdbcEventRepository.getByStreamId(streamId)) { + return stream.collect(toList()); + } + } +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamStatusFactory.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamStatusFactory.java new file mode 100644 index 0000000..67b5dab --- /dev/null +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamStatusFactory.java @@ -0,0 +1,32 @@ +package uk.gov.justice.framework.tools.replay; + +import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.substringBefore; + +import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; +import uk.gov.justice.services.messaging.JsonEnvelope; + +import java.util.List; +import java.util.UUID; + +public class StreamStatusFactory { + + public StreamStatus create(final List envelopes, final UUID streamId) { + final JsonEnvelope jsonEnvelope = envelopes.get(0); + final Long version = getVersionFrom(jsonEnvelope); + final String source = getSourceFrom(jsonEnvelope); + + return new StreamStatus(streamId, version, source); + } + + private Long getVersionFrom(final JsonEnvelope envelope) { + return envelope + .metadata() + .version() + .orElseThrow(() -> new IllegalArgumentException(format("Version not found in the envelope: %s", envelope.toString()))); + } + + private String getSourceFrom(final JsonEnvelope envelope) { + return substringBefore(envelope.metadata().name(), "."); + } +} diff --git a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/TransactionalEnvelopeDispatcher.java b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/TransactionalEnvelopeDispatcher.java index 999dc99..4616d16 100644 --- a/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/TransactionalEnvelopeDispatcher.java +++ b/framework-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/TransactionalEnvelopeDispatcher.java @@ -30,6 +30,4 @@ public void init() { public void dispatch(JsonEnvelope envelope) { dispatcher.dispatch(envelope); } - - } diff --git a/framework-tools-replay/src/main/resources/log4j.properties b/framework-tools-replay/src/main/resources/log4j.properties index e9b2392..b4e6b50 100644 --- a/framework-tools-replay/src/main/resources/log4j.properties +++ b/framework-tools-replay/src/main/resources/log4j.properties @@ -1,7 +1,7 @@ # Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=INFO, A1 +log4j.rootLogger=INFO, A1 file # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcherTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcherTest.java index 5370b91..366a5f4 100644 --- a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcherTest.java +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcherTest.java @@ -1,32 +1,29 @@ package uk.gov.justice.framework.tools.replay; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.Optional.of; import static java.util.UUID.randomUUID; -import static org.hamcrest.Matchers.contains; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.internal.verification.VerificationModeFactory.times; -import static uk.gov.justice.services.test.utils.core.matchers.JsonEnvelopeMatcher.jsonEnvelope; -import static uk.gov.justice.services.test.utils.core.matchers.JsonEnvelopeMetadataMatcher.metadata; -import static uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder.envelope; -import static uk.gov.justice.services.test.utils.core.messaging.MetadataBuilderFactory.metadataWithDefaults; -import static uk.gov.justice.services.test.utils.core.messaging.MetadataBuilderFactory.metadataWithRandomUUID; +import static org.mockito.Mockito.when; import uk.gov.justice.services.core.handler.exception.MissingHandlerException; import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository; import uk.gov.justice.services.messaging.JsonEnvelope; -import uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder; +import uk.gov.justice.services.messaging.Metadata; import java.util.List; import java.util.UUID; -import java.util.stream.Stream; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -40,111 +37,70 @@ public class AsyncStreamDispatcherTest { @Mock private StreamStatusJdbcRepository streamStatusRepository; + @Mock + private StreamEnvelopeProvider streamEnvelopeProvider; + + @Mock + private StreamStatusFactory streamStatusFactory; + + @Mock + private ProgressLogger progressLogger; + @InjectMocks private AsyncStreamDispatcher asyncStreamDispatcher; @Test - public void shouldDispatchEnvelopes() { - + public void shouldGetTheEventsOfAStreamDispatchThemAllThenUpdateTheStreamStatus() throws Exception { final UUID streamId = randomUUID(); - final JsonEnvelope envelope1 = envelope().with( - metadataWithRandomUUID("source.event-occurred") - .withStreamId(streamId) - .withVersion(1L)) - .build(); - final JsonEnvelope envelope2 = envelope().with( - metadataWithRandomUUID("source.another-event-occurred") - .withStreamId(streamId) - .withVersion(2L)) - .build(); - doNothing().when(envelopeDispatcher).dispatch(envelope1); - doNothing().when(envelopeDispatcher).dispatch(envelope2); + final JsonEnvelope jsonEnvelope_1 = mock(JsonEnvelope.class); + final JsonEnvelope jsonEnvelope_2 = mock(JsonEnvelope.class); + final JsonEnvelope jsonEnvelope_3 = mock(JsonEnvelope.class); + final List envelopes = asList(jsonEnvelope_1, jsonEnvelope_2, jsonEnvelope_3); - asyncStreamDispatcher.dispatch(Stream.of(envelope1, envelope2)); + final StreamStatus streamStatus = mock(StreamStatus.class); - final ArgumentCaptor dispatchCaptor = ArgumentCaptor.forClass(JsonEnvelope.class); + when(streamEnvelopeProvider.getStreamAsList(streamId)).thenReturn(envelopes); + when(streamStatusFactory.create(envelopes, streamId)).thenReturn(streamStatus); - verify(envelopeDispatcher, times(2)).dispatch(dispatchCaptor.capture()); - final List dispatchedEnvelopes = dispatchCaptor.getAllValues(); + assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId)); - assertThat(dispatchedEnvelopes, contains(envelope1, envelope2)); - } + final InOrder inOrder = inOrder(progressLogger, envelopeDispatcher, streamStatusRepository, progressLogger); - @Test - public void shouldUpdateStreamBufferStatus() { - - final UUID streamId = randomUUID(); - final JsonEnvelope envelope1 = JsonEnvelopeBuilder.envelope().with( - metadataWithRandomUUID("source.event-occurred") - .withStreamId(streamId) - .withVersion(4L)) - .build(); - final JsonEnvelope envelope2 = JsonEnvelopeBuilder.envelope().with( - metadataWithRandomUUID("source.another-event-occurred") - .withStreamId(streamId) - .withVersion(5L)) - .build(); - - asyncStreamDispatcher.dispatch(Stream.of(envelope1, envelope2)); - - verify(streamStatusRepository).insert(new StreamStatus(streamId, 5L, "source")); + inOrder.verify(progressLogger).logStart(streamId); + inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_1); + inOrder.verify(progressLogger).logSuccess(streamId, 0); + inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_2); + inOrder.verify(progressLogger).logSuccess(streamId, 1); + inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_3); + inOrder.verify(progressLogger).logSuccess(streamId, 2); + inOrder.verify(streamStatusRepository).insert(streamStatus); + inOrder.verify(progressLogger).logCompletion(streamId); } - @Test(expected = IllegalArgumentException.class) - public void shouldThrowExceptionIfNoStreamIdInTheEnvelope() { + @Test + public void shouldLogFailureIfNoHandlerFoundForDispatch() throws Exception { - final Stream stream = Stream.of( - envelope() - .with(metadataWithRandomUUID("dummyName") - .withVersion(1L)) - .build()); + final MissingHandlerException missingHandlerException = new MissingHandlerException("Ooops"); - asyncStreamDispatcher.dispatch(stream); - } + final UUID streamId = randomUUID(); - @Test(expected = IllegalArgumentException.class) - public void shouldThrowExceptionIfNoVersionInTheEnvelope() { + final JsonEnvelope jsonEnvelope = mock(JsonEnvelope.class); + final List envelopes = singletonList(jsonEnvelope); - final Stream stream = Stream.of( - envelope() - .with(metadataWithDefaults() - .withStreamId(randomUUID())) - .build()); + final StreamStatus streamStatus = mock(StreamStatus.class); - asyncStreamDispatcher.dispatch(stream); - } + when(streamEnvelopeProvider.getStreamAsList(streamId)).thenReturn(envelopes); + doThrow(missingHandlerException).when(envelopeDispatcher).dispatch(jsonEnvelope); + when(streamStatusFactory.create(envelopes, streamId)).thenReturn(streamStatus); - @Test - public void shouldProcessStreamWhenThereIsNoHandlerDefined() { + assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId)); - final UUID streamId = randomUUID(); + final InOrder inOrder = inOrder(progressLogger, envelopeDispatcher, streamStatusRepository, progressLogger); - doThrow(new MissingHandlerException("Handler for event-without-handler not found")) - .when(envelopeDispatcher).dispatch( - argThat(jsonEnvelope() - .withMetadataOf(metadata().withName("source.event-without-handler")))); - - final JsonEnvelope envelope1 = envelope() - .with(metadataWithRandomUUID("source.event-with-handler") - .withStreamId(streamId) - .withVersion(1L)) - .build(); - final JsonEnvelope envelope2 = envelope() - .with(metadataWithRandomUUID("source.event-without-handler") - .withStreamId(streamId) - .withVersion(2L)).build(); - final JsonEnvelope envelope3 = envelope() - .with(metadataWithRandomUUID("source.event-with-handler") - .withStreamId(streamId) - .withVersion(3L)) - .build(); - - asyncStreamDispatcher.dispatch(Stream.of(envelope1, envelope2, envelope3)); - - final ArgumentCaptor dispatchCaptor = ArgumentCaptor.forClass(JsonEnvelope.class); - - verify(envelopeDispatcher, times(3)).dispatch(dispatchCaptor.capture()); - verify(streamStatusRepository).insert(new StreamStatus(streamId, 3L, "source")); + inOrder.verify(progressLogger).logStart(streamId); + inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope); + inOrder.verify(progressLogger).logFailure(streamId, jsonEnvelope); + inOrder.verify(progressLogger).logCompletion(streamId); } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressCheckerTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressCheckerTest.java new file mode 100644 index 0000000..d1c5dde --- /dev/null +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressCheckerTest.java @@ -0,0 +1,40 @@ +package uk.gov.justice.framework.tools.replay; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +public class ProgressCheckerTest { + + private final ProgressChecker progressChecker = new ProgressChecker(); + + @Test + public void shouldReturnTrueIfTheIndexIsAMultipleOf100() throws Exception { + + final List trueValues = new ArrayList<>(); + + for(int index = 0; index < 1000; index++) { + + if(progressChecker.shouldLogProgress(index)) { + trueValues.add(index); + } + } + + assertThat(trueValues.size(), is(10)); + + assertThat(trueValues.get(0), is(0)); + assertThat(trueValues.get(1), is(100)); + assertThat(trueValues.get(2), is(200)); + assertThat(trueValues.get(3), is(300)); + assertThat(trueValues.get(4), is(400)); + assertThat(trueValues.get(5), is(500)); + assertThat(trueValues.get(6), is(600)); + assertThat(trueValues.get(7), is(700)); + assertThat(trueValues.get(8), is(800)); + assertThat(trueValues.get(9), is(900)); + } +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressLoggerTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressLoggerTest.java new file mode 100644 index 0000000..50d1acf --- /dev/null +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/ProgressLoggerTest.java @@ -0,0 +1,114 @@ +package uk.gov.justice.framework.tools.replay; + +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static java.util.UUID.randomUUID; +import static org.junit.Assert.*; + +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; + +import uk.gov.justice.services.messaging.JsonEnvelope; +import uk.gov.justice.services.messaging.Metadata; + +import java.util.Optional; +import java.util.UUID; + +import javax.inject.Inject; + +@RunWith(MockitoJUnitRunner.class) +public class ProgressLoggerTest { + + @Mock + private ProgressChecker progressChecker; + + @Mock + private Logger logger; + + @InjectMocks + private ProgressLogger progressLogger; + + @Test + public void shouldLogTheStartOfTheProcess() throws Exception { + + final UUID streamId = randomUUID(); + + progressLogger.logStart(streamId); + + verify(logger).info("Starting processing of stream: {}", streamId); + } + + @Test + public void shouldLogSuccessOnlyIfTheProgressCheckerAllowsIt() throws Exception { + + final UUID streamId = randomUUID(); + + when(progressChecker.shouldLogProgress(0)).thenReturn(true); + when(progressChecker.shouldLogProgress(10)).thenReturn(true); + + for(int i = 0; i < 11; i++) { + progressLogger.logSuccess(streamId, i); + } + + verify(logger).info("Processed {} element(s) of stream: {}", 1, streamId); + verify(logger).info("Processed {} element(s) of stream: {}", 11, streamId); + } + + @Test + public void shouldLogFailures() throws Exception { + + final UUID streamId = randomUUID(); + final String commandName = "example-command-api.notification-added"; + final long version = 1234L; + + final JsonEnvelope jsonEnvelope = mock(JsonEnvelope.class); + final Metadata metadata = mock(Metadata.class); + when(metadata.name()).thenReturn(commandName); + when(metadata.version()).thenReturn(of(version)); + + when(jsonEnvelope.metadata()).thenReturn(metadata); + + logger.warn("Missing handler for stream Id: {}, event name: {}, version: {}", + streamId, + commandName, + version + "" + ); + } + + @Test + public void shouldUseNotSetIfTheVersionDoesNotExistWhenLoggingFailures() throws Exception { + + final UUID streamId = randomUUID(); + final String commandName = "example-command-api.notification-added"; + + final JsonEnvelope jsonEnvelope = mock(JsonEnvelope.class); + final Metadata metadata = mock(Metadata.class); + when(metadata.name()).thenReturn(commandName); + when(metadata.version()).thenReturn(empty()); + + when(jsonEnvelope.metadata()).thenReturn(metadata); + + logger.warn("Missing handler for stream Id: {}, event name: {}, version: {}", + streamId, + commandName, + "Not set" + ); + } +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StartReplayTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StartReplayTest.java index 9b3ec4e..b6eb23a 100644 --- a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StartReplayTest.java +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StartReplayTest.java @@ -1,5 +1,6 @@ package uk.gov.justice.framework.tools.replay; +import static java.util.UUID.randomUUID; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; @@ -8,7 +9,6 @@ import static org.wildfly.swarm.bootstrap.Main.MAIN_PROCESS_FILE; import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository; -import uk.gov.justice.services.messaging.JsonEnvelope; import java.io.IOException; import java.nio.file.Files; @@ -33,18 +33,22 @@ public class StartReplayTest { @Mock private JdbcEventRepository jdbcEventRepository; + @Mock private ManagedExecutorService executorService; + @Mock private StreamDispatchTask dispatchTask; + @Mock private Throwable throwable; + @Mock private Future dispatchTaskFuture; - @Mock - private Stream mockStream; + @Mock private Deque outstandingTasks; + @Mock private Logger logger; @@ -53,10 +57,10 @@ public class StartReplayTest { @Test public void shouldDispatchStreams() throws IOException { - final Stream> streamOfStreams = Stream.of(mockStream, mockStream); + final Stream activeStreamIds = Stream.of(randomUUID(), randomUUID()); createMainProcessFile(); - when(jdbcEventRepository.getStreamOfAllActiveEventStreams()).thenReturn(streamOfStreams); + when(jdbcEventRepository.getAllActiveStreamIds()).thenReturn(activeStreamIds); when(executorService.submit(any(StreamDispatchTask.class))).thenReturn(dispatchTaskFuture); when(outstandingTasks.isEmpty()).thenReturn(true); @@ -68,9 +72,9 @@ public void shouldDispatchStreams() throws IOException { @Test public void shouldDispatchStreamsAndShutdownByForce() { - final Stream> streamOfStreams = Stream.of(mockStream, mockStream); + final Stream activeStreamIds = Stream.of(randomUUID(), randomUUID()); - when(jdbcEventRepository.getStreamOfAllActiveEventStreams()).thenReturn(streamOfStreams); + when(jdbcEventRepository.getAllActiveStreamIds()).thenReturn(activeStreamIds); when(executorService.submit(any(StreamDispatchTask.class))).thenReturn(dispatchTaskFuture); when(outstandingTasks.isEmpty()).thenReturn(true); @@ -116,4 +120,4 @@ private void createMainProcessFile() throws IOException { final Path file = Files.createFile(Paths.get("src/test/processFile")); System.setProperty(MAIN_PROCESS_FILE, file.toString()); } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamDispatchTaskTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamDispatchTaskTest.java index 9c992e6..2e7ed7f 100644 --- a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamDispatchTaskTest.java +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamDispatchTaskTest.java @@ -1,52 +1,50 @@ package uk.gov.justice.framework.tools.replay; -import org.junit.Before; +import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.verify; + +import java.util.UUID; + +import javax.enterprise.concurrent.ManagedTaskListener; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import uk.gov.justice.services.messaging.JsonEnvelope; - -import javax.enterprise.concurrent.ManagedTaskListener; -import java.util.stream.Stream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class StreamDispatchTaskTest { - @Mock - private Stream stream; @Mock private AsyncStreamDispatcher dispatcher; + @Mock private ManagedTaskListener taskListener; - private StreamDispatchTask streamDispatchTask; - - @Before - public void setup() { - streamDispatchTask = new StreamDispatchTask(stream, dispatcher, taskListener); - } - - @Test public void shouldCallDispatcher() { + final UUID streamId = randomUUID(); + final StreamDispatchTask streamDispatchTask = new StreamDispatchTask(streamId, dispatcher, taskListener); + streamDispatchTask.call(); - verify(dispatcher).dispatch(eq(stream)); + verify(dispatcher).dispatch(streamId); } @Test public void shouldReturnNullExecutionProperties() { + final StreamDispatchTask streamDispatchTask = new StreamDispatchTask(randomUUID(), dispatcher, taskListener); + assertNull(streamDispatchTask.getExecutionProperties()); } @Test public void shouldReturnTaskListener() { - assertEquals(streamDispatchTask.getManagedTaskListener(), taskListener); + final StreamDispatchTask streamDispatchTask = new StreamDispatchTask(randomUUID(), dispatcher, taskListener); + + assertThat(streamDispatchTask.getManagedTaskListener(), is(taskListener)); } -} \ No newline at end of file +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProviderTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProviderTest.java new file mode 100644 index 0000000..5b90e83 --- /dev/null +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProviderTest.java @@ -0,0 +1,63 @@ +package uk.gov.justice.framework.tools.replay; + +import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository; +import uk.gov.justice.services.messaging.JsonEnvelope; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StreamEnvelopeProviderTest { + + @Mock + private JdbcEventRepository jdbcEventRepository; + + @InjectMocks + private StreamEnvelopeProvider streamEnvelopeProvider; + + @Test + public void shouldConvertAListOfEnvelopesFromTheRepositoryToAListAndCloseTheStream() throws Exception { + + final UUID streamId = randomUUID(); + + final JsonEnvelope jsonEnvelope_1 = mock(JsonEnvelope.class); + final JsonEnvelope jsonEnvelope_2 = mock(JsonEnvelope.class); + final JsonEnvelope jsonEnvelope_3 = mock(JsonEnvelope.class); + + final ArrayList closeCheckerList = new ArrayList<>(); + + final Stream envelopeStream = Stream.of( + jsonEnvelope_1, + jsonEnvelope_2, + jsonEnvelope_3 + ).onClose(() -> closeCheckerList.add(true)); + + when(jdbcEventRepository.getByStreamId(streamId)).thenReturn( + envelopeStream + ); + + final List envelopes = streamEnvelopeProvider.getStreamAsList(streamId); + + assertThat(envelopes.size(), is(3)); + assertThat(envelopes.get(0), is(jsonEnvelope_1)); + assertThat(envelopes.get(1), is(jsonEnvelope_2)); + assertThat(envelopes.get(2), is(jsonEnvelope_3)); + + assertThat(closeCheckerList.size(), is(1)); + } +} diff --git a/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamStatusFactoryTest.java b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamStatusFactoryTest.java new file mode 100644 index 0000000..61b4ef5 --- /dev/null +++ b/framework-tools-replay/src/test/java/uk/gov/justice/framework/tools/replay/StreamStatusFactoryTest.java @@ -0,0 +1,97 @@ +package uk.gov.justice.framework.tools.replay; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static java.util.UUID.randomUUID; +import static javax.json.Json.createObjectBuilder; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static uk.gov.justice.services.messaging.JsonEnvelope.envelopeFrom; +import static uk.gov.justice.services.messaging.JsonEnvelope.metadataBuilder; + +import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; +import uk.gov.justice.services.messaging.JsonEnvelope; +import uk.gov.justice.services.messaging.MetadataBuilder; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.runners.MockitoJUnitRunner; + + +@RunWith(MockitoJUnitRunner.class) +public class StreamStatusFactoryTest { + + @InjectMocks + private StreamStatusFactory streamStatusFactory; + + @Test + public void shouldCreateAStreamStatusUsingTheValuesInTheFirstEnvelopeInTheList() throws Exception { + + final String commandName = "example-command-api.notification-added"; + final Optional version = of(29384L); + final UUID streamId = randomUUID(); + final UUID envelopeId_1 = randomUUID(); + final UUID envelopeId_2 = randomUUID(); + + final JsonEnvelope jsonEnvelope_1 = anEnvelope(commandName, version, envelopeId_1, streamId); + final JsonEnvelope jsonEnvelope_2 = anEnvelope(commandName, version, envelopeId_2, streamId); + + final List envelopes = asList(jsonEnvelope_1, jsonEnvelope_2); + + final StreamStatus streamStatus = streamStatusFactory.create(envelopes, streamId); + + assertThat(streamStatus.getSource(), is("example-command-api")); + assertThat(streamStatus.getStreamId(), is(streamId)); + assertThat(streamStatus.getVersion(), is(version.get())); + } + + @Test + public void shouldThrowAnIllegalArgumentExceptionIfTheEnvelopeDoesNotContainAVersion() throws Exception { + + final String commandName = "example-command-api.notification-added"; + final Optional version = empty(); + final UUID streamId = randomUUID(); + final UUID envelopeId = randomUUID(); + + final JsonEnvelope jsonEnvelope = anEnvelope(commandName, version, envelopeId, streamId); + + final List envelopes = singletonList(jsonEnvelope); + + try { + streamStatusFactory.create(envelopes, streamId); + fail(); + } catch (final IllegalArgumentException expected) { + final String message = "Version not found in the envelope: " + + "{\"id\":\"" + envelopeId + "\"," + + "\"name\":\"example-command-api.notification-added\"," + + "\"causation\":[]}"; + + assertThat(expected.getMessage(), is(message)); + } + } + + @SuppressWarnings("SameParameterValue") + private JsonEnvelope anEnvelope(final String commandName, final Optional version, final UUID envelopeId, final UUID streamId) { + + final MetadataBuilder metadataBuilder = metadataBuilder() + .withId(envelopeId) + .withName(commandName) + .withStreamId(streamId); + + version.ifPresent(metadataBuilder::withVersion); + + return envelopeFrom( + metadataBuilder, + createObjectBuilder() + .add("exampleField", "example value")); + } +} diff --git a/framework-tools-replay/standalone-ds.xml b/framework-tools-replay/standalone-ds.xml index 24eb4da..72866d1 100644 --- a/framework-tools-replay/standalone-ds.xml +++ b/framework-tools-replay/standalone-ds.xml @@ -69,19 +69,42 @@ - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -278,4 +301,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 92f1811..7b9d0ed 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ uk.gov.justice maven-framework-parent-pom - 1.10.1 + 1.12.2 framework-tools @@ -16,12 +16,12 @@ framework-tools - 1.22.0 - 2.0.1 - 4.0.0 + 1.26.0 + 2.0.3 + 4.1.0 2017.11.0 - 1.16.0 - 1.1.0 + 1.17.2 + 1.2.2 77