Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backpressure to the event processing queues in catchup #202

Merged
merged 1 commit into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Backpressure added to the event processing queues during catchup
### Fixed
- Verification completion log message now correctly logs if verification of Catchup or of Rebuild

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static java.lang.Thread.currentThread;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
Expand All @@ -26,6 +28,9 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer

private final ConcurrentHashMap<UUID, Queue<PublishedEvent>> allEventStreams = new ConcurrentHashMap<>();

@Inject
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Inject
private EventStreamsInProgressList eventStreamsInProgressList;

Expand Down Expand Up @@ -61,12 +66,26 @@ public int add(

final Queue<PublishedEvent> events = allEventStreams.computeIfAbsent(streamId, id -> new ConcurrentLinkedQueue<>());

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {

while (eventsInProcessCounter.maxNumberOfEventsInProcess()) {
try {
EXCLUSIVE_LOCK.wait();
} catch (final InterruptedException e) {
currentThread().interrupt();
break;
}
}

events.offer(publishedEvent);

if (notInProgress(events)) {
createAndSubmitTaskFor(events, subscriptionName, catchupCommand, commandId);
}

eventsInProcessCounter.incrementEventsInProcessCount();
}

return 1;
Expand Down Expand Up @@ -99,6 +118,17 @@ public void waitForCompletion() {
eventStreamsInProgressList.blockUntilEmpty();
}

@Override
public void decrementEventsInProcessCount() {

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
}

private boolean notInProgress(final Queue<PublishedEvent> eventStream) {
return !eventStreamsInProgressList.contains(eventStream);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static java.lang.Integer.parseInt;

import uk.gov.justice.services.common.configuration.GlobalValue;

import javax.annotation.Priority;
import javax.enterprise.inject.Alternative;
import javax.inject.Inject;

@Alternative
@Priority(100)
public class DefaultEventQueueProcessingConfig implements EventQueueProcessingConfig {

@Inject
@GlobalValue(key = "catchup.event.processing.max.total.events.in.process", defaultValue = "100000")
private String maxTotalEventsInProcess;

@Override
public int getMaxTotalEventsInProcess() {
return parseInt(maxTotalEventsInProcess);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

public interface EventQueueProcessingConfig {
int getMaxTotalEventsInProcess();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface EventStreamConsumptionResolver {
* @return true if all events are consumed, false if there are still events remaining in the queue.
*/
boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage);

void decrementEventsInProcessCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import java.util.concurrent.atomic.AtomicInteger;

public class EventsInProcessCounter {

private final int maxTotalEventsInProcess;
private AtomicInteger eventInProcessCount = new AtomicInteger(0);

public EventsInProcessCounter(final int maxTotalEventsInProcess) {
this.maxTotalEventsInProcess = maxTotalEventsInProcess;
}

public synchronized void incrementEventsInProcessCount() {
eventInProcessCount.incrementAndGet();
}

public synchronized void decrementEventsInProcessCount() {
eventInProcessCount.decrementAndGet();
}

public synchronized boolean maxNumberOfEventsInProcess() {
return eventInProcessCount.get() >= maxTotalEventsInProcess;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import java.util.concurrent.ConcurrentHashMap;

import javax.inject.Inject;

public class EventsInProcessCounterProvider {

@Inject
private EventQueueProcessingConfig eventQueueProcessingConfig;

private ConcurrentHashMap<String, EventsInProcessCounter> concurrentHashMap = new ConcurrentHashMap<>();

public EventsInProcessCounter getInstance() {
return concurrentHashMap.computeIfAbsent("default", this::newInstance);
}

private EventsInProcessCounter newInstance(String s) {
return new EventsInProcessCounter(eventQueueProcessingConfig.getMaxTotalEventsInProcess());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public boolean consumeEventQueue(
final Queue<PublishedEvent> events,
final String subscriptionName,
final CatchupCommand catchupCommand) {

while (!events.isEmpty()) {
final PublishedEvent publishedEvent = events.poll();

final PublishedEvent publishedEvent = events.poll();
try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
} catch (final RuntimeException e) {
eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} finally {
eventStreamConsumptionResolver.decrementEventsInProcessCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import static uk.gov.justice.services.core.postgres.OpenEjbConfigurationBuilder.createOpenEjbConfigurationBuilder;

import uk.gov.justice.services.cdi.LoggerProducer;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.DummyEventQueueProcessingConfig;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventsInProcessCounterProvider;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
Expand Down Expand Up @@ -57,7 +59,9 @@ public class EventStreamCatchupIT {
ConcurrentEventStreamConsumerManager.class,
EventProcessingFailedHandler.class,
ConsumeEventQueueTaskManager.class,
ConsumeEventQueueTaskFactory.class
ConsumeEventQueueTaskFactory.class,
EventsInProcessCounterProvider.class,
DummyEventQueueProcessingConfig.class
})
public WebApp war() {
return new WebApp()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.DefaultEventQueueProcessingConfig;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueProcessingConfig;

import org.apache.openejb.testing.Default;

@Default
public class DummyEventQueueProcessingConfig implements EventQueueProcessingConfig {

@Override
public int getMaxTotalEventsInProcess() {
return 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
@RunWith(MockitoJUnitRunner.class)
public class ConcurrentEventStreamConsumerManagerTest {

@Mock
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Mock
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;

Expand All @@ -56,7 +59,10 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
final PublishedEvent publishedEvent = mock(PublishedEvent.class);

final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent.getStreamId()).thenReturn(streamId);

Expand All @@ -68,6 +74,7 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
assertThat(events.size(), is(1));
assertThat(events.poll(), is(publishedEvent));

verify(eventsInProcessCounter).incrementEventsInProcessCount();
}

@Test
Expand All @@ -81,6 +88,10 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
final CatchupCommand catchupCommand = new EventCatchupCommand();

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId);
when(publishedEvent_2.getStreamId()).thenReturn(streamId);
Expand All @@ -94,6 +105,8 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
assertThat(eventsStream.size(), is(2));
assertThat(eventsStream.poll(), is(publishedEvent_1));
assertThat(eventsStream.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -108,6 +121,10 @@ public void shouldCreateQueueForEachStreamId() {
final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class);
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
Expand All @@ -126,6 +143,8 @@ public void shouldCreateQueueForEachStreamId() {
final Queue<PublishedEvent> eventsStream_2 = allValues.get(1);
assertThat(eventsStream_2.size(), is(1));
assertThat(eventsStream_2.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -140,6 +159,10 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class);
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
Expand All @@ -160,6 +183,8 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
final Queue<PublishedEvent> eventsStream_2 = eventQueueCaptor.getValue();
assertThat(eventsStream_2.size(), is(1));
assertThat(eventsStream_2.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -169,4 +194,16 @@ public void shouldBlockOnTheEventsStreamInProgressListWhenWaitingForCompletion()

verify(eventStreamsInProgressList).blockUntilEmpty();
}

@Test
public void shouldDecrementTheEventsInProcessCount() throws Exception {

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);

concurrentEventStreamConsumerManager.decrementEventsInProcessCount();

verify(eventsInProcessCounter).decrementEventsInProcessCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class DefaultEventQueueProcessingConfigTest {

@InjectMocks
private DefaultEventQueueProcessingConfig defaultEventQueueProcessingConfig;

@Test
public void shouldGetTheInjectedJndiValue() throws Exception {

setField(defaultEventQueueProcessingConfig, "maxTotalEventsInProcess", "23");

assertThat(defaultEventQueueProcessingConfig.getMaxTotalEventsInProcess(), is(23));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class EventsInProcessCounterProviderTest {

@Mock
private EventQueueProcessingConfig eventQueueProcessingConfig;

@InjectMocks
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Test
public void shouldCreateWithCorrectMaxTotalEventsInProcess() throws Exception {

final int maxTotalEventsInProcess = 982734;

when(eventQueueProcessingConfig.getMaxTotalEventsInProcess()).thenReturn(maxTotalEventsInProcess);

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

assertThat(getValueOfField(eventsInProcessCounter, "maxTotalEventsInProcess", Integer.class), is(maxTotalEventsInProcess));
}

@Test
public void shouldAlwaysReturnTheSameInstance() throws Exception {

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
}
}
Loading