Skip to content

Commit

Permalink
Merge db236d6 into 816620a
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Sep 28, 2019
2 parents 816620a + db236d6 commit 75e4078
Show file tree
Hide file tree
Showing 30 changed files with 930 additions and 210 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.1.1] - 2019-09-28
### Added
- New system event 'CatchupProcessingOfEventFailedEvent' fired if processing of any PublishedEvent during catchup fails
### Changed
- All system events moved into their own module 'event-store-management-events'
- Unsuccessful catchups now logged correctly in catchup completion.

## [2.1.0] - 2019-09-26
### Changed
Expand Down
5 changes: 5 additions & 0 deletions event-sourcing/subscription-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>jmx-command-handling</artifactId>
<version>${framework.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>event-store-management-events</artifactId>
<version>${project.version}</version>
</dependency>

<!--Test Dependencies-->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType;

import java.util.Queue;
import java.util.UUID;
Expand Down Expand Up @@ -50,7 +51,10 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer
* count the number of events consumed
*/
@Override
public int add(final PublishedEvent publishedEvent, final String subscriptionName) {
public int add(
final PublishedEvent publishedEvent,
final String subscriptionName,
final CatchupType catchupType) {

final UUID streamId = publishedEvent.getStreamId();

Expand All @@ -60,7 +64,7 @@ public int add(final PublishedEvent publishedEvent, final String subscriptionNam
events.offer(publishedEvent);

if (notInProgress(events)) {
createAndSubmitTaskFor(events, subscriptionName);
createAndSubmitTaskFor(events, subscriptionName, catchupType);
}
}

Expand Down Expand Up @@ -98,11 +102,18 @@ private boolean notInProgress(final Queue<PublishedEvent> eventStream) {
return !eventStreamsInProgressList.contains(eventStream);
}

private void createAndSubmitTaskFor(final Queue<PublishedEvent> eventStream, final String subscriptionName) {
private void createAndSubmitTaskFor(
final Queue<PublishedEvent> eventStream,
final String subscriptionName,
final CatchupType catchupType) {

eventStreamsInProgressList.add(eventStream);

final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this);
consumeEventQueueBean.consume(eventStream, eventQueueConsumer, subscriptionName);
consumeEventQueueBean.consume(
eventStream,
eventQueueConsumer,
subscriptionName,
catchupType);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.slf4j.LoggerFactory.getLogger;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor;

Expand All @@ -12,10 +11,13 @@ public class EventQueueConsumerFactory {
@Inject
private TransactionalEventProcessor transactionalEventProcessor;

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

public EventQueueConsumer create(final EventStreamConsumptionResolver eventStreamConsumptionResolver) {
return new EventQueueConsumer(
transactionalEventProcessor,
eventStreamConsumptionResolver,
getLogger(EventQueueConsumer.class));
eventProcessingFailedHandler);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType;

/**
* Interface for managing the consuming of JsonEnvelope events from Stream of events.
Expand All @@ -18,7 +19,7 @@ public interface EventStreamConsumerManager {
* @return The number of events added to the stream. Note this is always one and is used
* to count the number of events consumed
*/
int add(final PublishedEvent publishedEvent, final String subscriptionName);
int add(final PublishedEvent publishedEvent, final String subscriptionName, final CatchupType catchupType);

void waitForCompletion();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType;

import java.util.Queue;

Expand All @@ -11,11 +12,18 @@
public class ConsumeEventQueueBean {

@Asynchronous
public void consume(final Queue<PublishedEvent> events, final EventQueueConsumer eventQueueConsumer, final String subscriptionName) {
public void consume(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupType catchupType) {

boolean consumed = false;
while(! consumed) {
consumed = eventQueueConsumer.consumeEventQueue(events, subscriptionName);
consumed = eventQueueConsumer.consumeEventQueue(
events,
subscriptionName,
catchupType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType;

import javax.enterprise.event.Event;
import javax.inject.Inject;

import org.slf4j.Logger;

public class EventProcessingFailedHandler {

@Inject
private Event<CatchupProcessingOfEventFailedEvent> catchupProcessingOfEventFailedEventFirer;

@Inject
private Logger logger;

public void handle(final RuntimeException exception, final PublishedEvent publishedEvent, final String subscriptionName, final CatchupType catchupType) {

final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata());
logger.error(
logMessage,
exception);

final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent(
publishedEvent.getId(),
publishedEvent.getMetadata(),
exception,
catchupType,
subscriptionName
);

catchupProcessingOfEventFailedEventFirer.fire(catchupProcessingOfEventFailedEvent);
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task;

import static java.lang.String.format;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.FinishedProcessingMessage;
import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType;

import java.util.Queue;

import org.slf4j.Logger;

public class EventQueueConsumer {

private final TransactionalEventProcessor transactionalEventProcessor;
private final EventStreamConsumptionResolver eventStreamConsumptionResolver;
private final Logger logger;
private final EventProcessingFailedHandler eventProcessingFailedHandler;

public EventQueueConsumer(
final TransactionalEventProcessor transactionalEventProcessor,
final EventStreamConsumptionResolver eventStreamConsumptionResolver,
final Logger logger) {
final EventProcessingFailedHandler eventProcessingFailedHandler) {
this.transactionalEventProcessor = transactionalEventProcessor;
this.eventStreamConsumptionResolver = eventStreamConsumptionResolver;
this.logger = logger;
this.eventProcessingFailedHandler = eventProcessingFailedHandler;
}

public boolean consumeEventQueue(final Queue<PublishedEvent> events, final String subscriptionName) {
public boolean consumeEventQueue(
final Queue<PublishedEvent> events,
final String subscriptionName,
final CatchupType catchupType) {
while (!events.isEmpty()) {
final PublishedEvent publishedEvent = events.poll();

try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
} catch (final RuntimeException e) {
final String message = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata());
logger.error(
message,
e);
eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.DummyTransactionalEventProcessor;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.TestCatchupBean;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
Expand Down Expand Up @@ -51,7 +52,8 @@ public class EventStreamCatchupIT {
EventQueueConsumerFactory.class,
LoggerProducer.class,
DummySystemCommandStore.class,
ConcurrentEventStreamConsumerManager.class
ConcurrentEventStreamConsumerManager.class,
EventProcessingFailedHandler.class
})
public WebApp war() {
return new WebApp()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
Expand Down Expand Up @@ -56,9 +57,9 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent.getStreamId()).thenReturn(streamId);

concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName, EVENT_CATCHUP);

verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName));
verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP));

final Queue<PublishedEvent> events = eventQueueCaptor.getValue();
assertThat(events.size(), is(1));
Expand All @@ -79,10 +80,10 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
when(publishedEvent_1.getStreamId()).thenReturn(streamId);
when(publishedEvent_2.getStreamId()).thenReturn(streamId);

concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP);

verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName));
verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP));

final Queue<PublishedEvent> eventsStream = eventQueueCaptor.getValue();
assertThat(eventsStream.size(), is(2));
Expand All @@ -104,10 +105,10 @@ public void shouldCreateQueueForEachStreamId() {
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);

concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP);

verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName));
verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP));

final List<Queue<PublishedEvent>> allValues = eventQueueCaptor.getAllValues();

Expand All @@ -134,18 +135,18 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);

concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP);

verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName));
verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP));

final Queue<PublishedEvent> eventsStream_1 = eventQueueCaptor.getValue();
assertThat(eventsStream_1.size(), is(1));
assertThat(eventsStream_1.poll(), is(publishedEvent_1));

concurrentEventStreamConsumerManager.isEventConsumptionComplete(new FinishedProcessingMessage(eventsStream_1));
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP);

verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName));
verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP));

final Queue<PublishedEvent> eventsStream_2 = eventQueueCaptor.getValue();
assertThat(eventsStream_2.size(), is(1));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class EventQueueConsumerFactoryTest {

@Mock
private TransactionalEventProcessor transactionalEventProcessor;

@Mock
private EventProcessingFailedHandler eventProcessingFailedHandler;

@InjectMocks
private EventQueueConsumerFactory eventQueueConsumerFactory;

@Test
public void shouldCreateEventQueueConsumerFactory() throws Exception {

final EventStreamConsumptionResolver eventStreamConsumptionResolver = mock(EventStreamConsumptionResolver.class);

final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(eventStreamConsumptionResolver);

assertThat(getValueOfField(eventQueueConsumer, "transactionalEventProcessor", TransactionalEventProcessor.class), is(transactionalEventProcessor));
assertThat(getValueOfField(eventQueueConsumer, "eventStreamConsumptionResolver", EventStreamConsumptionResolver.class), is(eventStreamConsumptionResolver));
assertThat(getValueOfField(eventQueueConsumer, "eventProcessingFailedHandler", EventProcessingFailedHandler.class), is(eventProcessingFailedHandler));
}
}
Loading

0 comments on commit 75e4078

Please sign in to comment.