diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f2dd4485..ec9829759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +### Changed +- Fail cleanly if exception occurs while accessing subscription event source + ## [2.5.0-M3] - 2020-02-11 ### Removed - Remove mechanism to also drop/add trigger on SUSPEND/UNSUSPEND as it causes diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java index 2a3dd3577..0aec038b0 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandler.java @@ -49,6 +49,16 @@ public void handleStreamFailure( handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception); } + public void handleSubscriptionFailure( + final Exception exception, + final String subscriptionName, + final UUID commandId, final CatchupCommand catchupCommand) { + + final String logMessage = String.format("Failed to subscribe to '%s'. Aborting...", subscriptionName); + + handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception); + } + private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) { logger.error( logMessage, diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java index 3ac42c9cc..9ab3d1aed 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventProcessingFailedHandlerTest.java @@ -111,4 +111,39 @@ public void shouldLogExceptionAndFireFailureEventOnStreamFailure() throws Except assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException)); assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName)); } + + @Test + public void shouldLogExceptionAndFireFailureEventOnSubscriptionFailure() throws Exception { + + final NullPointerException nullPointerException = new NullPointerException("Ooops"); + final UUID commandId = randomUUID(); + final String subscriptionName = "subscriptionName"; + final String eventName = "events.some-event"; + final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e"); + final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829"); + final CatchupCommand catchupCommand = new EventCatchupCommand(); + + final PublishedEvent publishedEvent = mock(PublishedEvent.class); + + when(publishedEvent.getName()).thenReturn(eventName); + when(publishedEvent.getId()).thenReturn(eventId); + when(publishedEvent.getStreamId()).thenReturn(streamId); + + eventProcessingFailedHandler.handleSubscriptionFailure( + nullPointerException, + subscriptionName, + commandId, catchupCommand + ); + + verify(logger).error("Failed to subscribe to 'subscriptionName'. Aborting...", nullPointerException); + + verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture()); + + final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue(); + + assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to subscribe to 'subscriptionName'. Aborting...: NullPointerException: Ooops")); + assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand)); + assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException)); + assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName)); + } } diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java index 61d8b9dd8..6ebe0cfca 100644 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java @@ -1,6 +1,7 @@ package uk.gov.justice.services.eventstore.management.catchup.process; import uk.gov.justice.services.common.util.UtcClock; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails; @@ -25,6 +26,9 @@ public class EventCatchupRunner { @Inject private UtcClock clock; + @Inject + private EventProcessingFailedHandler eventProcessingFailedHandler; + public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCommand) { final List subscriptionCatchupDefinitions = subscriptionCatchupProvider.getBySubscription(catchupCommand); @@ -37,9 +41,19 @@ public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCo )); subscriptionCatchupDefinitions - .forEach(catchupFor -> eventCatchupByComponentRunner.runEventCatchupForComponent( - catchupFor, - commandId, - catchupCommand)); + .forEach(subscriptionCatchupDetails -> + catchupSubscription(subscriptionCatchupDetails, commandId, catchupCommand)); + } + + private void catchupSubscription(final SubscriptionCatchupDetails subscriptionCatchupDetails, final UUID commandId, final CatchupCommand catchupCommand) { + + try { + eventCatchupByComponentRunner.runEventCatchupForComponent( + subscriptionCatchupDetails, + commandId, + catchupCommand); + } catch (final Exception e) { + eventProcessingFailedHandler.handleSubscriptionFailure(e, subscriptionCatchupDetails.getSubscriptionName(), commandId, catchupCommand); + } } } diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java index 1cc99fc44..82ffd1324 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java @@ -1,12 +1,16 @@ package uk.gov.justice.services.eventstore.management.catchup.process; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.UUID.randomUUID; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import uk.gov.justice.services.common.util.UtcClock; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler; import uk.gov.justice.services.eventstore.management.commands.CatchupCommand; import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand; import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent; @@ -39,6 +43,9 @@ public class EventCatchupRunnerTest { @Mock private UtcClock clock; + @Mock + private EventProcessingFailedHandler eventProcessingFailedHandler; + @InjectMocks private EventCatchupRunner eventCatchupRunner; @@ -69,4 +76,34 @@ public void shouldRunEventCatchupForEachSubscription() throws Exception { inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_1, commandId, catchupCommand); inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_2, commandId, catchupCommand); } + + @Test + public void shouldHandleExceptionsByFailingGracefully() { + + final UUID commandId = randomUUID(); + final RuntimeException runtimeException = new RuntimeException(); + + final String subscriptionName = "subscription_1"; + final SubscriptionCatchupDetails subscriptionCatchupDefinition_1 = mock(SubscriptionCatchupDetails.class); + final List subscriptionCatchupDefinitions = singletonList(subscriptionCatchupDefinition_1); + + final CatchupCommand catchupCommand = new EventCatchupCommand(); + + when(subscriptionCatchupProvider.getBySubscription(catchupCommand)).thenReturn(subscriptionCatchupDefinitions); + doThrow(runtimeException).when(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_1, commandId, catchupCommand); + when(subscriptionCatchupDefinition_1.getSubscriptionName()).thenReturn(subscriptionName); + + eventCatchupRunner.runEventCatchup(commandId, catchupCommand); + + final InOrder inOrder = inOrder(catchupStartedEventFirer, eventProcessingFailedHandler); + + inOrder.verify(catchupStartedEventFirer).fire(new CatchupStartedEvent( + commandId, + catchupCommand, + subscriptionCatchupDefinitions, + clock.now() + )); + + verify(eventProcessingFailedHandler).handleSubscriptionFailure(runtimeException, subscriptionName, commandId, catchupCommand); + } }