Skip to content

Commit

Permalink
SCTP-341: Addressing retry mechanism when performing operations on a …
Browse files Browse the repository at this point in the history
…stream (append or clone)
  • Loading branch information
Mahesh Subramanian committed Jul 16, 2019
1 parent 1fe0610 commit 1892fcb
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 74 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Retry mechanism when performing stream operations such as append / move or clone

## [5.3.0] - 2019-06-27
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void shouldTransformAndMoveEventInEventStoreAndPreserveEventSequenceInThe
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence1"));
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getEventNumber().get(), is(8L));
assertThat(linkedEvent.getEventNumber().get(), is(9L));
assertThat(linkedEvent.getPreviousEventNumber(), is(0L));
}

Expand All @@ -274,7 +274,7 @@ public void shouldTransformAndMoveEventInEventStoreAndPreserveEventSequenceInThe
assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence2"));
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getEventNumber().get(), is(10L));
assertThat(linkedEvent.getPreviousEventNumber(), is(8L));
assertThat(linkedEvent.getPreviousEventNumber(), is(9L));
}
if (linkedEvent.getSequenceId() == 2L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationStreamIdFilter;
import uk.gov.justice.tools.eventsourcing.transformation.StreamMover;
import uk.gov.justice.tools.eventsourcing.transformation.TransformationChecker;
import uk.gov.justice.tools.eventsourcing.transformation.api.Action;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;
Expand Down Expand Up @@ -59,6 +57,9 @@ public class EventStreamTransformationService {
@Inject
private EventStreamReader eventStreamReader;

@Inject
private RetryStreamOperator retryStreamOperator;


@Transactional(REQUIRES_NEW)
public UUID transformEventStream(final UUID originalStreamId, final int pass) {
Expand All @@ -70,7 +71,7 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
final Action action = transformationChecker.requiresTransformation(jsonEnvelopeList, originalStreamId, pass);

if (action.isKeepBackup()) {
cloneStream(originalStreamId);
retryStreamOperator.cloneWithRetry(originalStreamId);
}

if (action.isTransform()) {
Expand All @@ -92,16 +93,6 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
return originalStreamId;
}

@SuppressWarnings({"squid:S2629"})
private void cloneStream(final UUID originalStreamId) {
try {
final UUID clonedStreamId = eventSourceTransformation.cloneStream(originalStreamId);
logger.debug(format("Created backup stream '%s' from stream '%s'", clonedStreamId, originalStreamId));
} catch (final EventStreamException e) {
logger.error(format("Failed to backup stream %s", originalStreamId), e);
}
}

private boolean isNewStreamId(final Optional<UUID> newStreamId,
final UUID originalStreamId) {
return newStreamId.isPresent() && !newStreamId.get().equals(originalStreamId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.common.configuration.Value;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.OptimisticLockingRetryException;
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.slf4j.Logger;

@ApplicationScoped
public class RetryStreamOperator {

@Inject
private Logger logger;

@Inject
@Value(key = "maxRetryStreamOperation", defaultValue = "10")
private long maxRetry;

@Inject
private EventSourceTransformation eventSourceTransformation;

public void appendWithRetry(final UUID streamId, final EventStream eventStream, final Stream<JsonEnvelope> events) throws EventStreamException {
executeStreamOperationWithRetry(streamId, () -> {
eventStream.append(events);
logger.info("Appended events to stream with ID - '{}'", streamId);
});

}

public void cloneWithRetry(final UUID streamId) throws EventStreamException {
executeStreamOperationWithRetry(streamId, () -> {
final UUID clonedStreamId = eventSourceTransformation.cloneStream(streamId);
logger.info("Created backup stream '{}' from stream '{}'", clonedStreamId, streamId);
});
}

private void executeStreamOperationWithRetry(final UUID streamId, final StreamOperationRetryable streamOperationRetryable) throws EventStreamException {
boolean operationCompletedSuccesfully = false;
long retryCount = 0L;
while (!operationCompletedSuccesfully) {
try {
streamOperationRetryable.execute();
operationCompletedSuccesfully = true;
} catch (OptimisticLockingRetryException e) {
retryCount++;
if (retryCount >= maxRetry) {
logger.error("Failed to complete operation on stream '{}' due to concurrency issues. Exhausted all retries.", streamId);
throw e;
}
sleep();
logger.warn(format("Encountered exception whilst completing operation on stream during attempt %s for stream with ID: %s", retryCount, streamId), e);
}
}

}

private void sleep() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
logger.error("Thread sleep interrupted", e);
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package uk.gov.justice.tools.eventsourcing.transformation;
package uk.gov.justice.tools.eventsourcing.transformation.service;

import uk.gov.justice.services.eventsourcing.source.core.EventSource;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EnvelopeFixer;

import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -20,18 +21,16 @@ public class StreamAppender {
@Inject
private EnvelopeFixer envelopeFixer;

@Inject
private RetryStreamOperator retryStreamOperator;

public void appendEventsToStream(
final UUID streamId,
final Stream<JsonEnvelope> jsonEnvelopeStream) throws EventStreamException {

try {
final EventStream eventStream = eventSource
.getStreamById(streamId);
eventStream
.append(jsonEnvelopeStream.map(envelopeFixer::clearPositionAndGiveNewId));
} catch (final Exception e) {
jsonEnvelopeStream.close();
throw new EventStreamException("Failed to append events to stream", e);
}
final EventStream eventStream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = jsonEnvelopeStream.map(envelopeFixer::clearPositionAndGiveNewId);
retryStreamOperator.appendWithRetry(streamId, eventStream, events);
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package uk.gov.justice.tools.eventsourcing.transformation;
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;

import java.util.List;
Expand All @@ -26,7 +28,7 @@ public class StreamMover {
private EventSourceTransformation eventSourceTransformation;

@Inject
private StreamAppender streamRepository;
private StreamAppender streamAppender;

@Inject
private StreamTransformerUtil streamTransformerUtil;
Expand All @@ -42,11 +44,13 @@ public void transformAndMoveStream(final UUID originalStreamId,

eventSourceTransformation.clearStream(originalStreamId);

final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations);
final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations);
try (final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations)) {
streamAppender.appendEventsToStream(newStreamId, filteredMoveEventStream);
}

streamRepository.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
streamRepository.appendEventsToStream(newStreamId, filteredMoveEventStream);
try (final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations)) {
streamAppender.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
}

} catch (final Exception e) {
logger.error(format("Unknown error while moving events on stream %s", originalStreamId), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;

@FunctionalInterface
public interface StreamOperationRetryable {

void execute() throws EventStreamException;
}


Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.StreamAppender;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -42,14 +43,15 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>
try {
final EventStream stream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = stream.read();
final List<JsonEnvelope> originalEvents = events.collect(Collectors.toList());

eventSourceTransformation.clearStream(streamId);

logger.debug("Transforming events on stream {}", streamId);

final Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(events, transformations);

streamAppender.appendEventsToStream(streamId, transformedEventStream);
try (Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(originalEvents.stream(), transformations)) {
streamAppender.appendEventsToStream(streamId, transformedEventStream);
}

events.close();

Expand All @@ -59,4 +61,5 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.of;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -16,6 +17,8 @@
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.service.RetryStreamOperator;
import uk.gov.justice.tools.eventsourcing.transformation.service.StreamAppender;

import java.util.List;
import java.util.UUID;
Expand All @@ -38,6 +41,9 @@ public class StreamAppenderTest {
@Mock
private EnvelopeFixer envelopeFixer;

@Mock
private RetryStreamOperator retryStreamOperator;

@InjectMocks
private StreamAppender streamAppender;

Expand All @@ -61,9 +67,9 @@ public void shouldAppendEvents() throws Exception {
when(envelopeFixer.clearPositionAndGiveNewId(event_1)).thenReturn(clearedEvent_1);
when(envelopeFixer.clearPositionAndGiveNewId(event_2)).thenReturn(clearedEvent_2);

streamAppender.appendEventsToStream(streamId, Stream.of(event_1, event_2));
streamAppender.appendEventsToStream(streamId, of(event_1, event_2));

verify(eventStream).append(envelopeStreamCaptor.capture());
verify(retryStreamOperator).appendWithRetry(eq(streamId), eq(eventStream), envelopeStreamCaptor.capture());

final List<JsonEnvelope> appendedEvents = envelopeStreamCaptor.getValue().collect(toList());

Expand All @@ -83,39 +89,17 @@ public void shouldAlwaysCloseTheStreamOnException() throws Exception {
final JsonEnvelope event_2 = mock(JsonEnvelope.class);

final EventStream eventStream = mock(EventStream.class);
final Stream<JsonEnvelope> jsonEnvelopeStream = Stream.of(event_1, event_2);
final Stream<JsonEnvelope> jsonEnvelopeStream = of(event_1, event_2);

when(eventSource.getStreamById(streamId)).thenReturn(eventStream);
doThrow(eventStreamException).when(eventStream).append(any(Stream.class));


final StreamCloseVerifier streamCloseVerifier = new StreamCloseVerifier();
jsonEnvelopeStream.onClose(streamCloseVerifier);

assertThat(streamCloseVerifier.isClosed(), is(false));

doThrow(eventStreamException).when(retryStreamOperator).appendWithRetry(eq(streamId), eq(eventStream), envelopeStreamCaptor.capture());

try {
streamAppender.appendEventsToStream(streamId, jsonEnvelopeStream);
fail();
} catch (final EventStreamException expected) {
assertThat(streamCloseVerifier.isClosed(), is(true));
assertThat(expected.getCause(), is(eventStreamException));
assertThat(expected.getMessage(), is("Failed to append events to stream"));
}
}

private class StreamCloseVerifier implements Runnable {

private boolean closed = false;

@Override
public void run() {
closed = true;
}

public boolean isClosed() {
return closed;
assertThat(expected, is(eventStreamException));
assertThat(expected.getMessage(), is("oops"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;
import uk.gov.justice.tools.eventsourcing.transformation.service.StreamAppender;
import uk.gov.justice.tools.eventsourcing.transformation.service.StreamMover;

import java.util.List;
import java.util.Set;
Expand Down
Loading

0 comments on commit 1892fcb

Please sign in to comment.