Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinYSpasov committed Oct 23, 2018
1 parent 8844d31 commit c7e9027
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package uk.gov.justice.tools.eventsourcing.transformation;

import static java.util.Optional.empty;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.tools.eventsourcing.transformation.api.EventTransformation;
Expand All @@ -18,37 +16,24 @@
public class EventTransformationStreamIdFilter {

public Optional<UUID> getEventTransformationStreamId(final Set<EventTransformation> transformations,
final List<JsonEnvelope> transformedEventStream) {
final List<JsonEnvelope> jsonEnvelopeList) {

final Set<EventTransformation> eventTransformationList = transformedEventStream
return jsonEnvelopeList
.stream()
.map(event -> transformerFor(event, transformations))
.map(event -> streamIdFor(event, transformations))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toSet());

return transformedEventStream.stream()
.map(e -> filterStreamId(e, eventTransformationList))
.filter(Optional::isPresent)
.findFirst()
.findFirst()
.orElse(empty());

}

private Optional<UUID> filterStreamId(final JsonEnvelope event, final Set<EventTransformation> transformations) {
private Optional<UUID> streamIdFor(final JsonEnvelope event,
final Set<EventTransformation> transformations) {
final Optional<EventTransformation> eventTransformation = transformations
.stream()
.filter(transformation -> transformation.actionFor(event).isTransform())
.filter(transformation -> transformation.setStreamId(event).isPresent())
.findFirst();

return eventTransformation.isPresent() ? eventTransformation.get().setStreamId(event) : empty();
}

private Optional<EventTransformation> transformerFor(final JsonEnvelope event,
final Set<EventTransformation> transformations) {
return transformations
.stream()
.filter(transformation -> transformation.actionFor(event).isTransform())
.findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ public UUID transformEventStream(final UUID originalStreamId, final int pass) {
if (action.isDeactivate()) {
streamRepository.deactivateStream(originalStreamId);
}
return originalStreamId;

}catch (final Exception e){
logger.error(format("Unknown error while moving events on stream %s", originalStreamId), e);
}
return originalStreamId;
}

@SuppressWarnings({"squid:S2629"})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.tools.eventsourcing.transformation.service;

import static com.google.common.collect.Sets.newHashSet;
import static java.lang.String.format;
import static java.util.Optional.empty;
import static java.util.UUID.randomUUID;
import static javax.json.Json.createObjectBuilder;
Expand Down Expand Up @@ -302,6 +303,18 @@ public void shouldLogWhenFailedToBackupStream() throws EventStreamException {
verifyNoMoreInteractions(streamTransformer);
}


@Test
public void shouldLogEventStreamException() throws Exception {
try {
doThrow(Exception.class).when(eventTransformationRegistry).getEventTransformationBy(1);
eventStreamTransformationService.transformEventStream(STREAM_ID, 1);
} catch (final Exception expected) {
verify(logger).error(format(any(String.class)), expected);
}
}


private JsonEnvelope buildEnvelope(final String eventName) {
return envelopeFrom(
metadataBuilder().withId(randomUUID()).withStreamId(STREAM_ID).withName(eventName),
Expand Down

0 comments on commit c7e9027

Please sign in to comment.