Skip to content

Commit

Permalink
Merge 4168a2b into 7d1136a
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Jun 6, 2018
2 parents 7d1136a + 4168a2b commit aa9993f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [4.3.3] - 2018-06-06
### Fixed
- Fix for replaying of events into custom event listeners

## [4.3.2] - 2018-06-04
### Fixed
- Fix of running out of database Connections during the replay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ private void replayAllEventsOf(final UUID streamId) {
}
}


@TransactionAttribute(REQUIRED)
private void replayBatchOfEvents(final UUID streamId, final long position) {
try (final Stream<JsonEnvelope> eventStream = jsonEnvelopeJdbcRepository.pageEventStream(streamId, position, PAGE_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
package uk.gov.justice.framework.tools.replay;

import static javax.transaction.Transactional.TxType.REQUIRES_NEW;
import static uk.gov.justice.services.core.annotation.Component.EVENT_LISTENER;
import static uk.gov.justice.services.core.annotation.ServiceComponentLocation.LOCAL;

import uk.gov.justice.services.core.dispatcher.Dispatcher;
import uk.gov.justice.services.core.dispatcher.DispatcherCache;
import uk.gov.justice.services.core.extension.ServiceComponentFoundEvent;
import uk.gov.justice.services.messaging.JsonEnvelope;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.transaction.Transactional;


@ApplicationScoped
public class TransactionalEnvelopeDispatcher {

@Inject
DispatcherCache dispatcherCache;

private Dispatcher dispatcher;

@PostConstruct
public void init() {
dispatcher = dispatcherCache.dispatcherFor(EVENT_LISTENER, LOCAL);
void register(@Observes final ServiceComponentFoundEvent event) {
if (null == dispatcher) {
dispatcher = dispatcherCache.dispatcherFor(event.getComponentName(), LOCAL);
}
}

@Transactional(REQUIRES_NEW)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.framework.tools.replay;

import static java.util.UUID.randomUUID;
import static java.util.stream.IntStream.rangeClosed;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doThrow;
Expand All @@ -13,6 +14,8 @@
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -77,6 +80,93 @@ public void shouldGetTheEventsOfAStreamDispatchThemAllThenUpdateTheStreamStatus(
inOrder.verify(progressLogger).logCompletion(streamId);
}

@Test
public void shouldDispatchOneFullPageOfEvents() throws Exception {
final UUID streamId = randomUUID();

final List<JsonEnvelope> pageOfEvents_1 = pageOfJsonEnvelopes(PAGE_SIZE);

final JsonEnvelope lastEnvelope = pageOfEvents_1.get(PAGE_SIZE - 1);
final StreamStatus streamStatus = mock(StreamStatus.class);

when(jsonEnvelopeJdbcRepository.getCurrentVersion(streamId)).thenReturn(1000L);
when(jsonEnvelopeJdbcRepository.pageEventStream(streamId, 1L, PAGE_SIZE)).thenReturn(pageOfEvents_1.stream());
when(jsonEnvelopeJdbcRepository.getLatestEvent(streamId)).thenReturn(lastEnvelope);
when(streamStatusFactory.create(lastEnvelope, streamId)).thenReturn(streamStatus);

assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId));

final InOrder inOrder = inOrder(envelopeDispatcher, streamStatusRepository);

for (JsonEnvelope jsonEnvelope : pageOfEvents_1) {
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope);
}

inOrder.verify(streamStatusRepository).insert(streamStatus);
}

@Test
public void shouldDispatchTwoFullPagesOfEvents() throws Exception {
final UUID streamId = randomUUID();

final List<JsonEnvelope> pageOfEvents_1 = pageOfJsonEnvelopes(PAGE_SIZE);
final List<JsonEnvelope> pageOfEvents_2 = pageOfJsonEnvelopes(PAGE_SIZE);

final JsonEnvelope lastEnvelope = pageOfEvents_2.get(PAGE_SIZE - 1);
final StreamStatus streamStatus = mock(StreamStatus.class);

when(jsonEnvelopeJdbcRepository.getCurrentVersion(streamId)).thenReturn(2000L);
when(jsonEnvelopeJdbcRepository.pageEventStream(streamId, 1L, PAGE_SIZE)).thenReturn(pageOfEvents_1.stream());
when(jsonEnvelopeJdbcRepository.pageEventStream(streamId, 1001L, PAGE_SIZE)).thenReturn(pageOfEvents_2.stream());
when(jsonEnvelopeJdbcRepository.getLatestEvent(streamId)).thenReturn(lastEnvelope);
when(streamStatusFactory.create(lastEnvelope, streamId)).thenReturn(streamStatus);

assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId));

final InOrder inOrder = inOrder(envelopeDispatcher, streamStatusRepository);

for (JsonEnvelope jsonEnvelope : pageOfEvents_1) {
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope);
}

for (JsonEnvelope jsonEnvelope : pageOfEvents_2) {
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope);
}

inOrder.verify(streamStatusRepository).insert(streamStatus);
}

@Test
public void shouldDispatchOneFullPageAndSecondPageWithSingleEvent() throws Exception {
final UUID streamId = randomUUID();

final List<JsonEnvelope> pageOfEvents_1 = pageOfJsonEnvelopes(PAGE_SIZE);
final List<JsonEnvelope> pageOfEvents_2 = pageOfJsonEnvelopes(1);

final JsonEnvelope lastEnvelope = pageOfEvents_2.get(0);
final StreamStatus streamStatus = mock(StreamStatus.class);

when(jsonEnvelopeJdbcRepository.getCurrentVersion(streamId)).thenReturn(1001L);
when(jsonEnvelopeJdbcRepository.pageEventStream(streamId, 1L, PAGE_SIZE)).thenReturn(pageOfEvents_1.stream());
when(jsonEnvelopeJdbcRepository.pageEventStream(streamId, 1001L, PAGE_SIZE)).thenReturn(pageOfEvents_2.stream());
when(jsonEnvelopeJdbcRepository.getLatestEvent(streamId)).thenReturn(lastEnvelope);
when(streamStatusFactory.create(lastEnvelope, streamId)).thenReturn(streamStatus);

assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId));

final InOrder inOrder = inOrder(envelopeDispatcher, streamStatusRepository);

for (JsonEnvelope jsonEnvelope : pageOfEvents_1) {
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope);
}

for (JsonEnvelope jsonEnvelope : pageOfEvents_2) {
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope);
}

inOrder.verify(streamStatusRepository).insert(streamStatus);
}

@Test
public void shouldLogFailureIfNoHandlerFoundForDispatch() throws Exception {

Expand All @@ -103,4 +193,14 @@ public void shouldLogFailureIfNoHandlerFoundForDispatch() throws Exception {
inOrder.verify(progressLogger).logFailure(streamId, jsonEnvelope);
inOrder.verify(progressLogger).logCompletion(streamId);
}

private List<JsonEnvelope> pageOfJsonEnvelopes(final int numberOfEvents) {
final List<JsonEnvelope> pageOfEvents = new ArrayList<>();

rangeClosed(1, numberOfEvents).forEach(value -> {
pageOfEvents.add(mock(JsonEnvelope.class));
});

return pageOfEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.core.annotation.Component.EVENT_LISTENER;
import static uk.gov.justice.services.core.annotation.ServiceComponentLocation.LOCAL;
import static uk.gov.justice.services.messaging.DefaultJsonEnvelope.envelope;

import uk.gov.justice.services.core.dispatcher.Dispatcher;
import uk.gov.justice.services.core.dispatcher.DispatcherCache;
import uk.gov.justice.services.core.extension.ServiceComponentFoundEvent;
import uk.gov.justice.services.messaging.JsonEnvelope;

import org.junit.Test;
Expand All @@ -29,14 +29,17 @@ public class TransactionalEnvelopeDispatcherTest {

@Test
public void shouldDispatchEnvelope() {
Dispatcher dispatcher = mock(Dispatcher.class);
final Dispatcher dispatcher = mock(Dispatcher.class);
final JsonEnvelope envelope = mock(JsonEnvelope.class);
final ServiceComponentFoundEvent serviceComponentFoundEventClass = mock(ServiceComponentFoundEvent.class);

when(serviceComponentFoundEventClass.getComponentName()).thenReturn(EVENT_LISTENER);
when(dispatcherCache.dispatcherFor(EVENT_LISTENER, LOCAL)).thenReturn(dispatcher);
transactionalEnvelopeDispatcher.init();

final JsonEnvelope envelope = envelope().build();
transactionalEnvelopeDispatcher.register(serviceComponentFoundEventClass);

transactionalEnvelopeDispatcher.dispatch(envelope);
verify(dispatcher).dispatch(envelope);

verify(dispatcher).dispatch(envelope);
}
}

0 comments on commit aa9993f

Please sign in to comment.