Skip to content

Commit

Permalink
SCTP-341: initial fix to retry stream transformation on exception bei…
Browse files Browse the repository at this point in the history
…ng encountered. May need to introduce a graceful delay mechanism of sorts before retrying. Tests are pending
  • Loading branch information
Mahesh Subramanian committed Jul 3, 2019
1 parent 1fe0610 commit a2db140
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import uk.gov.justice.services.eventsourcing.source.core.EventSource;
import uk.gov.justice.services.eventsourcing.source.core.EventSourceTransformation;
import uk.gov.justice.services.eventsourcing.source.core.EventStream;
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.StreamAppender;
import uk.gov.justice.tools.eventsourcing.transformation.StreamTransformerUtil;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -42,14 +45,27 @@ public void transformStream(final UUID streamId, final Set<EventTransformation>
try {
final EventStream stream = eventSource.getStreamById(streamId);
final Stream<JsonEnvelope> events = stream.read();
final List<JsonEnvelope> originalEvents = events.collect(Collectors.toList());

eventSourceTransformation.clearStream(streamId);
boolean noOptimisticLockException = true;
logger.info("**************** before while loop for stream with id {}", streamId);
while (noOptimisticLockException) {

logger.debug("Transforming events on stream {}", streamId);
logger.debug("Transforming events on stream {}", streamId);

final Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(events, transformations);
final Stream<JsonEnvelope> transformedEventStream = streamTransformerUtil.transform(originalEvents.stream(), transformations);

streamAppender.appendEventsToStream(streamId, transformedEventStream);
try {
logger.info("**************** inside while loop for stream with id {}", streamId);
streamAppender.appendEventsToStream(streamId, transformedEventStream);
noOptimisticLockException = false;
} catch (EventStreamException e) {
logger.error(format("Encountered exception. Retrying for stream with ID: %s", streamId), e);
}

}
logger.info("**************** after while loop for stream with id {}", streamId);

events.close();

Expand Down

0 comments on commit a2db140

Please sign in to comment.