Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential problem of a transaction failing during catchup causing… #208

Merged
merged 1 commit into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
## [2.4.4] - 2020-01-06
### Added
- Added mechanism to also drop/add trigger to event_log table on SUSPEND/UNSUSPEND commands
### Fixed
- Fixed potential problem of a transaction failing during catchup causing catchup to never complete

## [2.4.3] - 2019-12-06
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static java.lang.Thread.currentThread;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;

Expand All @@ -13,15 +12,17 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.inject.Inject;
import javax.inject.Singleton;

/**
* A concurrent implementation of EventStreamConsumerManager and EventStreamConsumerListener.
*
* <p>
* This uses the ManagedExecutorService for concurrency and Queues events according to the Stream
* Id.
*
* <p>
* When the add method is called
*/
@Singleton
public class ConcurrentEventStreamConsumerManager implements EventStreamConsumerManager, EventStreamConsumptionResolver {

private static final Object EXCLUSIVE_LOCK = new Object();
Expand All @@ -37,17 +38,14 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer
@Inject
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;

@Inject
private EventQueueConsumerFactory eventQueueConsumerFactory;

/**
* A ConcurrentLinkedQueue is created for each Stream Id and added to a ConcurrentHashMap. An
* event is added to the Queue for a Stream Id.
*
* <p>
* If the Queue is not currently being processed a new ConsumeEventQueueTask is created and
* submitted to the ManagedExecutorService. The Queue is then added to the
* eventStreamsInProgress Queue.
*
* <p>
* If the Queue is currently being processed no further action is taken, as the event will be
* processed by the current ConsumeEventQueueTask.
*
Expand Down Expand Up @@ -124,9 +122,19 @@ public void decrementEventsInProcessCount() {
final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
}

@Override
public void decrementEventsInProcessCountBy(final int count) {
final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCountBy(count);
EXCLUSIVE_LOCK.notify();
}
}

private boolean notInProgress(final Queue<PublishedEvent> eventStream) {
Expand All @@ -141,10 +149,8 @@ private void createAndSubmitTaskFor(

eventStreamsInProgressList.add(eventStream);

final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this);
consumeEventQueueTaskManager.consume(
eventStream,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
* Interface for listening to Event Stream Consumers.
*
* <p>
* An instance of the an EventStreamConsumerListener is passed to the consumer. The consumer will
* call the finishedConsuming method once complete.
*/
Expand All @@ -12,10 +12,12 @@ public interface EventStreamConsumptionResolver {
* Called by a consumer when finish is expected.
*
* @param finishedProcessingMessage - the message containing the Queue that has been consumed.
*
* @return true if all events are consumed, false if there are still events remaining in the queue.
* @return true if all events are consumed, false if there are still events remaining in the
* queue.
*/
boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage);

void decrementEventsInProcessCount();

void decrementEventsInProcessCountBy(final int count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public synchronized void decrementEventsInProcessCount() {
eventInProcessCount.decrementAndGet();
}

public synchronized void decrementEventsInProcessCountBy(final int count) {
eventInProcessCount.addAndGet(count * -1);
}

public synchronized boolean maxNumberOfEventsInProcess() {
return eventInProcessCount.get() >= maxTotalEventsInProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import static javax.ejb.TransactionManagementType.CONTAINER;
import static javax.transaction.Transactional.TxType.NEVER;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;

import java.util.Queue;
import java.util.UUID;
Expand All @@ -13,28 +15,47 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ejb.TransactionManagement;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;

@Stateless
@TransactionManagement(CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NEVER)
public class ConsumeEventQueueBean {

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

@Inject
private Event<CatchupProcessingOfEventFailedEvent> catchupProcessingOfEventFailedEventFirer;

@Inject
private EventStreamConsumptionResolver eventStreamConsumptionResolver;

@Inject
private EventQueueConsumer eventQueueConsumer;

@Transactional(NEVER)
public void consume(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

boolean consumed = false;
while(! consumed) {
consumed = eventQueueConsumer.consumeEventQueue(
commandId,
events,
subscriptionName,
catchupCommand);
while (!consumed) {
try {
consumed = eventQueueConsumer.consumeEventQueue(
commandId,
events,
subscriptionName,
catchupCommand);
} catch (final Exception e) {
eventStreamConsumptionResolver.decrementEventsInProcessCountBy(events.size());
events.clear();
eventProcessingFailedHandler.handleStreamFailure(e, subscriptionName, catchupCommand, commandId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,18 @@ public class ConsumeEventQueueTask implements Runnable {

private final ConsumeEventQueueBean consumeEventQueueBean;
private final Queue<PublishedEvent> events;
private final EventQueueConsumer eventQueueConsumer;
private final String subscriptionName;
private final CatchupCommand catchupCommand;
private final UUID commandId;

public ConsumeEventQueueTask(
final ConsumeEventQueueBean consumeEventQueueBean,
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
this.consumeEventQueueBean = consumeEventQueueBean;
this.events = events;
this.eventQueueConsumer = eventQueueConsumer;
this.subscriptionName = subscriptionName;
this.catchupCommand = catchupCommand;
this.commandId = commandId;
Expand All @@ -35,7 +32,6 @@ public void run() {

consumeEventQueueBean.consume(
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ public class ConsumeEventQueueTaskFactory {

public ConsumeEventQueueTask createConsumeEventQueueTask(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

return new ConsumeEventQueueTask(
consumeEventQueueBean,
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ public class ConsumeEventQueueTaskManager {

public void consume(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {


final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask(
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,44 @@ public class EventProcessingFailedHandler {
@Inject
private Logger logger;

public void handle(
final RuntimeException exception,
public void handleEventFailure(
final Exception exception,
final PublishedEvent publishedEvent,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata());
final String logMessage = format(
"Failed to process publishedEvent: name: '%s', id: '%s', streamId: '%s'",
publishedEvent.getName(),
publishedEvent.getId(),
publishedEvent.getStreamId()
);

handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

public void handleStreamFailure(
final Exception exception,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

final String logMessage = "Failed to consume stream of events. Aborting...";

handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) {
logger.error(
logMessage,
exception);

final String errorMessage = format("%s: %s: %s", logMessage, exception.getClass().getSimpleName(), exception.getMessage());

final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent(
commandId,
publishedEvent.getId(),
publishedEvent.getMetadata(),
errorMessage,
exception,
catchupCommand,
subscriptionName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@
import java.util.Queue;
import java.util.UUID;

import javax.inject.Inject;

public class EventQueueConsumer {

private final TransactionalEventProcessor transactionalEventProcessor;
private final EventStreamConsumptionResolver eventStreamConsumptionResolver;
private final EventProcessingFailedHandler eventProcessingFailedHandler;

public EventQueueConsumer(
final TransactionalEventProcessor transactionalEventProcessor,
final EventStreamConsumptionResolver eventStreamConsumptionResolver,
final EventProcessingFailedHandler eventProcessingFailedHandler) {
this.transactionalEventProcessor = transactionalEventProcessor;
this.eventStreamConsumptionResolver = eventStreamConsumptionResolver;
this.eventProcessingFailedHandler = eventProcessingFailedHandler;
}
@Inject
private TransactionalEventProcessor transactionalEventProcessor;

@Inject
private EventStreamConsumptionResolver eventStreamConsumptionResolver;

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

public boolean consumeEventQueue(
final UUID commandId,
Expand All @@ -35,8 +33,8 @@ public boolean consumeEventQueue(
final PublishedEvent publishedEvent = events.poll();
try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
} catch (final RuntimeException e) {
eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} catch (final Exception e) {
eventProcessingFailedHandler.handleEventFailure(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} finally {
eventStreamConsumptionResolver.decrementEventsInProcessCount();
}
Expand Down
Loading