Skip to content

Commit

Permalink
Merge 3e735c4 into 374609c
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahesh Subramanian committed Oct 9, 2019
2 parents 374609c + 3e735c4 commit 0c19467
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Fixed
- Running out of database connections due to streams not getting closed correctly
- Database state not rolling back due to exceptions getting suppressed and not propagating correctly

## [6.1.0] - 2019-10-07
### Fixed
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
<properties>
<cpp.repo.name>stream-transformation-tool</cpp.repo.name>
<framework-api.version>4.1.0</framework-api.version>
<framework.version>6.0.16</framework.version>
<event-store.version>2.0.22</event-store.version>
<framework.version>6.1.1</framework.version>
<event-store.version>2.1.2</event-store.version>
<common-bom.version>2.4.0</common-bom.version>
<utilities.version>1.20.2</utilities.version>
<test-utils.version>1.24.3</test-utils.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package uk.gov.justice.framework.tools.transformation;

import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static javax.json.Json.createReader;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

import java.io.StringReader;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;

import javax.json.JsonObject;

import org.junit.Before;
import org.junit.Test;


public class StreamTransformationRollbackIT {

private static final long STREAM_COUNT_REPORTING_INTERVAL = 10L;
private static final String MEMORY_OPTIONS_PARAMETER = "2048M";
private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false;
private static final Boolean PROCESS_ALL_STREAMS = true;
private static final int WILDFLY_TIMEOUT_IN_SECONDS = 60;

private static final String EVENT_TO_ANONYMISE = "sample.transformation.anonymise";

private SwarmStarterUtil swarmStarterUtil;

private DatabaseUtils databaseUtils;

@Before
public void setUp() throws Exception {
swarmStarterUtil = new SwarmStarterUtil();
databaseUtils = new DatabaseUtils();
databaseUtils.dropAndUpdateLiquibase();
}

@Test
public void shouldNotAnonymiseActiveStreamEventDataAndSuccessfullyRollback() throws Exception {

final UUID activeStreamId = randomUUID();

final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 3L, createdAt); // deliberately inserting the 2nd event for stream 1 with 3rd position reference

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventStoreDataAccess().findAllEvents().stream().filter(e -> e.getStreamId().equals(activeStreamId)).collect(toList());

//this should throw an exception when processing stream as the total number of records and max value of sequence Id do not match. Hence, no transformation should have taken plave
assertThat(events, hasSize(2));

final Event event1 = retrieveEvent(activeStreamId, 1l);
assertNotNull(event1);
JsonObject payload1 = createReader(new StringReader(event1.getPayload())).readObject();
assertTrue(payload1.getString("a string").equalsIgnoreCase("test"));

final Event event2 = retrieveEvent(activeStreamId, 3l);
assertNotNull(event2);
JsonObject payload2 = createReader(new StringReader(event2.getPayload())).readObject();
assertTrue(payload2.getString("a string").equalsIgnoreCase("test"));

}

private Event retrieveEvent(final UUID streamId, final long positionInStream) {
final List<Event> eventLogs = databaseUtils.getEventStoreDataAccess().findAllEvents();
final Optional<Event> event = eventLogs.stream()
.filter(item -> item.getPositionInStream().equals(positionInStream))
.filter(item -> item.getStreamId().equals(streamId))
.findFirst();

return event.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry;
Expand Down Expand Up @@ -61,10 +63,10 @@ public class EventStreamTransformationService {
private RetryStreamOperator retryStreamOperator;


@Transactional(REQUIRES_NEW)
public UUID transformEventStream(final UUID originalStreamId, final int pass) {
try {
@Transactional(value = REQUIRES_NEW, rollbackOn = {EventStreamException.class, JdbcRepositoryException.class})
public UUID transformEventStream(final UUID originalStreamId, final int pass) throws EventStreamException {

try {
final List<JsonEnvelope> jsonEnvelopeList = eventStreamReader.getStreamBy(originalStreamId);

final Set<EventTransformation> eventTransformations = getEventTransformations(pass);
Expand All @@ -86,10 +88,11 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
if (action.isDeactivate()) {
streamRepository.deactivateStream(originalStreamId);
}

} catch (final Exception e) {
logger.error(format("Unknown error while moving events on stream %s", originalStreamId), e);
} catch (final EventStreamException | JdbcRepositoryException e) {
logger.error(format("Unknown error while transforming events on stream %s", originalStreamId), e);
throw e;
}

return originalStreamId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
Expand Down Expand Up @@ -38,23 +37,19 @@ public class StreamMover {

public void transformAndMoveStream(final UUID originalStreamId,
final Set<EventTransformation> transformations,
final UUID newStreamId) {
try {
final List<JsonEnvelope> jsonEnvelopeList = eventStreamReader.getStreamBy(originalStreamId);

eventSourceTransformation.clearStream(originalStreamId);
final UUID newStreamId) throws EventStreamException {
final List<JsonEnvelope> jsonEnvelopeList = eventStreamReader.getStreamBy(originalStreamId);

try (final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations)) {
streamAppender.appendEventsToStream(newStreamId, filteredMoveEventStream);
}
eventSourceTransformation.clearStream(originalStreamId);

try (final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations)) {
streamAppender.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
}
try (final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations)) {
streamAppender.appendEventsToStream(newStreamId, filteredMoveEventStream);
}

} catch (final Exception e) {
logger.error(format("Unknown error while moving events on stream %s", originalStreamId), e);
try (final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations)) {
streamAppender.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
}

}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.source.core.EventSource;
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;
Expand Down Expand Up @@ -39,10 +38,10 @@ public class StreamTransformer {
private StreamTransformerUtil streamTransformerUtil;

@SuppressWarnings({"squid:S2629"})
public void transformStream(final UUID streamId, final Set<EventTransformation> transformations) {
try {
final EventStream stream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = stream.read();
public void transformStream(final UUID streamId, final Set<EventTransformation> transformations) throws EventStreamException {
final EventStream stream = eventSource.getStreamById(streamId);

try (final Stream<JsonEnvelope> events = stream.read()) {
final List<JsonEnvelope> originalEvents = events.collect(Collectors.toList());

eventSourceTransformation.clearStream(streamId);
Expand All @@ -52,14 +51,6 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>
try (Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(originalEvents.stream(), transformations)) {
streamAppender.appendEventsToStream(streamId, transformedEventStream);
}

events.close();

} catch (final Exception e) {
logger.error(format("Unknown error while transforming events on stream %s", streamId), e);
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,11 @@ public void shouldMoveStream() throws EventStreamException {
verify(streamAppender, times(2)).appendEventsToStream(streamIdArgumentCaptor.capture(), streamArgumentCaptor.capture());
}

@Test
public void shouldLogEventStreamException() throws Exception {
@Test(expected = EventStreamException.class)
public void shouldNotSuppressEventStreamException() throws Exception {
final Set<EventTransformation> transformations = newHashSet(eventTransformation);
doThrow(Exception.class).when(eventSourceTransformation).clearStream(any());
doThrow(EventStreamException.class).when(eventSourceTransformation).clearStream(any());
streamMover.transformAndMoveStream(STREAM_ID, transformations, randomUUID());

verify(logger).error(anyString(), any(Exception.class));
}

private JsonEnvelope buildEnvelope(final String eventName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import static java.util.Optional.empty;
import static java.util.UUID.randomUUID;
import static javax.json.Json.createObjectBuilder;
import static org.junit.Assert.fail;
import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
Expand All @@ -24,7 +26,6 @@

import uk.gov.justice.services.core.enveloper.Enveloper;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.source.core.EventSource;
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
Expand Down Expand Up @@ -155,7 +156,7 @@ public void shouldTransformStreamOfSingleEvent() throws EventStreamException {
}

@Test
public void shouldTransformStreamOfSingleEventWithStreamMover() {
public void shouldTransformStreamOfSingleEventWithStreamMover() throws Exception {
final JsonEnvelope event = buildEnvelope(SOURCE_EVENT_NAME);
final Set<EventTransformation> transformations = newHashSet(eventTransformation);

Expand Down Expand Up @@ -186,7 +187,7 @@ public void shouldTransformStreamOfSingleEventWithStreamMover() {


@Test
public void shouldDeactivateStreamOfSingleEvent() {
public void shouldDeactivateStreamOfSingleEvent() throws Exception {
final JsonEnvelope event = buildEnvelope(SOURCE_EVENT_NAME);
when(eventStream.read()).thenReturn(Stream.of(event));
final Set<EventTransformation> eventTransformations = newHashSet(eventTransformation);
Expand All @@ -202,7 +203,7 @@ public void shouldDeactivateStreamOfSingleEvent() {
}

@Test
public void shouldTransformAllEventsOnStream() {
public void shouldTransformAllEventsOnStream() throws Exception {
final JsonEnvelope event = buildEnvelope(SOURCE_EVENT_NAME);
final JsonEnvelope event2 = buildEnvelope(OTHER_EVENT_NAME);
when(eventStream.read()).thenReturn(Stream.of(event, event2));
Expand All @@ -227,7 +228,7 @@ public void shouldTransformAllEventsOnStream() {
}

@Test
public void shouldNotPerformAnyActionOnTheStreamIfNotIndicated() {
public void shouldNotPerformAnyActionOnTheStreamIfNotIndicated() throws Exception {
final JsonEnvelope event = buildEnvelope(SOURCE_EVENT_NAME);

when(eventStream.read()).thenReturn(Stream.of(event));
Expand All @@ -241,7 +242,7 @@ public void shouldNotPerformAnyActionOnTheStreamIfNotIndicated() {
}

@Test
public void shouldNotPerformAnyActionIfMultipleActionsAreDefinedOnAStream() {
public void shouldNotPerformAnyActionIfMultipleActionsAreDefinedOnAStream() throws Exception {
final JsonEnvelope event = buildEnvelope(SOURCE_EVENT_NAME);
final JsonEnvelope event2 = buildEnvelope(OTHER_EVENT_NAME);

Expand Down Expand Up @@ -300,26 +301,30 @@ public void shouldLogWhenFailedToBackupStream() throws EventStreamException {

doThrow(EventStreamException.class).when(retryStreamOperator).cloneWithRetry(any());

eventStreamTransformationService.transformEventStream(STREAM_ID, 1);
try {
eventStreamTransformationService.transformEventStream(STREAM_ID, 1);
fail();
} catch (EventStreamException e) {

final InOrder inOrder = inOrder(transformationChecker, retryStreamOperator, streamTransformer);
final InOrder inOrder = inOrder(transformationChecker, retryStreamOperator, streamTransformer);

inOrder.verify(transformationChecker).requiresTransformation(listArgumentCaptor.capture(),
uuidCaptor.capture(), intArgumentCaptor.capture());
inOrder.verify(transformationChecker).requiresTransformation(listArgumentCaptor.capture(),
uuidCaptor.capture(), intArgumentCaptor.capture());

inOrder.verify(streamTransformer, never()).transformStream(STREAM_ID, newHashSet(eventTransformation));
inOrder.verify(streamTransformer, never()).transformStream(STREAM_ID, newHashSet(eventTransformation));

verifyNoMoreInteractions(streamTransformer);
verifyNoMoreInteractions(streamTransformer);
}
}


@Test
public void shouldLogEventStreamException() throws Exception {
try {
doThrow(Exception.class).when(eventTransformationRegistry).getEventTransformationBy(1);
doThrow(EventStreamException.class).when(eventTransformationRegistry).getEventTransformationBy(1);
eventStreamTransformationService.transformEventStream(STREAM_ID, 1);
} catch (final Exception expected) {
verify(logger).error(format(any(String.class)), expected);
} catch (final EventStreamException expected) {
verify(logger).error(any(String.class), eq(expected));
}
}

Expand Down
Loading

0 comments on commit 0c19467

Please sign in to comment.