Skip to content

Commit

Permalink
Merge 10fa43a into 7457df8
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Jan 6, 2020
2 parents 7457df8 + 10fa43a commit 03cb9a5
Show file tree
Hide file tree
Showing 26 changed files with 297 additions and 193 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
## [2.4.4] - 2020-01-06
### Added
- Added mechanism to also drop/add trigger to event_log table on SUSPEND/UNSUSPEND commands
### Fixed
- Fixed potential problem of a transaction failing during catchup causing catchup to never complete

## [2.4.3] - 2019-12-06
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static java.lang.Thread.currentThread;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;

Expand All @@ -13,15 +12,17 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.inject.Inject;
import javax.inject.Singleton;

/**
* A concurrent implementation of EventStreamConsumerManager and EventStreamConsumerListener.
*
* <p>
* This uses the ManagedExecutorService for concurrency and Queues events according to the Stream
* Id.
*
* <p>
* When the add method is called
*/
@Singleton
public class ConcurrentEventStreamConsumerManager implements EventStreamConsumerManager, EventStreamConsumptionResolver {

private static final Object EXCLUSIVE_LOCK = new Object();
Expand All @@ -37,17 +38,14 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer
@Inject
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;

@Inject
private EventQueueConsumerFactory eventQueueConsumerFactory;

/**
* A ConcurrentLinkedQueue is created for each Stream Id and added to a ConcurrentHashMap. An
* event is added to the Queue for a Stream Id.
*
* <p>
* If the Queue is not currently being processed a new ConsumeEventQueueTask is created and
* submitted to the ManagedExecutorService. The Queue is then added to the
* eventStreamsInProgress Queue.
*
* <p>
* If the Queue is currently being processed no further action is taken, as the event will be
* processed by the current ConsumeEventQueueTask.
*
Expand Down Expand Up @@ -124,9 +122,19 @@ public void decrementEventsInProcessCount() {
final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
}

@Override
public void decrementEventsInProcessCountBy(final int count) {
final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCountBy(count);
EXCLUSIVE_LOCK.notify();
}
}

private boolean notInProgress(final Queue<PublishedEvent> eventStream) {
Expand All @@ -141,10 +149,8 @@ private void createAndSubmitTaskFor(

eventStreamsInProgressList.add(eventStream);

final EventQueueConsumer eventQueueConsumer = eventQueueConsumerFactory.create(this);
consumeEventQueueTaskManager.consume(
eventStream,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
* Interface for listening to Event Stream Consumers.
*
* <p>
* An instance of the an EventStreamConsumerListener is passed to the consumer. The consumer will
* call the finishedConsuming method once complete.
*/
Expand All @@ -12,10 +12,12 @@ public interface EventStreamConsumptionResolver {
* Called by a consumer when finish is expected.
*
* @param finishedProcessingMessage - the message containing the Queue that has been consumed.
*
* @return true if all events are consumed, false if there are still events remaining in the queue.
* @return true if all events are consumed, false if there are still events remaining in the
* queue.
*/
boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage);

void decrementEventsInProcessCount();

void decrementEventsInProcessCountBy(final int count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public synchronized void decrementEventsInProcessCount() {
eventInProcessCount.decrementAndGet();
}

public synchronized void decrementEventsInProcessCountBy(final int count) {
eventInProcessCount.addAndGet(count * -1);
}

public synchronized boolean maxNumberOfEventsInProcess() {
return eventInProcessCount.get() >= maxTotalEventsInProcess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import static javax.ejb.TransactionManagementType.CONTAINER;
import static javax.transaction.Transactional.TxType.NEVER;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamConsumptionResolver;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;

import java.util.Queue;
import java.util.UUID;
Expand All @@ -13,28 +15,47 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ejb.TransactionManagement;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;

@Stateless
@TransactionManagement(CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NEVER)
public class ConsumeEventQueueBean {

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

@Inject
private Event<CatchupProcessingOfEventFailedEvent> catchupProcessingOfEventFailedEventFirer;

@Inject
private EventStreamConsumptionResolver eventStreamConsumptionResolver;

@Inject
private EventQueueConsumer eventQueueConsumer;

@Transactional(NEVER)
public void consume(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

boolean consumed = false;
while(! consumed) {
consumed = eventQueueConsumer.consumeEventQueue(
commandId,
events,
subscriptionName,
catchupCommand);
while (!consumed) {
try {
consumed = eventQueueConsumer.consumeEventQueue(
commandId,
events,
subscriptionName,
catchupCommand);
} catch (final Exception e) {
eventStreamConsumptionResolver.decrementEventsInProcessCountBy(events.size());
events.clear();
eventProcessingFailedHandler.handleStreamFailure(e, subscriptionName, catchupCommand, commandId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,18 @@ public class ConsumeEventQueueTask implements Runnable {

private final ConsumeEventQueueBean consumeEventQueueBean;
private final Queue<PublishedEvent> events;
private final EventQueueConsumer eventQueueConsumer;
private final String subscriptionName;
private final CatchupCommand catchupCommand;
private final UUID commandId;

public ConsumeEventQueueTask(
final ConsumeEventQueueBean consumeEventQueueBean,
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {
this.consumeEventQueueBean = consumeEventQueueBean;
this.events = events;
this.eventQueueConsumer = eventQueueConsumer;
this.subscriptionName = subscriptionName;
this.catchupCommand = catchupCommand;
this.commandId = commandId;
Expand All @@ -35,7 +32,6 @@ public void run() {

consumeEventQueueBean.consume(
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ public class ConsumeEventQueueTaskFactory {

public ConsumeEventQueueTask createConsumeEventQueueTask(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

return new ConsumeEventQueueTask(
consumeEventQueueBean,
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ public class ConsumeEventQueueTaskManager {

public void consume(
final Queue<PublishedEvent> events,
final EventQueueConsumer eventQueueConsumer,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {


final ConsumeEventQueueTask consumeEventQueueTask = consumeEventQueueTaskFactory.createConsumeEventQueueTask(
events,
eventQueueConsumer,
subscriptionName,
catchupCommand,
commandId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,44 @@ public class EventProcessingFailedHandler {
@Inject
private Logger logger;

public void handle(
final RuntimeException exception,
public void handleEventFailure(
final Exception exception,
final PublishedEvent publishedEvent,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

final String logMessage = format("Failed to process publishedEvent with metadata: %s", publishedEvent.getMetadata());
final String logMessage = format(
"Failed to process publishedEvent: name: '%s', id: '%s', streamId: '%s'",
publishedEvent.getName(),
publishedEvent.getId(),
publishedEvent.getStreamId()
);

handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

public void handleStreamFailure(
final Exception exception,
final String subscriptionName,
final CatchupCommand catchupCommand,
final UUID commandId) {

final String logMessage = "Failed to consume stream of events. Aborting...";

handleFailure(catchupCommand, commandId, logMessage, subscriptionName, exception);
}

private void handleFailure(final CatchupCommand catchupCommand, final UUID commandId, final String logMessage, final String subscriptionName, final Exception exception) {
logger.error(
logMessage,
exception);

final String errorMessage = format("%s: %s: %s", logMessage, exception.getClass().getSimpleName(), exception.getMessage());

final CatchupProcessingOfEventFailedEvent catchupProcessingOfEventFailedEvent = new CatchupProcessingOfEventFailedEvent(
commandId,
publishedEvent.getId(),
publishedEvent.getMetadata(),
errorMessage,
exception,
catchupCommand,
subscriptionName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@
import java.util.Queue;
import java.util.UUID;

import javax.inject.Inject;

public class EventQueueConsumer {

private final TransactionalEventProcessor transactionalEventProcessor;
private final EventStreamConsumptionResolver eventStreamConsumptionResolver;
private final EventProcessingFailedHandler eventProcessingFailedHandler;

public EventQueueConsumer(
final TransactionalEventProcessor transactionalEventProcessor,
final EventStreamConsumptionResolver eventStreamConsumptionResolver,
final EventProcessingFailedHandler eventProcessingFailedHandler) {
this.transactionalEventProcessor = transactionalEventProcessor;
this.eventStreamConsumptionResolver = eventStreamConsumptionResolver;
this.eventProcessingFailedHandler = eventProcessingFailedHandler;
}
@Inject
private TransactionalEventProcessor transactionalEventProcessor;

@Inject
private EventStreamConsumptionResolver eventStreamConsumptionResolver;

@Inject
private EventProcessingFailedHandler eventProcessingFailedHandler;

public boolean consumeEventQueue(
final UUID commandId,
Expand All @@ -35,8 +33,8 @@ public boolean consumeEventQueue(
final PublishedEvent publishedEvent = events.poll();
try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
} catch (final RuntimeException e) {
eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} catch (final Exception e) {
eventProcessingFailedHandler.handleEventFailure(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} finally {
eventStreamConsumptionResolver.decrementEventsInProcessCount();
}
Expand Down
Loading

0 comments on commit 03cb9a5

Please sign in to comment.