Skip to content

Commit

Permalink
Merge 8315644 into e2938f3
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Sep 9, 2019
2 parents e2938f3 + 8315644 commit 8e7d354
Show file tree
Hide file tree
Showing 34 changed files with 624 additions and 652 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [6.0.0] - 2019-09-09
### Changed
- Upgrade to framework 6
- Truncate pre_publish_queue table after transformation is complete
- Rebuild published_event table after transformation is complete
- Update framework-api to 4.0.1
- Update framework to 6.0.12
- Update event-store to 2.0.14
- Update common-bom to 2.4.0
- Update utilities to 1.20.2
- Update test-utils to 1.24.3

## [5.3.1] - 2019-07-17
### Added
- Retry mechanism when performing stream operations such as append / move or clone
Expand Down
14 changes: 13 additions & 1 deletion event-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>uk.gov.justice</groupId>
<artifactId>stream-transformation-tool</artifactId>
<version>5.3.2-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>
</parent>

<packaging>war</packaging>
Expand Down Expand Up @@ -93,6 +93,18 @@
<artifactId>logging</artifactId>
</dependency>

<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
<version>${slf4j-jboss-logmanager.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<version>${jboss-logmanager.version}</version>
<scope>test</scope>
</dependency>

<!--External-->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import uk.gov.justice.event.tool.task.StreamTransformationTask;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PrePublishedQueueTruncatorService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PublishedEventsRebuilderService;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -60,7 +61,10 @@ public class StartTransformation implements ManagedTaskListener {
private PassesDeterminer passesDeterminer;

@Inject
private LinkedEventStreamTransformationService linkedEventStreamTransformationService;
private PublishedEventsRebuilderService publishedEventsRebuilderService;

@Inject
private PrePublishedQueueTruncatorService prePublishedQueueTruncatorService;

final Deque<Future<UUID>> outstandingTasks = new LinkedBlockingDeque<>();

Expand Down Expand Up @@ -103,6 +107,7 @@ private void createTransformationTasks(final int pass) {
activeStreams.close();

if (outstandingTasks.isEmpty()) {
truncateQueueAndRebuildPublishedEvents();
shutdown();
}

Expand Down Expand Up @@ -134,8 +139,8 @@ public void taskDone(final Future<?> futureTask, final ManagedExecutorService ma
public void taskAborted(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task, final Throwable throwable) {
logger.error("Aborted Transformation task", throwable);
removeOutstandingTask(futureTask);
truncateQueueAndRebuildPublishedEvents();
shutDownIfFinished();
truncateAndPopulateLinkedEvents();
}

private void removeOutstandingTask(final Future<?> futureTask) {
Expand All @@ -146,7 +151,7 @@ private void nextPassIfFinished() {
if (isTaskFinished()) {
final boolean isLastElementInPasses = passesDeterminer.isLastElementInPasses();
if (isLastElementInPasses) {
truncateAndPopulateLinkedEvents();
truncateQueueAndRebuildPublishedEvents();
shutdown();
} else {
createTransformationTasks(passesDeterminer.getNextPassValue());
Expand Down Expand Up @@ -188,15 +193,18 @@ private void shutdown() {
}
}

private void truncateAndPopulateLinkedEvents() {
private void truncateQueueAndRebuildPublishedEvents() {

logger.info("-------------- Truncating pre_publish_queue --------------");

logger.info("-------------- Truncating the Linked Events Log after complete transformation --------------");
prePublishedQueueTruncatorService.truncate();

linkedEventStreamTransformationService.truncateLinkedEvents();
logger.info("-------------- Truncate of pre_publish_queue complete --------------");
logger.info("-------------- Rebuilding the Published Events after complete transformation --------------");

logger.info("-------------- Populating the Linked Events Log --------------");
publishedEventsRebuilderService.rebuild();

linkedEventStreamTransformationService.populateLinkedEvents();
logger.info("-------------- Rebuild of the Published Events complete --------------");
}

private void checkForMainProcessFile() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package uk.gov.justice.event.tool;

import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -9,15 +8,15 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import uk.gov.justice.event.tool.task.StreamTransformationTask;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStream;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry;
import uk.gov.justice.tools.eventsourcing.transformation.service.EventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.LinkedEventStreamTransformationService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PrePublishedQueueTruncatorService;
import uk.gov.justice.tools.eventsourcing.transformation.service.PublishedEventsRebuilderService;

import java.lang.reflect.Field;
import java.util.UUID;
Expand Down Expand Up @@ -52,7 +51,7 @@ public class StartTransformationTest {
private EventStreamTransformationService eventStreamTransformationService;

@Mock
private LinkedEventStreamTransformationService linkedEventStreamTransformationService;
private PublishedEventsRebuilderService publishedEventsRebuilderService;

@InjectMocks
private StartTransformation startTransformation;
Expand All @@ -69,6 +68,9 @@ public class StartTransformationTest {
@Mock
private EventTransformationRegistry eventTransformationRegistry;

@Mock
private PrePublishedQueueTruncatorService prePublishedQueueTruncatorService;

@Mock
private Logger logger;

Expand Down Expand Up @@ -111,8 +113,8 @@ public void shouldCreateTasksForEachStream() {

assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();
}

@Test
Expand All @@ -135,14 +137,13 @@ public void shouldRemoveFinishedTasks() {
startTransformation.taskAborted(future, null, null, mock(Throwable.class));
assertThat(startTransformation.outstandingTasks.size(), is(1));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(publishedEventsRebuilderService).rebuild();

startTransformation.taskDone(future2, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService,times(2)).truncateLinkedEvents();
verify(linkedEventStreamTransformationService,times(2)).populateLinkedEvents();
verify(prePublishedQueueTruncatorService, times(2)).truncate();
verify(publishedEventsRebuilderService, times(2)).rebuild();
}

@Test
Expand Down Expand Up @@ -178,13 +179,12 @@ public void shouldRemoveFinishedTaskForAllPasses() {
startTransformation.taskAborted(future, null, null, mock(Throwable.class));

assertThat(startTransformation.outstandingTasks.size(), is(1));
verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(publishedEventsRebuilderService).rebuild();

startTransformation.taskDone(future2, null, null, null);

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();

}

Expand All @@ -194,17 +194,17 @@ public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsNull() throws
streamsProcessedCountStepInfo.set(startTransformation, null);
startTransformation.go();

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionWhenstreamsProcessedCountStepInfoIsZero() throws Exception {
streamsProcessedCountStepInfo.set(startTransformation, 0);
startTransformation.go();

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();
}

@Test
Expand Down Expand Up @@ -232,8 +232,8 @@ public void shouldNotThrowExceptionWhenstreamsProcessedCountStepInfoIsNotZero()
startTransformation.taskDone(future3, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();
}

@Test
Expand Down Expand Up @@ -267,7 +267,7 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN
startTransformation.taskDone(future3, null, null, null);
assertThat(startTransformation.outstandingTasks.size(), is(0));

verify(linkedEventStreamTransformationService).truncateLinkedEvents();
verify(linkedEventStreamTransformationService).populateLinkedEvents();
verify(prePublishedQueueTruncatorService).truncate();
verify(publishedEventsRebuilderService).rebuild();
}
}
18 changes: 10 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<groupId>uk.gov.justice</groupId>
<artifactId>maven-framework-parent-pom</artifactId>
<version>1.7.0</version>
<version>1.13.0</version>
</parent>

<artifactId>stream-transformation-tool</artifactId>
<packaging>pom</packaging>
<version>5.3.2-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>

<scm>
<connection>${cpp.scm.connection}</connection>
Expand All @@ -22,15 +22,17 @@

<properties>
<cpp.repo.name>stream-transformation-tool</cpp.repo.name>
<framework-api.version>3.1.1</framework-api.version>
<framework.version>5.1.2</framework.version>
<event-store.version>1.1.11</event-store.version>
<common-bom.version>1.28.0</common-bom.version>
<utilities.version>1.20.0</utilities.version>
<test-utils.version>1.18.1</test-utils.version>
<framework-api.version>4.0.1</framework-api.version>
<framework.version>6.0.12</framework.version>
<event-store.version>2.0.14</event-store.version>
<common-bom.version>2.4.0</common-bom.version>
<utilities.version>1.20.2</utilities.version>
<test-utils.version>1.24.3</test-utils.version>
<wildfly.swarm.version>2017.11.0</wildfly.swarm.version>
<version.swarm.fraction-plugin>77</version.swarm.fraction-plugin>
<junit-dataprovider.version>1.13.1</junit-dataprovider.version>
<slf4j-jboss-logmanager.version>1.0.4.GA</slf4j-jboss-logmanager.version>
<jboss-logmanager.version>2.1.2.Final</jboss-logmanager.version>

</properties>

Expand Down
2 changes: 1 addition & 1 deletion stream-transformation-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>stream-transformation-tool</artifactId>
<groupId>uk.gov.justice</groupId>
<version>5.3.2-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>
</parent>

<artifactId>stream-transformation-test</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion stream-transformation-test/sample-transformations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>stream-transformation-test</artifactId>
<groupId>uk.gov.justice</groupId>
<version>5.3.2-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>
</parent>

<artifactId>stream-transformations</artifactId>
Expand Down
15 changes: 12 additions & 3 deletions stream-transformation-test/stream-transformation-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>stream-transformation-test</artifactId>
<groupId>uk.gov.justice</groupId>
<version>5.3.2-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>
</parent>

<artifactId>stream-transformation-it</artifactId>
Expand All @@ -25,7 +25,7 @@
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>linked-event-processor</artifactId>
<artifactId>published-event-processor</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -76,7 +76,16 @@
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>test-utils-event</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
<artifactId>test-utils-core</artifactId>
<version>${test-utils.version}</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;
import uk.gov.justice.services.test.utils.events.EventStoreDataAccess;

import java.sql.SQLException;
import java.time.ZonedDateTime;
Expand All @@ -18,14 +19,14 @@

public class DatabaseUtils {

private final TestEventLogJdbcRepository eventLogJdbcRepository;
private final EventStoreDataAccess eventStoreDataAccess;
private final TestEventStreamJdbcRepository eventStreamJdbcRepository;
private final LiquibaseUtil liquibaseUtil = new LiquibaseUtil();
private final DataSource dataSource;

public DatabaseUtils() throws SQLException, LiquibaseException {
dataSource = liquibaseUtil.initEventStoreDb();
eventLogJdbcRepository = new TestEventLogJdbcRepository(dataSource);
eventStoreDataAccess = new EventStoreDataAccess(dataSource);
eventStreamJdbcRepository = new TestEventStreamJdbcRepository(dataSource);
}

Expand Down Expand Up @@ -56,12 +57,12 @@ public void insertEventLogData(final String eventName, final UUID streamId, fina
public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final Optional<Long> eventNumber, boolean streamStatus) throws InvalidPositionException {
final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt, eventNumber);

eventLogJdbcRepository.insert(event);
eventStoreDataAccess.insertIntoEventLog(event);
eventStreamJdbcRepository.insert(streamId, streamStatus);
}

public TestEventLogJdbcRepository getEventLogJdbcRepository() {
return eventLogJdbcRepository;
public EventStoreDataAccess getEventStoreDataAccess() {
return eventStoreDataAccess;
}

public TestEventStreamJdbcRepository getEventStreamJdbcRepository() {
Expand Down
Loading

0 comments on commit 8e7d354

Please sign in to comment.