Skip to content

Commit

Permalink
Run all subscription catchups in series
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed Sep 24, 2019
1 parent d69cc85 commit 5458847
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 150 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- Subscriptions are no longer run asynchronously during catchup

## [2.0.22] - 2019-09-24
### Changed
- Catchup verification logging now runs in MDC context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import java.util.Objects;

public class CatchupContext {
public class CatchupSubscriptionContext {

private final String componentName;
private final Subscription subscription;
private final CatchupRequestedEvent catchupRequestedEvent;

public CatchupContext(final String componentName, final Subscription subscription, final CatchupRequestedEvent catchupRequestedEvent) {
public CatchupSubscriptionContext(final String componentName, final Subscription subscription, final CatchupRequestedEvent catchupRequestedEvent) {
this.componentName = componentName;
this.subscription = subscription;
this.catchupRequestedEvent = catchupRequestedEvent;
Expand All @@ -32,8 +32,8 @@ public CatchupRequestedEvent getCatchupRequestedEvent() {
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof CatchupContext)) return false;
final CatchupContext that = (CatchupContext) o;
if (!(o instanceof CatchupSubscriptionContext)) return false;
final CatchupSubscriptionContext that = (CatchupSubscriptionContext) o;
return Objects.equals(componentName, that.componentName) &&
Objects.equals(subscription, that.subscription) &&
Objects.equals(catchupRequestedEvent, that.catchupRequestedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class EventCatchupByComponentRunner {

@Inject
private EventCatchupBySubscriptionRunner eventCatchupBySubscriptionRunner;
private EventCatchupProcessorBean eventCatchupProcessorBean;

@Inject
private Logger logger;
Expand All @@ -37,11 +37,11 @@ private void runEventCatchupForSubscription(

logger.info(format("Running catchup for Component '%s', Subscription '%s'", componentName, subscription.getName()));

final CatchupContext catchupContext = new CatchupContext(
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(
componentName,
subscription,
catchupRequestedEvent);

eventCatchupBySubscriptionRunner.runEventCatchupForSubscription(catchupContext);
eventCatchupProcessorBean.performEventCatchup(catchupSubscriptionContext);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public EventCatchupProcessor(
}

@Transactional(NEVER)
public void performEventCatchup(final CatchupContext catchupContext) {
public void performEventCatchup(final CatchupSubscriptionContext catchupSubscriptionContext) {

final Subscription subscription = catchupContext.getSubscription();
final Subscription subscription = catchupSubscriptionContext.getSubscription();
final String subscriptionName = subscription.getName();
final String eventSourceName = subscription.getEventSourceName();
final String componentName = catchupContext.getComponentName();
final CatchupRequestedEvent catchupRequestedEvent = catchupContext.getCatchupRequestedEvent();
final String componentName = catchupSubscriptionContext.getComponentName();
final CatchupRequestedEvent catchupRequestedEvent = catchupSubscriptionContext.getCatchupRequestedEvent();

final PublishedEventSource eventSource = publishedEventSourceProvider.getPublishedEventSource(eventSourceName);
final Long latestProcessedEventNumber = processedEventTrackingService.getLatestProcessedEventNumber(eventSourceName, componentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public class EventCatchupProcessorBean {
@Inject
EventCatchupProcessorFactory eventCatchupProcessorFactory;

public void performEventCatchup(final CatchupContext catchupContext) {
public void performEventCatchup(final CatchupSubscriptionContext catchupSubscriptionContext) {

final EventCatchupProcessor eventCatchupProcessor = eventCatchupProcessorFactory.create();

eventCatchupProcessor.performEventCatchup(catchupContext);
eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class EventCatchupByComponentRunnerTest {

@Mock
private EventCatchupBySubscriptionRunner eventCatchupBySubscriptionRunner;
private EventCatchupProcessorBean eventCatchupProcessorBean;

@Mock
private Logger logger;
Expand Down Expand Up @@ -51,12 +51,12 @@ public void shouldGetAllSubscriptionsForTheComponentAndRunCatchupOnEach() throws

eventCatchupByComponentRunner.runEventCatchupForComponent(subscriptionsDescriptor, catchupRequestedEvent);

final InOrder inOrder = inOrder(logger, eventCatchupBySubscriptionRunner);
final InOrder inOrder = inOrder(logger, eventCatchupProcessorBean);

inOrder.verify(logger).info("Running catchup for Component 'AN_EVENT_LISTENER', Subscription 'subscriptionName_1'");
inOrder.verify(eventCatchupBySubscriptionRunner).runEventCatchupForSubscription(new CatchupContext(componentName, subscription_1, catchupRequestedEvent));
inOrder.verify(eventCatchupProcessorBean).performEventCatchup(new CatchupSubscriptionContext(componentName, subscription_1, catchupRequestedEvent));
inOrder.verify(logger).info("Running catchup for Component 'AN_EVENT_LISTENER', Subscription 'subscriptionName_2'");
inOrder.verify(eventCatchupBySubscriptionRunner).runEventCatchupForSubscription(new CatchupContext(componentName, subscription_2, catchupRequestedEvent));
inOrder.verify(eventCatchupProcessorBean).performEventCatchup(new CatchupSubscriptionContext(componentName, subscription_2, catchupRequestedEvent));
}

@Test
Expand All @@ -70,6 +70,6 @@ public void shouldNotRunCatchupForThisComponentIfTheComponentIsNotAnEventListene

eventCatchupByComponentRunner.runEventCatchupForComponent(subscriptionsDescriptor, catchupRequestedEvent);

verifyZeroInteractions(eventCatchupBySubscriptionRunner);
verifyZeroInteractions(eventCatchupProcessorBean);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public class EventCatchupProcessorBeanTest {
@Test
public void shouldPerformEventCatchup() throws Exception {

final CatchupContext catchupContext = mock(CatchupContext.class);
final CatchupSubscriptionContext catchupSubscriptionContext = mock(CatchupSubscriptionContext.class);
final EventCatchupProcessor eventCatchupProcessor = mock(EventCatchupProcessor.class);

when(eventCatchupProcessorFactory.create()).thenReturn(eventCatchupProcessor);

eventCatchupProcessorBean.performEventCatchup(catchupContext);
eventCatchupProcessorBean.performEventCatchup(catchupSubscriptionContext);

verify(eventCatchupProcessor).performEventCatchup(catchupContext);
verify(eventCatchupProcessor).performEventCatchup(catchupSubscriptionContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
final Subscription subscription = mock(Subscription.class);
final PublishedEventSource publishedEventSource = mock(PublishedEventSource.class);
final CatchupRequestedEvent catchupRequestedEvent = mock(CatchupRequestedEvent.class);
final CatchupContext catchupContext = new CatchupContext(componentName, subscription, catchupRequestedEvent);
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(componentName, subscription, catchupRequestedEvent);
final SystemCommand systemCommand = mock(SystemCommand.class);

final PublishedEvent publishedEvent_1 = mock(PublishedEvent.class);
Expand All @@ -116,7 +116,7 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
when(eventStreamConsumerManager.add(publishedEvent_3, subscriptionName)).thenReturn(1);
when(catchupRequestedEvent.getTarget()).thenReturn(systemCommand);

eventCatchupProcessor.performEventCatchup(catchupContext);
eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext);

final InOrder inOrder = inOrder(
catchupStartedForSubscriptionEventFirer,
Expand Down Expand Up @@ -159,7 +159,7 @@ public void shouldThrowExceptionIfEventNumberIsAbsentFromPublishedEvent() throws
final Subscription subscription = mock(Subscription.class);
final PublishedEventSource publishedEventSource = mock(PublishedEventSource.class);
final CatchupRequestedEvent catchupRequestedEvent = mock(CatchupRequestedEvent.class);
final CatchupContext catchupContext = new CatchupContext(componentName, subscription, catchupRequestedEvent);
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(componentName, subscription, catchupRequestedEvent);
final SystemCommand systemCommand = mock(SystemCommand.class);

final PublishedEvent publishedEvent_1 = mock(PublishedEvent.class);
Expand All @@ -185,7 +185,7 @@ public void shouldThrowExceptionIfEventNumberIsAbsentFromPublishedEvent() throws
when(catchupRequestedEvent.getTarget()).thenReturn(systemCommand);

try {
eventCatchupProcessor.performEventCatchup(catchupContext);
eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext);
fail();
} catch (final MissingEventNumberException expected) {
assertThat(expected.getMessage(), is("PublishedEvent with id '937f9fd6-3679-4bc2-a73c-6a7b18a651e1' is missing its event number"));
Expand Down

This file was deleted.

0 comments on commit 5458847

Please sign in to comment.