Skip to content

Commit

Permalink
Merge 06ac792 into 3ca8da2
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Mar 2, 2020
2 parents 3ca8da2 + 06ac792 commit 38e0047
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

### Changed
- Fail cleanly if exception occurs while accessing subscription event source

## [2.5.0-M3] - 2020-02-11
### Removed
- Remove mechanism to also drop/add trigger on SUSPEND/UNSUSPEND as it causes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public void handleStreamFailure(
handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

public void handleSubscriptionFailure(
final Exception exception,
final String subscriptionName,
final UUID commandId, final CatchupCommand catchupCommand) {

final String logMessage = String.format("Failed to subscribe to '%s'. Aborting...", subscriptionName);

handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) {
logger.error(
logMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,39 @@ public void shouldLogExceptionAndFireFailureEventOnStreamFailure() throws Except
assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException));
assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName));
}

@Test
public void shouldLogExceptionAndFireFailureEventOnSubscriptionFailure() throws Exception {

final NullPointerException nullPointerException = new NullPointerException("Ooops");
final UUID commandId = randomUUID();
final String subscriptionName = "subscriptionName";
final String eventName = "events.some-event";
final UUID eventId = fromString("1c68536e-1fdf-4891-9c74-85661a9c0f9e");
final UUID streamId = fromString("b1834ed9-e084-41c7-ae7f-74f1227cc829");
final CatchupCommand catchupCommand = new EventCatchupCommand();

final PublishedEvent publishedEvent = mock(PublishedEvent.class);

when(publishedEvent.getName()).thenReturn(eventName);
when(publishedEvent.getId()).thenReturn(eventId);
when(publishedEvent.getStreamId()).thenReturn(streamId);

eventProcessingFailedHandler.handleSubscriptionFailure(
nullPointerException,
subscriptionName,
commandId, catchupCommand
);

verify(logger).error("Failed to subscribe to 'subscriptionName'. Aborting...", nullPointerException);

verify(catchupProcessingOfEventFailedEventFirer).fire(catchupProcessingOfEventFailedEventCaptor.capture());

final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = catchupProcessingOfEventFailedEventCaptor.getValue();

assertThat(catchupProcessingOfEventFailedEvent.getMessage(), is("Failed to subscribe to 'subscriptionName'. Aborting...: NullPointerException: Ooops"));
assertThat(catchupProcessingOfEventFailedEvent.getCatchupCommand(), is(catchupCommand));
assertThat(catchupProcessingOfEventFailedEvent.getException(), is(nullPointerException));
assertThat(catchupProcessingOfEventFailedEvent.getSubscriptionName(), is(subscriptionName));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
Expand All @@ -25,6 +26,9 @@ public class EventCatchupRunner {
@Inject
private UtcClock clock;

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCommand) {

final List<SubscriptionCatchupDetails> subscriptionCatchupDefinitions = subscriptionCatchupProvider.getBySubscription(catchupCommand);
Expand All @@ -37,9 +41,19 @@ public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCo
));

subscriptionCatchupDefinitions
.forEach(catchupFor -> eventCatchupByComponentRunner.runEventCatchupForComponent(
catchupFor,
commandId,
catchupCommand));
.forEach(subscriptionCatchupDetails ->
catchupSubscription(subscriptionCatchupDetails, commandId, catchupCommand));
}

private void catchupSubscription(final SubscriptionCatchupDetails subscriptionCatchupDetails, final UUID commandId, final CatchupCommand catchupCommand) {

try {
eventCatchupByComponentRunner.runEventCatchupForComponent(
subscriptionCatchupDetails,
commandId,
catchupCommand);
} catch (final Exception e) {
eventProcessingFailedHandler.handleSubscriptionFailure(e, subscriptionCatchupDetails.getSubscriptionName(), commandId, catchupCommand);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventProcessingFailedHandler;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
Expand Down Expand Up @@ -39,6 +43,9 @@ public class EventCatchupRunnerTest {
@Mock
private UtcClock clock;

@Mock
private EventProcessingFailedHandler eventProcessingFailedHandler;

@InjectMocks
private EventCatchupRunner eventCatchupRunner;

Expand Down Expand Up @@ -69,4 +76,34 @@ public void shouldRunEventCatchupForEachSubscription() throws Exception {
inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_1, commandId, catchupCommand);
inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_2, commandId, catchupCommand);
}

@Test
public void shouldHandleExceptionsByFailingGracefully() {

final UUID commandId = randomUUID();
final RuntimeException runtimeException = new RuntimeException();

final String subscriptionName = "subscription_1";
final SubscriptionCatchupDetails subscriptionCatchupDefinition_1 = mock(SubscriptionCatchupDetails.class);
final List<SubscriptionCatchupDetails> subscriptionCatchupDefinitions = singletonList(subscriptionCatchupDefinition_1);

final CatchupCommand catchupCommand = new EventCatchupCommand();

when(subscriptionCatchupProvider.getBySubscription(catchupCommand)).thenReturn(subscriptionCatchupDefinitions);
doThrow(runtimeException).when(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_1, commandId, catchupCommand);
when(subscriptionCatchupDefinition_1.getSubscriptionName()).thenReturn(subscriptionName);

eventCatchupRunner.runEventCatchup(commandId, catchupCommand);

final InOrder inOrder = inOrder(catchupStartedEventFirer, eventProcessingFailedHandler);

inOrder.verify(catchupStartedEventFirer).fire(new CatchupStartedEvent(
commandId,
catchupCommand,
subscriptionCatchupDefinitions,
clock.now()
));

verify(eventProcessingFailedHandler).handleSubscriptionFailure(runtimeException, subscriptionName, commandId, catchupCommand);
}
}

0 comments on commit 38e0047

Please sign in to comment.