From db236d634375f8438e484e4cdb557320590ac1b3 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Fri, 27 Sep 2019 10:47:43 +0100 Subject: [PATCH] Fire new CatchupProcessingOfEventFailedEvent if processing of PublishedEvent during catchup fails --- CHANGELOG.md | 5 + event-sourcing/subscription-manager/pom.xml | 5 + .../ConcurrentEventStreamConsumerManager.java | 19 ++- .../manager/EventQueueConsumerFactory.java | 8 +- .../manager/EventStreamConsumerManager.java | 3 +- .../consumer/task/ConsumeEventQueueBean.java | 12 +- .../task/EventProcessingFailedHandler.java | 39 ++++++ .../consumer/task/EventQueueConsumer.java | 21 ++- .../catchup/EventStreamCatchupIT.java | 4 +- ...currentEventStreamConsumerManagerTest.java | 25 ++-- .../EventQueueConsumerFactoryTest.java | 41 ++++++ .../task/ConsumeEventQueueBeanTest.java | 46 +++++++ .../EventProcessingFailedHandlerTest.java | 73 ++++++++++ .../consumer/task/EventQueueConsumerTest.java | 21 +-- .../consumer/util/TestCatchupBean.java | 4 +- .../event-store-management-core/pom.xml | 2 +- .../catchup/observers/CatchupLifecycle.java | 59 +++++++-- .../catchup/observers/CatchupObserver.java | 24 ++++ .../process/EventCatchupProcessor.java | 5 +- .../catchup/state/CatchupError.java | 75 +++++++++++ .../state/CatchupErrorStateManager.java | 35 +++++ .../CatchupStateManager.java} | 10 +- .../observers/CatchupLifecycleTest.java | 125 ++++++++++++++++-- .../observers/CatchupObserverTest.java | 26 ++++ .../process/CatchupsInProgressCacheTest.java | 122 ----------------- .../process/EventCatchupProcessorTest.java | 18 +-- .../state/CatchupErrorStateManagerTest.java | 98 ++++++++++++++ .../state/CatchupStateManagerTest.java | 123 +++++++++++++++++ .../events/catchup/CatchupCompletedEvent.java | 18 ++- .../CatchupProcessingOfEventFailedEvent.java | 74 +++++++++++ 30 files changed, 930 insertions(+), 210 deletions(-) create mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java create mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java create mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java rename event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/{process/CatchupsInProgressCache.java => state/CatchupStateManager.java} (85%) delete mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCacheTest.java create mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManagerTest.java create mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java create mode 100644 event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a1492296b..16c36de96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,13 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] + +## [2.1.1] - 2019-09-28 +### Added +- New system event 'CatchupProcessingOfEventFailedEvent' fired if processing of any PublishedEvent during catchup fails ### Changed - All system events moved into their own module 'event-store-management-events' +- Unsuccessful catchups now logged correctly in catchup completion. ## [2.1.0] - 2019-09-26 ### Changed diff --git a/event-sourcing/subscription-manager/pom.xml b/event-sourcing/subscription-manager/pom.xml index 8e995d433..d63bb6bc5 100644 --- a/event-sourcing/subscription-manager/pom.xml +++ b/event-sourcing/subscription-manager/pom.xml @@ -63,6 +63,11 @@ jmx-command-handling ${framework.version} + + uk.gov.justice.event-store + event-store-management-events + ${project.version} + diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java index 84fbe4aa7..74272085d 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java @@ -3,6 +3,7 @@ import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; import java.util.Queue; import java.util.UUID; @@ -50,7 +51,10 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer * count the number of events consumed */ @Override - public int add(final PublishedEvent publishedEvent, final String subscriptionName) { + public int add( + final PublishedEvent publishedEvent, + final String subscriptionName, + final CatchupType catchupType) { final UUID streamId = publishedEvent.getStreamId(); @@ -60,7 +64,7 @@ public int add(final PublishedEvent publishedEvent, final String subscriptionNam events.offer(publishedEvent); if (notInProgress(events)) { - createAndSubmitTaskFor(events, subscriptionName); + createAndSubmitTaskFor(events, subscriptionName, catchupType); } } @@ -98,11 +102,18 @@ private boolean notInProgress(final Queue eventStream) { return !eventStreamsInProgressList.contains(eventStream); } - private void createAndSubmitTaskFor(final Queue eventStream, final String subscriptionName) { + private void createAndSubmitTaskFor( + final Queue eventStream, + final String subscriptionName, + final CatchupType catchupType) { eventStreamsInProgressList.add(eventStream); final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this); - consumeEventQueueBean.consume(eventStream, eventQueueConsumer, subscriptionName); + consumeEventQueueBean.consume( + eventStream, + eventQueueConsumer, + subscriptionName, + catchupType); } } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java index 4076f6272..c9ed3c2c5 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactory.java @@ -1,7 +1,6 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; -import static org.slf4j.LoggerFactory.getLogger; - +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor; @@ -12,10 +11,13 @@ public class EventQueueConsumerFactory { @Inject private TransactionalEventProcessor transactionalEventProcessor; + @Inject + private EventProcessingFailedHandler eventProcessingFailedHandler; + public EventQueueConsumer create(final EventStreamConsumptionResolver eventStreamConsumptionResolver) { return new EventQueueConsumer( transactionalEventProcessor, eventStreamConsumptionResolver, - getLogger(EventQueueConsumer.class)); + eventProcessingFailedHandler); } } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumerManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumerManager.java index dbcc17578..3e8ce8bb2 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumerManager.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumerManager.java @@ -1,6 +1,7 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; /** * Interface for managing the consuming of JsonEnvelope events from Stream of events. @@ -18,7 +19,7 @@ public interface EventStreamConsumerManager { * @return The number of events added to the stream. Note this is always one and is used * to count the number of events consumed */ - int add(final PublishedEvent publishedEvent, final String subscriptionName); + int add(final PublishedEvent publishedEvent, final String subscriptionName, final CatchupType catchupType); void waitForCompletion(); } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java index c95c8aac5..b38a0d615 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBean.java @@ -1,6 +1,7 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; import java.util.Queue; @@ -11,11 +12,18 @@ public class ConsumeEventQueueBean { @Asynchronous - public void consume(final Queue events, final EventQueueConsumer eventQueueConsumer, final String subscriptionName) { + public void consume( + final Queue events, + final EventQueueConsumer eventQueueConsumer, + final String subscriptionName, + final CatchupType catchupType) { boolean consumed = false; while(! consumed) { - consumed = eventQueueConsumer.consumeEventQueue(events, subscriptionName); + consumed = eventQueueConsumer.consumeEventQueue( + events, + subscriptionName, + catchupType); } } } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java new file mode 100644 index 000000000..37afc27a9 --- /dev/null +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java @@ -0,0 +1,39 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; + +import static java.lang.String.format; + +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; + +import javax.enterprise.event.Event; +import javax.inject.Inject; + +import org.slf4j.Logger; + +public class EventProcessingFailedHandler { + + @Inject + private Event catchupProcessingOfEventFailedEventFirer; + + @Inject + private Logger logger; + + public void handle(final RuntimeException exception, final PublishedEvent publishedEvent, final String subscriptionName, final CatchupType catchupType) { + + final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata()); + logger.error( + logMessage, + exception); + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent( + publishedEvent.getId(), + publishedEvent.getMetadata(), + exception, + catchupType, + subscriptionName + ); + + catchupProcessingOfEventFailedEventFirer.fire(catchupProcessingOfEventFailedEvent); + } +} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java index 2a0848795..8d1fdc121 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java @@ -1,42 +1,39 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; -import static java.lang.String.format; - import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.FinishedProcessingMessage; import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; import java.util.Queue; -import org.slf4j.Logger; - public class EventQueueConsumer { private final TransactionalEventProcessor transactionalEventProcessor; private final EventStreamConsumptionResolver eventStreamConsumptionResolver; - private final Logger logger; + private final EventProcessingFailedHandler eventProcessingFailedHandler; public EventQueueConsumer( final TransactionalEventProcessor transactionalEventProcessor, final EventStreamConsumptionResolver eventStreamConsumptionResolver, - final Logger logger) { + final EventProcessingFailedHandler eventProcessingFailedHandler) { this.transactionalEventProcessor = transactionalEventProcessor; this.eventStreamConsumptionResolver = eventStreamConsumptionResolver; - this.logger = logger; + this.eventProcessingFailedHandler = eventProcessingFailedHandler; } - public boolean consumeEventQueue(final Queue events, final String subscriptionName) { + public boolean consumeEventQueue( + final Queue events, + final String subscriptionName, + final CatchupType catchupType) { while (!events.isEmpty()) { final PublishedEvent publishedEvent = events.poll(); try { transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName); } catch (final RuntimeException e) { - final String message = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata()); - logger.error( - message, - e); + eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupType); } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java index cae6d625d..fabeb9850 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java @@ -10,6 +10,7 @@ import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.DummyTransactionalEventProcessor; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util.TestCatchupBean; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; @@ -51,7 +52,8 @@ public class EventStreamCatchupIT { EventQueueConsumerFactory.class, LoggerProducer.class, DummySystemCommandStore.class, - ConcurrentEventStreamConsumerManager.class + ConcurrentEventStreamConsumerManager.class, + EventProcessingFailedHandler.class }) public WebApp war() { return new WebApp() diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java index 942f8c49f..4d541cc08 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; @@ -56,9 +57,9 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() { when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent.getStreamId()).thenReturn(streamId); - concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName); + concurrentEventStreamConsumerManager.add(publishedEvent, subscriptionName, EVENT_CATCHUP); - verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName)); + verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP)); final Queue events = eventQueueCaptor.getValue(); assertThat(events.size(), is(1)); @@ -79,10 +80,10 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() { when(publishedEvent_1.getStreamId()).thenReturn(streamId); when(publishedEvent_2.getStreamId()).thenReturn(streamId); - concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName); - concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName); + concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP); + concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP); - verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName)); + verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP)); final Queue eventsStream = eventQueueCaptor.getValue(); assertThat(eventsStream.size(), is(2)); @@ -104,10 +105,10 @@ public void shouldCreateQueueForEachStreamId() { when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); - concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName); - concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName); + concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP); + concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP); - verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName)); + verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP)); final List> allValues = eventQueueCaptor.getAllValues(); @@ -134,18 +135,18 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); - concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName); + concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP); - verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName)); + verify(consumeEventQueueBean).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP)); final Queue eventsStream_1 = eventQueueCaptor.getValue(); assertThat(eventsStream_1.size(), is(1)); assertThat(eventsStream_1.poll(), is(publishedEvent_1)); concurrentEventStreamConsumerManager.isEventConsumptionComplete(new FinishedProcessingMessage(eventsStream_1)); - concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName); + concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP); - verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName)); + verify(consumeEventQueueBean, times(2)).consume(eventQueueCaptor.capture(), eq(eventQueueConsumer), eq(subscriptionName), eq(EVENT_CATCHUP)); final Queue eventsStream_2 = eventQueueCaptor.getValue(); assertThat(eventsStream_2.size(), is(1)); diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java new file mode 100644 index 000000000..b6fef6e1c --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueConsumerFactoryTest.java @@ -0,0 +1,41 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField; + +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; +import uk.gov.justice.services.event.sourcing.subscription.manager.TransactionalEventProcessor; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class EventQueueConsumerFactoryTest { + + @Mock + private TransactionalEventProcessor transactionalEventProcessor; + + @Mock + private EventProcessingFailedHandler eventProcessingFailedHandler; + + @InjectMocks + private EventQueueConsumerFactory eventQueueConsumerFactory; + + @Test + public void shouldCreateEventQueueConsumerFactory() throws Exception { + + final EventStreamConsumptionResolver eventStreamConsumptionResolver = mock(EventStreamConsumptionResolver.class); + + final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(eventStreamConsumptionResolver); + + assertThat(getValueOfField(eventQueueConsumer, "transactionalEventProcessor", TransactionalEventProcessor.class), is(transactionalEventProcessor)); + assertThat(getValueOfField(eventQueueConsumer, "eventStreamConsumptionResolver", EventStreamConsumptionResolver.class), is(eventStreamConsumptionResolver)); + assertThat(getValueOfField(eventQueueConsumer, "eventProcessingFailedHandler", EventProcessingFailedHandler.class), is(eventProcessingFailedHandler)); + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java new file mode 100644 index 000000000..e2dd1edbb --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/ConsumeEventQueueBeanTest.java @@ -0,0 +1,46 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; + +import static java.util.Collections.singletonList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; + +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumeEventQueueBeanTest { + + @InjectMocks + private ConsumeEventQueueBean consumeEventQueueBean; + + @Test + public void shouldConsumeTheEventQueueUntilEventsConsumedIsTrue() throws Exception { + + final Queue events = new ConcurrentLinkedQueue<>(singletonList(mock(PublishedEvent.class))) ; + final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); + final String subscriptionName = "subscriptionName"; + final CatchupType catchupType = EVENT_CATCHUP; + + when(eventQueueConsumer.consumeEventQueue(events, subscriptionName, catchupType)).thenReturn(false, false, true); + + consumeEventQueueBean.consume( + events, + eventQueueConsumer, + subscriptionName, + catchupType + ); + + verify(eventQueueConsumer, times(3)).consumeEventQueue(events, subscriptionName, catchupType); + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java new file mode 100644 index 000000000..bef6327d7 --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java @@ -0,0 +1,73 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task; + +import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; + +import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; + +import java.util.UUID; + +import javax.enterprise.event.Event; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + +@RunWith(MockitoJUnitRunner.class) +public class EventProcessingFailedHandlerTest { + + @Mock + private Event catchupProcessingOfEventFailedEventFirer; + + @Mock + private Logger logger; + + @InjectMocks + private EventProcessingFailedHandler eventProcessingFailedHandler; + + @Captor + private ArgumentCaptor catchupProcessingOfEventFailedEventCaptor; + + @Test + public void shouldLogExceptionAndFireFailureEvent() throws Exception { + + final NullPointerException nullPointerException = new NullPointerException("Ooops"); + final String subscriptionName = "subscriptionName"; + final String metadata = "{some: metadata}"; + final UUID eventId = randomUUID(); + + final PublishedEvent publishedEvent = mock(PublishedEvent.class); + + when(publishedEvent.getId()).thenReturn(eventId); + when(publishedEvent.getMetadata()).thenReturn(metadata); + + eventProcessingFailedHandler.handle( + nullPointerException, + publishedEvent, + subscriptionName, + EVENT_CATCHUP); + + verify(logger).error("Failed to process publishedEvent with metadata: {some: metadata}", nullPointerException); + + verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture()); + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue(); + + assertThat(catchupProcessingOfEventFailedEvent.getEventId(), is(eventId)); + assertThat(catchupProcessingOfEventFailedEvent.getMetadata(), is(metadata)); + assertThat(catchupProcessingOfEventFailedEvent.getCatchupType(), is(EVENT_CATCHUP)); + assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException)); + assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName)); + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java index 8900e17fc..98d1abdfb 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.FinishedProcessingMessage; @@ -20,7 +21,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import org.slf4j.Logger; @RunWith(MockitoJUnitRunner.class) public class EventQueueConsumerTest { @@ -32,7 +32,7 @@ public class EventQueueConsumerTest { private EventStreamConsumptionResolver eventStreamConsumptionResolver; @Mock - private Logger logger; + private EventProcessingFailedHandler eventProcessingFailedHandler; @InjectMocks @@ -52,7 +52,7 @@ public void shouldProcessAllEventsOnQueueAndReturnTrueIfComplete() throws Except eventQueue.add(event_2); final String subscriptionName = "subscriptionName"; - eventQueueConsumer.consumeEventQueue(eventQueue, subscriptionName); + eventQueueConsumer.consumeEventQueue(eventQueue, subscriptionName, EVENT_CATCHUP); final InOrder inOrder = inOrder(transactionalEventProcessor); @@ -61,7 +61,7 @@ public void shouldProcessAllEventsOnQueueAndReturnTrueIfComplete() throws Except } @Test - public void shouldLogAnyExceptionsThrownWhilstProcessing() throws Exception { + public void shouldHandleExceptionsThrownWhilstProcessing() throws Exception { final NullPointerException nullPointerException = new NullPointerException("Ooops"); @@ -78,12 +78,17 @@ public void shouldLogAnyExceptionsThrownWhilstProcessing() throws Exception { eventQueue.add(event_2); final String subscriptionName = "subscriptionName"; - doThrow(nullPointerException).when(transactionalEventProcessor).processWithEventBuffer(event_2, subscriptionName); + doThrow(nullPointerException).when(transactionalEventProcessor).processWithEventBuffer(event_1, subscriptionName); - eventQueueConsumer.consumeEventQueue(eventQueue, subscriptionName); + eventQueueConsumer.consumeEventQueue(eventQueue, subscriptionName, EVENT_CATCHUP); - verify(transactionalEventProcessor).processWithEventBuffer(event_1, subscriptionName); + verify(transactionalEventProcessor).processWithEventBuffer(event_2, subscriptionName); - verify(logger).error("Failed to process publishedEvent with metadata: {some: metadata}", nullPointerException); + verify(eventProcessingFailedHandler).handle( + nullPointerException, + event_1, + subscriptionName, + EVENT_CATCHUP + ); } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/util/TestCatchupBean.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/util/TestCatchupBean.java index 575662c27..072d7992b 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/util/TestCatchupBean.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/util/TestCatchupBean.java @@ -1,5 +1,7 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.util; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; + import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; @@ -38,7 +40,7 @@ public void run(final StopWatch stopWatch) { stopWatch.start(); final int totalEventsProcessed = eventStream.mapToInt(event -> { - concurrentEventStreamConsumerManager.add(event, ""); + concurrentEventStreamConsumerManager.add(event, "subscriptionName", EVENT_CATCHUP); return 1; }).sum(); diff --git a/event-store-management/event-store-management-core/pom.xml b/event-store-management/event-store-management-core/pom.xml index c65535949..75a677229 100644 --- a/event-store-management/event-store-management-core/pom.xml +++ b/event-store-management/event-store-management-core/pom.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - event-store + event-store-management uk.gov.justice.event-store 2.2.0-SNAPSHOT diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycle.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycle.java index 297c24d03..1390d00e4 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycle.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycle.java @@ -5,10 +5,13 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventstore.management.catchup.process.CatchupDurationCalculator; import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress; -import uk.gov.justice.services.eventstore.management.catchup.process.CatchupsInProgressCache; import uk.gov.justice.services.eventstore.management.catchup.process.EventCatchupRunner; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupError; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupErrorStateManager; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupStateManager; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent; @@ -17,6 +20,7 @@ import java.time.Duration; import java.time.ZonedDateTime; +import java.util.List; import javax.enterprise.event.Event; import javax.inject.Inject; @@ -29,7 +33,10 @@ public class CatchupLifecycle { private EventCatchupRunner eventCatchupRunner; @Inject - private CatchupsInProgressCache catchupsInProgressCache; + private CatchupStateManager catchupStateManager; + + @Inject + private CatchupErrorStateManager catchupErrorStateManager; @Inject private CatchupDurationCalculator catchupDurationCalculator; @@ -43,7 +50,6 @@ public class CatchupLifecycle { @Inject private Logger logger; - public void handleCatchupRequested(final CatchupRequestedEvent catchupRequestedEvent) { final CatchupType catchupType = catchupRequestedEvent.getCatchupType(); @@ -51,6 +57,9 @@ public void handleCatchupRequested(final CatchupRequestedEvent catchupRequestedE logger.info(format("%s catchup requested", catchupType.getName())); + catchupStateManager.clear(catchupType); + catchupErrorStateManager.clear(catchupType); + eventCatchupRunner.runEventCatchup(catchupType, target); } @@ -60,8 +69,6 @@ public void handleCatchupStarted(final CatchupStartedEvent catchupStartedEvent) final ZonedDateTime catchupStartedAt = catchupStartedEvent.getCatchupStartedAt(); logger.info(format("%s catchup started at %s", catchupType.getName(), catchupStartedAt)); - - catchupsInProgressCache.removeAll(catchupType); } public void handleCatchupStartedForSubscription(final CatchupStartedForSubscriptionEvent catchupStartedForSubscriptionEvent) { @@ -70,7 +77,7 @@ public void handleCatchupStartedForSubscription(final CatchupStartedForSubscript final ZonedDateTime catchupStartedAt = catchupStartedForSubscriptionEvent.getCatchupStartedAt(); final CatchupType catchupType = catchupStartedForSubscriptionEvent.getCatchupType(); - catchupsInProgressCache.addCatchupInProgress(new CatchupInProgress(subscriptionName, catchupStartedAt), catchupType); + catchupStateManager.addCatchupInProgress(new CatchupInProgress(subscriptionName, catchupStartedAt), catchupType); logger.info(format("%s catchup for subscription '%s' started at %s", catchupType.getName(), subscriptionName, catchupStartedAt)); } @@ -86,7 +93,7 @@ public void handleCatchupCompleteForSubscription(final CatchupCompletedForSubscr logger.info(format("%s catchup for subscription '%s' completed at %s", catchupType.getName(), subscriptionName, catchupCompletedAt)); logger.info(format("%s catchup for subscription '%s' caught up %d events", catchupType.getName(), subscriptionName, totalNumberOfEvents)); - final CatchupInProgress catchupInProgress = catchupsInProgressCache.removeCatchupInProgress(subscriptionName, catchupType); + final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(subscriptionName, catchupType); final Duration catchupDuration = catchupDurationCalculator.calculate( catchupInProgress, @@ -94,11 +101,41 @@ public void handleCatchupCompleteForSubscription(final CatchupCompletedForSubscr logger.info(format("%s catchup for subscription '%s' took %d milliseconds", catchupType.getName(), subscriptionName, catchupDuration.toMillis())); - if (catchupsInProgressCache.noCatchupsInProgress(catchupType)) { - final ZonedDateTime completedAt = clock.now(); + if (catchupStateManager.noCatchupsInProgress(catchupType)) { final SystemCommand target = catchupCompletedForSubscriptionEvent.getTarget(); - catchupCompletedEventFirer.fire(new CatchupCompletedEvent(target, completedAt)); - logger.info(format("%s catchup fully complete at %s", catchupType.getName(), completedAt)); + final ZonedDateTime completedAt = clock.now(); + + catchupCompletedEventFirer.fire(new CatchupCompletedEvent( + target, + completedAt, + catchupType)); + } + } + + public void handleCatchupComplete(final CatchupCompletedEvent catchupCompletedEvent) { + + final CatchupType catchupType = catchupCompletedEvent.getCatchupType(); + final ZonedDateTime completedAt = catchupCompletedEvent.getCompletedAt(); + + final List errors = catchupErrorStateManager.getErrors(catchupType); + if (errors.isEmpty()) { + logger.info(format("%s catchup successfully completed with 0 errors at %s", catchupType.getName(), completedAt)); + } else { + logger.error(format("%s catchup failed with %d errors", catchupType.getName(), errors.size())); } } + + public void handleCatchupProcessingOfEventFailed(final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent) { + + final CatchupType catchupType = catchupProcessingOfEventFailedEvent.getCatchupType(); + + final CatchupError catchupError = new CatchupError( + catchupProcessingOfEventFailedEvent.getEventId(), + catchupProcessingOfEventFailedEvent.getMetadata(), + catchupProcessingOfEventFailedEvent.getSubscriptionName(), + catchupType, catchupProcessingOfEventFailedEvent.getException() + ); + + catchupErrorStateManager.add(catchupError, catchupType); + } } diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java index 42bd5cb44..30905ccb8 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java @@ -1,10 +1,22 @@ package uk.gov.justice.services.eventstore.management.catchup.observers; +import static java.lang.String.format; + +import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupError; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; import uk.gov.justice.services.eventstore.management.logging.MdcLogger; +import uk.gov.justice.services.jmx.api.command.SystemCommand; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.List; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; @@ -46,4 +58,16 @@ public void onCatchupCompleteForSubscription(@Observes final CatchupCompletedFor .mdcLoggerConsumer() .accept(() -> catchupLifecycle.handleCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent)); } + + public void onCatchupComplete(@Observes final CatchupCompletedEvent catchupCompletedEvent) { + mdcLogger + .mdcLoggerConsumer() + .accept(() -> catchupLifecycle.handleCatchupComplete(catchupCompletedEvent)); + } + + public void onCatchupProcessingOfEventFailed(@Observes final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent) { + mdcLogger + .mdcLoggerConsumer() + .accept(() -> catchupLifecycle.handleCatchupProcessingOfEventFailed(catchupProcessingOfEventFailedEvent)); + } } diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java index 5ee1c9b02..ed94ef2e9 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java @@ -76,14 +76,15 @@ public void performEventCatchup(final CatchupSubscriptionContext catchupSubscrip logger.info(format("%s catch up for Event Number: %d", catchupType.getName(), eventNumber)); } - return concurrentEventStreamConsumerManager.add(event, subscriptionName); + return concurrentEventStreamConsumerManager.add(event, subscriptionName, catchupType); }).sum(); concurrentEventStreamConsumerManager.waitForCompletion(); final CatchupCompletedForSubscriptionEvent event = new CatchupCompletedForSubscriptionEvent( - catchupType, subscriptionName, + catchupType, + subscriptionName, eventSourceName, componentName, systemCommand, diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java new file mode 100644 index 000000000..5d9a94e45 --- /dev/null +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupError.java @@ -0,0 +1,75 @@ +package uk.gov.justice.services.eventstore.management.catchup.state; + +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; + +import java.util.Objects; +import java.util.UUID; + +public class CatchupError { + + private final UUID eventId; + private final String metadata; + private final String subscriptionName; + private final CatchupType catchupType; + private final Throwable exception; + + public CatchupError( + final UUID eventId, + final String metadata, + final String subscriptionName, + final CatchupType catchupType, final Throwable exception) { + this.eventId = eventId; + this.metadata = metadata; + this.subscriptionName = subscriptionName; + this.catchupType = catchupType; + this.exception = exception; + } + + public UUID getEventId() { + return eventId; + } + + public String getMetadata() { + return metadata; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public CatchupType getCatchupType() { + return catchupType; + } + + public Throwable getException() { + return exception; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof CatchupError)) return false; + final CatchupError that = (CatchupError) o; + return Objects.equals(eventId, that.eventId) && + Objects.equals(metadata, that.metadata) && + Objects.equals(subscriptionName, that.subscriptionName) && + catchupType == that.catchupType && + Objects.equals(exception, that.exception); + } + + @Override + public int hashCode() { + return Objects.hash(eventId, metadata, subscriptionName, catchupType, exception); + } + + @Override + public String toString() { + return "CatchupError{" + + "eventId=" + eventId + + ", metadata='" + metadata + '\'' + + ", subscriptionName='" + subscriptionName + '\'' + + ", catchupType=" + catchupType + + ", exception=" + exception + + '}'; + } +} diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java new file mode 100644 index 000000000..34a3e35cd --- /dev/null +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManager.java @@ -0,0 +1,35 @@ +package uk.gov.justice.services.eventstore.management.catchup.state; + +import static com.google.common.collect.ImmutableList.copyOf; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; + +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class CatchupErrorStateManager { + + private final List eventCatchupErrors = new CopyOnWriteArrayList<>(); + private final List indexCatchupErrors = new CopyOnWriteArrayList<>(); + + public void add(final CatchupError catchupError, final CatchupType catchupType) { + listFor(catchupType).add(catchupError); + } + + public List getErrors(final CatchupType catchupType) { + return copyOf(listFor(catchupType)); + } + + public void clear(final CatchupType catchupType) { + listFor(catchupType).clear(); + } + + private List listFor(final CatchupType catchupType) { + if (catchupType == EVENT_CATCHUP) { + return eventCatchupErrors; + } + + return indexCatchupErrors; + } +} diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCache.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java similarity index 85% rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCache.java rename to event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java index bbdb71801..5ea8814c9 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCache.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java @@ -1,7 +1,8 @@ -package uk.gov.justice.services.eventstore.management.catchup.process; +package uk.gov.justice.services.eventstore.management.catchup.state; import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.INDEX_CATCHUP; +import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; import java.util.ArrayList; @@ -9,12 +10,15 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class CatchupsInProgressCache { +import javax.inject.Singleton; + +@Singleton +public class CatchupStateManager { private final Map eventCatchupsInProgress = new ConcurrentHashMap<>(); private final Map indexCatchupsInProgress = new ConcurrentHashMap<>(); - public void removeAll(final CatchupType catchupType) { + public void clear(final CatchupType catchupType) { getCache(catchupType).clear(); } diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycleTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycleTest.java index 5e230d329..c172746cc 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycleTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupLifecycleTest.java @@ -3,8 +3,13 @@ import static java.time.ZoneOffset.UTC; import static java.time.ZonedDateTime.of; import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.UUID.randomUUID; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -14,10 +19,13 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventstore.management.catchup.process.CatchupDurationCalculator; import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress; -import uk.gov.justice.services.eventstore.management.catchup.process.CatchupsInProgressCache; import uk.gov.justice.services.eventstore.management.catchup.process.EventCatchupRunner; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupError; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupErrorStateManager; +import uk.gov.justice.services.eventstore.management.catchup.state.CatchupStateManager; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent; @@ -27,6 +35,9 @@ import java.time.Duration; import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.List; +import java.util.UUID; import javax.enterprise.event.Event; @@ -34,6 +45,7 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -46,7 +58,10 @@ public class CatchupLifecycleTest { private EventCatchupRunner eventCatchupRunner; @Mock - private CatchupsInProgressCache catchupsInProgressCache; + private CatchupStateManager catchupStateManager; + + @Mock + private CatchupErrorStateManager catchupErrorStateManager; @Mock private CatchupDurationCalculator catchupDurationCalculator; @@ -66,6 +81,9 @@ public class CatchupLifecycleTest { @Captor private ArgumentCaptor catchupTypeCaptor; + @Captor + private ArgumentCaptor catchupErrorCaptor; + @InjectMocks private CatchupLifecycle catchupLifecycle; @@ -73,10 +91,11 @@ public class CatchupLifecycleTest { public void shouldCallTheCatchupRunnerOnCatchupRequested() throws Exception { final ZonedDateTime catchupStartedAt = of(2019, 2, 23, 17, 12, 23, 0, UTC); + final CatchupType catchupType = EVENT_CATCHUP; final CatchupCommand catchupCommand = new CatchupCommand(); final CatchupRequestedEvent catchupRequestedEvent = new CatchupRequestedEvent( - EVENT_CATCHUP, + catchupType, catchupCommand, catchupStartedAt ); @@ -84,18 +103,25 @@ public void shouldCallTheCatchupRunnerOnCatchupRequested() throws Exception { catchupLifecycle.handleCatchupRequested(catchupRequestedEvent); verify(logger).info("Event catchup requested"); - verify(eventCatchupRunner).runEventCatchup(EVENT_CATCHUP, catchupCommand); + + final InOrder inOrder = inOrder( + catchupStateManager, + catchupErrorStateManager, + eventCatchupRunner); + + inOrder.verify(catchupStateManager).clear(catchupType); + inOrder.verify(catchupErrorStateManager).clear(catchupType); + inOrder.verify(eventCatchupRunner).runEventCatchup(catchupType, catchupCommand); } @Test - public void shouldClearEventsInProgressOnCatchupStart() throws Exception { + public void shouldHanldeCatchupStarted() throws Exception { final ZonedDateTime catchupStartedAt = of(2019, 2, 23, 17, 12, 23, 0, UTC); catchupLifecycle.handleCatchupStarted(new CatchupStartedEvent(EVENT_CATCHUP, catchupStartedAt)); verify(logger).info("Event catchup started at 2019-02-23T17:12:23Z"); - verify(catchupsInProgressCache).removeAll(EVENT_CATCHUP); } @Test @@ -110,7 +136,7 @@ public void shouldLogCatchupStartedForSubscriptionAndStoreProgress() throws Exce catchupStartedAt); catchupLifecycle.handleCatchupStartedForSubscription(catchupStartedForSubscriptionEvent); - verify(catchupsInProgressCache).addCatchupInProgress(catchupInProgressCaptor.capture(), catchupTypeCaptor.capture()); + verify(catchupStateManager).addCatchupInProgress(catchupInProgressCaptor.capture(), catchupTypeCaptor.capture()); verify(logger).info("Event catchup for subscription 'mySubscription' started at 2019-02-23T17:12:23Z"); final CatchupInProgress catchupInProgress = catchupInProgressCaptor.getValue(); @@ -146,12 +172,12 @@ public void shouldRemoveTheCatchupForSubscriptionInProgressOnCatchupForSubscript final CatchupInProgress catchupInProgress = mock(CatchupInProgress.class); - when(catchupsInProgressCache.removeCatchupInProgress(subscriptionName, EVENT_CATCHUP)).thenReturn(catchupInProgress); + when(catchupStateManager.removeCatchupInProgress(subscriptionName, EVENT_CATCHUP)).thenReturn(catchupInProgress); when(catchupDurationCalculator.calculate( catchupInProgress, catchupCompletedForSubscriptionEvent)).thenReturn(catchupDuration); - when(catchupsInProgressCache.noCatchupsInProgress(EVENT_CATCHUP)).thenReturn(false); + when(catchupStateManager.noCatchupsInProgress(EVENT_CATCHUP)).thenReturn(false); catchupLifecycle.handleCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent); @@ -187,12 +213,12 @@ public void shouldFireTheCatchupCompleteEventIfAllCatchupsForSubscriptionsComple final CatchupInProgress catchupInProgress = mock(CatchupInProgress.class); - when(catchupsInProgressCache.removeCatchupInProgress(subscriptionName, EVENT_CATCHUP)).thenReturn(catchupInProgress); + when(catchupStateManager.removeCatchupInProgress(subscriptionName, EVENT_CATCHUP)).thenReturn(catchupInProgress); when(catchupDurationCalculator.calculate( catchupInProgress, catchupCompletedForSubscriptionEvent)).thenReturn(catchupDuration); - when(catchupsInProgressCache.noCatchupsInProgress(EVENT_CATCHUP)).thenReturn(true); + when(catchupStateManager.noCatchupsInProgress(EVENT_CATCHUP)).thenReturn(true); when(clock.now()).thenReturn(allCatchupsCompletedAt); catchupLifecycle.handleCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent); @@ -201,7 +227,80 @@ public void shouldFireTheCatchupCompleteEventIfAllCatchupsForSubscriptionsComple verify(logger).info("Event catchup for subscription 'mySubscription' caught up 23 events"); verify(logger).info("Event catchup for subscription 'mySubscription' took 5000 milliseconds"); - verify(catchupCompletedEventFirer).fire(new CatchupCompletedEvent(target, allCatchupsCompletedAt)); - verify(logger).info("Event catchup fully complete at 2019-02-23T17:12:46Z"); + verify(catchupCompletedEventFirer).fire(new CatchupCompletedEvent(target, allCatchupsCompletedAt, EVENT_CATCHUP)); + } + + @Test + public void shouldHandleCatchupFailureCompletion() throws Exception { + + final ZonedDateTime catchupCompletedAt = of(2019, 2, 23, 17, 12, 23, 0, UTC); + final CatchupCommand target = new CatchupCommand(); + final CatchupType catchupType = EVENT_CATCHUP; + final CatchupCompletedEvent catchupCompletedEvent = new CatchupCompletedEvent( + target, + catchupCompletedAt, + catchupType + ); + + when(catchupErrorStateManager.getErrors(catchupType)).thenReturn(emptyList()); + + catchupLifecycle.handleCatchupComplete(catchupCompletedEvent); + + verify(logger).info("Event catchup successfully completed with 0 errors at 2019-02-23T17:12:23Z"); + } + + @Test + public void shouldHandleCatchupSuccessfulCompletion() throws Exception { + + + final List catchupErrors = asList( + mock(CatchupError.class), + mock(CatchupError.class), + mock(CatchupError.class)); + + final ZonedDateTime catchupCompletedAt = of(2019, 2, 23, 17, 12, 23, 0, UTC); + final CatchupCommand target = new CatchupCommand(); + final CatchupType catchupType = EVENT_CATCHUP; + final CatchupCompletedEvent catchupCompletedEvent = new CatchupCompletedEvent( + target, + catchupCompletedAt, + catchupType + ); + + when(catchupErrorStateManager.getErrors(catchupType)).thenReturn(catchupErrors); + + catchupLifecycle.handleCatchupComplete(catchupCompletedEvent); + + verify(logger).error("Event catchup failed with 3 errors"); + } + + @Test + public void shouldHandleCatchupProcessingOfEventFailed() throws Exception { + + final UUID eventId = randomUUID(); + final String metadata = "{some: metadata}"; + final NullPointerException exception = new NullPointerException("Ooops"); + final CatchupType catchupType = EVENT_CATCHUP; + final String subscriptionName = "subscriptionName"; + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent( + eventId, + metadata, + exception, + catchupType, + subscriptionName + ); + + catchupLifecycle.handleCatchupProcessingOfEventFailed(catchupProcessingOfEventFailedEvent); + + verify(catchupErrorStateManager).add(catchupErrorCaptor.capture(), eq(catchupType)); + + final CatchupError catchupError = catchupErrorCaptor.getValue(); + + assertThat(catchupError.getEventId(), is(eventId)); + assertThat(catchupError.getException(), is(exception)); + assertThat(catchupError.getMetadata(), is(metadata)); + assertThat(catchupError.getCatchupType(), is(catchupType)); + assertThat(catchupError.getSubscriptionName(), is(subscriptionName)); } } diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java index d6464bece..205233c66 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java @@ -4,7 +4,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent; +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent; @@ -78,4 +80,28 @@ public void shouldHandleCatchupCompleteForSubscription() throws Exception { verify(catchupLifecycle).handleCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent); } + + @Test + public void shouldHandleCatchupComplete() throws Exception { + + final CatchupCompletedEvent catchupCompletedEvent = mock(CatchupCompletedEvent.class); + + when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); + + catchupObserver.onCatchupComplete(catchupCompletedEvent); + + verify(catchupLifecycle).handleCatchupComplete(catchupCompletedEvent); + } + + @Test + public void shouldHandleCatchupProcessingOfEventFailed() throws Exception { + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = mock(CatchupProcessingOfEventFailedEvent.class); + + when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); + + catchupObserver.onCatchupProcessingOfEventFailed(catchupProcessingOfEventFailedEvent); + + verify(catchupLifecycle).handleCatchupProcessingOfEventFailed(catchupProcessingOfEventFailedEvent); + } } diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCacheTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCacheTest.java deleted file mode 100644 index 1c915517b..000000000 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupsInProgressCacheTest.java +++ /dev/null @@ -1,122 +0,0 @@ -package uk.gov.justice.services.eventstore.management.catchup.process; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; -import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.INDEX_CATCHUP; - -import uk.gov.justice.services.common.util.UtcClock; - -import java.time.ZonedDateTime; -import java.util.List; - -import org.hamcrest.CoreMatchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class CatchupsInProgressCacheTest { - - @InjectMocks - private CatchupsInProgressCache catchupsInProgressCache; - - @Test - public void shouldMaintainACacheOfAllEventCatchupsInProgress() throws Exception { - - final CatchupInProgress indexCatchupInProgress = mock(CatchupInProgress.class); - when(indexCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup"); - - catchupsInProgressCache.addCatchupInProgress(indexCatchupInProgress, INDEX_CATCHUP); - - assertThat(catchupsInProgressCache.getAllCatchupsInProgress(EVENT_CATCHUP).isEmpty(), is(true)); - - final ZonedDateTime startedAt = new UtcClock().now(); - - final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt); - final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1)); - final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2)); - - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_1, EVENT_CATCHUP); - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_2, EVENT_CATCHUP); - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_3, EVENT_CATCHUP); - - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), EVENT_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), EVENT_CATCHUP), is(true)); - - final List allCatchupsInProgress = catchupsInProgressCache.getAllCatchupsInProgress(EVENT_CATCHUP); - - assertThat(allCatchupsInProgress.size(), is(3)); - assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3)); - - final CatchupInProgress removedCatchupInProgress = catchupsInProgressCache.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP); - - assertThat(removedCatchupInProgress, is(catchupInProgress_2)); - - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), EVENT_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP), is(false)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), EVENT_CATCHUP), is(true)); - - final List currentCatchupsInProgress = catchupsInProgressCache.getAllCatchupsInProgress(EVENT_CATCHUP); - - assertThat(currentCatchupsInProgress.size(), is(2)); - assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3)); - - catchupsInProgressCache.removeAll(EVENT_CATCHUP); - - assertThat(catchupsInProgressCache.noCatchupsInProgress(EVENT_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.getAllCatchupsInProgress(EVENT_CATCHUP).isEmpty(), is(true)); - } - - @Test - public void shouldMaintainACacheOfAllIndexCatchupsInProgress() throws Exception { - - final CatchupInProgress eventCatchupInProgress = mock(CatchupInProgress.class); - when(eventCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup"); - - catchupsInProgressCache.addCatchupInProgress(eventCatchupInProgress, EVENT_CATCHUP); - - assertThat(catchupsInProgressCache.getAllCatchupsInProgress(INDEX_CATCHUP).isEmpty(), is(true)); - - final ZonedDateTime startedAt = new UtcClock().now(); - - final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt); - final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1)); - final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2)); - - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_1, INDEX_CATCHUP); - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_2, INDEX_CATCHUP); - catchupsInProgressCache.addCatchupInProgress(catchupInProgress_3, INDEX_CATCHUP); - - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), INDEX_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), INDEX_CATCHUP), is(true)); - - final List allCatchupsInProgress = catchupsInProgressCache.getAllCatchupsInProgress(INDEX_CATCHUP); - - assertThat(allCatchupsInProgress.size(), is(3)); - assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3)); - - final CatchupInProgress removedCatchupInProgress = catchupsInProgressCache.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP); - - assertThat(removedCatchupInProgress, is(catchupInProgress_2)); - - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), INDEX_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP), is(false)); - assertThat(catchupsInProgressCache.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), INDEX_CATCHUP), is(true)); - - final List currentCatchupsInProgress = catchupsInProgressCache.getAllCatchupsInProgress(INDEX_CATCHUP); - - assertThat(currentCatchupsInProgress.size(), is(2)); - assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3)); - - catchupsInProgressCache.removeAll(INDEX_CATCHUP); - - assertThat(catchupsInProgressCache.noCatchupsInProgress(INDEX_CATCHUP), is(true)); - assertThat(catchupsInProgressCache.getAllCatchupsInProgress(INDEX_CATCHUP).isEmpty(), is(true)); - } -} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java index 458db0433..364fe8adf 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java @@ -103,9 +103,9 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception { when(publishedEventSourceProvider.getPublishedEventSource(eventSourceName)).thenReturn(publishedEventSource); when(processedEventTrackingService.getLatestProcessedEventNumber(eventSourceName, componentName)).thenReturn(eventNumberFrom); when(publishedEventSource.findEventsSince(eventNumberFrom)).thenReturn(events.stream()); - when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName)).thenReturn(1); - when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName)).thenReturn(1); - when(concurrentEventStreamConsumerManager.add(publishedEvent_3, subscriptionName)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_3, subscriptionName, EVENT_CATCHUP)).thenReturn(1); eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext); @@ -119,9 +119,9 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception { EVENT_CATCHUP, catchupStartedAt)); - inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_1, subscriptionName); - inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_2, subscriptionName); - inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_3, subscriptionName); + inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_1, subscriptionName, EVENT_CATCHUP); + inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_2, subscriptionName, EVENT_CATCHUP); + inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_3, subscriptionName, EVENT_CATCHUP); inOrder.verify(concurrentEventStreamConsumerManager).waitForCompletion(); inOrder.verify(catchupCompletedForSubscriptionEventFirer).fire(new CatchupCompletedForSubscriptionEvent( @@ -176,9 +176,9 @@ public void shouldThrowExceptionIfEventNumberIsAbsentFromPublishedEvent() throws when(publishedEventSourceProvider.getPublishedEventSource(eventSourceName)).thenReturn(publishedEventSource); when(processedEventTrackingService.getLatestProcessedEventNumber(eventSourceName, componentName)).thenReturn(eventNumberFrom); when(publishedEventSource.findEventsSince(eventNumberFrom)).thenReturn(events.stream()); - when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName)).thenReturn(1); - when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName)).thenReturn(1); - when(concurrentEventStreamConsumerManager.add(publishedEvent_3, subscriptionName)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, EVENT_CATCHUP)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, EVENT_CATCHUP)).thenReturn(1); + when(concurrentEventStreamConsumerManager.add(publishedEvent_3, subscriptionName, EVENT_CATCHUP)).thenReturn(1); try { eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext); diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManagerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManagerTest.java new file mode 100644 index 000000000..f683ff3ef --- /dev/null +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupErrorStateManagerTest.java @@ -0,0 +1,98 @@ +package uk.gov.justice.services.eventstore.management.catchup.state; + +import static java.util.Collections.emptyList; +import static org.junit.Assert.*; + +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.INDEX_CATCHUP; + +import uk.gov.justice.services.eventstore.management.events.catchup.CatchupType; + +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class CatchupErrorStateManagerTest { + + + @InjectMocks + private CatchupErrorStateManager catchupErrorStateManager; + + @Test + public void shouldKeepListOfAllCatchupErrorsForEventCatchup() throws Exception { + + final CatchupError indexCatchupError = mock(CatchupError.class); + final CatchupError eventCatchupError_1 = mock(CatchupError.class); + final CatchupError eventCatchupError_2 = mock(CatchupError.class); + final CatchupError eventCatchupError_3 = mock(CatchupError.class); + + catchupErrorStateManager.add(indexCatchupError, INDEX_CATCHUP); + + assertThat(catchupErrorStateManager.getErrors(EVENT_CATCHUP), is(emptyList())); + + catchupErrorStateManager.add(eventCatchupError_1, EVENT_CATCHUP); + catchupErrorStateManager.add(eventCatchupError_2, EVENT_CATCHUP); + catchupErrorStateManager.add(eventCatchupError_3, EVENT_CATCHUP); + + final List errors = catchupErrorStateManager.getErrors(EVENT_CATCHUP); + + assertThat(errors.size(), is(3)); + assertThat(errors.get(0), is(eventCatchupError_1)); + assertThat(errors.get(1), is(eventCatchupError_2)); + assertThat(errors.get(2), is(eventCatchupError_3)); + + catchupErrorStateManager.clear(EVENT_CATCHUP); + + assertThat(catchupErrorStateManager.getErrors(EVENT_CATCHUP), is(emptyList())); + + assertThat(catchupErrorStateManager.getErrors(INDEX_CATCHUP).size(), is(1)); + } + + @Test + public void shouldKeepListOfAllCatchupErrorsForIndexCatchup() throws Exception { + + final CatchupError eventCatchupError = mock(CatchupError.class); + final CatchupError indexCatchupError_1 = mock(CatchupError.class); + final CatchupError indexCatchupError_2 = mock(CatchupError.class); + final CatchupError indexCatchupError_3 = mock(CatchupError.class); + + catchupErrorStateManager.add(eventCatchupError, EVENT_CATCHUP); + + assertThat(catchupErrorStateManager.getErrors(INDEX_CATCHUP), is(emptyList())); + + catchupErrorStateManager.add(indexCatchupError_1, INDEX_CATCHUP); + catchupErrorStateManager.add(indexCatchupError_2, INDEX_CATCHUP); + catchupErrorStateManager.add(indexCatchupError_3, INDEX_CATCHUP); + + final List errors = catchupErrorStateManager.getErrors(INDEX_CATCHUP); + + assertThat(errors.size(), is(3)); + assertThat(errors.get(0), is(indexCatchupError_1)); + assertThat(errors.get(1), is(indexCatchupError_2)); + assertThat(errors.get(2), is(indexCatchupError_3)); + + catchupErrorStateManager.clear(INDEX_CATCHUP); + + assertThat(catchupErrorStateManager.getErrors(INDEX_CATCHUP), is(emptyList())); + + assertThat(catchupErrorStateManager.getErrors(EVENT_CATCHUP).size(), is(1)); + } +} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java new file mode 100644 index 000000000..44d242639 --- /dev/null +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java @@ -0,0 +1,123 @@ +package uk.gov.justice.services.eventstore.management.catchup.state; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.EVENT_CATCHUP; +import static uk.gov.justice.services.eventstore.management.events.catchup.CatchupType.INDEX_CATCHUP; + +import uk.gov.justice.services.common.util.UtcClock; +import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress; + +import java.time.ZonedDateTime; +import java.util.List; + +import org.hamcrest.CoreMatchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class CatchupStateManagerTest { + + @InjectMocks + private CatchupStateManager catchupStateManager; + + @Test + public void shouldMaintainACacheOfAllEventCatchupsInProgress() throws Exception { + + final CatchupInProgress indexCatchupInProgress = mock(CatchupInProgress.class); + when(indexCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup"); + + catchupStateManager.addCatchupInProgress(indexCatchupInProgress, INDEX_CATCHUP); + + assertThat(catchupStateManager.getAllCatchupsInProgress(EVENT_CATCHUP).isEmpty(), is(true)); + + final ZonedDateTime startedAt = new UtcClock().now(); + + final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt); + final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1)); + final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2)); + + catchupStateManager.addCatchupInProgress(catchupInProgress_1, EVENT_CATCHUP); + catchupStateManager.addCatchupInProgress(catchupInProgress_2, EVENT_CATCHUP); + catchupStateManager.addCatchupInProgress(catchupInProgress_3, EVENT_CATCHUP); + + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), EVENT_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), EVENT_CATCHUP), is(true)); + + final List allCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(EVENT_CATCHUP); + + assertThat(allCatchupsInProgress.size(), is(3)); + assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3)); + + final CatchupInProgress removedCatchupInProgress = catchupStateManager.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP); + + assertThat(removedCatchupInProgress, is(catchupInProgress_2)); + + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), EVENT_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), EVENT_CATCHUP), is(false)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), EVENT_CATCHUP), is(true)); + + final List currentCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(EVENT_CATCHUP); + + assertThat(currentCatchupsInProgress.size(), is(2)); + assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3)); + + catchupStateManager.clear(EVENT_CATCHUP); + + assertThat(catchupStateManager.noCatchupsInProgress(EVENT_CATCHUP), is(true)); + assertThat(catchupStateManager.getAllCatchupsInProgress(EVENT_CATCHUP).isEmpty(), is(true)); + } + + @Test + public void shouldMaintainACacheOfAllIndexCatchupsInProgress() throws Exception { + + final CatchupInProgress eventCatchupInProgress = mock(CatchupInProgress.class); + when(eventCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup"); + + catchupStateManager.addCatchupInProgress(eventCatchupInProgress, EVENT_CATCHUP); + + assertThat(catchupStateManager.getAllCatchupsInProgress(INDEX_CATCHUP).isEmpty(), is(true)); + + final ZonedDateTime startedAt = new UtcClock().now(); + + final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt); + final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1)); + final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2)); + + catchupStateManager.addCatchupInProgress(catchupInProgress_1, INDEX_CATCHUP); + catchupStateManager.addCatchupInProgress(catchupInProgress_2, INDEX_CATCHUP); + catchupStateManager.addCatchupInProgress(catchupInProgress_3, INDEX_CATCHUP); + + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), INDEX_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), INDEX_CATCHUP), is(true)); + + final List allCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(INDEX_CATCHUP); + + assertThat(allCatchupsInProgress.size(), is(3)); + assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3)); + + final CatchupInProgress removedCatchupInProgress = catchupStateManager.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP); + + assertThat(removedCatchupInProgress, is(catchupInProgress_2)); + + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), INDEX_CATCHUP), is(true)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), INDEX_CATCHUP), is(false)); + assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), INDEX_CATCHUP), is(true)); + + final List currentCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(INDEX_CATCHUP); + + assertThat(currentCatchupsInProgress.size(), is(2)); + assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3)); + + catchupStateManager.clear(INDEX_CATCHUP); + + assertThat(catchupStateManager.noCatchupsInProgress(INDEX_CATCHUP), is(true)); + assertThat(catchupStateManager.getAllCatchupsInProgress(INDEX_CATCHUP).isEmpty(), is(true)); + } +} diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupCompletedEvent.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupCompletedEvent.java index 226cde12b..1d71887ca 100644 --- a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupCompletedEvent.java +++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupCompletedEvent.java @@ -9,18 +9,24 @@ public class CatchupCompletedEvent { private final SystemCommand target; private final ZonedDateTime completedAt; + private final CatchupType catchupType; - public CatchupCompletedEvent(final SystemCommand target, final ZonedDateTime completedAt) { + public CatchupCompletedEvent(final SystemCommand target, final ZonedDateTime completedAt, final CatchupType catchupType) { this.target = target; this.completedAt = completedAt; + this.catchupType = catchupType; + } + + public SystemCommand getTarget() { + return target; } public ZonedDateTime getCompletedAt() { return completedAt; } - public SystemCommand getTarget() { - return target; + public CatchupType getCatchupType() { + return catchupType; } @Override @@ -29,12 +35,13 @@ public boolean equals(final Object o) { if (!(o instanceof CatchupCompletedEvent)) return false; final CatchupCompletedEvent that = (CatchupCompletedEvent) o; return Objects.equals(target, that.target) && - Objects.equals(completedAt, that.completedAt); + Objects.equals(completedAt, that.completedAt) && + catchupType == that.catchupType; } @Override public int hashCode() { - return Objects.hash(target, completedAt); + return Objects.hash(target, completedAt, catchupType); } @Override @@ -42,6 +49,7 @@ public String toString() { return "CatchupCompletedEvent{" + "target=" + target + ", completedAt=" + completedAt + + ", catchupType=" + catchupType + '}'; } } diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java new file mode 100644 index 000000000..64b33540c --- /dev/null +++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupProcessingOfEventFailedEvent.java @@ -0,0 +1,74 @@ +package uk.gov.justice.services.eventstore.management.events.catchup; + +import java.util.Objects; +import java.util.UUID; + +public class CatchupProcessingOfEventFailedEvent { + + private final UUID eventId; + private final String metadata; + private final Throwable exception; + private final CatchupType catchupType; + private final String subscriptionName; + + public CatchupProcessingOfEventFailedEvent( + final UUID eventId, + final String metadata, + final Throwable exception, + final CatchupType catchupType, + final String subscriptionName) { + this.eventId = eventId; + this.metadata = metadata; + this.exception = exception; + this.catchupType = catchupType; + this.subscriptionName = subscriptionName; + } + + public UUID getEventId() { + return eventId; + } + + public String getMetadata() { + return metadata; + } + + public Throwable getException() { + return exception; + } + + public CatchupType getCatchupType() { + return catchupType; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof CatchupProcessingOfEventFailedEvent)) return false; + final CatchupProcessingOfEventFailedEvent that = (CatchupProcessingOfEventFailedEvent) o; + return Objects.equals(eventId, that.eventId) && + Objects.equals(metadata, that.metadata) && + Objects.equals(exception, that.exception) && + catchupType == that.catchupType && + Objects.equals(subscriptionName, that.subscriptionName); + } + + @Override + public int hashCode() { + return Objects.hash(eventId, metadata, exception, catchupType, subscriptionName); + } + + @Override + public String toString() { + return "CatchupProcessingOfEventFailedEvent{" + + "eventId=" + eventId + + ", metadata='" + metadata + '\'' + + ", exception=" + exception + + ", catchupType=" + catchupType + + ", subscriptionName='" + subscriptionName + '\'' + + '}'; + } +}