diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2ed26cf5a..0f6cbd327 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
## [2.4.4] - 2020-01-06
### Added
- Added mechanism to also drop/add trigger to event_log table on SUSPEND/UNSUSPEND commands
+### Fixed
+- Fixed potential problem of a transaction failing during catchup causing catchup to never complete
## [2.4.3] - 2019-12-06
### Changed
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java
index e513017ab..fa88b9178 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java
@@ -3,7 +3,6 @@
import static java.lang.Thread.currentThread;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
@@ -13,15 +12,17 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.inject.Inject;
+import javax.inject.Singleton;
/**
* A concurrent implementation of EventStreamConsumerManager and EventStreamConsumerListener.
- *
+ *
* This uses the ManagedExecutorService for concurrency and Queues events according to the Stream
* Id.
- *
+ *
* When the add method is called
*/
+@Singleton
public class ConcurrentEventStreamConsumerManager implements EventStreamConsumerManager, EventStreamConsumptionResolver {
private static final Object EXCLUSIVE_LOCK = new Object();
@@ -37,17 +38,14 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer
@Inject
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;
- @Inject
- private EventQueueConsumerFactory eventQueueConsumerFactory;
-
/**
* A ConcurrentLinkedQueue is created for each Stream Id and added to a ConcurrentHashMap. An
* event is added to the Queue for a Stream Id.
- *
+ *
* If the Queue is not currently being processed a new ConsumeEventQueueTask is created and
* submitted to the ManagedExecutorService. The Queue is then added to the
* eventStreamsInProgress Queue.
- *
+ *
* If the Queue is currently being processed no further action is taken, as the event will be
* processed by the current ConsumeEventQueueTask.
*
@@ -124,9 +122,19 @@ public void decrementEventsInProcessCount() {
final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();
synchronized (EXCLUSIVE_LOCK) {
- eventsInProcessCounter.decrementEventsInProcessCount();
- EXCLUSIVE_LOCK.notify();
- }
+ eventsInProcessCounter.decrementEventsInProcessCount();
+ EXCLUSIVE_LOCK.notify();
+ }
+ }
+
+ @Override
+ public void decrementEventsInProcessCountBy(final int count) {
+ final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();
+
+ synchronized (EXCLUSIVE_LOCK) {
+ eventsInProcessCounter.decrementEventsInProcessCountBy(count);
+ EXCLUSIVE_LOCK.notify();
+ }
}
private boolean notInProgress(final Queue eventStream) {
@@ -141,10 +149,8 @@ private void createAndSubmitTaskFor(
eventStreamsInProgressList.add(eventStream);
- final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this);
consumeEventQueueTaskManager.consume(
eventStream,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId);
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java
deleted file mode 100644
index c9ed3c2c5..000000000
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;
-
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
-import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor;
-
-import javax.inject.Inject;
-
-public class EventQueueConsumerFactory {
-
- @Inject
- private TransactionalEventProcessor transactionalEventProcessor;
-
- @Inject
- private EventProcessingFailedHandler eventProcessingFailedHandler;
-
- public EventQueueConsumer create(final EventStreamConsumptionResolver eventStreamConsumptionResolver) {
- return new EventQueueConsumer(
- transactionalEventProcessor,
- eventStreamConsumptionResolver,
- eventProcessingFailedHandler);
- }
-}
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java
index c04c16f75..aae260687 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java
@@ -2,7 +2,7 @@
/**
* Interface for listening to Event Stream Consumers.
- *
+ *
* An instance of the an EventStreamConsumerListener is passed to the consumer. The consumer will
* call the finishedConsuming method once complete.
*/
@@ -12,10 +12,12 @@ public interface EventStreamConsumptionResolver {
* Called by a consumer when finish is expected.
*
* @param finishedProcessingMessage - the message containing the Queue that has been consumed.
- *
- * @return true if all events are consumed, false if there are still events remaining in the queue.
+ * @return true if all events are consumed, false if there are still events remaining in the
+ * queue.
*/
boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage);
void decrementEventsInProcessCount();
+
+ void decrementEventsInProcessCountBy(final int count);
}
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java
index 38aa7da9c..f8ab86903 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java
@@ -19,6 +19,10 @@ public synchronized void decrementEventsInProcessCount() {
eventInProcessCount.decrementAndGet();
}
+ public synchronized void decrementEventsInProcessCountBy(final int count) {
+ eventInProcessCount.addAndGet(count * -1);
+ }
+
public synchronized boolean maxNumberOfEventsInProcess() {
return eventInProcessCount.get() >= maxTotalEventsInProcess;
}
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java
index d3eea3806..2cac4795a 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java
@@ -3,8 +3,10 @@
import static javax.ejb.TransactionManagementType.CONTAINER;
import static javax.transaction.Transactional.TxType.NEVER;
+import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import java.util.Queue;
import java.util.UUID;
@@ -13,6 +15,8 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ejb.TransactionManagement;
+import javax.enterprise.event.Event;
+import javax.inject.Inject;
import javax.transaction.Transactional;
@Stateless
@@ -20,21 +24,38 @@
@TransactionAttribute(value = TransactionAttributeType.NEVER)
public class ConsumeEventQueueBean {
+ @Inject
+ private EventProcessingFailedHandler eventProcessingFailedHandler;
+
+ @Inject
+ private Event catchupProcessingOfEventFailedEventFirer;
+
+ @Inject
+ private EventStreamConsumptionResolver eventStreamConsumptionResolver;
+
+ @Inject
+ private EventQueueConsumer eventQueueConsumer;
+
@Transactional(NEVER)
public void consume(
final Queue events,
- final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
boolean consumed = false;
- while(! consumed) {
- consumed = eventQueueConsumer.consumeEventQueue(
- commandId,
- events,
- subscriptionName,
- catchupCommand);
+ while (!consumed) {
+ try {
+ consumed = eventQueueConsumer.consumeEventQueue(
+ commandId,
+ events,
+ subscriptionName,
+ catchupCommand);
+ } catch (final Exception e) {
+ eventStreamConsumptionResolver.decrementEventsInProcessCountBy(events.size());
+ events.clear();
+ eventProcessingFailedHandler.handleStreamFailure(e, subscriptionName, catchupCommand, commandId);
+ }
}
}
}
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java
index b29344657..b9ce5df09 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTask.java
@@ -10,7 +10,6 @@ public class ConsumeEventQueueTask implements Runnable {
private final ConsumeEventQueueBean consumeEventQueueBean;
private final Queue events;
- private final EventQueueConsumer eventQueueConsumer;
private final String subscriptionName;
private final CatchupCommand catchupCommand;
private final UUID commandId;
@@ -18,13 +17,11 @@ public class ConsumeEventQueueTask implements Runnable {
public ConsumeEventQueueTask(
final ConsumeEventQueueBean consumeEventQueueBean,
final Queue events,
- final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
this.consumeEventQueueBean = consumeEventQueueBean;
this.events = events;
- this.eventQueueConsumer = eventQueueConsumer;
this.subscriptionName = subscriptionName;
this.catchupCommand = catchupCommand;
this.commandId = commandId;
@@ -35,7 +32,6 @@ public void run() {
consumeEventQueueBean.consume(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java
index 75347899c..ab24b54fd 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactory.java
@@ -15,7 +15,6 @@ public class ConsumeEventQueueTaskFactory {
public ConsumeEventQueueTask createConsumeEventQueueTask(
final Queue events,
- final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
@@ -23,7 +22,6 @@ public ConsumeEventQueueTask createConsumeEventQueueTask(
return new ConsumeEventQueueTask(
consumeEventQueueBean,
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java
index de701a1f6..289a49521 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManager.java
@@ -20,7 +20,6 @@ public class ConsumeEventQueueTaskManager {
public void consume(
final Queue events,
- final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
@@ -28,7 +27,6 @@ public void consume(
final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java
index 7926b7a73..2a3dd3577 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java
@@ -21,22 +21,44 @@ public class EventProcessingFailedHandler {
@Inject
private Logger logger;
- public void handle(
- final RuntimeException exception,
+ public void handleEventFailure(
+ final Exception exception,
final PublishedEvent publishedEvent,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
- final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata());
+ final String logMessage = format(
+ "Failed to process publishedEvent: name: '%s', id: '%s', streamId: '%s'",
+ publishedEvent.getName(),
+ publishedEvent.getId(),
+ publishedEvent.getStreamId()
+ );
+
+ handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
+ }
+
+ public void handleStreamFailure(
+ final Exception exception,
+ final String subscriptionName,
+ final CatchupCommand catchupCommand,
+ final UUID commandId) {
+
+ final String logMessage = "Failed to consume stream of events. Aborting...";
+
+ handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
+ }
+
+ private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) {
logger.error(
logMessage,
exception);
+ final String errorMessage = format("%s: %s: %s", logMessage, exception.getClass().getSimpleName(), exception.getMessage());
+
final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent(
commandId,
- publishedEvent.getId(),
- publishedEvent.getMetadata(),
+ errorMessage,
exception,
catchupCommand,
subscriptionName
diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java
index 25ecb8965..2cc2da315 100644
--- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java
+++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java
@@ -9,20 +9,18 @@
import java.util.Queue;
import java.util.UUID;
+import javax.inject.Inject;
+
public class EventQueueConsumer {
- private final TransactionalEventProcessor transactionalEventProcessor;
- private final EventStreamConsumptionResolver eventStreamConsumptionResolver;
- private final EventProcessingFailedHandler eventProcessingFailedHandler;
-
- public EventQueueConsumer(
- final TransactionalEventProcessor transactionalEventProcessor,
- final EventStreamConsumptionResolver eventStreamConsumptionResolver,
- final EventProcessingFailedHandler eventProcessingFailedHandler) {
- this.transactionalEventProcessor = transactionalEventProcessor;
- this.eventStreamConsumptionResolver = eventStreamConsumptionResolver;
- this.eventProcessingFailedHandler = eventProcessingFailedHandler;
- }
+ @Inject
+ private TransactionalEventProcessor transactionalEventProcessor;
+
+ @Inject
+ private EventStreamConsumptionResolver eventStreamConsumptionResolver;
+
+ @Inject
+ private EventProcessingFailedHandler eventProcessingFailedHandler;
public boolean consumeEventQueue(
final UUID commandId,
@@ -35,8 +33,8 @@ public boolean consumeEventQueue(
final PublishedEvent publishedEvent = events.poll();
try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
- } catch (final RuntimeException e) {
- eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId);
+ } catch (final Exception e) {
+ eventProcessingFailedHandler.handleEventFailure(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} finally {
eventStreamConsumptionResolver.decrementEventsInProcessCount();
}
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java
index 7086195cd..745d0561b 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java
@@ -8,13 +8,13 @@
import uk.gov.justice.services.cdi.LoggerProducer;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.DummyEventQueueProcessingConfig;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager;
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventsInProcessCounterProvider;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
+import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.DummyTransactionalEventProcessor;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.TestCatchupBean;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
@@ -53,7 +53,6 @@ public class EventStreamCatchupIT {
DummyTransactionalEventProcessor.class,
EventStreamsInProgressList.class,
ConsumeEventQueueBean.class,
- EventQueueConsumerFactory.class,
LoggerProducer.class,
DummySystemCommandStore.class,
ConcurrentEventStreamConsumerManager.class,
@@ -61,7 +60,8 @@ public class EventStreamCatchupIT {
ConsumeEventQueueTaskManager.class,
ConsumeEventQueueTaskFactory.class,
EventsInProcessCounterProvider.class,
- DummyEventQueueProcessingConfig.class
+ DummyEventQueueProcessingConfig.class,
+ EventQueueConsumer.class
})
public WebApp war() {
return new WebApp()
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java
index 348ea2ed5..0b4121eaa 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java
@@ -37,9 +37,6 @@ public class ConcurrentEventStreamConsumerManagerTest {
@Mock
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;
- @Mock
- private EventQueueConsumerFactory eventQueueConsumerFactory;
-
@Spy
private EventStreamsInProgressList eventStreamsInProgressList = new EventStreamsInProgressList();
@@ -63,12 +60,11 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
- when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent.getStreamId()).thenReturn(streamId);
concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName, catchupCommand, commandId);
- verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId));
+ verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId));
final Queue events = eventQueueCaptor.getValue();
assertThat(events.size(), is(1));
@@ -92,14 +88,13 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
- when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId);
when(publishedEvent_2.getStreamId()).thenReturn(streamId);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId);
- verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId));
+ verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId));
final Queue eventsStream = eventQueueCaptor.getValue();
assertThat(eventsStream.size(), is(2));
@@ -125,14 +120,13 @@ public void shouldCreateQueueForEachStreamId() {
when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
- when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId);
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId);
- verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId));
+ verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId));
final List> allValues = eventQueueCaptor.getAllValues();
@@ -163,13 +157,12 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
- when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId);
- verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId));
+ verify(consumeEventQueueTaskManager).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId));
final Queue eventsStream_1 = eventQueueCaptor.getValue();
assertThat(eventsStream_1.size(), is(1));
@@ -178,7 +171,7 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
concurrentEventStreamConsumerManager.isEventConsumptionComplete(new FinishedProcessingMessage(eventsStream_1));
concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId);
- verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(catchupCommand), eq(commandId));
+ verify(consumeEventQueueTaskManager, times(2)).consume(eventQueueCaptor.capture(), eq(subscriptionName), eq(catchupCommand), eq(commandId));
final Queue eventsStream_2 = eventQueueCaptor.getValue();
assertThat(eventsStream_2.size(), is(1));
@@ -206,4 +199,17 @@ public void shouldDecrementTheEventsInProcessCount() throws Exception {
verify(eventsInProcessCounter).decrementEventsInProcessCount();
}
+
+ @Test
+ public void shouldDecrementTheEventsInProcessCountByGivenNumber() throws Exception {
+
+ final int count = 2;
+ final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);
+
+ when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
+
+ concurrentEventStreamConsumerManager.decrementEventsInProcessCountBy(count);
+
+ verify(eventsInProcessCounter).decrementEventsInProcessCountBy(count);
+ }
}
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java
deleted file mode 100644
index b6fef6e1c..000000000
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField;
-
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
-import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
-import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventQueueConsumerFactoryTest {
-
- @Mock
- private TransactionalEventProcessor transactionalEventProcessor;
-
- @Mock
- private EventProcessingFailedHandler eventProcessingFailedHandler;
-
- @InjectMocks
- private EventQueueConsumerFactory eventQueueConsumerFactory;
-
- @Test
- public void shouldCreateEventQueueConsumerFactory() throws Exception {
-
- final EventStreamConsumptionResolver eventStreamConsumptionResolver = mock(EventStreamConsumptionResolver.class);
-
- final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(eventStreamConsumptionResolver);
-
- assertThat(getValueOfField(eventQueueConsumer, "transactionalEventProcessor", TransactionalEventProcessor.class), is(transactionalEventProcessor));
- assertThat(getValueOfField(eventQueueConsumer, "eventStreamConsumptionResolver", EventStreamConsumptionResolver.class), is(eventStreamConsumptionResolver));
- assertThat(getValueOfField(eventQueueConsumer, "eventProcessingFailedHandler", EventProcessingFailedHandler.class), is(eventProcessingFailedHandler));
- }
-}
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java
new file mode 100644
index 000000000..3b5b72fd8
--- /dev/null
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterTest.java
@@ -0,0 +1,57 @@
+package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class EventsInProcessCounterTest {
+
+ @Test
+ public void shouldIncrementAndDecrementEventsInProcessCounterByOne() {
+
+ final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(1);
+ final AtomicInteger eventInProcessCount = ReflectionUtil.getValueOfField(eventsInProcessCounter, "eventInProcessCount", AtomicInteger.class);
+
+ eventsInProcessCounter.incrementEventsInProcessCount();
+
+ assertThat(eventInProcessCount.get(), is(1));
+
+ eventsInProcessCounter.decrementEventsInProcessCount();
+
+ assertThat(eventInProcessCount.get(), is(0));
+ }
+
+ @Test
+ public void shouldDecrementEventsInProcessCounterByGivenNumber() {
+
+ final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(3);
+ final AtomicInteger eventInProcessCount = ReflectionUtil.getValueOfField(eventsInProcessCounter, "eventInProcessCount", AtomicInteger.class);
+
+ eventsInProcessCounter.incrementEventsInProcessCount();
+ eventsInProcessCounter.incrementEventsInProcessCount();
+ eventsInProcessCounter.incrementEventsInProcessCount();
+
+ assertThat(eventInProcessCount.get(), is(3));
+
+ eventsInProcessCounter.decrementEventsInProcessCountBy(3);
+
+ assertThat(eventInProcessCount.get(), is(0));
+ }
+
+ @Test
+ public void shouldReturnTrueIfMaxCountReached() {
+
+ final EventsInProcessCounter eventsInProcessCounter = new EventsInProcessCounter(3);
+
+ eventsInProcessCounter.incrementEventsInProcessCount();
+ eventsInProcessCounter.incrementEventsInProcessCount();
+ eventsInProcessCounter.incrementEventsInProcessCount();
+
+ assertThat(eventsInProcessCounter.maxNumberOfEventsInProcess(), is(true));
+ }
+}
\ No newline at end of file
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java
index e9d5b3dab..3c5863030 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java
@@ -2,27 +2,46 @@
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.enterprise.event.Event;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ConsumeEventQueueBeanTest {
+ @Mock
+ private EventProcessingFailedHandler eventProcessingFailedHandler;
+
+ @Mock
+ private Event catchupProcessingOfEventFailedEventFirer;
+
+ @Mock
+ private EventStreamConsumptionResolver eventStreamConsumptionResolver;
+
+ @Mock
+ private EventQueueConsumer eventQueueConsumer;
+
@InjectMocks
private ConsumeEventQueueBean consumeEventQueueBean;
@@ -30,8 +49,7 @@ public class ConsumeEventQueueBeanTest {
public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Exception {
final UUID commandId = randomUUID();
- final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class))) ;
- final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
+ final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class)));
final String subscriptionName = "subscriptionName";
final CatchupCommand eventCatchupCommand = new EventCatchupCommand();
@@ -39,7 +57,6 @@ public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Excepti
consumeEventQueueBean.consume(
events,
- eventQueueConsumer,
subscriptionName,
eventCatchupCommand,
commandId
@@ -47,4 +64,35 @@ public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Excepti
verify(eventQueueConsumer, times(3)).consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand);
}
+
+ @Test
+ public void shouldCatchTheHiddenExceptionAndClearTheEventQueueIfTheTransactionFails() throws Exception {
+
+ final RuntimeException runtimeException = new RuntimeException(
+ "In reality this will be a javax.transaction.RollbackException"
+ );
+
+ final UUID commandId = randomUUID();
+ final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class)));
+ final String subscriptionName = "subscriptionName";
+ final CatchupCommand eventCatchupCommand = new EventCatchupCommand();
+
+ when(eventQueueConsumer.consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand))
+ .thenThrow(runtimeException)
+ .thenReturn(true);
+
+ assertThat(events.isEmpty(), is(false));
+
+ consumeEventQueueBean.consume(
+ events,
+ subscriptionName,
+ eventCatchupCommand,
+ commandId
+ );
+
+ verify(eventStreamConsumptionResolver).decrementEventsInProcessCountBy(1);
+ verify(eventQueueConsumer, times(2)).consumeEventQueue(commandId, events, subscriptionName, eventCatchupCommand);
+ verify(eventProcessingFailedHandler).handleStreamFailure(runtimeException, subscriptionName, eventCatchupCommand, commandId);
+ assertThat(events.isEmpty(), is(true));
+ }
}
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java
index 161f337b2..7424c1a73 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskFactoryTest.java
@@ -34,14 +34,12 @@ public class ConsumeEventQueueTaskFactoryTest {
public void shouldCreateConsumeEventQueueTask() throws Exception {
final Queue events = mock(Queue.class);
- final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
final String subscriptionName = "subscription name";
final CatchupCommand catchupCommand = new EventCatchupCommand();
final UUID commandId = randomUUID();
final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
@@ -49,7 +47,6 @@ public void shouldCreateConsumeEventQueueTask() throws Exception {
assertThat(getValueOfField(consumeEventQueueTask, "consumeEventQueueBean", ConsumeEventQueueBean.class), is(consumeEventQueueBean));
assertThat(getValueOfField(consumeEventQueueTask, "events", Queue.class), is(events));
- assertThat(getValueOfField(consumeEventQueueTask, "eventQueueConsumer", EventQueueConsumer.class), is(eventQueueConsumer));
assertThat(getValueOfField(consumeEventQueueTask, "subscriptionName", String.class), is(subscriptionName));
assertThat(getValueOfField(consumeEventQueueTask, "catchupCommand", CatchupCommand.class), is(catchupCommand));
assertThat(getValueOfField(consumeEventQueueTask, "commandId", UUID.class), is(commandId));
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java
index c709e173d..e70e73cbb 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskManagerTest.java
@@ -45,7 +45,6 @@ public void shouldAsynchronouslyRunConsumeEventQueue() throws Exception {
when(consumeEventQueueTaskFactory.createConsumeEventQueueTask(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
@@ -53,7 +52,6 @@ public void shouldAsynchronouslyRunConsumeEventQueue() throws Exception {
consumeEventQueueTaskManager.consume(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java
index 24ff06e28..84f137565 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueTaskTest.java
@@ -31,17 +31,15 @@ public void shouldCallTheConsumeEventQueueBean() throws Exception {
final ConsumeEventQueueTask consumeEventQueueTask = new ConsumeEventQueueTask(
consumeEventQueueBean,
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
);
-
+
consumeEventQueueTask.run();
verify(consumeEventQueueBean).consume(
events,
- eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java
index ba0668610..3ac42c9cc 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java
@@ -1,5 +1,6 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task;
+import static java.util.UUID.fromString;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -41,35 +42,71 @@ public class EventProcessingFailedHandlerTest {
private ArgumentCaptor catchupProcessingOfEventFailedEventCaptor;
@Test
- public void shouldLogExceptionAndFireFailureEvent() throws Exception {
+ public void shouldLogExceptionAndFireFailureEventOnEventFailure() throws Exception {
final NullPointerException nullPointerException = new NullPointerException("Ooops");
final UUID commandId = randomUUID();
final String subscriptionName = "subscriptionName";
- final String metadata = "{some: metadata}";
- final UUID eventId = randomUUID();
+ final String eventName = "events.some-event";
+ final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e");
+ final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829");
final CatchupCommand catchupCommand = new EventCatchupCommand();
final PublishedEvent publishedEvent = mock(PublishedEvent.class);
+ when(publishedEvent.getName()).thenReturn(eventName);
when(publishedEvent.getId()).thenReturn(eventId);
- when(publishedEvent.getMetadata()).thenReturn(metadata);
+ when(publishedEvent.getStreamId()).thenReturn(streamId);
- eventProcessingFailedHandler.handle(
+ eventProcessingFailedHandler.handleEventFailure(
nullPointerException,
publishedEvent,
subscriptionName,
catchupCommand,
commandId);
- verify(logger).error("Failed to process publishedEvent with metadata: {some: metadata}", nullPointerException);
+ verify(logger).error("Failed to process publishedEvent: name: 'events.some-event', id: '1c68536e-1fdf-4891-9c74-85661a9c0f9e', streamId: 'b1834ed9-e084-41c7-ae7f-74f1227cc829'", nullPointerException);
verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture());
final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue();
- assertThat(catchupProcessingOfEventFailedEvent.getEventId(), is(eventId));
- assertThat(catchupProcessingOfEventFailedEvent.getMetadata(), is(metadata));
+ assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to process publishedEvent: name: 'events.some-event', id: '1c68536e-1fdf-4891-9c74-85661a9c0f9e', streamId: 'b1834ed9-e084-41c7-ae7f-74f1227cc829': NullPointerException: Ooops"));
+ assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand));
+ assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException));
+ assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName));
+ }
+
+ @Test
+ public void shouldLogExceptionAndFireFailureEventOnStreamFailure() throws Exception {
+
+ final NullPointerException nullPointerException = new NullPointerException("Ooops");
+ final UUID commandId = randomUUID();
+ final String subscriptionName = "subscriptionName";
+ final String eventName = "events.some-event";
+ final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e");
+ final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829");
+ final CatchupCommand catchupCommand = new EventCatchupCommand();
+
+ final PublishedEvent publishedEvent = mock(PublishedEvent.class);
+
+ when(publishedEvent.getName()).thenReturn(eventName);
+ when(publishedEvent.getId()).thenReturn(eventId);
+ when(publishedEvent.getStreamId()).thenReturn(streamId);
+
+ eventProcessingFailedHandler.handleStreamFailure(
+ nullPointerException,
+ subscriptionName,
+ catchupCommand,
+ commandId);
+
+ verify(logger).error("Failed to consume stream of events. Aborting...", nullPointerException);
+
+ verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture());
+
+ final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue();
+
+ assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to consume stream of events. Aborting...: NullPointerException: Ooops"));
assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand));
assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException));
assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName));
diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java
index a23949a88..625f5e171 100644
--- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java
+++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java
@@ -47,7 +47,7 @@ public void shouldProcessAllEventsOnQueueAndReturnTrueIfComplete() throws Except
final UUID commandId = randomUUID();
final CatchupCommand catchupCommand = new EventCatchupCommand();
-
+
final PublishedEvent event_1 = mock(PublishedEvent.class);
final PublishedEvent event_2 = mock(PublishedEvent.class);
@@ -95,7 +95,7 @@ public void shouldHandleExceptionsThrownWhilstProcessing() throws Exception {
verify(transactionalEventProcessor).processWithEventBuffer(event_2, subscriptionName);
- verify(eventProcessingFailedHandler).handle(
+ verify(eventProcessingFailedHandler).handleEventFailure(
nullPointerException,
event_1,
subscriptionName,
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
index cb31edd76..79f6a192d 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
@@ -120,8 +120,7 @@ public void onCatchupProcessingOfEventFailed(@Observes final CatchupProcessingOf
final CatchupCommand catchupCommand = catchupProcessingOfEventFailedEvent.getCatchupCommand();
final CatchupError catchupError = new CatchupError(
- catchupProcessingOfEventFailedEvent.getEventId(),
- catchupProcessingOfEventFailedEvent.getMetadata(),
+ catchupProcessingOfEventFailedEvent.getMessage(),
catchupProcessingOfEventFailedEvent.getSubscriptionName(),
catchupCommand,
catchupProcessingOfEventFailedEvent.getException()
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java
index 748789cb3..62570150c 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java
@@ -3,35 +3,27 @@
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import java.util.Objects;
-import java.util.UUID;
public class CatchupError {
- private final UUID eventId;
- private final String metadata;
+ private final String message;
private final String subscriptionName;
private final CatchupCommand catchupCommand;
private final Throwable exception;
public CatchupError(
- final UUID eventId,
- final String metadata,
+ final String message,
final String subscriptionName,
final CatchupCommand catchupCommand,
final Throwable exception) {
- this.eventId = eventId;
- this.metadata = metadata;
+ this.message = message;
this.subscriptionName = subscriptionName;
this.catchupCommand = catchupCommand;
this.exception = exception;
}
- public UUID getEventId() {
- return eventId;
- }
-
- public String getMetadata() {
- return metadata;
+ public String getMessage() {
+ return message;
}
public String getSubscriptionName() {
@@ -51,8 +43,7 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof CatchupError)) return false;
final CatchupError that = (CatchupError) o;
- return Objects.equals(eventId, that.eventId) &&
- Objects.equals(metadata, that.metadata) &&
+ return Objects.equals(message, that.message) &&
Objects.equals(subscriptionName, that.subscriptionName) &&
Objects.equals(catchupCommand, that.catchupCommand) &&
Objects.equals(exception, that.exception);
@@ -60,14 +51,13 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
- return Objects.hash(eventId, metadata, subscriptionName, catchupCommand, exception);
+ return Objects.hash(message, subscriptionName, catchupCommand, exception);
}
@Override
public String toString() {
return "CatchupError{" +
- "eventId=" + eventId +
- ", metadata='" + metadata + '\'' +
+ "message=" + message +
", subscriptionName='" + subscriptionName + '\'' +
", catchupCommand=" + catchupCommand +
", exception=" + exception +
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java
index 4eda863c9..44ee6feb4 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java
@@ -7,6 +7,9 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import javax.inject.Singleton;
+
+@Singleton
public class CatchupErrorStateManager {
private final List eventCatchupErrors = new CopyOnWriteArrayList<>();
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
index b5a5d1e21..bb6b9bbf8 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
@@ -244,16 +244,14 @@ public void shouldCompleteTheCatchupIfAllCatchupsForSubscriptionsComplete() thro
public void shouldHandleCatchupProcessingOfEventFailed() throws Exception {
final UUID commandId = randomUUID();
- final UUID eventId = randomUUID();
- final String metadata = "{some: metadata}";
+ final String message = "oh gosh";
final NullPointerException exception = new NullPointerException("Ooops");
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
final String subscriptionName = "subscriptionName";
final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent(
commandId,
- eventId,
- metadata,
+ message,
exception,
eventCatchupCommand,
subscriptionName
@@ -265,9 +263,8 @@ public void shouldHandleCatchupProcessingOfEventFailed() throws Exception {
final CatchupError catchupError = catchupErrorCaptor.getValue();
- assertThat(catchupError.getEventId(), is(eventId));
+ assertThat(catchupError.getMessage(), is(message));
assertThat(catchupError.getException(), is(exception));
- assertThat(catchupError.getMetadata(), is(metadata));
assertThat(catchupError.getCatchupCommand(), is(eventCatchupCommand));
assertThat(catchupError.getSubscriptionName(), is(subscriptionName));
}
diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java
index a8a6ee47c..99ca871ff 100644
--- a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java
+++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java
@@ -8,22 +8,19 @@
public class CatchupProcessingOfEventFailedEvent {
private final UUID commandId;
- private final UUID eventId;
- private final String metadata;
+ private final String message;
private final Throwable exception;
private final CatchupCommand catchupCommand;
private final String subscriptionName;
public CatchupProcessingOfEventFailedEvent(
final UUID commandId,
- final UUID eventId,
- final String metadata,
+ final String message,
final Throwable exception,
final CatchupCommand catchupCommand,
final String subscriptionName) {
this.commandId = commandId;
- this.eventId = eventId;
- this.metadata = metadata;
+ this.message = message;
this.exception = exception;
this.catchupCommand = catchupCommand;
this.subscriptionName = subscriptionName;
@@ -33,12 +30,8 @@ public UUID getCommandId() {
return commandId;
}
- public UUID getEventId() {
- return eventId;
- }
-
- public String getMetadata() {
- return metadata;
+ public String getMessage() {
+ return message;
}
public Throwable getException() {
@@ -59,8 +52,7 @@ public boolean equals(final Object o) {
if (!(o instanceof CatchupProcessingOfEventFailedEvent)) return false;
final CatchupProcessingOfEventFailedEvent that = (CatchupProcessingOfEventFailedEvent) o;
return Objects.equals(commandId, that.commandId) &&
- Objects.equals(eventId, that.eventId) &&
- Objects.equals(metadata, that.metadata) &&
+ Objects.equals(message, that.message) &&
Objects.equals(exception, that.exception) &&
Objects.equals(catchupCommand, that.catchupCommand) &&
Objects.equals(subscriptionName, that.subscriptionName);
@@ -68,15 +60,14 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
- return Objects.hash(commandId, eventId, metadata, exception, catchupCommand, subscriptionName);
+ return Objects.hash(commandId, message, exception, catchupCommand, subscriptionName);
}
@Override
public String toString() {
return "CatchupProcessingOfEventFailedEvent{" +
"commandId=" + commandId +
- ", eventId=" + eventId +
- ", metadata='" + metadata + '\'' +
+ ", message=" + message +
", exception=" + exception +
", catchupCommand=" + catchupCommand +
", subscriptionName='" + subscriptionName + '\'' +