Skip to content

Commit

Permalink
Merge 0cd9e44 into 8055d70
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedFarouk-HMCTS committed Mar 20, 2019
2 parents 8055d70 + 0cd9e44 commit c06c715
Show file tree
Hide file tree
Showing 19 changed files with 589 additions and 135 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [5.2.0] - 2019-03-20
- Add support for linked event synch with event log after transformation

## [5.1.0] - 2018-12-12

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ public int getPassValue() {
public boolean isLastElementInPasses() {
return eventTransformationRegistry.getPasses().size() == passValue.get();
}

public int passCount(){
return eventTransformationRegistry.getPasses().size();
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uk.gov.justice.event.tool.task.StreamTransformationTask;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -54,7 +55,10 @@ public class StartTransformation implements ManagedTaskListener {
@Inject
private PassesDeterminer passesDeterminer;

Deque<Future<UUID>> outstandingTasks = new LinkedBlockingDeque<>();
@Inject
private LinkedEventStreamTransformationService linkedEventStreamTransformationService;

final Deque<Future<UUID>> outstandingTasks = new LinkedBlockingDeque<>();

boolean allTasksCreated = false;

Expand Down Expand Up @@ -125,6 +129,7 @@ public void taskAborted(final Future<?> futureTask, final ManagedExecutorService
logger.error(String.format("Aborted Transformation task: '%s'", throwable.getMessage()));
removeOutstandingTask(futureTask);
shutDownIfFinished();
truncateAndPopulateLinkedEvents();
}

private void removeOutstandingTask(final Future<?> futureTask) {
Expand All @@ -135,6 +140,7 @@ private void nextPassIfFinished() {
if (isTaskFinished()) {
final boolean isLastElementInPasses = passesDeterminer.isLastElementInPasses();
if (isLastElementInPasses) {
truncateAndPopulateLinkedEvents();
shutdown();
} else {
createTransformationTasks(passesDeterminer.getNextPassValue());
Expand Down Expand Up @@ -176,6 +182,17 @@ private void shutdown() {
}
}

private void truncateAndPopulateLinkedEvents() {

logger.info("-------------- Truncating the Linked Events Log after complete transformation --------------");

linkedEventStreamTransformationService.truncateLinkedEvents();

logger.info("-------------- Populating the Linked Events Log --------------");

linkedEventStreamTransformationService.populateLinkedEvents();
}

private void checkForMainProcessFile() {
if (System.getProperty(MAIN_PROCESS_FILE) == null) {
logger.warn(NO_PROCESS_FILE_WARNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ public void shouldIncrementPassValue() throws Exception {
assertThat(passesDeterminer.getPassValue(), is(1));
assertThat(passesDeterminer.getNextPassValue(), is(2));
assertThat(passesDeterminer.getNextPassValue(), is(3));
assertThat(passesDeterminer.passCount(), is(0));
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package uk.gov.justice.event.tool;

import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import uk.gov.justice.event.tool.task.StreamTransformationTask;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry;
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService;

import java.lang.reflect.Field;
import java.util.UUID;
Expand All @@ -29,7 +33,6 @@
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;

Expand All @@ -48,6 +51,9 @@ public class StartTransformationTest {
@Mock
private EventStreamTransformationService eventStreamTransformationService;

@Mock
private LinkedEventStreamTransformationService linkedEventStreamTransformationService;

@InjectMocks
private StartTransformation startTransformation;

Expand Down Expand Up @@ -89,12 +95,24 @@ public void shouldCreateTasksForEachStream() {
final UUID streamId_2 = randomUUID();

when(eventRepository.getAllActiveStreamIds()).thenReturn(Stream.of(streamId_1, streamId_2));
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(mock(Future.class));
final Future future = mock(Future.class);
final Future future2 = mock(Future.class);

when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2);

startTransformation.go();

assertThat(startTransformation.outstandingTasks.size(), is(2));
assertTrue(startTransformation.allTasksCreated);

when(passesDeterminer.isLastElementInPasses()).thenReturn(true);
startTransformation.taskDone(future, null, null, null);
startTransformation.taskDone(future2, null, null, null);

assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
}

@Test
Expand All @@ -117,8 +135,14 @@ public void shouldRemoveFinishedTasks() {
startTransformation.taskAborted(future, null, null, mock(Throwable.class));
assertThat(startTransformation.outstandingTasks.size(), is(1));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();

startTransformation.taskDone(future2, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService,times(2)).truncateLinkedEvents();
verify(linkedEventStreamTransformationService,times(2)).populateLinkedEvents();
}

@Test
Expand Down Expand Up @@ -152,25 +176,35 @@ public void shouldRemoveFinishedTaskForAllPasses() {
assertThat(startTransformation.outstandingTasks.size(), is(2));

startTransformation.taskAborted(future, null, null, mock(Throwable.class));

assertThat(startTransformation.outstandingTasks.size(), is(1));
verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();

startTransformation.taskDone(future2, null, null, null);

startTransformation.taskDone(future3, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));
verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();

}


@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsNull() throws Exception {
streamsProcessedCountStepInfo.set(startTransformation, null);
startTransformation.go();

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsZero() throws Exception {
streamsProcessedCountStepInfo.set(startTransformation, 0);
startTransformation.go();

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
}

@Test
Expand All @@ -186,35 +220,20 @@ public void shouldNotThrowExceptionWhenstreamsProcessedCountStepInfoIsNotZero()
when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2);
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true);

final Future future = mock(Future.class);
final Future future1 = mock(Future.class);
final Future future2 = mock(Future.class);
final Future future3 = mock(Future.class);
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3);
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3);

streamsProcessedCountStepInfo.set(startTransformation, 10);
startTransformation.go();
}

@Test
public void shouldNotLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsZero() throws IllegalArgumentException, IllegalAccessException {
final UUID streamId_1 = randomUUID();
final UUID streamId_2 = randomUUID();
final UUID streamId_3 = randomUUID();

when(eventRepository.getAllActiveStreamIds())
.thenReturn(Stream.of(streamId_1, streamId_2))
.thenReturn(Stream.of(streamId_3));

when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2);
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true);

final Future future = mock(Future.class);
final Future future2 = mock(Future.class);
final Future future3 = mock(Future.class);
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3);
startTransformation.taskDone(future1, null, null, null);
startTransformation.taskDone(future2, null, null, null);
startTransformation.taskDone(future3, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

streamsProcessedCountStepInfo.set(startTransformation, 10);
startTransformation.go();
verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
}

@Test
Expand All @@ -230,13 +249,22 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN
when(passesDeterminer.getPassValue()).thenReturn(1).thenReturn(2);
when(passesDeterminer.isLastElementInPasses()).thenReturn(false).thenReturn(true);

final Future future = mock(Future.class);
final Future future1 = mock(Future.class);
final Future future2 = mock(Future.class);
final Future future3 = mock(Future.class);
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future).thenReturn(future2).thenReturn(future3);
when(executorService.submit(any(StreamTransformationTask.class))).thenReturn(future1).thenReturn(future2).thenReturn(future3);
when(stopWatch.getTime()).thenReturn(12222222l);
streamsProcessedCountStepInfo.set(startTransformation, 2);

startTransformation.go();

verify(logger).info("Pass 1 - Streams count: 4 - time(ms): 12222222");
startTransformation.taskDone(future1, null, null, null);
startTransformation.taskDone(future2, null, null, null);
startTransformation.taskDone(future3, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.mockito.Matchers.isNotNull;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.verify;

import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
Expand Down Expand Up @@ -43,4 +46,8 @@ public void shouldReturnTransformationListener() {
assertThat(streamTransformationTask.getManagedTaskListener(), is(transformationListener));
}

@Test
public void shouldReturnExecutionProperties() {
assertThat(streamTransformationTask.getExecutionProperties(), is(nullValue()));
}
}
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@

<properties>
<cpp.repo.name>stream-transformation-tool</cpp.repo.name>

<framework.version>5.0.4</framework.version>
<event-store.version>1.0.4</event-store.version>
<framework-api.version>3.1.1</framework-api.version>
<framework.version>5.1.2</framework.version>
<event-store.version>1.1.7</event-store.version>
<common-bom.version>1.28.0</common-bom.version>
<utilities.version>1.16.1</utilities.version>
<test-utils.version>1.18.1</test-utils.version>

<wildfly.swarm.version>2017.11.0</wildfly.swarm.version>
<version.swarm.fraction-plugin>77</version.swarm.fraction-plugin>
<junit-dataprovider.version>1.13.1</junit-dataprovider.version>
Expand Down
5 changes: 5 additions & 0 deletions stream-transformation-test/sample-transformations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<artifactId>stream-transformation-tool-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.framework-api</groupId>
<artifactId>framework-api-core</artifactId>
<version>${framework-api.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>test-utils-core</artifactId>
Expand Down
23 changes: 17 additions & 6 deletions stream-transformation-test/stream-transformation-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
<artifactId>event-repository-liquibase</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>linked-event-processor</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>test-utils-core</artifactId>
Expand Down Expand Up @@ -53,12 +58,6 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
<artifactId>test-utils-logging-simple</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
Expand All @@ -67,6 +66,18 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
<artifactId>test-utils-logging-simple</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
Loading

0 comments on commit c06c715

Please sign in to comment.