Skip to content

Commit

Permalink
SCTP-341: Addressing retry mechanism when performing operations on a …
Browse files Browse the repository at this point in the history
…stream (append or clone) (#33)
  • Loading branch information
Mahesh Subramanian committed Jul 17, 2019
1 parent 1fe0610 commit 1fff5e4
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 76 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Retry mechanism when performing stream operations such as append / move or clone

## [5.3.0] - 2019-06-27
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<framework.version>5.1.2</framework.version>
<event-store.version>1.1.11</event-store.version>
<common-bom.version>1.28.0</common-bom.version>
<utilities.version>1.16.2</utilities.version>
<utilities.version>1.20.0</utilities.version>
<test-utils.version>1.18.1</test-utils.version>
<wildfly.swarm.version>2017.11.0</wildfly.swarm.version>
<version.swarm.fraction-plugin>77</version.swarm.fraction-plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void shouldTransformAndMoveEventInEventStoreAndPreserveEventSequenceInThe
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence1"));
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getEventNumber().get(), is(8L));
assertThat(linkedEvent.getEventNumber().get(), is(9L));
assertThat(linkedEvent.getPreviousEventNumber(), is(0L));
}

Expand All @@ -274,7 +274,7 @@ public void shouldTransformAndMoveEventInEventStoreAndPreserveEventSequenceInThe
assertThat(linkedEvent.getName(), is("sample.events.name.pass1.sequence2"));
assertThat(linkedEvent.getCreatedAt(), is(createdAt));
assertThat(linkedEvent.getEventNumber().get(), is(10L));
assertThat(linkedEvent.getPreviousEventNumber(), is(8L));
assertThat(linkedEvent.getPreviousEventNumber(), is(9L));
}
if (linkedEvent.getSequenceId() == 2L)

Expand Down
5 changes: 4 additions & 1 deletion stream-transformation-tool-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<artifactId>framework-api-event-source</artifactId>
<version>${framework-api.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
<artifactId>utilities-core</artifactId>
</dependency>

<!-- Test Dependencies -->
<dependency>
Expand All @@ -53,7 +57,6 @@
<artifactId>junit-dataprovider</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import static javax.transaction.Transactional.TxType.REQUIRES_NEW;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationRegistry;
import uk.gov.justice.tools.eventsourcing.transformation.EventTransformationStreamIdFilter;
import uk.gov.justice.tools.eventsourcing.transformation.StreamMover;
import uk.gov.justice.tools.eventsourcing.transformation.TransformationChecker;
import uk.gov.justice.tools.eventsourcing.transformation.api.Action;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;
Expand Down Expand Up @@ -59,6 +57,9 @@ public class EventStreamTransformationService {
@Inject
private EventStreamReader eventStreamReader;

@Inject
private RetryStreamOperator retryStreamOperator;


@Transactional(REQUIRES_NEW)
public UUID transformEventStream(final UUID originalStreamId, final int pass) {
Expand All @@ -70,7 +71,7 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
final Action action = transformationChecker.requiresTransformation(jsonEnvelopeList, originalStreamId, pass);

if (action.isKeepBackup()) {
cloneStream(originalStreamId);
retryStreamOperator.cloneWithRetry(originalStreamId);
}

if (action.isTransform()) {
Expand All @@ -92,16 +93,6 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
return originalStreamId;
}

@SuppressWarnings({"squid:S2629"})
private void cloneStream(final UUID originalStreamId) {
try {
final UUID clonedStreamId = eventSourceTransformation.cloneStream(originalStreamId);
logger.debug(format("Created backup stream '%s' from stream '%s'", clonedStreamId, originalStreamId));
} catch (final EventStreamException e) {
logger.error(format("Failed to backup stream %s", originalStreamId), e);
}
}

private boolean isNewStreamId(final Optional<UUID> newStreamId,
final UUID originalStreamId) {
return newStreamId.isPresent() && !newStreamId.get().equals(originalStreamId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.UUID;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.slf4j.Logger;

@ApplicationScoped
public class RetryStreamOperator {

@Inject
private Logger logger;

@Inject
private StreamOperationRetryableExecutor streamOperationRetryableExecutor;

@Inject
private EventSourceTransformation eventSourceTransformation;

public void appendWithRetry(final UUID streamId, final EventStream eventStream, final Stream<JsonEnvelope> events) throws EventStreamException {
streamOperationRetryableExecutor.execute(streamId, () -> {
eventStream.append(events);
logger.info("Appended events to stream with ID - '{}'", streamId);
});

}

public void cloneWithRetry(final UUID streamId) throws EventStreamException {
streamOperationRetryableExecutor.execute(streamId, () -> {
final UUID clonedStreamId = eventSourceTransformation.cloneStream(streamId);
logger.info("Created backup stream '{}' from stream '{}'", clonedStreamId, streamId);
});
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package uk.gov.justice.tools.eventsourcing.transformation;
package uk.gov.justice.tools.eventsourcing.transformation.service;

import uk.gov.justice.services.eventsourcing.source.core.EventSource;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EnvelopeFixer;

import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -20,18 +21,16 @@ public class StreamAppender {
@Inject
private EnvelopeFixer envelopeFixer;

@Inject
private RetryStreamOperator retryStreamOperator;

public void appendEventsToStream(
final UUID streamId,
final Stream<JsonEnvelope> jsonEnvelopeStream) throws EventStreamException {

try {
final EventStream eventStream = eventSource
.getStreamById(streamId);
eventStream
.append(jsonEnvelopeStream.map(envelopeFixer::clearPositionAndGiveNewId));
} catch (final Exception e) {
jsonEnvelopeStream.close();
throw new EventStreamException("Failed to append events to stream", e);
}
final EventStream eventStream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = jsonEnvelopeStream.map(envelopeFixer::clearPositionAndGiveNewId);
retryStreamOperator.appendWithRetry(streamId, eventStream, events);
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package uk.gov.justice.tools.eventsourcing.transformation;
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.EventStreamReader;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;

import java.util.List;
Expand All @@ -26,7 +28,7 @@ public class StreamMover {
private EventSourceTransformation eventSourceTransformation;

@Inject
private StreamAppender streamRepository;
private StreamAppender streamAppender;

@Inject
private StreamTransformerUtil streamTransformerUtil;
Expand All @@ -42,11 +44,13 @@ public void transformAndMoveStream(final UUID originalStreamId,

eventSourceTransformation.clearStream(originalStreamId);

final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations);
final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations);
try (final Stream<JsonEnvelope> filteredMoveEventStream = streamTransformerUtil.transformAndMove(jsonEnvelopeList.stream(), transformations)) {
streamAppender.appendEventsToStream(newStreamId, filteredMoveEventStream);
}

streamRepository.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
streamRepository.appendEventsToStream(newStreamId, filteredMoveEventStream);
try (final Stream<JsonEnvelope> unfilteredMoveEventStream = streamTransformerUtil.filterOriginalEvents(jsonEnvelopeList, transformations)) {
streamAppender.appendEventsToStream(originalStreamId, unfilteredMoveEventStream);
}

} catch (final Exception e) {
logger.error(format("Unknown error while moving events on stream %s", originalStreamId), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;

@FunctionalInterface
public interface StreamOperationRetryable {

void execute() throws EventStreamException;
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static java.lang.String.format;

import uk.gov.justice.services.common.configuration.Value;
import uk.gov.justice.services.common.util.Sleeper;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.OptimisticLockingRetryException;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;

import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.slf4j.Logger;

@ApplicationScoped
public class StreamOperationRetryableExecutor {

private static final int SLEEP_TIME_IN_MILLISECONDS = 2000;

@Inject
@Value(key = "maxRetryStreamOperation", defaultValue = "10")
private long maxRetry;

@Inject
private Logger logger;

@Inject
private Sleeper sleeper;

public void execute(final UUID streamId, final StreamOperationRetryable streamOperationRetryable) throws EventStreamException {
boolean operationCompletedSuccesfully = false;
long retryCount = 0L;
while (!operationCompletedSuccesfully) {
try {
streamOperationRetryable.execute();
operationCompletedSuccesfully = true;
} catch (OptimisticLockingRetryException e) {
retryCount++;
if (retryCount >= maxRetry) {
logger.error("Failed to complete operation on stream '{}' due to concurrency issues. Exhausted all retries.", streamId);
throw e;
}
sleeper.sleepFor(SLEEP_TIME_IN_MILLISECONDS);
logger.warn(format("Encountered exception whilst completing operation on stream during attempt %s for stream with ID: %s", retryCount, streamId), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.StreamAppender;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -42,14 +43,15 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>
try {
final EventStream stream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = stream.read();
final List<JsonEnvelope> originalEvents = events.collect(Collectors.toList());

eventSourceTransformation.clearStream(streamId);

logger.debug("Transforming events on stream {}", streamId);

final Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(events, transformations);

streamAppender.appendEventsToStream(streamId, transformedEventStream);
try (Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(originalEvents.stream(), transformations)) {
streamAppender.appendEventsToStream(streamId, transformedEventStream);
}

events.close();

Expand All @@ -59,4 +61,5 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>

}


}
Loading

0 comments on commit 1fff5e4

Please sign in to comment.