From 10fa43a60b77e3d55fdbf90a6d13d31fdbebd785 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Fri, 3 Jan 2020 13:40:21 +0000 Subject: [PATCH] Fix potential problem of a transaction failing during catchup causing catchup to never complete --- CHANGELOG.md | 2 + .../ConcurrentEventStreamConsumerManager.java | 32 ++++++----- .../manager/EventQueueConsumerFactory.java | 23 -------- .../EventStreamConsumptionResolver.java | 8 ++- .../manager/EventsInProcessCounter.java | 4 ++ .../consumer/task/ConsumeEventQueueBean.java | 35 +++++++++--- .../consumer/task/ConsumeEventQueueTask.java | 4 -- .../task/ConsumeEventQueueTaskFactory.java | 2 - .../task/ConsumeEventQueueTaskManager.java | 2 - .../task/EventProcessingFailedHandler.java | 32 +++++++++-- .../consumer/task/EventQueueConsumer.java | 26 ++++----- .../catchup/EventStreamCatchupIT.java | 6 +- ...currentEventStreamConsumerManagerTest.java | 30 ++++++---- .../EventQueueConsumerFactoryTest.java | 41 ------------- .../manager/EventsInProcessCounterTest.java | 57 +++++++++++++++++++ .../task/ConsumeEventQueueBeanTest.java | 54 +++++++++++++++++- .../ConsumeEventQueueTaskFactoryTest.java | 3 - .../ConsumeEventQueueTaskManagerTest.java | 2 - .../task/ConsumeEventQueueTaskTest.java | 4 +- .../EventProcessingFailedHandlerTest.java | 53 ++++++++++++++--- .../consumer/task/EventQueueConsumerTest.java | 4 +- .../catchup/observers/CatchupObserver.java | 3 +- .../catchup/state/CatchupError.java | 26 +++------ .../state/CatchupErrorStateManager.java | 3 + .../observers/CatchupObserverTest.java | 9 +-- .../CatchupProcessingOfEventFailedEvent.java | 25 +++----- 26 files changed, 297 insertions(+), 193 deletions(-) delete mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java delete mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ed26cf5a..0f6cbd327 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [2.4.4] - 2020-01-06 ### Added - Added mechanism to also drop/add trigger to event_log table on SUSPEND/UNSUSPEND commands +### Fixed +- Fixed potential problem of a transaction failing during catchup causing catchup to never complete ## [2.4.3] - 2019-12-06 ### Changed diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java index e513017ab..fa88b9178 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java @@ -3,7 +3,6 @@ import static java.lang.Thread.currentThread; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager; -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; @@ -13,15 +12,17 @@ import java.util.concurrent.ConcurrentLinkedQueue; import javax.inject.Inject; +import javax.inject.Singleton; /** * A concurrent implementation of EventStreamConsumerManager and EventStreamConsumerListener. - * + *

* This uses the ManagedExecutorService for concurrency and Queues events according to the Stream * Id. - * + *

* When the add method is called */ +@Singleton public class ConcurrentEventStreamConsumerManager implements EventStreamConsumerManager, EventStreamConsumptionResolver { private static final Object EXCLUSIVE_LOCK = new Object(); @@ -37,17 +38,14 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer @Inject private ConsumeEventQueueTaskManager consumeEventQueueTaskManager; - @Inject - private EventQueueConsumerFactory eventQueueConsumerFactory; - /** * A ConcurrentLinkedQueue is created for each Stream Id and added to a ConcurrentHashMap. An * event is added to the Queue for a Stream Id. - * + *

* If the Queue is not currently being processed a new ConsumeEventQueueTask is created and * submitted to the ManagedExecutorService. The Queue is then added to the * eventStreamsInProgress Queue. - * + *

* If the Queue is currently being processed no further action is taken, as the event will be * processed by the current ConsumeEventQueueTask. * @@ -124,9 +122,19 @@ public void decrementEventsInProcessCount() { final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); synchronized (EXCLUSIVE_LOCK) { - eventsInProcessCounter.decrementEventsInProcessCount(); - EXCLUSIVE_LOCK.notify(); - } + eventsInProcessCounter.decrementEventsInProcessCount(); + EXCLUSIVE_LOCK.notify(); + } + } + + @Override + public void decrementEventsInProcessCountBy(final int count) { + final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); + + synchronized (EXCLUSIVE_LOCK) { + eventsInProcessCounter.decrementEventsInProcessCountBy(count); + EXCLUSIVE_LOCK.notify(); + } } private boolean notInProgress(final Queue eventStream) { @@ -141,10 +149,8 @@ private void createAndSubmitTaskFor( eventStreamsInProgressList.add(eventStream); - final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this); consumeEventQueueTaskManager.consume( eventStream, - eventQueueConsumer, subscriptionName, catchupCommand, commandId); diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java deleted file mode 100644 index c9ed3c2c5..000000000 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; - -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; -import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor; - -import javax.inject.Inject; - -public class EventQueueConsumerFactory { - - @Inject - private TransactionalEventProcessor transactionalEventProcessor; - - @Inject - private EventProcessingFailedHandler eventProcessingFailedHandler; - - public EventQueueConsumer create(final EventStreamConsumptionResolver eventStreamConsumptionResolver) { - return new EventQueueConsumer( - transactionalEventProcessor, - eventStreamConsumptionResolver, - eventProcessingFailedHandler); - } -} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java index c04c16f75..aae260687 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java @@ -2,7 +2,7 @@ /** * Interface for listening to Event Stream Consumers. - * + *

* An instance of the an EventStreamConsumerListener is passed to the consumer. The consumer will * call the finishedConsuming method once complete. */ @@ -12,10 +12,12 @@ public interface EventStreamConsumptionResolver { * Called by a consumer when finish is expected. * * @param finishedProcessingMessage - the message containing the Queue that has been consumed. - * - * @return true if all events are consumed, false if there are still events remaining in the queue. + * @return true if all events are consumed, false if there are still events remaining in the + * queue. */ boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage); void decrementEventsInProcessCount(); + + void decrementEventsInProcessCountBy(final int count); } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java index 38aa7da9c..f8ab86903 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java @@ -19,6 +19,10 @@ public synchronized void decrementEventsInProcessCount() { eventInProcessCount.decrementAndGet(); } + public synchronized void decrementEventsInProcessCountBy(final int count) { + eventInProcessCount.addAndGet(count * -1); + } + public synchronized boolean maxNumberOfEventsInProcess() { return eventInProcessCount.get() >= maxTotalEventsInProcess; } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java index d3eea3806..2cac4795a 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java @@ -3,8 +3,10 @@ import static javax.ejb.TransactionManagementType.CONTAINER; import static javax.transaction.Transactional.TxType.NEVER; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import java.util.Queue; import java.util.UUID; @@ -13,6 +15,8 @@ import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.ejb.TransactionManagement; +import javax.enterprise.event.Event; +import javax.inject.Inject; import javax.transaction.Transactional; @Stateless @@ -20,21 +24,38 @@ @TransactionAttribute(value = TransactionAttributeType.NEVER) public class ConsumeEventQueueBean { + @Inject + private EventProcessingFailedHandler eventProcessingFailedHandler; + + @Inject + private Event catchupProcessingOfEventFailedEventFirer; + + @Inject + private EventStreamConsumptionResolver eventStreamConsumptionResolver; + + @Inject + private EventQueueConsumer eventQueueConsumer; + @Transactional(NEVER) public void consume( final Queue events, - final EventQueueConsumer eventQueueConsumer, final String subscriptionName, final CatchupCommand catchupCommand, final UUID commandId) { boolean consumed = false; - while(! consumed) { - consumed = eventQueueConsumer.consumeEventQueue( - commandId, - events, - subscriptionName, - catchupCommand); + while (!consumed) { + try { + consumed = eventQueueConsumer.consumeEventQueue( + commandId, + events, + subscriptionName, + catchupCommand); + } catch (final Exception e) { + eventStreamConsumptionResolver.decrementEventsInProcessCountBy(events.size()); + events.clear(); + eventProcessingFailedHandler.handleStreamFailure(e, subscriptionName, catchupCommand, commandId); + } } } } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java index b29344657..b9ce5df09 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java @@ -10,7 +10,6 @@ public class ConsumeEventQueueTask implements Runnable { private final ConsumeEventQueueBean consumeEventQueueBean; private final Queue events; - private final EventQueueConsumer eventQueueConsumer; private final String subscriptionName; private final CatchupCommand catchupCommand; private final UUID commandId; @@ -18,13 +17,11 @@ public class ConsumeEventQueueTask implements Runnable { public ConsumeEventQueueTask( final ConsumeEventQueueBean consumeEventQueueBean, final Queue events, - final EventQueueConsumer eventQueueConsumer, final String subscriptionName, final CatchupCommand catchupCommand, final UUID commandId) { this.consumeEventQueueBean = consumeEventQueueBean; this.events = events; - this.eventQueueConsumer = eventQueueConsumer; this.subscriptionName = subscriptionName; this.catchupCommand = catchupCommand; this.commandId = commandId; @@ -35,7 +32,6 @@ public void run() { consumeEventQueueBean.consume( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java index 75347899c..ab24b54fd 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java @@ -15,7 +15,6 @@ public class ConsumeEventQueueTaskFactory { public ConsumeEventQueueTask createConsumeEventQueueTask( final Queue events, - final EventQueueConsumer eventQueueConsumer, final String subscriptionName, final CatchupCommand catchupCommand, final UUID commandId) { @@ -23,7 +22,6 @@ public ConsumeEventQueueTask createConsumeEventQueueTask( return new ConsumeEventQueueTask( consumeEventQueueBean, events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java index de701a1f6..289a49521 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java @@ -20,7 +20,6 @@ public class ConsumeEventQueueTaskManager { public void consume( final Queue events, - final EventQueueConsumer eventQueueConsumer, final String subscriptionName, final CatchupCommand catchupCommand, final UUID commandId) { @@ -28,7 +27,6 @@ public void consume( final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java index 7926b7a73..2a3dd3577 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java @@ -21,22 +21,44 @@ public class EventProcessingFailedHandler { @Inject private Logger logger; - public void handle( - final RuntimeException exception, + public void handleEventFailure( + final Exception exception, final PublishedEvent publishedEvent, final String subscriptionName, final CatchupCommand catchupCommand, final UUID commandId) { - final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata()); + final String logMessage = format( + "Failed to process publishedEvent: name: '%s', id: '%s', streamId: '%s'", + publishedEvent.getName(), + publishedEvent.getId(), + publishedEvent.getStreamId() + ); + + handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception); + } + + public void handleStreamFailure( + final Exception exception, + final String subscriptionName, + final CatchupCommand catchupCommand, + final UUID commandId) { + + final String logMessage = "Failed to consume stream of events. Aborting..."; + + handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception); + } + + private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) { logger.error( logMessage, exception); + final String errorMessage = format("%s: %s: %s", logMessage, exception.getClass().getSimpleName(), exception.getMessage()); + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent( commandId, - publishedEvent.getId(), - publishedEvent.getMetadata(), + errorMessage, exception, catchupCommand, subscriptionName diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java index 25ecb8965..2cc2da315 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java @@ -9,20 +9,18 @@ import java.util.Queue; import java.util.UUID; +import javax.inject.Inject; + public class EventQueueConsumer { - private final TransactionalEventProcessor transactionalEventProcessor; - private final EventStreamConsumptionResolver eventStreamConsumptionResolver; - private final EventProcessingFailedHandler eventProcessingFailedHandler; - - public EventQueueConsumer( - final TransactionalEventProcessor transactionalEventProcessor, - final EventStreamConsumptionResolver eventStreamConsumptionResolver, - final EventProcessingFailedHandler eventProcessingFailedHandler) { - this.transactionalEventProcessor = transactionalEventProcessor; - this.eventStreamConsumptionResolver = eventStreamConsumptionResolver; - this.eventProcessingFailedHandler = eventProcessingFailedHandler; - } + @Inject + private TransactionalEventProcessor transactionalEventProcessor; + + @Inject + private EventStreamConsumptionResolver eventStreamConsumptionResolver; + + @Inject + private EventProcessingFailedHandler eventProcessingFailedHandler; public boolean consumeEventQueue( final UUID commandId, @@ -35,8 +33,8 @@ public boolean consumeEventQueue( final PublishedEvent publishedEvent = events.poll(); try { transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName); - } catch (final RuntimeException e) { - eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId); + } catch (final Exception e) { + eventProcessingFailedHandler.handleEventFailure(e, publishedEvent, subscriptionName, catchupCommand, commandId); } finally { eventStreamConsumptionResolver.decrementEventsInProcessCount(); } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java index 7086195cd..745d0561b 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java @@ -8,13 +8,13 @@ import uk.gov.justice.services.cdi.LoggerProducer; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.DummyEventQueueProcessingConfig; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager; -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventsInProcessCounterProvider; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskFactory; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.DummyTransactionalEventProcessor; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.TestCatchupBean; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; @@ -53,7 +53,6 @@ public class EventStreamCatchupIT { DummyTransactionalEventProcessor.class, EventStreamsInProgressList.class, ConsumeEventQueueBean.class, - EventQueueConsumerFactory.class, LoggerProducer.class, DummySystemCommandStore.class, ConcurrentEventStreamConsumerManager.class, @@ -61,7 +60,8 @@ public class EventStreamCatchupIT { ConsumeEventQueueTaskManager.class, ConsumeEventQueueTaskFactory.class, EventsInProcessCounterProvider.class, - DummyEventQueueProcessingConfig.class + DummyEventQueueProcessingConfig.class, + EventQueueConsumer.class }) public WebApp war() { return new WebApp() diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java index 348ea2ed5..0b4121eaa 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java @@ -37,9 +37,6 @@ public class ConcurrentEventStreamConsumerManagerTest { @Mock private ConsumeEventQueueTaskManager consumeEventQueueTaskManager; - @Mock - private EventQueueConsumerFactory eventQueueConsumerFactory; - @Spy private EventStreamsInProgressList eventStreamsInProgressList = new EventStreamsInProgressList(); @@ -63,12 +60,11 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() { when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); - when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent.getStreamId()).thenReturn(streamId); concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName, catchupCommand, commandId); - verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId)); + verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId)); final Queue events = eventQueueCaptor.getValue(); assertThat(events.size(), is(1)); @@ -92,14 +88,13 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() { when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); - when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId); when(publishedEvent_2.getStreamId()).thenReturn(streamId); concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId); concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId); - verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId)); + verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId)); final Queue eventsStream = eventQueueCaptor.getValue(); assertThat(eventsStream.size(), is(2)); @@ -125,14 +120,13 @@ public void shouldCreateQueueForEachStreamId() { when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); - when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId); concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId); - verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId)); + verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId)); final List> allValues = eventQueueCaptor.getAllValues(); @@ -163,13 +157,12 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); - when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId); - verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId)); + verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId)); final Queue eventsStream_1 = eventQueueCaptor.getValue(); assertThat(eventsStream_1.size(), is(1)); @@ -178,7 +171,7 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp concurrentEventStreamConsumerManager.isEventConsumptionComplete(new FinishedProcessingMessage(eventsStream_1)); concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId); - verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId)); + verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId)); final Queue eventsStream_2 = eventQueueCaptor.getValue(); assertThat(eventsStream_2.size(), is(1)); @@ -206,4 +199,17 @@ public void shouldDecrementTheEventsInProcessCount() throws Exception { verify(eventsInProcessCounter).decrementEventsInProcessCount(); } + + @Test + public void shouldDecrementTheEventsInProcessCountByGivenNumber() throws Exception { + + final int count = 2; + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + + concurrentEventStreamConsumerManager.decrementEventsInProcessCountBy(count); + + verify(eventsInProcessCounter).decrementEventsInProcessCountBy(count); + } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java deleted file mode 100644 index b6fef6e1c..000000000 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField; - -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; -import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; -import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class EventQueueConsumerFactoryTest { - - @Mock - private TransactionalEventProcessor transactionalEventProcessor; - - @Mock - private EventProcessingFailedHandler eventProcessingFailedHandler; - - @InjectMocks - private EventQueueConsumerFactory eventQueueConsumerFactory; - - @Test - public void shouldCreateEventQueueConsumerFactory() throws Exception { - - final EventStreamConsumptionResolver eventStreamConsumptionResolver = mock(EventStreamConsumptionResolver.class); - - final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(eventStreamConsumptionResolver); - - assertThat(getValueOfField(eventQueueConsumer, "transactionalEventProcessor", TransactionalEventProcessor.class), is(transactionalEventProcessor)); - assertThat(getValueOfField(eventQueueConsumer, "eventStreamConsumptionResolver", EventStreamConsumptionResolver.class), is(eventStreamConsumptionResolver)); - assertThat(getValueOfField(eventQueueConsumer, "eventProcessingFailedHandler", EventProcessingFailedHandler.class), is(eventProcessingFailedHandler)); - } -} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java new file mode 100644 index 000000000..3b5b72fd8 --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java @@ -0,0 +1,57 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class EventsInProcessCounterTest { + + @Test + public void shouldIncrementAndDecrementEventsInProcessCounterByOne() { + + final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(1); + final AtomicInteger eventInProcessCount = ReflectionUtil.getValueOfField(eventsInProcessCounter, "eventInProcessCount", AtomicInteger.class); + + eventsInProcessCounter.incrementEventsInProcessCount(); + + assertThat(eventInProcessCount.get(), is(1)); + + eventsInProcessCounter.decrementEventsInProcessCount(); + + assertThat(eventInProcessCount.get(), is(0)); + } + + @Test + public void shouldDecrementEventsInProcessCounterByGivenNumber() { + + final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(3); + final AtomicInteger eventInProcessCount = ReflectionUtil.getValueOfField(eventsInProcessCounter, "eventInProcessCount", AtomicInteger.class); + + eventsInProcessCounter.incrementEventsInProcessCount(); + eventsInProcessCounter.incrementEventsInProcessCount(); + eventsInProcessCounter.incrementEventsInProcessCount(); + + assertThat(eventInProcessCount.get(), is(3)); + + eventsInProcessCounter.decrementEventsInProcessCountBy(3); + + assertThat(eventInProcessCount.get(), is(0)); + } + + @Test + public void shouldReturnTrueIfMaxCountReached() { + + final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(3); + + eventsInProcessCounter.incrementEventsInProcessCount(); + eventsInProcessCounter.incrementEventsInProcessCount(); + eventsInProcessCounter.incrementEventsInProcessCount(); + + assertThat(eventsInProcessCounter.maxNumberOfEventsInProcess(), is(true)); + } +} \ No newline at end of file diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java index e9d5b3dab..3c5863030 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java @@ -2,27 +2,46 @@ import static java.util.Collections.singletonList; import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import javax.enterprise.event.Event; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class ConsumeEventQueueBeanTest { + @Mock + private EventProcessingFailedHandler eventProcessingFailedHandler; + + @Mock + private Event catchupProcessingOfEventFailedEventFirer; + + @Mock + private EventStreamConsumptionResolver eventStreamConsumptionResolver; + + @Mock + private EventQueueConsumer eventQueueConsumer; + @InjectMocks private ConsumeEventQueueBean consumeEventQueueBean; @@ -30,8 +49,7 @@ public class ConsumeEventQueueBeanTest { public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Exception { final UUID commandId = randomUUID(); - final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class))) ; - final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); + final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class))); final String subscriptionName = "subscriptionName"; final CatchupCommand eventCatchupCommand = new EventCatchupCommand(); @@ -39,7 +57,6 @@ public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Excepti consumeEventQueueBean.consume( events, - eventQueueConsumer, subscriptionName, eventCatchupCommand, commandId @@ -47,4 +64,35 @@ public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Excepti verify(eventQueueConsumer, times(3)).consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand); } + + @Test + public void shouldCatchTheHiddenExceptionAndClearTheEventQueueIfTheTransactionFails() throws Exception { + + final RuntimeException runtimeException = new RuntimeException( + "In reality this will be a javax.transaction.RollbackException" + ); + + final UUID commandId = randomUUID(); + final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class))); + final String subscriptionName = "subscriptionName"; + final CatchupCommand eventCatchupCommand = new EventCatchupCommand(); + + when(eventQueueConsumer.consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand)) + .thenThrow(runtimeException) + .thenReturn(true); + + assertThat(events.isEmpty(), is(false)); + + consumeEventQueueBean.consume( + events, + subscriptionName, + eventCatchupCommand, + commandId + ); + + verify(eventStreamConsumptionResolver).decrementEventsInProcessCountBy(1); + verify(eventQueueConsumer, times(2)).consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand); + verify(eventProcessingFailedHandler).handleStreamFailure(runtimeException, subscriptionName, eventCatchupCommand, commandId); + assertThat(events.isEmpty(), is(true)); + } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java index 161f337b2..7424c1a73 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java @@ -34,14 +34,12 @@ public class ConsumeEventQueueTaskFactoryTest { public void shouldCreateConsumeEventQueueTask() throws Exception { final Queue events = mock(Queue.class); - final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); final String subscriptionName = "subscription name"; final CatchupCommand catchupCommand = new EventCatchupCommand(); final UUID commandId = randomUUID(); final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId @@ -49,7 +47,6 @@ public void shouldCreateConsumeEventQueueTask() throws Exception { assertThat(getValueOfField(consumeEventQueueTask, "consumeEventQueueBean", ConsumeEventQueueBean.class), is(consumeEventQueueBean)); assertThat(getValueOfField(consumeEventQueueTask, "events", Queue.class), is(events)); - assertThat(getValueOfField(consumeEventQueueTask, "eventQueueConsumer", EventQueueConsumer.class), is(eventQueueConsumer)); assertThat(getValueOfField(consumeEventQueueTask, "subscriptionName", String.class), is(subscriptionName)); assertThat(getValueOfField(consumeEventQueueTask, "catchupCommand", CatchupCommand.class), is(catchupCommand)); assertThat(getValueOfField(consumeEventQueueTask, "commandId", UUID.class), is(commandId)); diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java index c709e173d..e70e73cbb 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java @@ -45,7 +45,6 @@ public void shouldAsynchronouslyRunConsumeEventQueue() throws Exception { when(consumeEventQueueTaskFactory.createConsumeEventQueueTask( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId @@ -53,7 +52,6 @@ public void shouldAsynchronouslyRunConsumeEventQueue() throws Exception { consumeEventQueueTaskManager.consume( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java index 24ff06e28..84f137565 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java @@ -31,17 +31,15 @@ public void shouldCallTheConsumeEventQueueBean() throws Exception { final ConsumeEventQueueTask consumeEventQueueTask = new ConsumeEventQueueTask( consumeEventQueueBean, events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId ); - + consumeEventQueueTask.run(); verify(consumeEventQueueBean).consume( events, - eventQueueConsumer, subscriptionName, catchupCommand, commandId diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java index ba0668610..3ac42c9cc 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java @@ -1,5 +1,6 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; +import static java.util.UUID.fromString; import static java.util.UUID.randomUUID; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -41,35 +42,71 @@ public class EventProcessingFailedHandlerTest { private ArgumentCaptor catchupProcessingOfEventFailedEventCaptor; @Test - public void shouldLogExceptionAndFireFailureEvent() throws Exception { + public void shouldLogExceptionAndFireFailureEventOnEventFailure() throws Exception { final NullPointerException nullPointerException = new NullPointerException("Ooops"); final UUID commandId = randomUUID(); final String subscriptionName = "subscriptionName"; - final String metadata = "{some: metadata}"; - final UUID eventId = randomUUID(); + final String eventName = "events.some-event"; + final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e"); + final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829"); final CatchupCommand catchupCommand = new EventCatchupCommand(); final PublishedEvent publishedEvent = mock(PublishedEvent.class); + when(publishedEvent.getName()).thenReturn(eventName); when(publishedEvent.getId()).thenReturn(eventId); - when(publishedEvent.getMetadata()).thenReturn(metadata); + when(publishedEvent.getStreamId()).thenReturn(streamId); - eventProcessingFailedHandler.handle( + eventProcessingFailedHandler.handleEventFailure( nullPointerException, publishedEvent, subscriptionName, catchupCommand, commandId); - verify(logger).error("Failed to process publishedEvent with metadata: {some: metadata}", nullPointerException); + verify(logger).error("Failed to process publishedEvent: name: 'events.some-event', id: '1c68536e-1fdf-4891-9c74-85661a9c0f9e', streamId: 'b1834ed9-e084-41c7-ae7f-74f1227cc829'", nullPointerException); verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture()); final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue(); - assertThat(catchupProcessingOfEventFailedEvent.getEventId(), is(eventId)); - assertThat(catchupProcessingOfEventFailedEvent.getMetadata(), is(metadata)); + assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to process publishedEvent: name: 'events.some-event', id: '1c68536e-1fdf-4891-9c74-85661a9c0f9e', streamId: 'b1834ed9-e084-41c7-ae7f-74f1227cc829': NullPointerException: Ooops")); + assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand)); + assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException)); + assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName)); + } + + @Test + public void shouldLogExceptionAndFireFailureEventOnStreamFailure() throws Exception { + + final NullPointerException nullPointerException = new NullPointerException("Ooops"); + final UUID commandId = randomUUID(); + final String subscriptionName = "subscriptionName"; + final String eventName = "events.some-event"; + final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e"); + final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829"); + final CatchupCommand catchupCommand = new EventCatchupCommand(); + + final PublishedEvent publishedEvent = mock(PublishedEvent.class); + + when(publishedEvent.getName()).thenReturn(eventName); + when(publishedEvent.getId()).thenReturn(eventId); + when(publishedEvent.getStreamId()).thenReturn(streamId); + + eventProcessingFailedHandler.handleStreamFailure( + nullPointerException, + subscriptionName, + catchupCommand, + commandId); + + verify(logger).error("Failed to consume stream of events. Aborting...", nullPointerException); + + verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture()); + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue(); + + assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to consume stream of events. Aborting...: NullPointerException: Ooops")); assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand)); assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException)); assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName)); diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java index a23949a88..625f5e171 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java @@ -47,7 +47,7 @@ public void shouldProcessAllEventsOnQueueAndReturnTrueIfComplete() throws Except final UUID commandId = randomUUID(); final CatchupCommand catchupCommand = new EventCatchupCommand(); - + final PublishedEvent event_1 = mock(PublishedEvent.class); final PublishedEvent event_2 = mock(PublishedEvent.class); @@ -95,7 +95,7 @@ public void shouldHandleExceptionsThrownWhilstProcessing() throws Exception { verify(transactionalEventProcessor).processWithEventBuffer(event_2, subscriptionName); - verify(eventProcessingFailedHandler).handle( + verify(eventProcessingFailedHandler).handleEventFailure( nullPointerException, event_1, subscriptionName, diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java index cb31edd76..79f6a192d 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java @@ -120,8 +120,7 @@ public void onCatchupProcessingOfEventFailed(@Observes final CatchupProcessingOf final CatchupCommand catchupCommand = catchupProcessingOfEventFailedEvent.getCatchupCommand(); final CatchupError catchupError = new CatchupError( - catchupProcessingOfEventFailedEvent.getEventId(), - catchupProcessingOfEventFailedEvent.getMetadata(), + catchupProcessingOfEventFailedEvent.getMessage(), catchupProcessingOfEventFailedEvent.getSubscriptionName(), catchupCommand, catchupProcessingOfEventFailedEvent.getException() diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java index 748789cb3..62570150c 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java @@ -3,35 +3,27 @@ import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; import java.util.Objects; -import java.util.UUID; public class CatchupError { - private final UUID eventId; - private final String metadata; + private final String message; private final String subscriptionName; private final CatchupCommand catchupCommand; private final Throwable exception; public CatchupError( - final UUID eventId, - final String metadata, + final String message, final String subscriptionName, final CatchupCommand catchupCommand, final Throwable exception) { - this.eventId = eventId; - this.metadata = metadata; + this.message = message; this.subscriptionName = subscriptionName; this.catchupCommand = catchupCommand; this.exception = exception; } - public UUID getEventId() { - return eventId; - } - - public String getMetadata() { - return metadata; + public String getMessage() { + return message; } public String getSubscriptionName() { @@ -51,8 +43,7 @@ public boolean equals(final Object o) { if (this == o) return true; if (!(o instanceof CatchupError)) return false; final CatchupError that = (CatchupError) o; - return Objects.equals(eventId, that.eventId) && - Objects.equals(metadata, that.metadata) && + return Objects.equals(message, that.message) && Objects.equals(subscriptionName, that.subscriptionName) && Objects.equals(catchupCommand, that.catchupCommand) && Objects.equals(exception, that.exception); @@ -60,14 +51,13 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(eventId, metadata, subscriptionName, catchupCommand, exception); + return Objects.hash(message, subscriptionName, catchupCommand, exception); } @Override public String toString() { return "CatchupError{" + - "eventId=" + eventId + - ", metadata='" + metadata + '\'' + + "message=" + message + ", subscriptionName='" + subscriptionName + '\'' + ", catchupCommand=" + catchupCommand + ", exception=" + exception + diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java index 4eda863c9..44ee6feb4 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java @@ -7,6 +7,9 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import javax.inject.Singleton; + +@Singleton public class CatchupErrorStateManager { private final List eventCatchupErrors = new CopyOnWriteArrayList<>(); diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java index b5a5d1e21..bb6b9bbf8 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java @@ -244,16 +244,14 @@ public void shouldCompleteTheCatchupIfAllCatchupsForSubscriptionsComplete() thro public void shouldHandleCatchupProcessingOfEventFailed() throws Exception { final UUID commandId = randomUUID(); - final UUID eventId = randomUUID(); - final String metadata = "{some: metadata}"; + final String message = "oh gosh"; final NullPointerException exception = new NullPointerException("Ooops"); final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand(); final String subscriptionName = "subscriptionName"; final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent( commandId, - eventId, - metadata, + message, exception, eventCatchupCommand, subscriptionName @@ -265,9 +263,8 @@ public void shouldHandleCatchupProcessingOfEventFailed() throws Exception { final CatchupError catchupError = catchupErrorCaptor.getValue(); - assertThat(catchupError.getEventId(), is(eventId)); + assertThat(catchupError.getMessage(), is(message)); assertThat(catchupError.getException(), is(exception)); - assertThat(catchupError.getMetadata(), is(metadata)); assertThat(catchupError.getCatchupCommand(), is(eventCatchupCommand)); assertThat(catchupError.getSubscriptionName(), is(subscriptionName)); } diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java index a8a6ee47c..99ca871ff 100644 --- a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java +++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java @@ -8,22 +8,19 @@ public class CatchupProcessingOfEventFailedEvent { private final UUID commandId; - private final UUID eventId; - private final String metadata; + private final String message; private final Throwable exception; private final CatchupCommand catchupCommand; private final String subscriptionName; public CatchupProcessingOfEventFailedEvent( final UUID commandId, - final UUID eventId, - final String metadata, + final String message, final Throwable exception, final CatchupCommand catchupCommand, final String subscriptionName) { this.commandId = commandId; - this.eventId = eventId; - this.metadata = metadata; + this.message = message; this.exception = exception; this.catchupCommand = catchupCommand; this.subscriptionName = subscriptionName; @@ -33,12 +30,8 @@ public UUID getCommandId() { return commandId; } - public UUID getEventId() { - return eventId; - } - - public String getMetadata() { - return metadata; + public String getMessage() { + return message; } public Throwable getException() { @@ -59,8 +52,7 @@ public boolean equals(final Object o) { if (!(o instanceof CatchupProcessingOfEventFailedEvent)) return false; final CatchupProcessingOfEventFailedEvent that = (CatchupProcessingOfEventFailedEvent) o; return Objects.equals(commandId, that.commandId) && - Objects.equals(eventId, that.eventId) && - Objects.equals(metadata, that.metadata) && + Objects.equals(message, that.message) && Objects.equals(exception, that.exception) && Objects.equals(catchupCommand, that.catchupCommand) && Objects.equals(subscriptionName, that.subscriptionName); @@ -68,15 +60,14 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(commandId, eventId, metadata, exception, catchupCommand, subscriptionName); + return Objects.hash(commandId, message, exception, catchupCommand, subscriptionName); } @Override public String toString() { return "CatchupProcessingOfEventFailedEvent{" + "commandId=" + commandId + - ", eventId=" + eventId + - ", metadata='" + metadata + '\'' + + ", message=" + message + ", exception=" + exception + ", catchupCommand=" + catchupCommand + ", subscriptionName='" + subscriptionName + '\'' +