Skip to content

Commit

Permalink
Support framework version 4.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Mar 12, 2018
1 parent 190ec19 commit 69426c8
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package uk.gov.justice.framework.tools.replay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.StringUtils.substringBefore;

import uk.gov.justice.services.core.handler.exception.MissingHandlerException;
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus;
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.Metadata;

import javax.ejb.Stateless;
import javax.inject.Inject;
import java.util.UUID;
import java.util.stream.Stream;

import javax.ejb.Stateless;
import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Stateless
public class AsyncStreamDispatcher {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncStreamDispatcher.class);

@Inject
private TransactionalEnvelopeDispatcher envelopeDispatcher;

@Inject
private StreamStatusJdbcRepository streamStatusRepository;

Expand All @@ -32,7 +39,7 @@ public UUID dispatch(final Stream<JsonEnvelope> stream) {
}
try {
envelopeDispatcher.dispatch(envelope);
} catch(MissingHandlerException ex) {
} catch (MissingHandlerException ex) {
final Metadata metadata = envelope.metadata();
LOGGER.warn("Missing handler for stream Id: {}, event name: {}, version: {}", metadata.streamId().get(),
metadata.name(), metadata.version().get());
Expand All @@ -44,22 +51,30 @@ public UUID dispatch(final Stream<JsonEnvelope> stream) {
envelopes[0] = envelope;
});
final UUID streamId = streamIdOf(envelopes[0]);
streamStatusRepository.insert(new StreamStatus(streamId, versionOf(envelopes[0])));
streamStatusRepository.insert(new StreamStatus(streamId, versionOf(envelopes[0]), sourceOf(envelopes[0])));
LOGGER.info("Finished processing of stream: {}, elements processed: {}", streamId, noOfProcessedElements[0]);
return streamId;
}
}

private UUID streamIdOf(final JsonEnvelope envelope) {
return envelope.metadata().streamId()
return envelope
.metadata()
.streamId()
.orElseThrow(() -> new IllegalArgumentException(String.format("Stream id not found in the envelope: %s", envelope.toString())));
}

private Long versionOf(final JsonEnvelope envelope) {
return envelope.metadata().version()
return envelope
.metadata()
.version()
.orElseThrow(() -> new IllegalArgumentException(String.format("Version not found in the envelope: %s", envelope.toString())));
}

private String sourceOf(final JsonEnvelope envelope) {
return substringBefore(envelope.metadata().name(), ".");
}

private boolean shouldLogProgress(final int[] i) {
return i[0] % 100 == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus;
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.test.utils.core.matchers.JsonEnvelopeMatcher;
import uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder;

import java.util.List;
Expand All @@ -37,6 +36,7 @@ public class AsyncStreamDispatcherTest {

@Mock
private TransactionalEnvelopeDispatcher envelopeDispatcher;

@Mock
private StreamStatusJdbcRepository streamStatusRepository;

Expand All @@ -47,8 +47,16 @@ public class AsyncStreamDispatcherTest {
public void shouldDispatchEnvelopes() {

final UUID streamId = randomUUID();
final JsonEnvelope envelope1 = envelope().with(metadataWithDefaults().withStreamId(streamId).withVersion(1L)).build();
final JsonEnvelope envelope2 = envelope().with(metadataWithDefaults().withStreamId(streamId).withVersion(2L)).build();
final JsonEnvelope envelope1 = envelope().with(
metadataWithRandomUUID("source.event-occurred")
.withStreamId(streamId)
.withVersion(1L))
.build();
final JsonEnvelope envelope2 = envelope().with(
metadataWithRandomUUID("source.another-event-occurred")
.withStreamId(streamId)
.withVersion(2L))
.build();

doNothing().when(envelopeDispatcher).dispatch(envelope1);
doNothing().when(envelopeDispatcher).dispatch(envelope2);
Expand All @@ -68,26 +76,42 @@ public void shouldUpdateStreamBufferStatus() {

final UUID streamId = randomUUID();
final JsonEnvelope envelope1 = JsonEnvelopeBuilder.envelope().with(
metadataWithDefaults().withStreamId(streamId).withVersion(4L))
metadataWithRandomUUID("source.event-occurred")
.withStreamId(streamId)
.withVersion(4L))
.build();
final JsonEnvelope envelope2 = JsonEnvelopeBuilder.envelope().with(
metadataWithDefaults().withStreamId(streamId).withVersion(5L))
metadataWithRandomUUID("source.another-event-occurred")
.withStreamId(streamId)
.withVersion(5L))
.build();

asyncStreamDispatcher.dispatch(Stream.of(envelope1, envelope2));

verify(streamStatusRepository).insert(new StreamStatus(streamId, 5L));
verify(streamStatusRepository).insert(new StreamStatus(streamId, 5L, "source"));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionIfNoStreamIdInTheEnvelope() {
final Stream<JsonEnvelope> stream = Stream.of(envelope().with(metadataWithRandomUUID("dummyName").withVersion(1L)).build());

final Stream<JsonEnvelope> stream = Stream.of(
envelope()
.with(metadataWithRandomUUID("dummyName")
.withVersion(1L))
.build());

asyncStreamDispatcher.dispatch(stream);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionIfNoVersionInTheEnvelope() {
final Stream<JsonEnvelope> stream = Stream.of(envelope().with(metadataWithDefaults().withStreamId(randomUUID())).build());

final Stream<JsonEnvelope> stream = Stream.of(
envelope()
.with(metadataWithDefaults()
.withStreamId(randomUUID()))
.build());

asyncStreamDispatcher.dispatch(stream);
}

Expand All @@ -96,30 +120,31 @@ public void shouldProcessStreamWhenThereIsNoHandlerDefined() {

final UUID streamId = randomUUID();

final JsonEnvelopeMatcher envelopeMatcherForEventWithoutHandler = jsonEnvelope()
.withMetadataOf(metadata().withName("event-without-handler"));

doThrow(new MissingHandlerException("Handler for event-without-handler not found"))
.when(envelopeDispatcher).dispatch(argThat(envelopeMatcherForEventWithoutHandler));


final JsonEnvelope envelope1 = envelope().with(metadataWithRandomUUID("event-with-handler")
.withStreamId(streamId)
.withVersion(1L))
.when(envelopeDispatcher).dispatch(
argThat(jsonEnvelope()
.withMetadataOf(metadata().withName("source.event-without-handler"))));

final JsonEnvelope envelope1 = envelope()
.with(metadataWithRandomUUID("source.event-with-handler")
.withStreamId(streamId)
.withVersion(1L))
.build();
final JsonEnvelope envelope2 = envelope()
.with(metadataWithRandomUUID("source.event-without-handler")
.withStreamId(streamId)
.withVersion(2L)).build();
final JsonEnvelope envelope3 = envelope()
.with(metadataWithRandomUUID("source.event-with-handler")
.withStreamId(streamId)
.withVersion(3L))
.build();
final JsonEnvelope envelope2 = envelope().with(metadataWithRandomUUID("event-without-handler")
.withStreamId(streamId)
.withVersion(2L)).build();
final JsonEnvelope envelope3 = envelope().with(metadataWithRandomUUID("event-with-handler").withStreamId(streamId).withVersion(3L)).build();

asyncStreamDispatcher.dispatch(Stream.of(envelope1, envelope2, envelope3));

ArgumentCaptor<JsonEnvelope> dispatchCaptor = ArgumentCaptor.forClass(JsonEnvelope.class);
final ArgumentCaptor<JsonEnvelope> dispatchCaptor = ArgumentCaptor.forClass(JsonEnvelope.class);

verify(envelopeDispatcher, times(3)).dispatch(dispatchCaptor.capture());
verify(streamStatusRepository).insert(new StreamStatus(streamId, 3L));

verify(streamStatusRepository).insert(new StreamStatus(streamId, 3L, "source"));
}


}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<properties>
<cpp.repo.name>framework-tools</cpp.repo.name>
<common-bom.version>1.22.0</common-bom.version>
<framework-api.version>1.1.0</framework-api.version>
<framework.version>3.1.0</framework.version>
<framework-api.version>2.0.1</framework-api.version>
<framework.version>4.0.0</framework.version>
<wildfly.swarm.version>2017.11.0</wildfly.swarm.version>
<test.utils.version>1.16.0</test.utils.version>
<schema-service.version>1.1.0</schema-service.version>
Expand Down

0 comments on commit 69426c8

Please sign in to comment.