Skip to content

Commit

Permalink
Fire CatchupStartedEvent at the correct time to allow for multiple co…
Browse files Browse the repository at this point in the history
…mponent catchups
  • Loading branch information
amckenzie committed Jan 21, 2020
1 parent b51f738 commit a744f70
Show file tree
Hide file tree
Showing 20 changed files with 451 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventstore.management.catchup.process.CatchupDurationCalculator;
import uk.gov.justice.services.eventstore.management.catchup.process.CatchupFor;
import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress;
import uk.gov.justice.services.eventstore.management.catchup.process.EventCatchupRunner;
import uk.gov.justice.services.eventstore.management.catchup.state.CatchupError;
Expand All @@ -15,12 +14,14 @@
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import uk.gov.justice.services.jmx.logging.MdcLoggerInterceptor;
import uk.gov.justice.services.jmx.state.events.SystemCommandStateChangedEvent;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -65,10 +66,10 @@ public void onCatchupRequested(@Observes final CatchupRequestedEvent catchupRequ

final ZonedDateTime catchupStartedAt = clock.now();

catchupStateManager.clear(catchupCommand);
catchupStateManager.clear();
catchupErrorStateManager.clear(catchupCommand);

final String message = format("%s started at %s", catchupCommand.getName(), catchupStartedAt);
final String message = format("%s requested at %s", catchupCommand.getName(), catchupStartedAt);
systemCommandStateChangedEventFirer.fire(new SystemCommandStateChangedEvent(
commandId,
catchupCommand,
Expand All @@ -82,45 +83,45 @@ public void onCatchupRequested(@Observes final CatchupRequestedEvent catchupRequ
eventCatchupRunner.runEventCatchup(commandId, catchupCommand);
}

public void onCatchupStartedForSubscription(@Observes final CatchupStartedForSubscriptionEvent catchupStartedForSubscriptionEvent) {
public void onCatchupStarted(@Observes final CatchupStartedEvent catchupStartedEvent) {

final String subscriptionName = catchupStartedForSubscriptionEvent.getSubscriptionName();
final String componentName = catchupStartedForSubscriptionEvent.getComponentName();
final ZonedDateTime catchupStartedAt = catchupStartedForSubscriptionEvent.getCatchupStartedAt();
final CatchupCommand catchupCommand = catchupStartedForSubscriptionEvent.getCatchupCommand();
final List<SubscriptionCatchupDetails> subscriptionCatchupDetailsList = catchupStartedEvent
.getSubscriptionCatchupDefinition();

final CatchupInProgress catchupInProgress = new CatchupInProgress(
new CatchupFor(subscriptionName, componentName),
catchupStartedAt);
final ZonedDateTime catchupStartedAt = catchupStartedEvent.getCatchupStartedAt();
final CatchupCommand catchupCommand = catchupStartedEvent.getCatchupCommand();

catchupStateManager.addCatchupInProgress(catchupInProgress, catchupCommand);
catchupStateManager.newCatchupInProgress(
subscriptionCatchupDetailsList,
catchupStartedAt);

logger.info(format("%s for subscription '%s' started at %s", catchupCommand.getName(), subscriptionName, catchupStartedAt));
logger.info(format("%s started at %s", catchupCommand.getName(), catchupStartedAt));
}

public void onCatchupCompleteForSubscription(@Observes final CatchupCompletedForSubscriptionEvent catchupCompletedForSubscriptionEvent) {
final UUID commandId = catchupCompletedForSubscriptionEvent.getCommandId();
final String subscriptionName = catchupCompletedForSubscriptionEvent.getSubscriptionName();
final String eventSourceName = catchupCompletedForSubscriptionEvent.getEventSourceName();
final String componentName = catchupCompletedForSubscriptionEvent.getComponentName();

final ZonedDateTime catchupCompletedAt = catchupCompletedForSubscriptionEvent.getCatchupCompletedAt();
final int totalNumberOfEvents = catchupCompletedForSubscriptionEvent.getTotalNumberOfEvents();
final CatchupCommand catchupCommand = catchupCompletedForSubscriptionEvent.getCatchupCommand();

final CatchupFor catchupFor = new CatchupFor(subscriptionName, componentName);
final SubscriptionCatchupDetails subscriptionCatchupDefinition = new SubscriptionCatchupDetails(subscriptionName, eventSourceName, componentName);

logger.info(format("%s for '%s' '%s' completed at %s", catchupCommand.getName(), componentName, subscriptionName, catchupCompletedAt));
logger.info(format("%s for '%s' '%s' caught up %d events", catchupCommand.getName(), componentName, subscriptionName, totalNumberOfEvents));

final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(catchupFor, catchupCommand);
final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(subscriptionCatchupDefinition);

final Duration catchupDuration = catchupDurationCalculator.calculate(
catchupInProgress.getStartedAt(),
catchupCompletedForSubscriptionEvent.getCatchupCompletedAt());

logger.info(format("%s for '%s' '%s' took %d milliseconds", catchupCommand.getName(), componentName, subscriptionName, catchupDuration.toMillis()));

if (catchupStateManager.noCatchupsInProgress(catchupCommand)) {
if (catchupStateManager.noCatchupsInProgress()) {
catchupProcessCompleter.handleCatchupComplete(commandId, catchupCommand);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;

import javax.inject.Inject;

public class RunCatchupForComponentSelector {
public class CanCatchupFilter {

@Inject
private CatchupTypeSelector catchupTypeSelector;

public boolean shouldRunForThisComponentAndType(final String componentName, final CatchupCommand catchupCommand) {
public boolean canCatchup(final SubscriptionsDescriptor subscriptionsDescriptor, final CatchupCommand catchupCommand) {

final String componentName = subscriptionsDescriptor.getServiceComponent();
final boolean eventCatchup = catchupTypeSelector.isEventCatchup(componentName, catchupCommand);
final boolean indexerCatchup = catchupTypeSelector.isIndexerCatchup(componentName, catchupCommand);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;

import java.time.ZonedDateTime;
import java.util.Objects;

public class CatchupInProgress {

private final CatchupFor catchupFor;
private final SubscriptionCatchupDetails subscriptionCatchupDetails;
private final ZonedDateTime startedAt;

public CatchupInProgress(final CatchupFor catchupFor, final ZonedDateTime startedAt) {
this.catchupFor = catchupFor;
public CatchupInProgress(final SubscriptionCatchupDetails subscriptionCatchupDetails, final ZonedDateTime startedAt) {
this.subscriptionCatchupDetails = subscriptionCatchupDetails;
this.startedAt = startedAt;
}

public CatchupFor getCatchupFor() {
return catchupFor;
public SubscriptionCatchupDetails getSubscriptionCatchupDetails() {
return subscriptionCatchupDetails;
}

public ZonedDateTime getStartedAt() {
Expand All @@ -26,19 +28,19 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof CatchupInProgress)) return false;
final CatchupInProgress that = (CatchupInProgress) o;
return Objects.equals(catchupFor, that.catchupFor) &&
return Objects.equals(subscriptionCatchupDetails, that.subscriptionCatchupDetails) &&
Objects.equals(startedAt, that.startedAt);
}

@Override
public int hashCode() {
return Objects.hash(catchupFor, startedAt);
return Objects.hash(subscriptionCatchupDetails, startedAt);
}

@Override
public String toString() {
return "CatchupInProgress{" +
"catchupFor=" + catchupFor +
"catchupFor=" + subscriptionCatchupDetails +
", startedAt=" + startedAt +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;

import java.util.Objects;
import java.util.UUID;
Expand All @@ -10,17 +10,17 @@ public class CatchupSubscriptionContext {

private final UUID commandId;
private final String componentName;
private final Subscription subscription;
private final SubscriptionCatchupDetails subscriptionCatchupDefinition;
private final CatchupCommand catchupCommand;

public CatchupSubscriptionContext(
final UUID commandId,
final String componentName,
final Subscription subscription,
final SubscriptionCatchupDetails subscriptionCatchupDefinition,
final CatchupCommand catchupCommand) {
this.commandId = commandId;
this.componentName = componentName;
this.subscription = subscription;
this.subscriptionCatchupDefinition = subscriptionCatchupDefinition;
this.catchupCommand = catchupCommand;
}

Expand All @@ -32,8 +32,8 @@ public String getComponentName() {
return componentName;
}

public Subscription getSubscription() {
return subscription;
public SubscriptionCatchupDetails getSubscriptionCatchupDefinition() {
return subscriptionCatchupDefinition;
}

public CatchupCommand getCatchupCommand() {
Expand All @@ -47,21 +47,21 @@ public boolean equals(final Object o) {
final CatchupSubscriptionContext that = (CatchupSubscriptionContext) o;
return Objects.equals(commandId, that.commandId) &&
Objects.equals(componentName, that.componentName) &&
Objects.equals(subscription, that.subscription) &&
Objects.equals(subscriptionCatchupDefinition, that.subscriptionCatchupDefinition) &&
Objects.equals(catchupCommand, that.catchupCommand);
}

@Override
public int hashCode() {
return Objects.hash(commandId, componentName, subscription, catchupCommand);
return Objects.hash(commandId, componentName, subscriptionCatchupDefinition, catchupCommand);
}

@Override
public String toString() {
return "CatchupSubscriptionContext{" +
"commandId=" + commandId +
", componentName='" + componentName + '\'' +
", subscription=" + subscription +
", catchupFor=" + subscriptionCatchupDefinition +
", catchupCommand=" + catchupCommand +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import static java.lang.String.format;

import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;

import java.util.UUID;

Expand All @@ -14,41 +13,26 @@

public class EventCatchupByComponentRunner {

@Inject
private RunCatchupForComponentSelector runCatchupForComponentSelector;

@Inject
private EventCatchupProcessorBean eventCatchupProcessorBean;

@Inject
private Logger logger;

public void runEventCatchupForComponent(
final SubscriptionCatchupDetails subscriptionCatchupDefinition,
final UUID commandId,
final SubscriptionsDescriptor subscriptionsDescriptor,
final CatchupCommand catchupCommand) {

final String componentName = subscriptionsDescriptor.getServiceComponent();

if (runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, catchupCommand)) {
subscriptionsDescriptor
.getSubscriptions()
.forEach(subscription -> runEventCatchupForSubscription(commandId, catchupCommand, componentName, subscription));
}
}

private void runEventCatchupForSubscription(
final UUID commandId,
final CatchupCommand catchupCommand,
final String componentName,
final Subscription subscription) {
final String componentName = subscriptionCatchupDefinition.getComponentName();
final String subscriptionName = subscriptionCatchupDefinition.getSubscriptionName();

logger.info(format("Running %s for Component '%s', Subscription '%s'", catchupCommand.getName(), componentName, subscription.getName()));
logger.info(format("Running %s for Component '%s', Subscription '%s'", catchupCommand.getName(), componentName, subscriptionName));

final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(
commandId,
componentName,
subscription,
subscriptionCatchupDefinition,
catchupCommand);

eventCatchupProcessorBean.performEventCatchup(catchupSubscriptionContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;

import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -29,9 +28,6 @@ public class EventCatchupProcessor {
@Inject
private MissingEventStreamer missingEventStreamer;

@Inject
private Event<CatchupStartedForSubscriptionEvent> catchupStartedForSubscriptionEventFirer;

@Inject
private Event<CatchupCompletedForSubscriptionEvent> catchupCompletedForSubscriptionEventFirer;

Expand All @@ -45,19 +41,12 @@ public class EventCatchupProcessor {
public void performEventCatchup(final CatchupSubscriptionContext catchupSubscriptionContext) {

final UUID commandId = catchupSubscriptionContext.getCommandId();
final Subscription subscription = catchupSubscriptionContext.getSubscription();
final String subscriptionName = subscription.getName();
final String eventSourceName = subscription.getEventSourceName();
final SubscriptionCatchupDetails subscriptionCatchupDefinition = catchupSubscriptionContext.getSubscriptionCatchupDefinition();
final String subscriptionName = subscriptionCatchupDefinition.getSubscriptionName();
final String eventSourceName = subscriptionCatchupDefinition.getEventSourceName();
final String componentName = catchupSubscriptionContext.getComponentName();
final CatchupCommand catchupCommand = catchupSubscriptionContext.getCatchupCommand();

catchupStartedForSubscriptionEventFirer.fire(new CatchupStartedForSubscriptionEvent(
commandId,
subscriptionName,
componentName,
catchupCommand,
clock.now()));

logger.info(format("Finding all missing events for event source '%s', component '%s", eventSourceName, componentName));
final Stream<PublishedEvent> events = missingEventStreamer.getMissingEvents(eventSourceName, componentName);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
package uk.gov.justice.services.eventstore.management.catchup.process;

import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.subscription.registry.SubscriptionsDescriptorsRegistry;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;

import java.util.List;
import java.util.UUID;

import javax.enterprise.event.Event;
import javax.inject.Inject;

public class EventCatchupRunner {

@Inject
private SubscriptionsDescriptorsRegistry subscriptionsDescriptorsRegistry;
private EventCatchupByComponentRunner eventCatchupByComponentRunner;

@Inject
private EventCatchupByComponentRunner eventCatchupByComponentRunner;
private Event<CatchupStartedEvent> catchupStartedEventFirer;

@Inject
private SubscriptionCatchupProvider subscriptionCatchupProvider;

@Inject
private UtcClock clock;

public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCommand) {

subscriptionsDescriptorsRegistry
.getAll()
.forEach(subscriptionsDescriptor -> eventCatchupByComponentRunner.runEventCatchupForComponent(
commandId,
subscriptionsDescriptor,
catchupCommand));
final List<SubscriptionCatchupDetails> subscriptionCatchupDefinitions = subscriptionCatchupProvider.getBySubscription(catchupCommand);

catchupStartedEventFirer.fire(new CatchupStartedEvent(
commandId,
catchupCommand,
subscriptionCatchupDefinitions,
clock.now()
));

subscriptionCatchupDefinitions.forEach(catchupFor -> eventCatchupByComponentRunner.runEventCatchupForComponent(
catchupFor,
commandId,
catchupCommand));
}
}
Loading

0 comments on commit a744f70

Please sign in to comment.