-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add stream transformation support for linked events #25
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,23 @@ | ||
package uk.gov.justice.event.tool; | ||
|
||
import static java.lang.String.format; | ||
import static java.util.UUID.randomUUID; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.junit.Assert.assertThat; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.verifyNoMoreInteractions; | ||
import static org.mockito.Mockito.when; | ||
|
||
import uk.gov.justice.event.tool.task.StreamTransformationTask; | ||
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository; | ||
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream; | ||
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry; | ||
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService; | ||
import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService; | ||
|
||
import java.lang.reflect.Field; | ||
import java.util.UUID; | ||
|
@@ -29,7 +33,6 @@ | |
import org.junit.runner.RunWith; | ||
import org.mockito.InjectMocks; | ||
import org.mockito.Mock; | ||
import org.mockito.Spy; | ||
import org.mockito.runners.MockitoJUnitRunner; | ||
import org.slf4j.Logger; | ||
|
||
|
@@ -48,6 +51,9 @@ public class StartTransformationTest { | |
@Mock | ||
private EventStreamTransformationService eventStreamTransformationService; | ||
|
||
@Mock | ||
private LinkedEventStreamTransformationService linkedEventStreamTransformationService; | ||
|
||
@InjectMocks | ||
private StartTransformation startTransformation; | ||
|
||
|
@@ -89,12 +95,24 @@ public void shouldCreateTasksForEachStream() { | |
final UUID streamId_2 = randomUUID(); | ||
|
||
when(eventRepository.getAllActiveStreamIds()).thenReturn(Stream.of(streamId_1, streamId_2)); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(mock(Future.class)); | ||
final Future future = mock(Future.class); | ||
final Future future2 = mock(Future.class); | ||
|
||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2); | ||
|
||
startTransformation.go(); | ||
|
||
assertThat(startTransformation.outstandingTasks.size(), is(2)); | ||
assertTrue(startTransformation.allTasksCreated); | ||
|
||
when(passesDeterminer.isLastElementInPasses()).thenReturn(true); | ||
startTransformation.taskDone(future, null, null, null); | ||
startTransformation.taskDone(future2, null, null, null); | ||
|
||
assertThat(startTransformation.outstandingTasks.size(), is(0)); | ||
|
||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commented out code can be removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
|
||
@Test | ||
|
@@ -117,8 +135,14 @@ public void shouldRemoveFinishedTasks() { | |
startTransformation.taskAborted(future, null, null, mock(Throwable.class)); | ||
assertThat(startTransformation.outstandingTasks.size(), is(1)); | ||
|
||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
|
||
startTransformation.taskDone(future2, null, null, null); | ||
assertThat(startTransformation.outstandingTasks.size(), is(0)); | ||
|
||
verify(linkedEventStreamTransformationService,times(2)).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService,times(2)).populateLinkedEvents(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commented out code can be removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
|
||
@Test | ||
|
@@ -152,25 +176,35 @@ public void shouldRemoveFinishedTaskForAllPasses() { | |
assertThat(startTransformation.outstandingTasks.size(), is(2)); | ||
|
||
startTransformation.taskAborted(future, null, null, mock(Throwable.class)); | ||
|
||
assertThat(startTransformation.outstandingTasks.size(), is(1)); | ||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
|
||
startTransformation.taskDone(future2, null, null, null); | ||
|
||
startTransformation.taskDone(future3, null, null, null); | ||
assertThat(startTransformation.outstandingTasks.size(), is(0)); | ||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
|
||
} | ||
|
||
|
||
@Test(expected = IllegalArgumentException.class) | ||
public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsNull() throws Exception { | ||
streamsProcessedCountStepInfo.set(startTransformation, null); | ||
startTransformation.go(); | ||
|
||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
} | ||
|
||
@Test(expected = IllegalArgumentException.class) | ||
public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsZero() throws Exception { | ||
streamsProcessedCountStepInfo.set(startTransformation, 0); | ||
startTransformation.go(); | ||
|
||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
} | ||
|
||
@Test | ||
|
@@ -186,35 +220,20 @@ public void shouldNotThrowExceptionWhenstreamsProcessedCountStepInfoIsNotZero() | |
when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); | ||
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); | ||
|
||
final Future future = mock(Future.class); | ||
final Future future1 = mock(Future.class); | ||
final Future future2 = mock(Future.class); | ||
final Future future3 = mock(Future.class); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3); | ||
|
||
streamsProcessedCountStepInfo.set(startTransformation, 10); | ||
startTransformation.go(); | ||
} | ||
|
||
@Test | ||
public void shouldNotLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsZero() throws IllegalArgumentException, IllegalAccessException { | ||
final UUID streamId_1 = randomUUID(); | ||
final UUID streamId_2 = randomUUID(); | ||
final UUID streamId_3 = randomUUID(); | ||
|
||
when(eventRepository.getAllActiveStreamIds()) | ||
.thenReturn(Stream.of(streamId_1, streamId_2)) | ||
.thenReturn(Stream.of(streamId_3)); | ||
|
||
when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); | ||
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); | ||
|
||
final Future future = mock(Future.class); | ||
final Future future2 = mock(Future.class); | ||
final Future future3 = mock(Future.class); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); | ||
startTransformation.taskDone(future1, null, null, null); | ||
startTransformation.taskDone(future2, null, null, null); | ||
startTransformation.taskDone(future3, null, null, null); | ||
assertThat(startTransformation.outstandingTasks.size(), is(0)); | ||
|
||
streamsProcessedCountStepInfo.set(startTransformation, 10); | ||
startTransformation.go(); | ||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
} | ||
|
||
@Test | ||
|
@@ -230,13 +249,22 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN | |
when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2); | ||
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true); | ||
|
||
final Future future = mock(Future.class); | ||
final Future future1 = mock(Future.class); | ||
final Future future2 = mock(Future.class); | ||
final Future future3 = mock(Future.class); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3); | ||
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3); | ||
when(stopWatch.getTime()).thenReturn(12222222l); | ||
streamsProcessedCountStepInfo.set(startTransformation, 2); | ||
|
||
startTransformation.go(); | ||
|
||
verify(logger).info("Pass 1 - Streams count: 4 - time(ms): 12222222"); | ||
startTransformation.taskDone(future1, null, null, null); | ||
startTransformation.taskDone(future2, null, null, null); | ||
startTransformation.taskDone(future3, null, null, null); | ||
assertThat(startTransformation.outstandingTasks.size(), is(0)); | ||
|
||
verify(linkedEventStreamTransformationService).truncateLinkedEvents(); | ||
verify(linkedEventStreamTransformationService).populateLinkedEvents(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,11 @@ | |
<artifactId>stream-transformation-tool-api</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>uk.gov.justice.framework-api</groupId> | ||
<artifactId>framework-api-core</artifactId> | ||
<version>${framework-api.version}</version> | ||
</dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is not a test dependency then it should be higher in the list There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
<dependency> | ||
<groupId>uk.gov.justice.services</groupId> | ||
<artifactId>test-utils-core</artifactId> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,11 @@ | |
<artifactId>event-repository-liquibase</artifactId> | ||
<version>${event-store.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>uk.gov.justice.event-store</groupId> | ||
<artifactId>linked-event-processor</artifactId> | ||
<version>${event-store.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>uk.gov.justice.services</groupId> | ||
<artifactId>test-utils-core</artifactId> | ||
|
@@ -53,12 +58,6 @@ | |
<groupId>org.hamcrest</groupId> | ||
<artifactId>hamcrest-library</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>uk.gov.justice.utils</groupId> | ||
<artifactId>test-utils-logging-simple</artifactId> | ||
<type>pom</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-dbcp2</artifactId> | ||
|
@@ -67,6 +66,18 @@ | |
<groupId>commons-io</groupId> | ||
<artifactId>commons-io</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.liquibase</groupId> | ||
<artifactId>liquibase-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>uk.gov.justice.utils</groupId> | ||
<artifactId>test-utils-logging-simple</artifactId> | ||
<type>pom</type> | ||
<scope>test</scope> | ||
</dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non test dependencies above should be moved above the test dependency There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
|
||
</dependencies> | ||
|
||
<build> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need to call truncateLinkedEvents() twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed