Skip to content

Commit

Permalink
Merge a8d9bc4 into 13aeca1
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Dec 5, 2019
2 parents 13aeca1 + a8d9bc4 commit 2c1fd4b
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- Backpressure added to the event processing queues during catchup
### Fixed
- Verification completion log message now correctly logs if verification of Catchup or of Rebuild

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static java.lang.Thread.currentThread;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.EventQueueConsumer;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
Expand All @@ -26,6 +28,9 @@ public class ConcurrentEventStreamConsumerManager implements EventStreamConsumer

private final ConcurrentHashMap<UUID, Queue<PublishedEvent>> allEventStreams = new ConcurrentHashMap<>();

@Inject
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Inject
private EventStreamsInProgressList eventStreamsInProgressList;

Expand Down Expand Up @@ -61,12 +66,26 @@ public int add(

final Queue<PublishedEvent> events = allEventStreams.computeIfAbsent(streamId, id -> new ConcurrentLinkedQueue<>());

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {

while (eventsInProcessCounter.maxNumberOfEventsInProcess()) {
try {
EXCLUSIVE_LOCK.wait();
} catch (final InterruptedException e) {
currentThread().interrupt();
break;
}
}

events.offer(publishedEvent);

if (notInProgress(events)) {
createAndSubmitTaskFor(events, subscriptionName, catchupCommand, commandId);
}

eventsInProcessCounter.incrementEventsInProcessCount();
}

return 1;
Expand Down Expand Up @@ -99,6 +118,17 @@ public void waitForCompletion() {
eventStreamsInProgressList.blockUntilEmpty();
}

@Override
public void decrementEventsInProcessCount() {

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

synchronized (EXCLUSIVE_LOCK) {
eventsInProcessCounter.decrementEventsInProcessCount();
EXCLUSIVE_LOCK.notify();
}
}

private boolean notInProgress(final Queue<PublishedEvent> eventStream) {
return !eventStreamsInProgressList.contains(eventStream);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static java.lang.Integer.parseInt;

import uk.gov.justice.services.common.configuration.GlobalValue;

import javax.annotation.Priority;
import javax.enterprise.inject.Alternative;
import javax.inject.Inject;

@Alternative
@Priority(100)
public class DefaultEventQueueProcessingConfig implements EventQueueProcessingConfig {

@Inject
@GlobalValue(key = "catchup.event.processing.max.total.events.in.process", defaultValue = "100000")
private String maxTotalEventsInProcess;

@Override
public int getMaxTotalEventsInProcess() {
return parseInt(maxTotalEventsInProcess);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

public interface EventQueueProcessingConfig {
int getMaxTotalEventsInProcess();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface EventStreamConsumptionResolver {
* @return true if all events are consumed, false if there are still events remaining in the queue.
*/
boolean isEventConsumptionComplete(final FinishedProcessingMessage finishedProcessingMessage);

void decrementEventsInProcessCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import java.util.concurrent.atomic.AtomicInteger;

public class EventsInProcessCounter {

private final int maxTotalEventsInProcess;
private AtomicInteger eventInProcessCount = new AtomicInteger(0);

public EventsInProcessCounter(final int maxTotalEventsInProcess) {
this.maxTotalEventsInProcess = maxTotalEventsInProcess;
}

public synchronized void incrementEventsInProcessCount() {
eventInProcessCount.incrementAndGet();
}

public synchronized void decrementEventsInProcessCount() {
eventInProcessCount.decrementAndGet();
}

public synchronized boolean maxNumberOfEventsInProcess() {
return eventInProcessCount.get() >= maxTotalEventsInProcess;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import java.util.concurrent.ConcurrentHashMap;

import javax.inject.Inject;

public class EventsInProcessCounterProvider {

@Inject
private EventQueueProcessingConfig eventQueueProcessingConfig;

private ConcurrentHashMap<String, EventsInProcessCounter> concurrentHashMap = new ConcurrentHashMap<>();

public EventsInProcessCounter getInstance() {
return concurrentHashMap.computeIfAbsent("default", this::newInstance);
}

private EventsInProcessCounter newInstance(String s) {
return new EventsInProcessCounter(eventQueueProcessingConfig.getMaxTotalEventsInProcess());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public boolean consumeEventQueue(
final Queue<PublishedEvent> events,
final String subscriptionName,
final CatchupCommand catchupCommand) {

while (!events.isEmpty()) {
final PublishedEvent publishedEvent = events.poll();

final PublishedEvent publishedEvent = events.poll();
try {
transactionalEventProcessor.processWithEventBuffer(publishedEvent, subscriptionName);
} catch (final RuntimeException e) {
eventProcessingFailedHandler.handle(e, publishedEvent, subscriptionName, catchupCommand, commandId);
} finally {
eventStreamConsumptionResolver.decrementEventsInProcessCount();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import static uk.gov.justice.services.core.postgres.OpenEjbConfigurationBuilder.createOpenEjbConfigurationBuilder;

import uk.gov.justice.services.cdi.LoggerProducer;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.DummyEventQueueProcessingConfig;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.ConcurrentEventStreamConsumerManager;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueConsumerFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventStreamsInProgressList;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventsInProcessCounterProvider;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueBean;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskFactory;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.task.ConsumeEventQueueTaskManager;
Expand Down Expand Up @@ -57,7 +59,9 @@ public class EventStreamCatchupIT {
ConcurrentEventStreamConsumerManager.class,
EventProcessingFailedHandler.class,
ConsumeEventQueueTaskManager.class,
ConsumeEventQueueTaskFactory.class
ConsumeEventQueueTaskFactory.class,
EventsInProcessCounterProvider.class,
DummyEventQueueProcessingConfig.class
})
public WebApp war() {
return new WebApp()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer;

import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.DefaultEventQueueProcessingConfig;
import uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager.EventQueueProcessingConfig;

import org.apache.openejb.testing.Default;

@Default
public class DummyEventQueueProcessingConfig implements EventQueueProcessingConfig {

@Override
public int getMaxTotalEventsInProcess() {
return 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
@RunWith(MockitoJUnitRunner.class)
public class ConcurrentEventStreamConsumerManagerTest {

@Mock
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Mock
private ConsumeEventQueueTaskManager consumeEventQueueTaskManager;

Expand All @@ -56,7 +59,10 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
final PublishedEvent publishedEvent = mock(PublishedEvent.class);

final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent.getStreamId()).thenReturn(streamId);

Expand All @@ -68,6 +74,7 @@ public void shouldCreateQueueAndCreateTaskToConsumeQueueForNewStreamId() {
assertThat(events.size(), is(1));
assertThat(events.poll(), is(publishedEvent));

verify(eventsInProcessCounter).incrementEventsInProcessCount();
}

@Test
Expand All @@ -81,6 +88,10 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);
final CatchupCommand catchupCommand = new EventCatchupCommand();

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId);
when(publishedEvent_2.getStreamId()).thenReturn(streamId);
Expand All @@ -94,6 +105,8 @@ public void shouldNotCreateQueueOrCreateTaskIfEventIsSameStreamId() {
assertThat(eventsStream.size(), is(2));
assertThat(eventsStream.poll(), is(publishedEvent_1));
assertThat(eventsStream.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -108,6 +121,10 @@ public void shouldCreateQueueForEachStreamId() {
final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class);
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
Expand All @@ -126,6 +143,8 @@ public void shouldCreateQueueForEachStreamId() {
final Queue<PublishedEvent> eventsStream_2 = allValues.get(1);
assertThat(eventsStream_2.size(), is(1));
assertThat(eventsStream_2.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -140,6 +159,10 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
final PublishedEvent publishedEvent_2 = mock(PublishedEvent.class);
final EventQueueConsumer eventQueueConsumer = mock(EventQueueConsumer.class);

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);
when(eventsInProcessCounter.maxNumberOfEventsInProcess()).thenReturn(false);
when(eventQueueConsumerFactory.create(concurrentEventStreamConsumerManager)).thenReturn(eventQueueConsumer);
when(publishedEvent_1.getStreamId()).thenReturn(streamId_1);
when(publishedEvent_2.getStreamId()).thenReturn(streamId_2);
Expand All @@ -160,6 +183,8 @@ public void shouldBeAbleToFinishQueueAndAllowAnotherProcessToPickupQueueIfNotEmp
final Queue<PublishedEvent> eventsStream_2 = eventQueueCaptor.getValue();
assertThat(eventsStream_2.size(), is(1));
assertThat(eventsStream_2.poll(), is(publishedEvent_2));

verify(eventsInProcessCounter, times(2)).incrementEventsInProcessCount();
}

@Test
Expand All @@ -169,4 +194,16 @@ public void shouldBlockOnTheEventsStreamInProgressListWhenWaitingForCompletion()

verify(eventStreamsInProgressList).blockUntilEmpty();
}

@Test
public void shouldDecrementTheEventsInProcessCount() throws Exception {

final EventsInProcessCounter eventsInProcessCounter = mock(EventsInProcessCounter.class);

when(eventsInProcessCounterProvider.getInstance()).thenReturn(eventsInProcessCounter);

concurrentEventStreamConsumerManager.decrementEventsInProcessCount();

verify(eventsInProcessCounter).decrementEventsInProcessCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.setField;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class DefaultEventQueueProcessingConfigTest {

@InjectMocks
private DefaultEventQueueProcessingConfig defaultEventQueueProcessingConfig;

@Test
public void shouldGetTheInjectedJndiValue() throws Exception {

setField(defaultEventQueueProcessingConfig, "maxTotalEventsInProcess", "23");

assertThat(defaultEventQueueProcessingConfig.getMaxTotalEventsInProcess(), is(23));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package uk.gov.justice.services.event.sourcing.subscription.catchup.consumer.manager;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class EventsInProcessCounterProviderTest {

@Mock
private EventQueueProcessingConfig eventQueueProcessingConfig;

@InjectMocks
private EventsInProcessCounterProvider eventsInProcessCounterProvider;

@Test
public void shouldCreateWithCorrectMaxTotalEventsInProcess() throws Exception {

final int maxTotalEventsInProcess = 982734;

when(eventQueueProcessingConfig.getMaxTotalEventsInProcess()).thenReturn(maxTotalEventsInProcess);

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

assertThat(getValueOfField(eventsInProcessCounter, "maxTotalEventsInProcess", Integer.class), is(maxTotalEventsInProcess));
}

@Test
public void shouldAlwaysReturnTheSameInstance() throws Exception {

final EventsInProcessCounter eventsInProcessCounter = eventsInProcessCounterProvider.getInstance();

assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
assertThat(eventsInProcessCounterProvider.getInstance(), is(sameInstance(eventsInProcessCounter)));
}
}
Loading

0 comments on commit 2c1fd4b

Please sign in to comment.