From a8d9bc45bce0e0345a992c234b13a2ac47edb4f8 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Thu, 5 Dec 2019 16:10:29 +0000 Subject: [PATCH] Add backpressure to the event processing queues in catchup --- CHANGELOG.md | 2 + .../ConcurrentEventStreamConsumerManager.java | 30 ++++++++++++ .../DefaultEventQueueProcessingConfig.java | 23 +++++++++ .../manager/EventQueueProcessingConfig.java | 5 ++ .../EventStreamConsumptionResolver.java | 2 + .../manager/EventsInProcessCounter.java | 25 ++++++++++ .../EventsInProcessCounterProvider.java | 21 +++++++++ .../consumer/task/EventQueueConsumer.java | 5 +- .../catchup/EventStreamCatchupIT.java | 6 ++- .../DummyEventQueueProcessingConfig.java | 15 ++++++ ...currentEventStreamConsumerManagerTest.java | 37 +++++++++++++++ ...DefaultEventQueueProcessingConfigTest.java | 25 ++++++++++ .../EventsInProcessCounterProviderTest.java | 47 +++++++++++++++++++ .../consumer/task/EventQueueConsumerTest.java | 7 ++- 14 files changed, 247 insertions(+), 3 deletions(-) create mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfig.java create mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueProcessingConfig.java create mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java create mode 100644 event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProvider.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/DummyEventQueueProcessingConfig.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfigTest.java create mode 100644 event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProviderTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 02a04436e..d452de566 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- Backpressure added to the event processing queues during catchup ### Fixed - Verification completion log message now correctly logs if verification of Catchup or of Rebuild diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java index 9fdda2dcd..e513017ab 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManager.java @@ -1,5 +1,7 @@ package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; +import static java.lang.Thread.currentThread; + import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer; import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; @@ -26,6 +28,9 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer private final ConcurrentHashMap> allEventStreams = new ConcurrentHashMap<>(); + @Inject + private EventsInProcessCounterProvider eventsInProcessCounterProvider; + @Inject private EventStreamsInProgressList eventStreamsInProgressList; @@ -61,12 +66,26 @@ public int add( final Queue events = allEventStreams.computeIfAbsent(streamId, id -> new ConcurrentLinkedQueue<>()); + final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); + synchronized (EXCLUSIVE_LOCK) { + + while (eventsInProcessCounter.maxNumberOfEventsInProcess()) { + try { + EXCLUSIVE_LOCK.wait(); + } catch (final InterruptedException e) { + currentThread().interrupt(); + break; + } + } + events.offer(publishedEvent); if (notInProgress(events)) { createAndSubmitTaskFor(events, subscriptionName, catchupCommand, commandId); } + + eventsInProcessCounter.incrementEventsInProcessCount(); } return 1; @@ -99,6 +118,17 @@ public void waitForCompletion() { eventStreamsInProgressList.blockUntilEmpty(); } + @Override + public void decrementEventsInProcessCount() { + + final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); + + synchronized (EXCLUSIVE_LOCK) { + eventsInProcessCounter.decrementEventsInProcessCount(); + EXCLUSIVE_LOCK.notify(); + } + } + private boolean notInProgress(final Queue eventStream) { return !eventStreamsInProgressList.contains(eventStream); } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfig.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfig.java new file mode 100644 index 000000000..ffcc6e30e --- /dev/null +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfig.java @@ -0,0 +1,23 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import static java.lang.Integer.parseInt; + +import uk.gov.justice.services.common.configuration.GlobalValue; + +import javax.annotation.Priority; +import javax.enterprise.inject.Alternative; +import javax.inject.Inject; + +@Alternative +@Priority(100) +public class DefaultEventQueueProcessingConfig implements EventQueueProcessingConfig { + + @Inject + @GlobalValue(key = "catchup.event.processing.max.total.events.in.process", defaultValue = "100000") + private String maxTotalEventsInProcess; + + @Override + public int getMaxTotalEventsInProcess() { + return parseInt(maxTotalEventsInProcess); + } +} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueProcessingConfig.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueProcessingConfig.java new file mode 100644 index 000000000..a18f8c6d1 --- /dev/null +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventQueueProcessingConfig.java @@ -0,0 +1,5 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +public interface EventQueueProcessingConfig { + int getMaxTotalEventsInProcess(); +} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java index 0c026aa48..c04c16f75 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventStreamConsumptionResolver.java @@ -16,4 +16,6 @@ public interface EventStreamConsumptionResolver { * @return true if all events are consumed, false if there are still events remaining in the queue. */ boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage); + + void decrementEventsInProcessCount(); } diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java new file mode 100644 index 000000000..38aa7da9c --- /dev/null +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounter.java @@ -0,0 +1,25 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import java.util.concurrent.atomic.AtomicInteger; + +public class EventsInProcessCounter { + + private final int maxTotalEventsInProcess; + private AtomicInteger eventInProcessCount = new AtomicInteger(0); + + public EventsInProcessCounter(final int maxTotalEventsInProcess) { + this.maxTotalEventsInProcess = maxTotalEventsInProcess; + } + + public synchronized void incrementEventsInProcessCount() { + eventInProcessCount.incrementAndGet(); + } + + public synchronized void decrementEventsInProcessCount() { + eventInProcessCount.decrementAndGet(); + } + + public synchronized boolean maxNumberOfEventsInProcess() { + return eventInProcessCount.get() >= maxTotalEventsInProcess; + } +} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProvider.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProvider.java new file mode 100644 index 000000000..9ce79f229 --- /dev/null +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProvider.java @@ -0,0 +1,21 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import java.util.concurrent.ConcurrentHashMap; + +import javax.inject.Inject; + +public class EventsInProcessCounterProvider { + + @Inject + private EventQueueProcessingConfig eventQueueProcessingConfig; + + private ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap<>(); + + public EventsInProcessCounter getInstance() { + return concurrentHashMap.computeIfAbsent("default", this::newInstance); + } + + private EventsInProcessCounter newInstance(String s) { + return new EventsInProcessCounter(eventQueueProcessingConfig.getMaxTotalEventsInProcess()); + } +} diff --git a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java index b103d8c96..25ecb8965 100644 --- a/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java +++ b/event-sourcing/subscription-manager/src/main/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumer.java @@ -29,13 +29,16 @@ public boolean consumeEventQueue( final Queue events, final String subscriptionName, final CatchupCommand catchupCommand) { + while (!events.isEmpty()) { - final PublishedEvent publishedEvent = events.poll(); + final PublishedEvent publishedEvent = events.poll(); try { transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName); } catch (final RuntimeException e) { eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId); + } finally { + eventStreamConsumptionResolver.decrementEventsInProcessCount(); } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java index 73c219eb4..7086195cd 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/EventStreamCatchupIT.java @@ -6,9 +6,11 @@ import static uk.gov.justice.services.core.postgres.OpenEjbConfigurationBuilder.createOpenEjbConfigurationBuilder; import uk.gov.justice.services.cdi.LoggerProducer; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.DummyEventQueueProcessingConfig; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventsInProcessCounterProvider; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskFactory; import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager; @@ -57,7 +59,9 @@ public class EventStreamCatchupIT { ConcurrentEventStreamConsumerManager.class, EventProcessingFailedHandler.class, ConsumeEventQueueTaskManager.class, - ConsumeEventQueueTaskFactory.class + ConsumeEventQueueTaskFactory.class, + EventsInProcessCounterProvider.class, + DummyEventQueueProcessingConfig.class }) public WebApp war() { return new WebApp() diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/DummyEventQueueProcessingConfig.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/DummyEventQueueProcessingConfig.java new file mode 100644 index 000000000..4cca15e0b --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/DummyEventQueueProcessingConfig.java @@ -0,0 +1,15 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer; + +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.DefaultEventQueueProcessingConfig; +import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueProcessingConfig; + +import org.apache.openejb.testing.Default; + +@Default +public class DummyEventQueueProcessingConfig implements EventQueueProcessingConfig { + + @Override + public int getMaxTotalEventsInProcess() { + return 100; + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java index 1ccfbe19b..348ea2ed5 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/ConcurrentEventStreamConsumerManagerTest.java @@ -31,6 +31,9 @@ @RunWith(MockitoJUnitRunner.class) public class ConcurrentEventStreamConsumerManagerTest { + @Mock + private EventsInProcessCounterProvider eventsInProcessCounterProvider; + @Mock private ConsumeEventQueueTaskManager consumeEventQueueTaskManager; @@ -56,7 +59,10 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() { final PublishedEvent publishedEvent = mock(PublishedEvent.class); final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent.getStreamId()).thenReturn(streamId); @@ -68,6 +74,7 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() { assertThat(events.size(), is(1)); assertThat(events.poll(), is(publishedEvent)); + verify(eventsInProcessCounter).incrementEventsInProcessCount(); } @Test @@ -81,6 +88,10 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() { final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); final CatchupCommand catchupCommand = new EventCatchupCommand(); + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId); when(publishedEvent_2.getStreamId()).thenReturn(streamId); @@ -94,6 +105,8 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() { assertThat(eventsStream.size(), is(2)); assertThat(eventsStream.poll(), is(publishedEvent_1)); assertThat(eventsStream.poll(), is(publishedEvent_2)); + + verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount(); } @Test @@ -108,6 +121,10 @@ public void shouldCreateQueueForEachStreamId() { final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class); final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); @@ -126,6 +143,8 @@ public void shouldCreateQueueForEachStreamId() { final Queue eventsStream_2 = allValues.get(1); assertThat(eventsStream_2.size(), is(1)); assertThat(eventsStream_2.poll(), is(publishedEvent_2)); + + verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount(); } @Test @@ -140,6 +159,10 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class); final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class); + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false); when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer); when(publishedEvent_1.getStreamId()).thenReturn(streamId_1); when(publishedEvent_2.getStreamId()).thenReturn(streamId_2); @@ -160,6 +183,8 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp final Queue eventsStream_2 = eventQueueCaptor.getValue(); assertThat(eventsStream_2.size(), is(1)); assertThat(eventsStream_2.poll(), is(publishedEvent_2)); + + verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount(); } @Test @@ -169,4 +194,16 @@ public void shouldBlockOnTheEventsStreamInProgressListWhenWaitingForCompletion() verify(eventStreamsInProgressList).blockUntilEmpty(); } + + @Test + public void shouldDecrementTheEventsInProcessCount() throws Exception { + + final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class); + + when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter); + + concurrentEventStreamConsumerManager.decrementEventsInProcessCount(); + + verify(eventsInProcessCounter).decrementEventsInProcessCount(); + } } diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfigTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfigTest.java new file mode 100644 index 000000000..419f03e9c --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/DefaultEventQueueProcessingConfigTest.java @@ -0,0 +1,25 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultEventQueueProcessingConfigTest { + + @InjectMocks + private DefaultEventQueueProcessingConfig defaultEventQueueProcessingConfig; + + @Test + public void shouldGetTheInjectedJndiValue() throws Exception { + + setField(defaultEventQueueProcessingConfig, "maxTotalEventsInProcess", "23"); + + assertThat(defaultEventQueueProcessingConfig.getMaxTotalEventsInProcess(), is(23)); + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProviderTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProviderTest.java new file mode 100644 index 000000000..45eace500 --- /dev/null +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/manager/EventsInProcessCounterProviderTest.java @@ -0,0 +1,47 @@ +package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class EventsInProcessCounterProviderTest { + + @Mock + private EventQueueProcessingConfig eventQueueProcessingConfig; + + @InjectMocks + private EventsInProcessCounterProvider eventsInProcessCounterProvider; + + @Test + public void shouldCreateWithCorrectMaxTotalEventsInProcess() throws Exception { + + final int maxTotalEventsInProcess = 982734; + + when(eventQueueProcessingConfig.getMaxTotalEventsInProcess()).thenReturn(maxTotalEventsInProcess); + + final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); + + assertThat(getValueOfField(eventsInProcessCounter, "maxTotalEventsInProcess", Integer.class), is(maxTotalEventsInProcess)); + } + + @Test + public void shouldAlwaysReturnTheSameInstance() throws Exception { + + final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance(); + + assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter))); + assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter))); + assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter))); + assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter))); + assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter))); + } +} diff --git a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java index 0f1fdabaa..a23949a88 100644 --- a/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java +++ b/event-sourcing/subscription-manager/src/test/java/uk/gov/justice/services/event/sourcing/subscription/catchup/consumer/task/EventQueueConsumerTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -60,10 +61,12 @@ public void shouldProcessAllEventsOnQueueAndReturnTrueIfComplete() throws Except eventQueueConsumer.consumeEventQueue(commandId, eventQueue, subscriptionName, catchupCommand); - final InOrder inOrder = inOrder(transactionalEventProcessor); + final InOrder inOrder = inOrder(transactionalEventProcessor, eventStreamConsumptionResolver); inOrder.verify(transactionalEventProcessor).processWithEventBuffer(event_1, subscriptionName); + inOrder.verify(eventStreamConsumptionResolver).decrementEventsInProcessCount(); inOrder.verify(transactionalEventProcessor).processWithEventBuffer(event_2, subscriptionName); + inOrder.verify(eventStreamConsumptionResolver).decrementEventsInProcessCount(); } @Test @@ -99,5 +102,7 @@ public void shouldHandleExceptionsThrownWhilstProcessing() throws Exception { catchupCommand, commandId ); + + verify(eventStreamConsumptionResolver, times(2)).decrementEventsInProcessCount(); } }