From c6ca388779fe6d83038c532459c54125dccafa6f Mon Sep 17 00:00:00 2001 From: amckenzie Date: Sun, 6 Oct 2019 10:38:18 +0100 Subject: [PATCH] Remove events from shuttering --- CHANGELOG.md | 5 + ...HandlerQueueDrainedShutteringExecutor.java | 74 ++++++++++++ ...PublishQueueDrainedShutteringExecutor.java | 74 ++++++++++++ ...PublishQueueDrainedShutteringObserver.java | 62 ---------- .../ShutterCommandHandlerObserver.java | 60 ---------- .../commands/RebuildCommandHandlerTest.java | 2 - ...lerQueueDrainedShutteringExecutorTest.java | 109 ++++++++++++++++++ ...ishQueueDrainedShutteringExecutorTest.java | 103 +++++++++++++++++ ...ishQueueDrainedShutteringObserverTest.java | 99 ---------------- .../ShutterCommandHandlerObserverTest.java | 107 ----------------- pom.xml | 2 +- 11 files changed, 366 insertions(+), 331 deletions(-) create mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutor.java create mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutor.java delete mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserver.java delete mode 100644 event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserver.java create mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutorTest.java create mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutorTest.java delete mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserverTest.java delete mode 100644 event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserverTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e958d1d66..ee051f451 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +## [2.2.0-M3] - 2018-10-09 +### Changed +- Updated framework t0 6.2.0-M3 +- Converted ShutteringExecutors to use the new ShutteringExecutor interface + ## [2.2.0-M2] - 2018-10-02 ### Added - Added commandId to all SystemEvents diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutor.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutor.java new file mode 100644 index 000000000..0c78d877e --- /dev/null +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutor.java @@ -0,0 +1,74 @@ +package uk.gov.justice.services.eventstore.management.shuttering.observers; + +import static java.lang.String.format; +import static uk.gov.justice.services.management.shuttering.api.ShutteringResult.shutteringFailed; +import static uk.gov.justice.services.management.shuttering.api.ShutteringResult.shutteringSucceeded; + +import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; +import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator; +import uk.gov.justice.services.jmx.api.command.SystemCommand; +import uk.gov.justice.services.management.shuttering.api.ShutteringExecutor; +import uk.gov.justice.services.management.shuttering.api.ShutteringResult; + +import java.util.UUID; + +import javax.inject.Inject; + +import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; + +public class CommandHandlerQueueDrainedShutteringExecutor implements ShutteringExecutor { + + @Inject + private CommandHandlerQueueInterrogator commandHandlerQueueInterrogator; + + @Inject + private StopWatchFactory stopWatchFactory; + + @Inject + private Logger logger; + + @Override + public boolean shouldShutter() { + return true; + } + + @Override + public ShutteringResult shutter(final UUID commandId, final SystemCommand systemCommand) { + + logger.info("Shuttering Command Handler. Waiting for queue to drain"); + + final StopWatch stopWatch = stopWatchFactory.createStartedStopWatch(); + + try { + final boolean queueEmpty = commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue(); + + if (queueEmpty) { + + final String message = "Command Handler Queue drained successfully"; + logger.info(message); + + return shutteringSucceeded( + getName(), + commandId, + message, + systemCommand + ); + } + + stopWatch.stop(); + + final String message = format("Failed to drain command handler queue in %d milliseconds", stopWatch.getTime()); + logger.error(message); + + return shutteringFailed( + getName(), + commandId, + message, + systemCommand + ); + } finally { + stopWatch.stop(); + } + } +} diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutor.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutor.java new file mode 100644 index 000000000..18ace6921 --- /dev/null +++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutor.java @@ -0,0 +1,74 @@ +package uk.gov.justice.services.eventstore.management.shuttering.observers; + +import static java.lang.String.format; +import static uk.gov.justice.services.management.shuttering.api.ShutteringResult.shutteringFailed; +import static uk.gov.justice.services.management.shuttering.api.ShutteringResult.shutteringSucceeded; + +import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; +import uk.gov.justice.services.eventstore.management.shuttering.process.PublishQueueInterrogator; +import uk.gov.justice.services.jmx.api.command.SystemCommand; +import uk.gov.justice.services.management.shuttering.api.ShutteringExecutor; +import uk.gov.justice.services.management.shuttering.api.ShutteringResult; + +import java.util.UUID; + +import javax.inject.Inject; + +import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; + +public class PublishQueueDrainedShutteringExecutor implements ShutteringExecutor { + + @Inject + private PublishQueueInterrogator publishQueueInterrogator; + + @Inject + private StopWatchFactory stopWatchFactory; + + @Inject + private Logger logger; + + @Override + public boolean shouldShutter() { + return true; + } + + @Override + public ShutteringResult shutter(final UUID commandId, final SystemCommand systemCommand) { + + logger.info("Waiting for Publish Queue to empty"); + final StopWatch stopWatch = stopWatchFactory.createStartedStopWatch(); + + try { + final boolean queueEmpty = publishQueueInterrogator.pollUntilPublishQueueEmpty(); + + if (queueEmpty) { + final String message = "Publish Queue drained successfully"; + logger.info(message); + + return shutteringSucceeded( + getName(), + commandId, + message, + systemCommand + ); + } + + stopWatch.stop(); + + final String message = format("PublishQueue failed to drain after %d milliseconds", stopWatch.getTime()); + + logger.error(message); + + return shutteringFailed( + getName(), + commandId, + message, + systemCommand + ); + + } finally { + stopWatch.stop(); + } + } +} diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserver.java deleted file mode 100644 index fcfd7f7c3..000000000 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserver.java +++ /dev/null @@ -1,62 +0,0 @@ -package uk.gov.justice.services.eventstore.management.shuttering.observers; - -import static java.lang.String.format; - -import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; -import uk.gov.justice.services.eventstore.management.shuttering.process.PublishQueueInterrogator; -import uk.gov.justice.services.jmx.logging.MdcLogger; -import uk.gov.justice.services.management.shuttering.events.ShutteringProcessStartedEvent; -import uk.gov.justice.services.management.shuttering.observers.shuttering.ShutteringRegistry; -import uk.gov.justice.services.management.shuttering.startup.ShutteringExecutor; - -import java.util.UUID; - -import javax.enterprise.event.Observes; -import javax.inject.Inject; - -import org.apache.commons.lang3.time.StopWatch; -import org.slf4j.Logger; - -@ShutteringExecutor -public class PublishQueueDrainedShutteringObserver { - - @Inject - private PublishQueueInterrogator publishQueueInterrogator; - - @Inject - private StopWatchFactory stopWatchFactory; - - @Inject - private ShutteringRegistry shutteringRegistry; - - @Inject - private MdcLogger mdcLogger; - - @Inject - private Logger logger; - - - // TODO: error handling here, extract to its own class - public void waitForPublishQueueToEmpty(@Observes final ShutteringProcessStartedEvent shutteringProcessStartedEvent) { - - mdcLogger.mdcLoggerConsumer().accept(() -> { - - final UUID commandId = shutteringProcessStartedEvent.getCommandId(); - logger.info("Waiting for Publish Queue to empty"); - - final StopWatch stopWatch = stopWatchFactory.createStartedStopWatch(); - final boolean queueEmpty = publishQueueInterrogator.pollUntilPublishQueueEmpty(); - - if (!queueEmpty) { - stopWatch.stop(); - throw new ShutteringException(format("PublishQueue failed to drain after %d milliseconds", stopWatch.getTime())); - } - - logger.info("Publish Queue empty"); - shutteringRegistry.markShutteringCompleteFor( - commandId, - getClass(), - shutteringProcessStartedEvent.getTarget()); - }); - } -} diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserver.java deleted file mode 100644 index fef5cd8b8..000000000 --- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserver.java +++ /dev/null @@ -1,60 +0,0 @@ -package uk.gov.justice.services.eventstore.management.shuttering.observers; - -import static java.lang.String.format; - -import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; -import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator; -import uk.gov.justice.services.jmx.logging.MdcLogger; -import uk.gov.justice.services.management.shuttering.events.ShutteringProcessStartedEvent; -import uk.gov.justice.services.management.shuttering.observers.shuttering.ShutteringRegistry; -import uk.gov.justice.services.management.shuttering.startup.ShutteringExecutor; - -import java.util.UUID; - -import javax.enterprise.event.Observes; -import javax.inject.Inject; - -import org.apache.commons.lang3.time.StopWatch; -import org.slf4j.Logger; - -@ShutteringExecutor -public class ShutterCommandHandlerObserver { - - @Inject - private ShutteringRegistry shutteringRegistry; - - @Inject - private CommandHandlerQueueInterrogator commandHandlerQueueInterrogator; - - @Inject - private StopWatchFactory stopWatchFactory; - - @Inject - private MdcLogger mdcLogger; - - @Inject - private Logger logger; - - public void waitForCommandQueueToEmpty(@Observes final ShutteringProcessStartedEvent shutteringProcessStartedEvent) { - - mdcLogger.mdcLoggerConsumer().accept(() -> { - - final UUID commandId = shutteringProcessStartedEvent.getCommandId(); - logger.info("Shuttering Command Handler. Waiting for queue to drain"); - - final StopWatch stopWatch = stopWatchFactory.createStartedStopWatch(); - - final boolean queueEmpty = commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue(); - if (!queueEmpty) { - stopWatch.stop(); - throw new ShutteringException(format("Failed to drain command handler queue in %d milliseconds", stopWatch.getTime())); - } - - logger.info("Command Handler Queue empty"); - shutteringRegistry.markShutteringCompleteFor( - commandId, - getClass(), - shutteringProcessStartedEvent.getTarget()); - }); - } -} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/rebuild/commands/RebuildCommandHandlerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/rebuild/commands/RebuildCommandHandlerTest.java index ab4e87754..fe97040ac 100644 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/rebuild/commands/RebuildCommandHandlerTest.java +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/rebuild/commands/RebuildCommandHandlerTest.java @@ -9,8 +9,6 @@ import uk.gov.justice.services.common.util.UtcClock; import uk.gov.justice.services.eventstore.management.events.rebuild.RebuildRequestedEvent; import uk.gov.justice.services.jmx.api.command.RebuildCommand; -import uk.gov.justice.services.management.shuttering.events.ShutteringRequestedEvent; -import uk.gov.justice.services.management.shuttering.events.UnshutteringRequestedEvent; import java.time.ZonedDateTime; import java.util.UUID; diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutorTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutorTest.java new file mode 100644 index 000000000..fb9835c65 --- /dev/null +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainedShutteringExecutorTest.java @@ -0,0 +1,109 @@ +package uk.gov.justice.services.eventstore.management.shuttering.observers; + +import static java.util.Optional.empty; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_COMPLETE; +import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_FAILED; + +import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; +import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator; +import uk.gov.justice.services.jmx.api.command.SystemCommand; +import uk.gov.justice.services.management.shuttering.api.ShutteringResult; + +import java.util.UUID; + +import org.apache.commons.lang3.time.StopWatch; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + +@RunWith(MockitoJUnitRunner.class) +public class CommandHandlerQueueDrainedShutteringExecutorTest { + + @Mock + private CommandHandlerQueueInterrogator commandHandlerQueueInterrogator; + + @Mock + private StopWatchFactory stopWatchFactory; + + @Mock + private Logger logger; + + @InjectMocks + private CommandHandlerQueueDrainedShutteringExecutor commandHandlerQueueDrainedShutteringExecutor; + + @Test + public void shouldShutterButNotUnshutter() throws Exception { + + assertThat(commandHandlerQueueDrainedShutteringExecutor.shouldShutter(), is(true)); + assertThat(commandHandlerQueueDrainedShutteringExecutor.shouldUnshutter(), is(false)); + } + + @Test + public void shouldWaitForCommandHandlerQueueToDrainAndReturnSuccess() throws Exception { + + final UUID commandId = UUID.randomUUID(); + final StopWatch stopWatch = mock(StopWatch.class); + final SystemCommand systemCommand = mock(SystemCommand.class); + + when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); + when(commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue()).thenReturn(true); + + + final ShutteringResult shutteringResult = commandHandlerQueueDrainedShutteringExecutor.shutter(commandId, systemCommand); + + assertThat(shutteringResult.getCommandId(), is(commandId)); + assertThat(shutteringResult.getCommandState(), is(COMMAND_COMPLETE)); + assertThat(shutteringResult.getSystemCommand(), is(systemCommand)); + assertThat(shutteringResult.getShutteringExecutorName(), is("CommandHandlerQueueDrainedShutteringExecutor")); + assertThat(shutteringResult.getMessage(), is("Command Handler Queue drained successfully")); + + final InOrder inOrder = inOrder( + logger, + stopWatch, + commandHandlerQueueInterrogator); + + inOrder.verify(logger).info("Shuttering Command Handler. Waiting for queue to drain"); + inOrder.verify(commandHandlerQueueInterrogator).pollUntilEmptyHandlerQueue(); + inOrder.verify(logger).info("Command Handler Queue drained successfully"); + } + + @Test + public void shouldReturnFailureIfQueueDoesNotDrainInTime() throws Exception { + + final UUID commandId = UUID.randomUUID(); + final StopWatch stopWatch = mock(StopWatch.class); + final SystemCommand systemCommand = mock(SystemCommand.class); + + when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); + when(commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue()).thenReturn(false); + when(stopWatch.getTime()).thenReturn(12345L); + + final ShutteringResult shutteringResult = commandHandlerQueueDrainedShutteringExecutor.shutter(commandId, systemCommand); + + assertThat(shutteringResult.getCommandId(), is(commandId)); + assertThat(shutteringResult.getCommandState(), is(COMMAND_FAILED)); + assertThat(shutteringResult.getSystemCommand(), is(systemCommand)); + assertThat(shutteringResult.getShutteringExecutorName(), is("CommandHandlerQueueDrainedShutteringExecutor")); + assertThat(shutteringResult.getMessage(), is("Failed to drain command handler queue in 12345 milliseconds")); + assertThat(shutteringResult.getException(), is(empty())); + + final InOrder inOrder = inOrder( + logger, + stopWatch, + commandHandlerQueueInterrogator); + + inOrder.verify(logger).info("Shuttering Command Handler. Waiting for queue to drain"); + inOrder.verify(commandHandlerQueueInterrogator).pollUntilEmptyHandlerQueue(); + inOrder.verify(stopWatch).stop(); + inOrder.verify(logger).error("Failed to drain command handler queue in 12345 milliseconds"); + } +} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutorTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutorTest.java new file mode 100644 index 000000000..3fde139fd --- /dev/null +++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringExecutorTest.java @@ -0,0 +1,103 @@ +package uk.gov.justice.services.eventstore.management.shuttering.observers; + +import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_COMPLETE; +import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_FAILED; + +import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; +import uk.gov.justice.services.eventstore.management.shuttering.process.PublishQueueInterrogator; +import uk.gov.justice.services.jmx.api.command.SystemCommand; +import uk.gov.justice.services.management.shuttering.api.ShutteringResult; + +import java.util.UUID; + +import org.apache.commons.lang3.time.StopWatch; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.slf4j.Logger; + +@RunWith(MockitoJUnitRunner.class) +public class PublishQueueDrainedShutteringExecutorTest { + + @Mock + private PublishQueueInterrogator publishQueueInterrogator; + + @Mock + private StopWatchFactory stopWatchFactory; + + @Mock + private Logger logger; + + @InjectMocks + private PublishQueueDrainedShutteringExecutor publishQueueDrainedShutteringExecutor; + + @Test + public void shouldShutterButNotUnshutter() throws Exception { + + assertThat(publishQueueDrainedShutteringExecutor.shouldShutter(), is(true)); + assertThat(publishQueueDrainedShutteringExecutor.shouldUnshutter(), is(false)); + } + + @Test + public void shouldWaitForPublishQueueToEmptyAndReturnSuccess() throws Exception { + + final UUID commandId = randomUUID(); + final SystemCommand systemCommand = mock(SystemCommand.class); + final StopWatch stopWatch = mock(StopWatch.class); + + when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); + when(publishQueueInterrogator.pollUntilPublishQueueEmpty()).thenReturn(true); + + final ShutteringResult shutteringResult = publishQueueDrainedShutteringExecutor.shutter(commandId, systemCommand); + + assertThat(shutteringResult.getCommandId(), is(commandId)); + assertThat(shutteringResult.getCommandState(), is(COMMAND_COMPLETE)); + assertThat(shutteringResult.getSystemCommand(), is(systemCommand)); + assertThat(shutteringResult.getShutteringExecutorName(), is("PublishQueueDrainedShutteringExecutor")); + assertThat(shutteringResult.getMessage(), is("Publish Queue drained successfully")); + + final InOrder inOrder = inOrder(logger, publishQueueInterrogator); + + inOrder.verify(logger).info("Waiting for Publish Queue to empty"); + inOrder.verify(publishQueueInterrogator).pollUntilPublishQueueEmpty(); + inOrder.verify(logger).info("Publish Queue drained successfully"); + + verify(stopWatch).stop(); + } + + @Test + public void shouldReturnFailureIfThePublishQueueFailsToDrainInTime() throws Exception { + + final UUID commandId = randomUUID(); + final SystemCommand systemCommand = mock(SystemCommand.class); + final StopWatch stopWatch = mock(StopWatch.class); + + when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); + when(publishQueueInterrogator.pollUntilPublishQueueEmpty()).thenReturn(false); + when(stopWatch.getTime()).thenReturn(1234L); + + final ShutteringResult shutteringResult = publishQueueDrainedShutteringExecutor.shutter(commandId, systemCommand); + + assertThat(shutteringResult.getCommandId(), is(commandId)); + assertThat(shutteringResult.getCommandState(), is(COMMAND_FAILED)); + assertThat(shutteringResult.getSystemCommand(), is(systemCommand)); + assertThat(shutteringResult.getShutteringExecutorName(), is("PublishQueueDrainedShutteringExecutor")); + assertThat(shutteringResult.getMessage(), is("PublishQueue failed to drain after 1234 milliseconds")); + + final InOrder inOrder = inOrder(stopWatch, logger); + + inOrder.verify(stopWatch).stop(); + inOrder.verify(stopWatch).getTime(); + inOrder.verify(logger).error("PublishQueue failed to drain after 1234 milliseconds"); + } +} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserverTest.java deleted file mode 100644 index 7f70789f3..000000000 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/PublishQueueDrainedShutteringObserverTest.java +++ /dev/null @@ -1,99 +0,0 @@ -package uk.gov.justice.services.eventstore.management.shuttering.observers; - -import static java.util.UUID.randomUUID; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; -import uk.gov.justice.services.eventstore.management.shuttering.process.PublishQueueInterrogator; -import uk.gov.justice.services.jmx.api.command.SystemCommand; -import uk.gov.justice.services.jmx.logging.MdcLogger; -import uk.gov.justice.services.management.shuttering.events.ShutteringProcessStartedEvent; -import uk.gov.justice.services.management.shuttering.observers.shuttering.ShutteringRegistry; - -import java.util.UUID; -import java.util.function.Consumer; - -import org.apache.commons.lang3.time.StopWatch; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InOrder; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; -import org.slf4j.Logger; - - -@RunWith(MockitoJUnitRunner.class) -public class PublishQueueDrainedShutteringObserverTest { - - @Mock - private PublishQueueInterrogator publishQueueInterrogator; - - @Mock - private StopWatchFactory stopWatchFactory; - - @Mock - private ShutteringRegistry shutteringRegistry; - - @Mock - private MdcLogger mdcLogger; - - @Mock - private Logger logger; - - @InjectMocks - private PublishQueueDrainedShutteringObserver publishQueueDrainedShutteringObserver; - - private Consumer testConsumer = Runnable::run; - - @Test - public void shouldWaitForPublishQueueToEmpty() throws Exception { - - final UUID commandId = randomUUID(); - final SystemCommand systemCommand = mock(SystemCommand.class); - final ShutteringProcessStartedEvent shutteringProcessStartedEvent = mock(ShutteringProcessStartedEvent.class); - - when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); - when(shutteringProcessStartedEvent.getCommandId()).thenReturn(commandId); - when(shutteringProcessStartedEvent.getTarget()).thenReturn(systemCommand); - when(publishQueueInterrogator.pollUntilPublishQueueEmpty()).thenReturn(true); - - publishQueueDrainedShutteringObserver.waitForPublishQueueToEmpty(shutteringProcessStartedEvent); - - final InOrder inOrder = inOrder(logger, publishQueueInterrogator, shutteringRegistry); - - inOrder.verify(logger).info("Waiting for Publish Queue to empty"); - inOrder.verify(publishQueueInterrogator).pollUntilPublishQueueEmpty(); - inOrder.verify(logger).info("Publish Queue empty"); - inOrder.verify(shutteringRegistry).markShutteringCompleteFor(commandId, PublishQueueDrainedShutteringObserver.class, systemCommand); - } - - @Test - public void shouldThrowExceptionIfThePublishQueueFailsToDrain() throws Exception { - - final SystemCommand systemCommand = mock(SystemCommand.class); - final ShutteringProcessStartedEvent shutteringProcessStartedEvent = mock(ShutteringProcessStartedEvent.class); - final StopWatch stopWatch = mock(StopWatch.class); - - when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); - when(shutteringProcessStartedEvent.getTarget()).thenReturn(systemCommand); - when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); - when(publishQueueInterrogator.pollUntilPublishQueueEmpty()).thenReturn(false); - when(stopWatch.getTime()).thenReturn(1234L); - - try { - publishQueueDrainedShutteringObserver.waitForPublishQueueToEmpty(shutteringProcessStartedEvent); - fail(); - } catch (final ShutteringException expected) { - assertThat(expected.getMessage(), is("PublishQueue failed to drain after 1234 milliseconds")); - } - - verify(stopWatch).stop(); - } -} diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserverTest.java deleted file mode 100644 index afbd325b3..000000000 --- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/ShutterCommandHandlerObserverTest.java +++ /dev/null @@ -1,107 +0,0 @@ -package uk.gov.justice.services.eventstore.management.shuttering.observers; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory; -import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator; -import uk.gov.justice.services.jmx.api.command.SystemCommand; -import uk.gov.justice.services.jmx.logging.MdcLogger; -import uk.gov.justice.services.management.shuttering.events.ShutteringProcessStartedEvent; -import uk.gov.justice.services.management.shuttering.observers.shuttering.ShutteringRegistry; - -import java.util.UUID; -import java.util.function.Consumer; - -import org.apache.commons.lang3.time.StopWatch; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InOrder; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; -import org.slf4j.Logger; - -@RunWith(MockitoJUnitRunner.class) -public class ShutterCommandHandlerObserverTest { - - @Mock - private ShutteringRegistry shutteringRegistry; - - @Mock - private CommandHandlerQueueInterrogator commandHandlerQueueInterrogator; - - @Mock - private StopWatchFactory stopWatchFactory; - - @Mock - private MdcLogger mdcLogger; - - @Mock - private Logger logger; - - @InjectMocks - private ShutterCommandHandlerObserver shutterCommandHandlerObserver; - - private Consumer testConsumer = Runnable::run; - - @Test - public void shouldPollUntilQueueEmptyThenInformTheShutteringRegistry() throws Exception { - - final UUID commandId = UUID.randomUUID(); - final StopWatch stopWatch = mock(StopWatch.class); - final SystemCommand systemCommand = mock(SystemCommand.class); - final ShutteringProcessStartedEvent shutteringProcessStartedEvent = mock(ShutteringProcessStartedEvent.class); - - when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); - when(shutteringProcessStartedEvent.getCommandId()).thenReturn(commandId); - when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); - when(commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue()).thenReturn(true); - when(shutteringProcessStartedEvent.getTarget()).thenReturn(systemCommand); - - - shutterCommandHandlerObserver.waitForCommandQueueToEmpty(shutteringProcessStartedEvent); - - final InOrder inOrder = inOrder( - logger, - stopWatch, - commandHandlerQueueInterrogator, - shutteringRegistry); - - inOrder.verify(logger).info("Shuttering Command Handler. Waiting for queue to drain"); - inOrder.verify(commandHandlerQueueInterrogator).pollUntilEmptyHandlerQueue(); - inOrder.verify(logger).info("Command Handler Queue empty"); - inOrder.verify(shutteringRegistry).markShutteringCompleteFor(commandId, ShutterCommandHandlerObserver.class, systemCommand); - } - - @Test - public void shouldThrowExceptionIfQueueDoesNotDrain() throws Exception { - - final StopWatch stopWatch = mock(StopWatch.class); - final SystemCommand systemCommand = mock(SystemCommand.class); - final ShutteringProcessStartedEvent shutteringProcessStartedEvent = mock(ShutteringProcessStartedEvent.class); - - when(mdcLogger.mdcLoggerConsumer()).thenReturn(testConsumer); - when(stopWatchFactory.createStartedStopWatch()).thenReturn(stopWatch); - when(commandHandlerQueueInterrogator.pollUntilEmptyHandlerQueue()).thenReturn(false); - when(shutteringProcessStartedEvent.getTarget()).thenReturn(systemCommand); - when(stopWatch.getTime()).thenReturn(12345L); - - try { - shutterCommandHandlerObserver.waitForCommandQueueToEmpty(shutteringProcessStartedEvent); - fail(); - } catch (final ShutteringException expected) { - assertThat(expected.getMessage(), is("Failed to drain command handler queue in 12345 milliseconds")); - } - - verify(stopWatch).stop(); - - verifyZeroInteractions(shutteringRegistry); - } -} diff --git a/pom.xml b/pom.xml index 93aca0778..f3183cfd1 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ event-store 4.1.0 - 6.2.0-M2 + 6.2.0-M3 2.4.0 1.20.1 1.24.3