diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0f6cbd327..e60517e5b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,28 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
- Updated framework-api to 4.2.1
- Database cleaner updated to handle the latest system database tables
+## [2.4.8] - 2020-01-29
+### Changed
+- Inserts into the event-buffer no longer fails if there is a conflict; it just logs a warning
+
+## [2.4.7] - 2020-01-24
+### Changed
+- Event store now works with multiple event sources
+- Event store now compatible with contexts that do not have a command pillar
+- Extracted all command pillar SystemCommands into their own module
+
+## [2.4.6] - 2020-01-21
+### Added
+- Catchup for multiple components now run in order of component and subscription priority
+- Added event source name to catchup logger output
+### Fixed
+- Fixed catchup error where catchup was marked as complete after all subscriptions rather than all components
+
+## [2.4.5] - 2020-01-06
+### Removed
+- Remove mechanism to also drop/add trigger on SUSPEND/UNSUSPEND as it causes
+many strange ejb database errors
+
## [2.4.4] - 2020-01-06
### Added
- Added mechanism to also drop/add trigger to event_log table on SUSPEND/UNSUSPEND commands
diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepository.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepository.java
index 5efc3776b..40a05db6a 100644
--- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepository.java
+++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepository.java
@@ -19,10 +19,12 @@
import javax.inject.Inject;
import javax.sql.DataSource;
+import org.slf4j.Logger;
+
@ApplicationScoped
public class EventBufferJdbcRepository {
- private static final String INSERT = "INSERT INTO stream_buffer (stream_id, position, event, source, component) VALUES (?, ?, ?, ?, ?)";
+ private static final String INSERT = "INSERT INTO stream_buffer (stream_id, position, event, source, component) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING";
private static final String SELECT_STREAM_BUFFER_BY_STREAM_ID_SOURCE_AND_COMPONENT = "SELECT stream_id, position, event, source, component FROM stream_buffer WHERE stream_id=? AND source=? AND component=? ORDER BY position";
private static final String DELETE_BY_STREAM_ID_POSITION = "DELETE FROM stream_buffer WHERE stream_id=? AND position=? AND source=? AND component=?";
@@ -41,16 +43,21 @@ public class EventBufferJdbcRepository {
@Inject
private ViewStoreJdbcDataSourceProvider dataSourceProvider;
+ @Inject
+ private Logger logger;
+
private DataSource dataSource;
public EventBufferJdbcRepository() {}
public EventBufferJdbcRepository(final JdbcResultSetStreamer jdbcResultSetStreamer,
final PreparedStatementWrapperFactory preparedStatementWrapperFactory,
- final DataSource dataSource) {
+ final DataSource dataSource,
+ final Logger logger) {
this.jdbcResultSetStreamer = jdbcResultSetStreamer;
this.dataSource = dataSource;
this.preparedStatementWrapperFactory = preparedStatementWrapperFactory;
+ this.logger = logger;
}
@@ -67,7 +74,11 @@ public void insert(final EventBufferEvent bufferedEvent) {
ps.setString(3, bufferedEvent.getEvent());
ps.setString(4, bufferedEvent.getSource());
ps.setString(5, bufferedEvent.getComponent());
- ps.executeUpdate();
+ final int rowsUpdated = ps.executeUpdate();
+ if (rowsUpdated == 0){
+ logger.warn("Event already present in event buffer. Ignoring");
+ }
+
} catch (SQLException e) {
throw new JdbcRepositoryException(format("Exception while storing event in the buffer: %s", bufferedEvent), e);
}
diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryIT.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryIT.java
index 7ba952889..a10f60211 100644
--- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryIT.java
+++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryIT.java
@@ -7,6 +7,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
import uk.gov.justice.services.jdbc.persistence.JdbcResultSetStreamer;
import uk.gov.justice.services.jdbc.persistence.PreparedStatementWrapperFactory;
@@ -20,6 +21,7 @@
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
public class EventBufferJdbcRepositoryIT {
@@ -32,7 +34,8 @@ public void initDatabase() throws Exception {
eventBufferJdbcRepository = new EventBufferJdbcRepository(
new JdbcResultSetStreamer(),
new PreparedStatementWrapperFactory(),
- dataSource);
+ dataSource,
+ mock(Logger.class));
new DatabaseCleaner().cleanViewStoreTables("framework", "stream_buffer", "stream_status");
}
diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryTest.java
index c26195202..5396403ae 100644
--- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryTest.java
+++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streambuffer/EventBufferJdbcRepositoryTest.java
@@ -4,6 +4,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException;
@@ -29,11 +30,12 @@
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
+import org.slf4j.Logger;
@RunWith(MockitoJUnitRunner.class)
public class EventBufferJdbcRepositoryTest {
private static final String SELECT_STREAM_BUFFER_BY_STREAM_ID_SOURCE_AND_COMPONENT = "SELECT stream_id, position, event, source, component FROM stream_buffer WHERE stream_id=? AND source=? AND component=? ORDER BY position";
- private static final String INSERT = "INSERT INTO stream_buffer (stream_id, position, event, source, component) VALUES (?, ?, ?, ?, ?)";
+ private static final String INSERT = "INSERT INTO stream_buffer (stream_id, position, event, source, component) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO NOTHING";
private static final String DELETE_BY_STREAM_ID_POSITION = "DELETE FROM stream_buffer WHERE stream_id=? AND position=? AND source=? AND component=?";
@Spy
@@ -57,6 +59,9 @@ public class EventBufferJdbcRepositoryTest {
@Mock
private JdbcResultSetStreamer jdbcResultSetStreamer;
+ @Mock
+ private Logger logger;
+
@InjectMocks
private EventBufferJdbcRepository eventBufferJdbcRepository;
@@ -72,8 +77,29 @@ public void initDatabase() throws Exception {
public void shouldInsertEvent() throws SQLException {
final String source = "source";
- when(connection.prepareStatement(INSERT))
- .thenReturn(preparedStatement);
+ when(connection.prepareStatement(INSERT)).thenReturn(preparedStatement);
+ when(preparedStatement.executeUpdate()).thenReturn(1);
+
+ final UUID streamId = randomUUID();
+ final long position = 1l;
+ eventBufferJdbcRepository.insert(new EventBufferEvent(streamId, position, "eventVersion_2", source, EVENT_LISTENER));
+
+ verify(preparedStatement).setObject(1, streamId);
+ verify(preparedStatement).setLong(2, position);
+ verify(preparedStatement).setString(3, "eventVersion_2");
+ verify(preparedStatement).setString(4, source);
+ verify(preparedStatement).setString(5, EVENT_LISTENER);
+ verify(preparedStatement).executeUpdate();
+ verifyZeroInteractions(logger);
+ }
+
+ @Test
+ public void shouldWarnIfInsertDoesNothing() throws SQLException {
+ final String source = "source";
+
+
+ when(connection.prepareStatement(INSERT)).thenReturn(preparedStatement);
+ when(preparedStatement.executeUpdate()).thenReturn(0);
final UUID streamId = randomUUID();
final long position = 1l;
@@ -85,6 +111,7 @@ public void shouldInsertEvent() throws SQLException {
verify(preparedStatement).setString(4, source);
verify(preparedStatement).setString(5, EVENT_LISTENER);
verify(preparedStatement).executeUpdate();
+ verify(logger).warn("Event already present in event buffer. Ignoring");
}
@Test(expected = JdbcRepositoryException.class)
diff --git a/event-store-management/event-store-management-command-handler-extension/pom.xml b/event-store-management/event-store-management-command-handler-extension/pom.xml
new file mode 100644
index 000000000..39ac42650
--- /dev/null
+++ b/event-store-management/event-store-management-command-handler-extension/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+ event-store-management
+ uk.gov.justice.event-store
+ 2.5.0-SNAPSHOT
+
+ 4.0.0
+
+ event-store-management-command-handler-extension
+
+
+
+ javax
+ javaee-api
+ provided
+
+
+ uk.gov.justice.services
+ framework-management
+ ${framework.version}
+
+
+ uk.gov.justice.event-store
+ event-store-management-core
+ ${project.version}
+
+
+ uk.gov.justice.event-store
+ event-store-util
+ ${project.version}
+
+
+ uk.gov.justice.event-store
+ event-publisher-timer
+ ${project.version}
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
diff --git a/event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/AddTriggerCommand.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/AddTriggerCommand.java
similarity index 100%
rename from event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/AddTriggerCommand.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/AddTriggerCommand.java
diff --git a/event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/DisablePublishingCommand.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/DisablePublishingCommand.java
similarity index 100%
rename from event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/DisablePublishingCommand.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/DisablePublishingCommand.java
diff --git a/event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/EnablePublishingCommand.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/EnablePublishingCommand.java
similarity index 100%
rename from event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/EnablePublishingCommand.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/EnablePublishingCommand.java
diff --git a/event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/RemoveTriggerCommand.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/RemoveTriggerCommand.java
similarity index 100%
rename from event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/RemoveTriggerCommand.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/RemoveTriggerCommand.java
diff --git a/event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/ValidatePublishedEventsCommand.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/ValidatePublishedEventsCommand.java
similarity index 100%
rename from event-store-management/event-store-management-commands/src/main/java/uk/gov/justice/services/eventstore/management/commands/ValidatePublishedEventsCommand.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/commands/ValidatePublishedEventsCommand.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainer.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainer.java
similarity index 92%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainer.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainer.java
index 7670e1375..dd554784c 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainer.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainer.java
@@ -1,11 +1,10 @@
-package uk.gov.justice.services.eventstore.management.shuttering.observers;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import static java.lang.String.format;
import static uk.gov.justice.services.management.suspension.api.SuspensionResult.suspensionFailed;
import static uk.gov.justice.services.management.suspension.api.SuspensionResult.suspensionSucceeded;
import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory;
-import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator;
import uk.gov.justice.services.management.suspension.api.Suspendable;
import uk.gov.justice.services.management.suspension.api.SuspensionResult;
import uk.gov.justice.services.management.suspension.commands.SuspensionCommand;
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogator.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogator.java
similarity index 90%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogator.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogator.java
index 2276a8623..6ff5d823b 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogator.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogator.java
@@ -1,4 +1,4 @@
-package uk.gov.justice.services.eventstore.management.shuttering.process;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import uk.gov.justice.services.common.polling.MultiIteratingPoller;
import uk.gov.justice.services.common.polling.MultiIteratingPollerFactory;
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactory.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactory.java
similarity index 89%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactory.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactory.java
index 26aadd799..76d74c013 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactory.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactory.java
@@ -1,4 +1,4 @@
-package uk.gov.justice.services.eventstore.management.shuttering.process;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import uk.gov.justice.services.messaging.jms.JmsCommandHandlerDestinationNameProvider;
import uk.gov.justice.services.messaging.jms.JmsQueueBrowser;
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandler.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandler.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandler.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandler.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessor.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessor.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessor.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessor.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnabler.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnabler.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnabler.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnabler.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBean.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBean.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBean.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBean.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandler.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandler.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandler.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandler.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandler.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandler.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandler.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandler.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcess.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcess.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcess.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcess.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidator.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidator.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidator.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidator.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverter.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverter.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverter.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverter.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaException.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaException.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaException.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaException.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaIdException.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaIdException.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaIdException.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/MissingSchemaIdException.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinder.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinder.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinder.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinder.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProvider.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProvider.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProvider.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProvider.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProvider.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProvider.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProvider.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProvider.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidator.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidator.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidator.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidator.java
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/ValidationError.java b/event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/ValidationError.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/ValidationError.java
rename to event-store-management/event-store-management-command-handler-extension/src/main/java/uk/gov/justice/services/eventstore/management/validation/process/ValidationError.java
diff --git a/event-store-management/event-store-management-command-handler-extension/src/main/resources/META-INF/beans.xml b/event-store-management/event-store-management-command-handler-extension/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a0aaf4421
--- /dev/null
+++ b/event-store-management/event-store-management-command-handler-extension/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,8 @@
+
+
+
+
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainerTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainerTest.java
similarity index 94%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainerTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainerTest.java
index 4a7bf94a4..0327f9e15 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/observers/CommandHandlerQueueDrainerTest.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueDrainerTest.java
@@ -1,6 +1,7 @@
-package uk.gov.justice.services.eventstore.management.shuttering.observers;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import static java.util.Optional.empty;
+import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.inOrder;
@@ -10,7 +11,6 @@
import static uk.gov.justice.services.jmx.api.domain.CommandState.COMMAND_FAILED;
import uk.gov.justice.services.eventsourcing.util.jee.timer.StopWatchFactory;
-import uk.gov.justice.services.eventstore.management.shuttering.process.CommandHandlerQueueInterrogator;
import uk.gov.justice.services.management.suspension.api.SuspensionResult;
import uk.gov.justice.services.management.suspension.commands.SuspensionCommand;
@@ -50,7 +50,7 @@ public void shouldSuspendButNotUnsuspend() throws Exception {
@Test
public void shouldWaitForCommandHandlerQueueToDrainAndReturnSuccess() throws Exception {
- final UUID commandId = UUID.randomUUID();
+ final UUID commandId = randomUUID();
final StopWatch stopWatch = mock(StopWatch.class);
final SuspensionCommand suspensionCommand = mock(SuspensionCommand.class);
@@ -79,7 +79,7 @@ public void shouldWaitForCommandHandlerQueueToDrainAndReturnSuccess() throws Exc
@Test
public void shouldReturnFailureIfQueueDoesNotDrainInTime() throws Exception {
- final UUID commandId = UUID.randomUUID();
+ final UUID commandId = randomUUID();
final StopWatch stopWatch = mock(StopWatch.class);
final SuspensionCommand applicationShutteringCommand = mock(SuspensionCommand.class);
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogatorTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogatorTest.java
similarity index 96%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogatorTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogatorTest.java
index f3eced5c7..bba42ff90 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueInterrogatorTest.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueInterrogatorTest.java
@@ -1,4 +1,4 @@
-package uk.gov.justice.services.eventstore.management.shuttering.process;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactoryTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactoryTest.java
similarity index 95%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactoryTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactoryTest.java
index 79f6820cf..d2166acd7 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/shuttering/process/CommandHandlerQueueSupplierFactoryTest.java
+++ b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/extension/suspension/CommandHandlerQueueSupplierFactoryTest.java
@@ -1,4 +1,4 @@
-package uk.gov.justice.services.eventstore.management.shuttering.process;
+package uk.gov.justice.services.eventstore.management.extension.suspension;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandlerTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandlerTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandlerTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingCommandHandlerTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessorTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessorTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessorTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/EnablePublishingProcessorTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnablerTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnablerTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnablerTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/publishing/PublishingEnablerTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBeanTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBeanTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBeanTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/trigger/TriggerManagementStartupBeanTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandlerTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandlerTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandlerTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/trigger/commands/AddRemoveTriggerCommandHandlerTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandlerTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandlerTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandlerTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/commands/ValidatePublishedEventCommandHandlerTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcessTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcessTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcessTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidationProcessTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidatorTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidatorTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidatorTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/EventValidatorTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverterTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverterTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverterTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/JsonStringConverterTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinderTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinderTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinderTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdFinderTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProviderTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProviderTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProviderTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaIdMappingProviderTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProviderTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProviderTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProviderTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SchemaProviderTest.java
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidatorTest.java b/event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidatorTest.java
similarity index 100%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidatorTest.java
rename to event-store-management/event-store-management-command-handler-extension/src/test/java/uk/gov/justice/services/eventstore/management/validation/process/SingleEventValidatorTest.java
diff --git a/event-store-management/event-store-management-core/pom.xml b/event-store-management/event-store-management-core/pom.xml
index b57ef46c5..92f8de5d1 100644
--- a/event-store-management/event-store-management-core/pom.xml
+++ b/event-store-management/event-store-management-core/pom.xml
@@ -54,11 +54,6 @@
event-store-management-events
${project.version}
-
- uk.gov.justice.event-store
- event-publisher-timer
- ${project.version}
-
uk.gov.justice.event-store
event-tracking-service
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
index 79f6a192d..8dc2e498e 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserver.java
@@ -14,12 +14,14 @@
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent;
-import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import uk.gov.justice.services.jmx.logging.MdcLoggerInterceptor;
import uk.gov.justice.services.jmx.state.events.SystemCommandStateChangedEvent;
import java.time.Duration;
import java.time.ZonedDateTime;
+import java.util.List;
import java.util.UUID;
import javax.enterprise.context.ApplicationScoped;
@@ -64,10 +66,10 @@ public void onCatchupRequested(@Observes final CatchupRequestedEvent catchupRequ
final ZonedDateTime catchupStartedAt = clock.now();
- catchupStateManager.clear(catchupCommand);
+ catchupStateManager.clear();
catchupErrorStateManager.clear(catchupCommand);
- final String message = format("%s started at %s", catchupCommand.getName(), catchupStartedAt);
+ final String message = format("%s requested at %s", catchupCommand.getName(), catchupStartedAt);
systemCommandStateChangedEventFirer.fire(new SystemCommandStateChangedEvent(
commandId,
catchupCommand,
@@ -81,37 +83,45 @@ public void onCatchupRequested(@Observes final CatchupRequestedEvent catchupRequ
eventCatchupRunner.runEventCatchup(commandId, catchupCommand);
}
- public void onCatchupStartedForSubscription(@Observes final CatchupStartedForSubscriptionEvent catchupStartedForSubscriptionEvent) {
+ public void onCatchupStarted(@Observes final CatchupStartedEvent catchupStartedEvent) {
- final String subscriptionName = catchupStartedForSubscriptionEvent.getSubscriptionName();
- final ZonedDateTime catchupStartedAt = catchupStartedForSubscriptionEvent.getCatchupStartedAt();
- final CatchupCommand catchupCommand = catchupStartedForSubscriptionEvent.getCatchupCommand();
+ final List subscriptionCatchupDetailsList = catchupStartedEvent
+ .getSubscriptionCatchupDefinition();
- catchupStateManager.addCatchupInProgress(new CatchupInProgress(subscriptionName, catchupStartedAt), catchupCommand);
+ final ZonedDateTime catchupStartedAt = catchupStartedEvent.getCatchupStartedAt();
+ final CatchupCommand catchupCommand = catchupStartedEvent.getCatchupCommand();
- logger.info(format("%s for subscription '%s' started at %s", catchupCommand.getName(), subscriptionName, catchupStartedAt));
+ catchupStateManager.newCatchupInProgress(
+ subscriptionCatchupDetailsList,
+ catchupStartedAt);
+
+ logger.info(format("%s started at %s", catchupCommand.getName(), catchupStartedAt));
}
public void onCatchupCompleteForSubscription(@Observes final CatchupCompletedForSubscriptionEvent catchupCompletedForSubscriptionEvent) {
final UUID commandId = catchupCompletedForSubscriptionEvent.getCommandId();
final String subscriptionName = catchupCompletedForSubscriptionEvent.getSubscriptionName();
+ final String eventSourceName = catchupCompletedForSubscriptionEvent.getEventSourceName();
+ final String componentName = catchupCompletedForSubscriptionEvent.getComponentName();
final ZonedDateTime catchupCompletedAt = catchupCompletedForSubscriptionEvent.getCatchupCompletedAt();
final int totalNumberOfEvents = catchupCompletedForSubscriptionEvent.getTotalNumberOfEvents();
final CatchupCommand catchupCommand = catchupCompletedForSubscriptionEvent.getCatchupCommand();
- logger.info(format("%s for subscription '%s' completed at %s", catchupCommand.getName(), subscriptionName, catchupCompletedAt));
- logger.info(format("%s for subscription '%s' caught up %d events", catchupCommand.getName(), subscriptionName, totalNumberOfEvents));
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition = new SubscriptionCatchupDetails(subscriptionName, eventSourceName, componentName);
+
+ logger.info(format("%s for '%s' '%s' completed at %s", catchupCommand.getName(), componentName, subscriptionName, catchupCompletedAt));
+ logger.info(format("%s for '%s' '%s' caught up %d events", catchupCommand.getName(), componentName, subscriptionName, totalNumberOfEvents));
- final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(subscriptionName, catchupCommand);
+ final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(subscriptionCatchupDefinition);
final Duration catchupDuration = catchupDurationCalculator.calculate(
catchupInProgress.getStartedAt(),
catchupCompletedForSubscriptionEvent.getCatchupCompletedAt());
- logger.info(format("%s for subscription '%s' took %d milliseconds", catchupCommand.getName(), subscriptionName, catchupDuration.toMillis()));
+ logger.info(format("%s for '%s' '%s' took %d milliseconds", catchupCommand.getName(), componentName, subscriptionName, catchupDuration.toMillis()));
- if (catchupStateManager.noCatchupsInProgress(catchupCommand)) {
+ if (catchupStateManager.noCatchupsInProgress()) {
catchupProcessCompleter.handleCatchupComplete(commandId, catchupCommand);
}
}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelector.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilter.java
similarity index 61%
rename from event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelector.java
rename to event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilter.java
index f4144010d..bd6959e0a 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelector.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilter.java
@@ -1,16 +1,18 @@
package uk.gov.justice.services.eventstore.management.catchup.process;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
import javax.inject.Inject;
-public class RunCatchupForComponentSelector {
+public class CanCatchupFilter {
@Inject
private CatchupTypeSelector catchupTypeSelector;
- public boolean shouldRunForThisComponentAndType(final String componentName, final CatchupCommand catchupCommand) {
+ public boolean canCatchup(final SubscriptionsDescriptor subscriptionsDescriptor, final CatchupCommand catchupCommand) {
+ final String componentName = subscriptionsDescriptor.getServiceComponent();
final boolean eventCatchup = catchupTypeSelector.isEventCatchup(componentName, catchupCommand);
final boolean indexerCatchup = catchupTypeSelector.isIndexerCatchup(componentName, catchupCommand);
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupInProgress.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupInProgress.java
index 37d07c874..d8869a9ea 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupInProgress.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupInProgress.java
@@ -1,20 +1,22 @@
package uk.gov.justice.services.eventstore.management.catchup.process;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+
import java.time.ZonedDateTime;
import java.util.Objects;
public class CatchupInProgress {
- private final String subscriptionName;
+ private final SubscriptionCatchupDetails subscriptionCatchupDetails;
private final ZonedDateTime startedAt;
- public CatchupInProgress(final String subscriptionName, final ZonedDateTime startedAt) {
- this.subscriptionName = subscriptionName;
+ public CatchupInProgress(final SubscriptionCatchupDetails subscriptionCatchupDetails, final ZonedDateTime startedAt) {
+ this.subscriptionCatchupDetails = subscriptionCatchupDetails;
this.startedAt = startedAt;
}
- public String getSubscriptionName() {
- return subscriptionName;
+ public SubscriptionCatchupDetails getSubscriptionCatchupDetails() {
+ return subscriptionCatchupDetails;
}
public ZonedDateTime getStartedAt() {
@@ -26,19 +28,19 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof CatchupInProgress)) return false;
final CatchupInProgress that = (CatchupInProgress) o;
- return Objects.equals(subscriptionName, that.subscriptionName) &&
+ return Objects.equals(subscriptionCatchupDetails, that.subscriptionCatchupDetails) &&
Objects.equals(startedAt, that.startedAt);
}
@Override
public int hashCode() {
- return Objects.hash(subscriptionName, startedAt);
+ return Objects.hash(subscriptionCatchupDetails, startedAt);
}
@Override
public String toString() {
return "CatchupInProgress{" +
- "subscriptionName='" + subscriptionName + '\'' +
+ "catchupFor=" + subscriptionCatchupDetails +
", startedAt=" + startedAt +
'}';
}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupSubscriptionContext.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupSubscriptionContext.java
index 0afb1b7aa..703aae2d0 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupSubscriptionContext.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/CatchupSubscriptionContext.java
@@ -1,7 +1,7 @@
package uk.gov.justice.services.eventstore.management.catchup.process;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.util.Objects;
import java.util.UUID;
@@ -10,17 +10,17 @@ public class CatchupSubscriptionContext {
private final UUID commandId;
private final String componentName;
- private final Subscription subscription;
+ private final SubscriptionCatchupDetails subscriptionCatchupDefinition;
private final CatchupCommand catchupCommand;
public CatchupSubscriptionContext(
final UUID commandId,
final String componentName,
- final Subscription subscription,
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition,
final CatchupCommand catchupCommand) {
this.commandId = commandId;
this.componentName = componentName;
- this.subscription = subscription;
+ this.subscriptionCatchupDefinition = subscriptionCatchupDefinition;
this.catchupCommand = catchupCommand;
}
@@ -32,8 +32,8 @@ public String getComponentName() {
return componentName;
}
- public Subscription getSubscription() {
- return subscription;
+ public SubscriptionCatchupDetails getSubscriptionCatchupDefinition() {
+ return subscriptionCatchupDefinition;
}
public CatchupCommand getCatchupCommand() {
@@ -47,13 +47,13 @@ public boolean equals(final Object o) {
final CatchupSubscriptionContext that = (CatchupSubscriptionContext) o;
return Objects.equals(commandId, that.commandId) &&
Objects.equals(componentName, that.componentName) &&
- Objects.equals(subscription, that.subscription) &&
+ Objects.equals(subscriptionCatchupDefinition, that.subscriptionCatchupDefinition) &&
Objects.equals(catchupCommand, that.catchupCommand);
}
@Override
public int hashCode() {
- return Objects.hash(commandId, componentName, subscription, catchupCommand);
+ return Objects.hash(commandId, componentName, subscriptionCatchupDefinition, catchupCommand);
}
@Override
@@ -61,7 +61,7 @@ public String toString() {
return "CatchupSubscriptionContext{" +
"commandId=" + commandId +
", componentName='" + componentName + '\'' +
- ", subscription=" + subscription +
+ ", catchupFor=" + subscriptionCatchupDefinition +
", catchupCommand=" + catchupCommand +
'}';
}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunner.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunner.java
index ab3e3b379..e2c03a657 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunner.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunner.java
@@ -3,8 +3,7 @@
import static java.lang.String.format;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.util.UUID;
@@ -14,9 +13,6 @@
public class EventCatchupByComponentRunner {
- @Inject
- private RunCatchupForComponentSelector runCatchupForComponentSelector;
-
@Inject
private EventCatchupProcessorBean eventCatchupProcessorBean;
@@ -24,31 +20,19 @@ public class EventCatchupByComponentRunner {
private Logger logger;
public void runEventCatchupForComponent(
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition,
final UUID commandId,
- final SubscriptionsDescriptor subscriptionsDescriptor,
final CatchupCommand catchupCommand) {
- final String componentName = subscriptionsDescriptor.getServiceComponent();
-
- if (runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, catchupCommand)) {
- subscriptionsDescriptor
- .getSubscriptions()
- .forEach(subscription -> runEventCatchupForSubscription(commandId, catchupCommand, componentName, subscription));
- }
- }
-
- private void runEventCatchupForSubscription(
- final UUID commandId,
- final CatchupCommand catchupCommand,
- final String componentName,
- final Subscription subscription) {
+ final String componentName = subscriptionCatchupDefinition.getComponentName();
+ final String subscriptionName = subscriptionCatchupDefinition.getSubscriptionName();
- logger.info(format("Running %s for Component '%s', Subscription '%s'", catchupCommand.getName(), componentName, subscription.getName()));
+ logger.info(format("Running %s for Component '%s', Subscription '%s'", catchupCommand.getName(), componentName, subscriptionName));
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(
commandId,
componentName,
- subscription,
+ subscriptionCatchupDefinition,
catchupCommand);
eventCatchupProcessorBean.performEventCatchup(catchupSubscriptionContext);
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java
index d2cf4ebeb..fb9c5c23a 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessor.java
@@ -9,8 +9,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
-import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.util.UUID;
import java.util.stream.Stream;
@@ -29,9 +28,6 @@ public class EventCatchupProcessor {
@Inject
private MissingEventStreamer missingEventStreamer;
- @Inject
- private Event catchupStartedForSubscriptionEventFirer;
-
@Inject
private Event catchupCompletedForSubscriptionEventFirer;
@@ -45,18 +41,12 @@ public class EventCatchupProcessor {
public void performEventCatchup(final CatchupSubscriptionContext catchupSubscriptionContext) {
final UUID commandId = catchupSubscriptionContext.getCommandId();
- final Subscription subscription = catchupSubscriptionContext.getSubscription();
- final String subscriptionName = subscription.getName();
- final String eventSourceName = subscription.getEventSourceName();
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition = catchupSubscriptionContext.getSubscriptionCatchupDefinition();
+ final String subscriptionName = subscriptionCatchupDefinition.getSubscriptionName();
+ final String eventSourceName = subscriptionCatchupDefinition.getEventSourceName();
final String componentName = catchupSubscriptionContext.getComponentName();
final CatchupCommand catchupCommand = catchupSubscriptionContext.getCatchupCommand();
- catchupStartedForSubscriptionEventFirer.fire(new CatchupStartedForSubscriptionEvent(
- commandId,
- subscriptionName,
- catchupCommand,
- clock.now()));
-
logger.info(format("Finding all missing events for event source '%s', component '%s", eventSourceName, componentName));
final Stream events = missingEventStreamer.getMissingEvents(eventSourceName, componentName);
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java
index 7f7da095b..61d8b9dd8 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunner.java
@@ -1,27 +1,45 @@
package uk.gov.justice.services.eventstore.management.catchup.process;
+import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
-import uk.gov.justice.subscription.registry.SubscriptionsDescriptorsRegistry;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import java.util.List;
import java.util.UUID;
+import javax.enterprise.event.Event;
import javax.inject.Inject;
public class EventCatchupRunner {
@Inject
- private SubscriptionsDescriptorsRegistry subscriptionsDescriptorsRegistry;
+ private EventCatchupByComponentRunner eventCatchupByComponentRunner;
@Inject
- private EventCatchupByComponentRunner eventCatchupByComponentRunner;
+ private Event catchupStartedEventFirer;
+
+ @Inject
+ private SubscriptionCatchupProvider subscriptionCatchupProvider;
+
+ @Inject
+ private UtcClock clock;
public void runEventCatchup(final UUID commandId, final CatchupCommand catchupCommand) {
- subscriptionsDescriptorsRegistry
- .getAll()
- .forEach(subscriptionsDescriptor -> eventCatchupByComponentRunner.runEventCatchupForComponent(
- commandId,
- subscriptionsDescriptor,
- catchupCommand));
+ final List subscriptionCatchupDefinitions = subscriptionCatchupProvider.getBySubscription(catchupCommand);
+
+ catchupStartedEventFirer.fire(new CatchupStartedEvent(
+ commandId,
+ catchupCommand,
+ subscriptionCatchupDefinitions,
+ clock.now()
+ ));
+
+ subscriptionCatchupDefinitions
+ .forEach(catchupFor -> eventCatchupByComponentRunner.runEventCatchupForComponent(
+ catchupFor,
+ commandId,
+ catchupCommand));
}
}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProvider.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProvider.java
new file mode 100644
index 000000000..062f7c4e3
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProvider.java
@@ -0,0 +1,19 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import static java.util.Comparator.comparingInt;
+
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+
+import java.util.Comparator;
+
+public class PriorityComparatorProvider {
+
+ public Comparator getSubscriptionDescriptorComparator() {
+ return comparingInt(SubscriptionsDescriptor::getPrioritisation);
+ }
+
+ public Comparator getSubscriptionComparator() {
+ return comparingInt(Subscription::getPrioritisation);
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapper.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapper.java
new file mode 100644
index 000000000..ec242e93e
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapper.java
@@ -0,0 +1,30 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+public class SubscriptionCatchupDetailsMapper {
+
+ @Inject
+ private PriorityComparatorProvider priorityComparatorProvider;
+
+ public Stream toSubscriptionCatchupDetails(final SubscriptionsDescriptor subscriptionsDescriptor) {
+
+ final String componentName = subscriptionsDescriptor.getServiceComponent();
+
+ return subscriptionsDescriptor.getSubscriptions()
+ .stream()
+ .sorted(priorityComparatorProvider.getSubscriptionComparator())
+ .map(subscription ->
+ new SubscriptionCatchupDetails(
+ subscription.getName(),
+ subscription.getEventSourceName(),
+ componentName
+ )
+ );
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProvider.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProvider.java
new file mode 100644
index 000000000..39f48dcaf
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProvider.java
@@ -0,0 +1,37 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import static java.util.stream.Collectors.toList;
+
+import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import uk.gov.justice.subscription.registry.SubscriptionsDescriptorsRegistry;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+public class SubscriptionCatchupProvider {
+
+ @Inject
+ private SubscriptionsDescriptorsRegistry subscriptionsDescriptorsRegistry;
+
+ @Inject
+ private PriorityComparatorProvider priorityComparatorProvider;
+
+ @Inject
+ private CanCatchupFilter canCatchupFilter;
+
+ @Inject
+ private SubscriptionCatchupDetailsMapper subscriptionCatchupDetailsMapper;
+
+ public List getBySubscription(final CatchupCommand catchupCommand) {
+
+ return subscriptionsDescriptorsRegistry
+ .getAll()
+ .stream()
+ .filter(subscriptionsDescriptor -> canCatchupFilter.canCatchup(subscriptionsDescriptor, catchupCommand))
+ .sorted(priorityComparatorProvider.getSubscriptionDescriptorComparator())
+ .flatMap(subscriptionCatchupDetailsMapper::toSubscriptionCatchupDetails)
+ .collect(toList());
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java
index d48fe96d0..b33f1e93a 100644
--- a/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java
+++ b/event-store-management/event-store-management-core/src/main/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManager.java
@@ -1,9 +1,9 @@
package uk.gov.justice.services.eventstore.management.catchup.state;
import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress;
-import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
-import java.util.ArrayList;
+import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -13,38 +13,33 @@
@Singleton
public class CatchupStateManager {
- private final Map eventCatchupsInProgress = new ConcurrentHashMap<>();
- private final Map indexCatchupsInProgress = new ConcurrentHashMap<>();
+ private final Map catchupsInProgress = new ConcurrentHashMap<>();
- public void clear(final CatchupCommand catchupCommand) {
- getCache(catchupCommand).clear();
+ public void clear() {
+ catchupsInProgress.clear();
}
- public void addCatchupInProgress(final CatchupInProgress catchupInProgress, final CatchupCommand catchupCommand) {
- getCache(catchupCommand).put(catchupInProgress.getSubscriptionName(), catchupInProgress);
- }
+ public void newCatchupInProgress(
+ final List subscriptionCatchupDetailsList,
+ final ZonedDateTime catchupStartedAt) {
- public CatchupInProgress removeCatchupInProgress(final String subscriptionName, final CatchupCommand catchupCommand) {
- return getCache(catchupCommand).remove(subscriptionName);
+ subscriptionCatchupDetailsList.forEach(subscriptionCatchupDetails ->
+ catchupsInProgress.put(
+ subscriptionCatchupDetails,
+ new CatchupInProgress(subscriptionCatchupDetails, catchupStartedAt))
+ );
}
- public boolean isCatchupInProgress(final String subscriptionName, final CatchupCommand catchupCommand) {
- return getCache(catchupCommand).containsKey(subscriptionName);
+ public CatchupInProgress removeCatchupInProgress(final SubscriptionCatchupDetails subscriptionCatchupDefinition) {
+ return catchupsInProgress.remove(subscriptionCatchupDefinition);
}
- public List getAllCatchupsInProgress(final CatchupCommand catchupCommand) {
- return new ArrayList<>(getCache(catchupCommand).values());
+ public boolean isCatchupInProgress(final SubscriptionCatchupDetails subscriptionCatchupDefinition) {
+ return catchupsInProgress.containsKey(subscriptionCatchupDefinition);
}
- public boolean noCatchupsInProgress(final CatchupCommand catchupType) {
- return getCache(catchupType).isEmpty();
+ public boolean noCatchupsInProgress() {
+ return catchupsInProgress.isEmpty();
}
- private Map getCache(final CatchupCommand catchupCommand) {
- if (catchupCommand.isEventCatchup()) {
- return eventCatchupsInProgress;
- }
-
- return indexCatchupsInProgress;
- }
}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
index bb6b9bbf8..71dc59ef9 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/observers/CatchupObserverTest.java
@@ -4,6 +4,7 @@
import static java.time.ZonedDateTime.of;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;
+import static java.util.Arrays.asList;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -22,16 +23,17 @@
import uk.gov.justice.services.eventstore.management.catchup.state.CatchupError;
import uk.gov.justice.services.eventstore.management.catchup.state.CatchupErrorStateManager;
import uk.gov.justice.services.eventstore.management.catchup.state.CatchupStateManager;
-import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupProcessingOfEventFailedEvent;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupRequestedEvent;
-import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import uk.gov.justice.services.jmx.state.events.SystemCommandStateChangedEvent;
import java.time.Duration;
import java.time.ZonedDateTime;
+import java.util.List;
import java.util.UUID;
import javax.enterprise.event.Event;
@@ -49,7 +51,6 @@
@RunWith(MockitoJUnitRunner.class)
public class CatchupObserverTest {
-
@Mock
private EventCatchupRunner eventCatchupRunner;
@@ -74,12 +75,6 @@ public class CatchupObserverTest {
@Mock
private Logger logger;
- @Captor
- private ArgumentCaptor catchupInProgressCaptor;
-
- @Captor
- private ArgumentCaptor catchupCommandCaptor;
-
@Captor
private ArgumentCaptor catchupErrorCaptor;
@@ -114,10 +109,10 @@ public void shouldStartCatchupOnCatchupRequested() throws Exception {
systemCommandStateChangedEventFirer,
eventCatchupRunner);
- inOrder.verify(catchupStateManager).clear(eventCatchupCommand);
+ inOrder.verify(catchupStateManager).clear();
inOrder.verify(catchupErrorStateManager).clear(eventCatchupCommand);
inOrder.verify(systemCommandStateChangedEventFirer).fire(systemCommandStateChangedEventCaptor.capture());
- inOrder.verify(logger).info("CATCHUP started at 2019-02-23T17:12:23Z");
+ inOrder.verify(logger).info("CATCHUP requested at 2019-02-23T17:12:23Z");
inOrder.verify(eventCatchupRunner).runEventCatchup(commandId, eventCatchupCommand);
final SystemCommandStateChangedEvent stateChangedEvent = systemCommandStateChangedEventCaptor.getValue();
@@ -126,33 +121,33 @@ public void shouldStartCatchupOnCatchupRequested() throws Exception {
assertThat(stateChangedEvent.getCommandState(), is(COMMAND_IN_PROGRESS));
assertThat(stateChangedEvent.getStatusChangedAt(), is(catchupStartedAt));
assertThat(stateChangedEvent.getSystemCommand(), is(eventCatchupCommand));
- assertThat(stateChangedEvent.getMessage(), is("CATCHUP started at 2019-02-23T17:12:23Z"));
+ assertThat(stateChangedEvent.getMessage(), is("CATCHUP requested at 2019-02-23T17:12:23Z"));
}
@Test
- public void shouldLogCatchupStartedForSubscriptionAndStoreProgress() throws Exception {
+ public void shouldLogCatchupStartedAndStoreSubscriptionCatchupsInProgress() throws Exception {
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
final UUID commandId = randomUUID();
- final String subscriptionName = "mySubscription";
final ZonedDateTime catchupStartedAt = of(2019, 2, 23, 17, 12, 23, 0, UTC);
- final CatchupStartedForSubscriptionEvent catchupStartedForSubscriptionEvent = new CatchupStartedForSubscriptionEvent(
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_1 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_2 = mock(SubscriptionCatchupDetails.class);
+
+ final List subscriptionCatchupDetailsList = asList(subscriptionCatchupDetails_1, subscriptionCatchupDetails_2);
+
+ final CatchupStartedEvent catchupStartedForSubscriptionEvent = new CatchupStartedEvent(
commandId,
- subscriptionName,
eventCatchupCommand,
+ subscriptionCatchupDetailsList,
catchupStartedAt);
- catchupObserver.onCatchupStartedForSubscription(catchupStartedForSubscriptionEvent);
-
- verify(catchupStateManager).addCatchupInProgress(catchupInProgressCaptor.capture(), catchupCommandCaptor.capture());
- verify(logger).info("CATCHUP for subscription 'mySubscription' started at 2019-02-23T17:12:23Z");
- final CatchupInProgress catchupInProgress = catchupInProgressCaptor.getValue();
+ catchupObserver.onCatchupStarted(catchupStartedForSubscriptionEvent);
- assertThat(catchupInProgress.getSubscriptionName(), is(subscriptionName));
- assertThat(catchupInProgress.getStartedAt(), is(catchupStartedAt));
-
- assertThat(catchupCommandCaptor.getValue(), is(eventCatchupCommand));
+ verify(catchupStateManager).newCatchupInProgress(
+ subscriptionCatchupDetailsList,
+ catchupStartedAt);
+ verify(logger).info("CATCHUP started at 2019-02-23T17:12:23Z");
}
@Test
@@ -168,6 +163,12 @@ public void shouldRemoveTheCatchupForSubscriptionInProgressOnCatchupForSubscript
final int totalNumberOfEvents = 23;
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition = new SubscriptionCatchupDetails(
+ subscriptionName,
+ eventSourceName,
+ componentName
+ );
+
final Duration catchupDuration = Duration.of(23, SECONDS);
final CatchupCompletedForSubscriptionEvent catchupCompletedForSubscriptionEvent = new CatchupCompletedForSubscriptionEvent(
@@ -183,16 +184,16 @@ public void shouldRemoveTheCatchupForSubscriptionInProgressOnCatchupForSubscript
final CatchupInProgress catchupInProgress = mock(CatchupInProgress.class);
when(catchupInProgress.getStartedAt()).thenReturn(catchupStartedAt);
- when(catchupStateManager.removeCatchupInProgress(subscriptionName, eventCatchupCommand)).thenReturn(catchupInProgress);
+ when(catchupStateManager.removeCatchupInProgress(subscriptionCatchupDefinition)).thenReturn(catchupInProgress);
when(catchupDurationCalculator.calculate(catchupStartedAt, catchupCompletedAt)).thenReturn(catchupDuration);
- when(catchupStateManager.noCatchupsInProgress(eventCatchupCommand)).thenReturn(false);
+ when(catchupStateManager.noCatchupsInProgress()).thenReturn(false);
catchupObserver.onCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent);
- verify(logger).info("CATCHUP for subscription 'mySubscription' completed at 2019-02-23T17:12:23Z");
- verify(logger).info("CATCHUP for subscription 'mySubscription' caught up 23 events");
- verify(logger).info("CATCHUP for subscription 'mySubscription' took 23000 milliseconds");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' completed at 2019-02-23T17:12:23Z");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' caught up 23 events");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' took 23000 milliseconds");
verifyZeroInteractions(catchupProcessCompleter);
}
@@ -209,6 +210,12 @@ public void shouldCompleteTheCatchupIfAllCatchupsForSubscriptionsComplete() thro
final int totalNumberOfEvents = 23;
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition = new SubscriptionCatchupDetails(
+ subscriptionName,
+ eventSourceName,
+ componentName
+ );
+
final Duration catchupDuration = Duration.of(2_000, MILLIS);
final CatchupCompletedForSubscriptionEvent catchupCompletedForSubscriptionEvent = new CatchupCompletedForSubscriptionEvent(
@@ -224,18 +231,18 @@ public void shouldCompleteTheCatchupIfAllCatchupsForSubscriptionsComplete() thro
final CatchupInProgress catchupInProgress = mock(CatchupInProgress.class);
when(catchupInProgress.getStartedAt()).thenReturn(catchupStartedAt);
- when(catchupStateManager.removeCatchupInProgress(subscriptionName, eventCatchupCommand)).thenReturn(catchupInProgress);
+ when(catchupStateManager.removeCatchupInProgress(subscriptionCatchupDefinition)).thenReturn(catchupInProgress);
when(catchupDurationCalculator.calculate(
catchupStartedAt,
catchupCompletedAt)).thenReturn(catchupDuration);
- when(catchupStateManager.noCatchupsInProgress(eventCatchupCommand)).thenReturn(true);
+ when(catchupStateManager.noCatchupsInProgress()).thenReturn(true);
catchupObserver.onCatchupCompleteForSubscription(catchupCompletedForSubscriptionEvent);
- verify(logger).info("CATCHUP for subscription 'mySubscription' completed at 2019-02-23T17:12:23Z");
- verify(logger).info("CATCHUP for subscription 'mySubscription' caught up 23 events");
- verify(logger).info("CATCHUP for subscription 'mySubscription' took 2000 milliseconds");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' completed at 2019-02-23T17:12:23Z");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' caught up 23 events");
+ verify(logger).info("CATCHUP for 'EVENT_LISTENER' 'mySubscription' took 2000 milliseconds");
verify(catchupProcessCompleter).handleCatchupComplete(commandId, eventCatchupCommand);
}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelectorTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilterTest.java
similarity index 65%
rename from event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelectorTest.java
rename to event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilterTest.java
index 11bb742e5..92d1e619b 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/RunCatchupForComponentSelectorTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/CanCatchupFilterTest.java
@@ -2,9 +2,11 @@
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -13,13 +15,13 @@
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-public class RunCatchupForComponentSelectorTest {
+public class CanCatchupFilterTest {
@Mock
private CatchupTypeSelector catchupTypeSelector;
@InjectMocks
- private RunCatchupForComponentSelector runCatchupForComponentSelector;
+ private CanCatchupFilter canCatchupFilter;
@Test
public void shouldRunIfRunningEventCatchupAndTheComponentIsEventListener() throws Exception {
@@ -27,10 +29,13 @@ public void shouldRunIfRunningEventCatchupAndTheComponentIsEventListener() throw
final String componentName = "EVENT_LISTENER";
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
+
+ when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
when(catchupTypeSelector.isEventCatchup(componentName, eventCatchupCommand)).thenReturn(true);
when(catchupTypeSelector.isIndexerCatchup(componentName, eventCatchupCommand)).thenReturn(false);
- final boolean shouldRun = runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, eventCatchupCommand);
+ final boolean shouldRun = canCatchupFilter.canCatchup(subscriptionsDescriptor, eventCatchupCommand);
assertThat(shouldRun, is(true));
}
@@ -40,11 +45,13 @@ public void shouldRunIfRunningIndexCatchupAndTheComponentIsEventIndexer() throws
final String componentName = "EVENT_INDEXER";
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
+ when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
when(catchupTypeSelector.isEventCatchup(componentName, eventCatchupCommand)).thenReturn(true);
when(catchupTypeSelector.isIndexerCatchup(componentName, eventCatchupCommand)).thenReturn(false);
- final boolean shouldRun = runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, eventCatchupCommand);
+ final boolean shouldRun = canCatchupFilter.canCatchup(subscriptionsDescriptor, eventCatchupCommand);
assertThat(shouldRun, is(true));
}
@@ -54,11 +61,13 @@ public void shouldNotRunIfRunningIfNeitherComponentShouldRun() throws Exception
final String componentName = "EVENT_PROCESSOR";
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
+ when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
when(catchupTypeSelector.isEventCatchup(componentName, eventCatchupCommand)).thenReturn(false);
when(catchupTypeSelector.isIndexerCatchup(componentName, eventCatchupCommand)).thenReturn(false);
- final boolean shouldRun = runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, eventCatchupCommand);
+ final boolean shouldRun = canCatchupFilter.canCatchup(subscriptionsDescriptor, eventCatchupCommand);
assertThat(shouldRun, is(false));
}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunnerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunnerTest.java
index 43732a688..1819dd215 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunnerTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupByComponentRunnerTest.java
@@ -1,15 +1,12 @@
package uk.gov.justice.services.eventstore.management.catchup.process;
-import static java.util.Arrays.asList;
import static java.util.UUID.randomUUID;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.util.UUID;
@@ -24,9 +21,6 @@
@RunWith(MockitoJUnitRunner.class)
public class EventCatchupByComponentRunnerTest {
- @Mock
- private RunCatchupForComponentSelector runCatchupForComponentSelector;
-
@Mock
private EventCatchupProcessorBean eventCatchupProcessorBean;
@@ -41,45 +35,20 @@ public void shouldGetAllSubscriptionsForTheComponentAndRunCatchupOnEach() throws
final UUID commandId = randomUUID();
final String componentName = "AN_EVENT_LISTENER";
+ final String subscriptionName = "subscriptionName";
- final String subscriptionName_1 = "subscriptionName_1";
- final String subscriptionName_2 = "subscriptionName_2";
-
- final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
- final Subscription subscription_1 = mock(Subscription.class);
- final Subscription subscription_2 = mock(Subscription.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition = mock(SubscriptionCatchupDetails.class);
+
final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
- when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
- when(runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, eventCatchupCommand)).thenReturn(true);
- when(subscriptionsDescriptor.getSubscriptions()).thenReturn(asList(subscription_1, subscription_2));
-
- when(subscription_1.getName()).thenReturn(subscriptionName_1);
- when(subscription_2.getName()).thenReturn(subscriptionName_2);
+ when(subscriptionCatchupDefinition.getComponentName()).thenReturn(componentName);
+ when(subscriptionCatchupDefinition.getSubscriptionName()).thenReturn(subscriptionName);
- eventCatchupByComponentRunner.runEventCatchupForComponent(commandId, subscriptionsDescriptor, eventCatchupCommand);
+ eventCatchupByComponentRunner.runEventCatchupForComponent(subscriptionCatchupDefinition, commandId, eventCatchupCommand);
final InOrder inOrder = inOrder(logger, eventCatchupProcessorBean);
- inOrder.verify(logger).info("Running CATCHUP for Component 'AN_EVENT_LISTENER', Subscription 'subscriptionName_1'");
- inOrder.verify(eventCatchupProcessorBean).performEventCatchup(new CatchupSubscriptionContext(commandId, componentName, subscription_1, eventCatchupCommand));
- inOrder.verify(logger).info("Running CATCHUP for Component 'AN_EVENT_LISTENER', Subscription 'subscriptionName_2'");
- inOrder.verify(eventCatchupProcessorBean).performEventCatchup(new CatchupSubscriptionContext(commandId, componentName, subscription_2, eventCatchupCommand));
- }
-
- @Test
- public void shouldNotRunCatchupForThisComponentIfTheComponentShouldNotBeRunForThisCatchupType() throws Exception {
-
- final UUID commandId = randomUUID();
- final String componentName = "AN_EVENT_PROCESSOR";
- final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
- final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
-
- when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
- when(runCatchupForComponentSelector.shouldRunForThisComponentAndType(componentName, eventCatchupCommand)).thenReturn(false);
-
- eventCatchupByComponentRunner.runEventCatchupForComponent(commandId, subscriptionsDescriptor, eventCatchupCommand);
-
- verifyZeroInteractions(eventCatchupProcessorBean);
+ inOrder.verify(logger).info("Running CATCHUP for Component 'AN_EVENT_LISTENER', Subscription 'subscriptionName'");
+ inOrder.verify(eventCatchupProcessorBean).performEventCatchup(new CatchupSubscriptionContext(commandId, componentName, subscriptionCatchupDefinition, eventCatchupCommand));
}
}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java
index a5e11186b..491501c10 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupProcessorTest.java
@@ -20,8 +20,7 @@
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
import uk.gov.justice.services.eventstore.management.events.catchup.CatchupCompletedForSubscriptionEvent;
-import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedForSubscriptionEvent;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.time.ZonedDateTime;
import java.util.List;
@@ -46,9 +45,6 @@ public class EventCatchupProcessorTest {
@Mock
private MissingEventStreamer missingEventStreamer;
- @Mock
- private Event catchupStartedForSubscriptionEventFirer;
-
@Mock
private Event catchupCompletedForSubscriptionEventFirer;
@@ -71,15 +67,15 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
final long eventNumberFrom = 999L;
final ZonedDateTime catchupStartedAt = new UtcClock().now();
- final ZonedDateTime catchupCompetedAt = catchupStartedAt.plusMinutes(23);
+ final ZonedDateTime catchupCompletedAt = catchupStartedAt.plusMinutes(23);
- final Subscription subscription = mock(Subscription.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails = mock(SubscriptionCatchupDetails.class);
final CatchupCommand catchupCommand = new EventCatchupCommand();
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(
commandId,
componentName,
- subscription,
+ subscriptionCatchupDetails,
catchupCommand);
final PublishedEvent publishedEvent_1 = mock(PublishedEvent.class);
@@ -94,9 +90,9 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
when(publishedEvent_2.getEventNumber()).thenReturn(of(eventNumberFrom + 2L));
when(publishedEvent_3.getEventNumber()).thenReturn(of(eventNumberFrom + 3L));
- when(subscription.getName()).thenReturn(subscriptionName);
- when(subscription.getEventSourceName()).thenReturn(eventSourceName);
- when(clock.now()).thenReturn(catchupStartedAt, catchupCompetedAt);
+ when(subscriptionCatchupDetails.getSubscriptionName()).thenReturn(subscriptionName);
+ when(subscriptionCatchupDetails.getEventSourceName()).thenReturn(eventSourceName);
+ when(clock.now()).thenReturn(catchupCompletedAt);
when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId)).thenReturn(1);
when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId)).thenReturn(1);
@@ -104,16 +100,7 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
eventCatchupProcessor.performEventCatchup(catchupSubscriptionContext);
- final InOrder inOrder = inOrder(
- catchupStartedForSubscriptionEventFirer,
- concurrentEventStreamConsumerManager,
- catchupCompletedForSubscriptionEventFirer);
-
- inOrder.verify(catchupStartedForSubscriptionEventFirer).fire(new CatchupStartedForSubscriptionEvent(
- commandId,
- subscriptionName,
- catchupCommand,
- catchupStartedAt));
+ final InOrder inOrder = inOrder(concurrentEventStreamConsumerManager, catchupCompletedForSubscriptionEventFirer);
inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_1, subscriptionName, catchupCommand, commandId);
inOrder.verify(concurrentEventStreamConsumerManager).add(publishedEvent_2, subscriptionName, catchupCommand, commandId);
@@ -126,7 +113,7 @@ public void shouldFetchAllMissingEventsAndProcess() throws Exception {
eventSourceName,
componentName,
catchupCommand,
- catchupCompetedAt,
+ catchupCompletedAt,
events.size()));
verify(logger).info("Finding all missing events for event source 'event source', component 'EVENT_LISTENER");
@@ -146,15 +133,15 @@ public void shouldThrowExceptionIfEventNumberIsAbsentFromPublishedEvent() throws
final UUID idOfEventWithNoEventNumber = fromString("937f9fd6-3679-4bc2-a73c-6a7b18a651e1");
final ZonedDateTime catchupStartedAt = new UtcClock().now();
- final ZonedDateTime catchupCompetedAt = catchupStartedAt.plusMinutes(23);
+ final ZonedDateTime catchupCompletedAt = catchupStartedAt.plusMinutes(23);
- final Subscription subscription = mock(Subscription.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails = mock(SubscriptionCatchupDetails.class);
final CatchupCommand catchupCommand = mock(CatchupCommand.class);
final CatchupSubscriptionContext catchupSubscriptionContext = new CatchupSubscriptionContext(
commandId,
componentName,
- subscription,
+ subscriptionCatchupDetails,
catchupCommand);
final PublishedEvent publishedEvent_1 = mock(PublishedEvent.class);
@@ -170,9 +157,9 @@ public void shouldThrowExceptionIfEventNumberIsAbsentFromPublishedEvent() throws
when(publishedEvent_3.getEventNumber()).thenReturn(empty());
when(publishedEvent_3.getId()).thenReturn(idOfEventWithNoEventNumber);
- when(subscription.getName()).thenReturn(subscriptionName);
- when(subscription.getEventSourceName()).thenReturn(eventSourceName);
- when(clock.now()).thenReturn(catchupStartedAt, catchupCompetedAt);
+ when(subscriptionCatchupDetails.getSubscriptionName()).thenReturn(subscriptionName);
+ when(subscriptionCatchupDetails.getEventSourceName()).thenReturn(eventSourceName);
+ when(clock.now()).thenReturn(catchupCompletedAt);
when(concurrentEventStreamConsumerManager.add(publishedEvent_1, subscriptionName, catchupCommand, commandId)).thenReturn(1);
when(concurrentEventStreamConsumerManager.add(publishedEvent_2, subscriptionName, catchupCommand, commandId)).thenReturn(1);
when(concurrentEventStreamConsumerManager.add(publishedEvent_3, subscriptionName, catchupCommand, commandId)).thenReturn(1);
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java
index 9bdab9edd..1cc99fc44 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/EventCatchupRunnerTest.java
@@ -6,13 +6,17 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
-import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
-import uk.gov.justice.subscription.registry.SubscriptionsDescriptorsRegistry;
+import uk.gov.justice.services.eventstore.management.events.catchup.CatchupStartedEvent;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import java.util.List;
import java.util.UUID;
+import javax.enterprise.event.Event;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
@@ -24,10 +28,16 @@
public class EventCatchupRunnerTest {
@Mock
- private SubscriptionsDescriptorsRegistry subscriptionsDescriptorsRegistry;
+ private EventCatchupByComponentRunner eventCatchupByComponentRunner;
@Mock
- private EventCatchupByComponentRunner eventCatchupByComponentRunner;
+ private Event catchupStartedEventFirer;
+
+ @Mock
+ private SubscriptionCatchupProvider subscriptionCatchupProvider;
+
+ @Mock
+ private UtcClock clock;
@InjectMocks
private EventCatchupRunner eventCatchupRunner;
@@ -37,18 +47,26 @@ public void shouldRunEventCatchupForEachSubscription() throws Exception {
final UUID commandId = randomUUID();
- final SubscriptionsDescriptor subscriptionsDescriptor_1 = mock(SubscriptionsDescriptor.class);
- final SubscriptionsDescriptor subscriptionsDescriptor_2 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition_1 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDefinition_2 = mock(SubscriptionCatchupDetails.class);
+ final List subscriptionCatchupDefinitions = asList(subscriptionCatchupDefinition_1, subscriptionCatchupDefinition_2);
final CatchupCommand catchupCommand = new EventCatchupCommand();
- when(subscriptionsDescriptorsRegistry.getAll()).thenReturn(asList(subscriptionsDescriptor_1, subscriptionsDescriptor_2));
+ when(subscriptionCatchupProvider.getBySubscription(catchupCommand)).thenReturn(subscriptionCatchupDefinitions);
eventCatchupRunner.runEventCatchup(commandId, catchupCommand);
- final InOrder inOrder = inOrder(eventCatchupByComponentRunner);
+ final InOrder inOrder = inOrder(catchupStartedEventFirer, eventCatchupByComponentRunner);
+
+ inOrder.verify(catchupStartedEventFirer).fire(new CatchupStartedEvent(
+ commandId,
+ catchupCommand,
+ subscriptionCatchupDefinitions,
+ clock.now()
+ ));
- inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(commandId, subscriptionsDescriptor_1, catchupCommand);
- inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(commandId, subscriptionsDescriptor_2, catchupCommand);
+ inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_1, commandId, catchupCommand);
+ inOrder.verify(eventCatchupByComponentRunner).runEventCatchupForComponent(subscriptionCatchupDefinition_2, commandId, catchupCommand);
}
}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProviderTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProviderTest.java
new file mode 100644
index 000000000..2de188ec8
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/PriorityComparatorProviderTest.java
@@ -0,0 +1,120 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PriorityComparatorProviderTest {
+
+ @InjectMocks
+ private PriorityComparatorProvider priorityComparatorProvider;
+
+ @Test
+ public void shouldSortByComponentPriority() throws Exception {
+
+ final SubscriptionsDescriptor subscriptionsDescriptor_1 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_2 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_3 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_4 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_5 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_6 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_7 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_8 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_9 = mock(SubscriptionsDescriptor.class);
+
+ when(subscriptionsDescriptor_1.getPrioritisation()).thenReturn(1);
+ when(subscriptionsDescriptor_2.getPrioritisation()).thenReturn(2);
+ when(subscriptionsDescriptor_3.getPrioritisation()).thenReturn(3);
+ when(subscriptionsDescriptor_4.getPrioritisation()).thenReturn(4);
+ when(subscriptionsDescriptor_5.getPrioritisation()).thenReturn(5);
+ when(subscriptionsDescriptor_6.getPrioritisation()).thenReturn(6);
+ when(subscriptionsDescriptor_7.getPrioritisation()).thenReturn(7);
+ when(subscriptionsDescriptor_8.getPrioritisation()).thenReturn(8);
+ when(subscriptionsDescriptor_9.getPrioritisation()).thenReturn(9);
+
+ final List subscriptionsDescriptors = asList(
+ subscriptionsDescriptor_3,
+ subscriptionsDescriptor_8,
+ subscriptionsDescriptor_4,
+ subscriptionsDescriptor_2,
+ subscriptionsDescriptor_9,
+ subscriptionsDescriptor_5,
+ subscriptionsDescriptor_7,
+ subscriptionsDescriptor_6,
+ subscriptionsDescriptor_1
+ );
+
+ subscriptionsDescriptors.sort(priorityComparatorProvider.getSubscriptionDescriptorComparator());
+
+ assertThat(subscriptionsDescriptors.get(0).getPrioritisation(), is(1));
+ assertThat(subscriptionsDescriptors.get(1).getPrioritisation(), is(2));
+ assertThat(subscriptionsDescriptors.get(2).getPrioritisation(), is(3));
+ assertThat(subscriptionsDescriptors.get(3).getPrioritisation(), is(4));
+ assertThat(subscriptionsDescriptors.get(4).getPrioritisation(), is(5));
+ assertThat(subscriptionsDescriptors.get(5).getPrioritisation(), is(6));
+ assertThat(subscriptionsDescriptors.get(6).getPrioritisation(), is(7));
+ assertThat(subscriptionsDescriptors.get(7).getPrioritisation(), is(8));
+ assertThat(subscriptionsDescriptors.get(8).getPrioritisation(), is(9));
+ }
+
+ @Test
+ public void shouldSortBySubscriptionPriority() throws Exception {
+
+ final Subscription subscription_1 = mock(Subscription.class);
+ final Subscription subscription_2 = mock(Subscription.class);
+ final Subscription subscription_3 = mock(Subscription.class);
+ final Subscription subscription_4 = mock(Subscription.class);
+ final Subscription subscription_5 = mock(Subscription.class);
+ final Subscription subscription_6 = mock(Subscription.class);
+ final Subscription subscription_7 = mock(Subscription.class);
+ final Subscription subscription_8 = mock(Subscription.class);
+ final Subscription subscription_9 = mock(Subscription.class);
+
+ when(subscription_1.getPrioritisation()).thenReturn(1);
+ when(subscription_2.getPrioritisation()).thenReturn(2);
+ when(subscription_3.getPrioritisation()).thenReturn(3);
+ when(subscription_4.getPrioritisation()).thenReturn(4);
+ when(subscription_5.getPrioritisation()).thenReturn(5);
+ when(subscription_6.getPrioritisation()).thenReturn(6);
+ when(subscription_7.getPrioritisation()).thenReturn(7);
+ when(subscription_8.getPrioritisation()).thenReturn(8);
+ when(subscription_9.getPrioritisation()).thenReturn(9);
+
+ final List subscriptions = asList(
+ subscription_3,
+ subscription_8,
+ subscription_4,
+ subscription_2,
+ subscription_9,
+ subscription_5,
+ subscription_7,
+ subscription_6,
+ subscription_1
+ );
+
+ subscriptions.sort(priorityComparatorProvider.getSubscriptionComparator());
+
+ assertThat(subscriptions.get(0).getPrioritisation(), is(1));
+ assertThat(subscriptions.get(1).getPrioritisation(), is(2));
+ assertThat(subscriptions.get(2).getPrioritisation(), is(3));
+ assertThat(subscriptions.get(3).getPrioritisation(), is(4));
+ assertThat(subscriptions.get(4).getPrioritisation(), is(5));
+ assertThat(subscriptions.get(5).getPrioritisation(), is(6));
+ assertThat(subscriptions.get(6).getPrioritisation(), is(7));
+ assertThat(subscriptions.get(7).getPrioritisation(), is(8));
+ assertThat(subscriptions.get(8).getPrioritisation(), is(9));
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapperTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapperTest.java
new file mode 100644
index 000000000..23e5c2df6
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupDetailsMapperTest.java
@@ -0,0 +1,66 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.Subscription;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionCatchupDetailsMapperTest {
+
+ @Spy
+ private PriorityComparatorProvider priorityComparatorProvider = new PriorityComparatorProvider();
+
+ @InjectMocks
+ private SubscriptionCatchupDetailsMapper subscriptionCatchupDetailsMapper;
+
+ @Test
+ public void shouldGetSubscriptionCatchupDetailsFromSubscriptionsDescriptor() throws Exception {
+
+ final String componentName = "EVENT_LISTENER";
+ final String subscriptionName_1 = "subscriptionName_1";
+ final String eventSourceName_1 = "eventSourceName_1";
+ final String subscriptionName_2 = "subscriptionName_2";
+ final String eventSourceName_2 = "eventSourceName_2";
+
+ final SubscriptionsDescriptor subscriptionsDescriptor = mock(SubscriptionsDescriptor.class);
+ final Subscription subscription_1 = mock(Subscription.class);
+ final Subscription subscription_2 = mock(Subscription.class);
+
+ when(subscription_1.getPrioritisation()).thenReturn(100);
+ when(subscription_2.getPrioritisation()).thenReturn(200);
+
+ when(subscriptionsDescriptor.getServiceComponent()).thenReturn(componentName);
+ when(subscriptionsDescriptor.getSubscriptions()).thenReturn(asList(subscription_2, subscription_1));
+ when(subscription_1.getName()).thenReturn(subscriptionName_1);
+ when(subscription_1.getEventSourceName()).thenReturn(eventSourceName_1);
+ when(subscription_2.getName()).thenReturn(subscriptionName_2);
+ when(subscription_2.getEventSourceName()).thenReturn(eventSourceName_2);
+
+ final List subscriptionCatchupDetails = subscriptionCatchupDetailsMapper.toSubscriptionCatchupDetails(subscriptionsDescriptor)
+ .collect(toList());
+
+ assertThat(subscriptionCatchupDetails.size(), is(2));
+
+ assertThat(subscriptionCatchupDetails.get(0).getComponentName(), is(componentName));
+ assertThat(subscriptionCatchupDetails.get(0).getSubscriptionName(), is(subscriptionName_1));
+ assertThat(subscriptionCatchupDetails.get(0).getEventSourceName(), is(eventSourceName_1));
+ assertThat(subscriptionCatchupDetails.get(1).getComponentName(), is(componentName));
+ assertThat(subscriptionCatchupDetails.get(1).getSubscriptionName(), is(subscriptionName_2));
+ assertThat(subscriptionCatchupDetails.get(1).getEventSourceName(), is(eventSourceName_2));
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProviderTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProviderTest.java
new file mode 100644
index 000000000..e1e4db0f5
--- /dev/null
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/process/SubscriptionCatchupProviderTest.java
@@ -0,0 +1,80 @@
+package uk.gov.justice.services.eventstore.management.catchup.process;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
+import uk.gov.justice.services.eventstore.management.commands.IndexerCatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
+import uk.gov.justice.subscription.domain.subscriptiondescriptor.SubscriptionsDescriptor;
+import uk.gov.justice.subscription.registry.SubscriptionsDescriptorsRegistry;
+
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionCatchupProviderTest {
+
+ @Mock
+ private SubscriptionsDescriptorsRegistry subscriptionsDescriptorsRegistry;
+
+ @Mock
+ private CanCatchupFilter canCatchupFilter;
+
+ @Spy
+ private PriorityComparatorProvider priorityComparatorProvider = new PriorityComparatorProvider();
+
+ @Mock
+ private SubscriptionCatchupDetailsMapper subscriptionCatchupDetailsMapper;
+
+ @InjectMocks
+ private SubscriptionCatchupProvider subscriptionCatchupProvider;
+
+ @Test
+ public void shouldFindSubscriptionsToCatchup() throws Exception {
+
+ final CatchupCommand catchupCommand = new IndexerCatchupCommand();
+
+ final SubscriptionsDescriptor subscriptionsDescriptor_1 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_2 = mock(SubscriptionsDescriptor.class);
+ final SubscriptionsDescriptor subscriptionsDescriptor_3 = mock(SubscriptionsDescriptor.class);
+
+ when(subscriptionsDescriptor_1.getPrioritisation()).thenReturn(20);
+ when(subscriptionsDescriptor_2.getPrioritisation()).thenReturn(40);
+ when(subscriptionsDescriptor_3.getPrioritisation()).thenReturn(60);
+
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_1_1 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_1_2 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_3_1 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_3_2 = mock(SubscriptionCatchupDetails.class);
+
+ when(subscriptionsDescriptorsRegistry.getAll()).thenReturn(asList(
+ subscriptionsDescriptor_3,
+ subscriptionsDescriptor_1,
+ subscriptionsDescriptor_2
+ ));
+ when(canCatchupFilter.canCatchup(subscriptionsDescriptor_1, catchupCommand)).thenReturn(true);
+ when(canCatchupFilter.canCatchup(subscriptionsDescriptor_2, catchupCommand)).thenReturn(false);
+ when(canCatchupFilter.canCatchup(subscriptionsDescriptor_3, catchupCommand)).thenReturn(true);
+ when(subscriptionCatchupDetailsMapper.toSubscriptionCatchupDetails(subscriptionsDescriptor_1)).thenReturn(Stream.of(subscriptionCatchupDetails_1_1, subscriptionCatchupDetails_1_2));
+ when(subscriptionCatchupDetailsMapper.toSubscriptionCatchupDetails(subscriptionsDescriptor_3)).thenReturn(Stream.of(subscriptionCatchupDetails_3_1, subscriptionCatchupDetails_3_2));
+
+ final List subscriptionCatchupDetails = subscriptionCatchupProvider.getBySubscription(catchupCommand);
+
+ assertThat(subscriptionCatchupDetails.size(), is(4));
+ assertThat(subscriptionCatchupDetails.get(0), is(subscriptionCatchupDetails_1_1));
+ assertThat(subscriptionCatchupDetails.get(1), is(subscriptionCatchupDetails_1_2));
+ assertThat(subscriptionCatchupDetails.get(2), is(subscriptionCatchupDetails_3_1));
+ assertThat(subscriptionCatchupDetails.get(3), is(subscriptionCatchupDetails_3_2));
+ }
+}
diff --git a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java
index cc4d4c7af..30d16e339 100644
--- a/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java
+++ b/event-store-management/event-store-management-core/src/test/java/uk/gov/justice/services/eventstore/management/catchup/state/CatchupStateManagerTest.java
@@ -1,19 +1,18 @@
package uk.gov.justice.services.eventstore.management.catchup.state;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import uk.gov.justice.services.common.util.UtcClock;
import uk.gov.justice.services.eventstore.management.catchup.process.CatchupInProgress;
-import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
-import uk.gov.justice.services.eventstore.management.commands.IndexerCatchupCommand;
+import uk.gov.justice.services.eventstore.management.events.catchup.SubscriptionCatchupDetails;
import java.time.ZonedDateTime;
import java.util.List;
-import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@@ -26,102 +25,73 @@ public class CatchupStateManagerTest {
private CatchupStateManager catchupStateManager;
@Test
- public void shouldMaintainACacheOfAllEventCatchupsInProgress() throws Exception {
+ public void shouldMaintainACacheOfAllCatchupsInProgress() throws Exception {
- final EventCatchupCommand eventCatchupCommand = new EventCatchupCommand();
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(true));
- final CatchupInProgress indexCatchupInProgress = mock(CatchupInProgress.class);
- when(indexCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup");
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_1 = mock(SubscriptionCatchupDetails.class);
+ final SubscriptionCatchupDetails subscriptionCatchupDetails_2 = mock(SubscriptionCatchupDetails.class);
- catchupStateManager.addCatchupInProgress(indexCatchupInProgress, new IndexerCatchupCommand());
+ final List subscriptionCatchupDetailsList = asList(
+ subscriptionCatchupDetails_1,
+ subscriptionCatchupDetails_2
+ );
- assertThat(catchupStateManager.getAllCatchupsInProgress(eventCatchupCommand).isEmpty(), is(true));
+ final ZonedDateTime catchupStartedAt = new UtcClock().now();
- final ZonedDateTime startedAt = new UtcClock().now();
+ catchupStateManager.newCatchupInProgress(subscriptionCatchupDetailsList, catchupStartedAt);
- final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt);
- final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1));
- final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2));
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails_1), is(true));
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails_2), is(true));
- catchupStateManager.addCatchupInProgress(catchupInProgress_1, eventCatchupCommand);
- catchupStateManager.addCatchupInProgress(catchupInProgress_2, eventCatchupCommand);
- catchupStateManager.addCatchupInProgress(catchupInProgress_3, eventCatchupCommand);
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(false));
+ }
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), eventCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), eventCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), eventCatchupCommand), is(true));
+ @Test
+ public void shouldRemoveCatchupsInProgress() throws Exception {
- final List allCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(eventCatchupCommand);
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(true));
- assertThat(allCatchupsInProgress.size(), is(3));
- assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3));
+ final SubscriptionCatchupDetails subscriptionCatchupDetails = mock(SubscriptionCatchupDetails.class);
- final CatchupInProgress removedCatchupInProgress = catchupStateManager.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), eventCatchupCommand);
+ final List subscriptionCatchupDetailsList = singletonList(subscriptionCatchupDetails);
- assertThat(removedCatchupInProgress, is(catchupInProgress_2));
+ final ZonedDateTime catchupStartedAt = new UtcClock().now();
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), eventCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), eventCatchupCommand), is(false));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), eventCatchupCommand), is(true));
+ catchupStateManager.newCatchupInProgress(subscriptionCatchupDetailsList, catchupStartedAt);
- final List currentCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(eventCatchupCommand);
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails), is(true));
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(false));
- assertThat(currentCatchupsInProgress.size(), is(2));
- assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3));
+ final CatchupInProgress catchupInProgress = catchupStateManager.removeCatchupInProgress(subscriptionCatchupDetails);
- catchupStateManager.clear(eventCatchupCommand);
+ assertThat(catchupInProgress.getSubscriptionCatchupDetails(), is(subscriptionCatchupDetails));
+ assertThat(catchupInProgress.getStartedAt(), is(catchupStartedAt));
- assertThat(catchupStateManager.noCatchupsInProgress(eventCatchupCommand), is(true));
- assertThat(catchupStateManager.getAllCatchupsInProgress(eventCatchupCommand).isEmpty(), is(true));
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails), is(false));
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(true));
}
@Test
- public void shouldMaintainACacheOfAllIndexCatchupsInProgress() throws Exception {
-
- final IndexerCatchupCommand indexerCatchupCommand = new IndexerCatchupCommand();
-
- final CatchupInProgress eventCatchupInProgress = mock(CatchupInProgress.class);
- when(eventCatchupInProgress.getSubscriptionName()).thenReturn("different_catchup");
-
- catchupStateManager.addCatchupInProgress(eventCatchupInProgress, new EventCatchupCommand());
-
- assertThat(catchupStateManager.getAllCatchupsInProgress(indexerCatchupCommand).isEmpty(), is(true));
-
- final ZonedDateTime startedAt = new UtcClock().now();
-
- final CatchupInProgress catchupInProgress_1 = new CatchupInProgress("subscription_1", startedAt);
- final CatchupInProgress catchupInProgress_2 = new CatchupInProgress("subscription_2", startedAt.plusMinutes(1));
- final CatchupInProgress catchupInProgress_3 = new CatchupInProgress("subscription_3", startedAt.plusMinutes(2));
-
- catchupStateManager.addCatchupInProgress(catchupInProgress_1, indexerCatchupCommand);
- catchupStateManager.addCatchupInProgress(catchupInProgress_2, indexerCatchupCommand);
- catchupStateManager.addCatchupInProgress(catchupInProgress_3, indexerCatchupCommand);
-
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), indexerCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), indexerCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), indexerCatchupCommand), is(true));
+ public void shouldClearCatchups() throws Exception {
- final List allCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(indexerCatchupCommand);
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(true));
- assertThat(allCatchupsInProgress.size(), is(3));
- assertThat(allCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_2, catchupInProgress_3));
+ final SubscriptionCatchupDetails subscriptionCatchupDetails = mock(SubscriptionCatchupDetails.class);
- final CatchupInProgress removedCatchupInProgress = catchupStateManager.removeCatchupInProgress(catchupInProgress_2.getSubscriptionName(), indexerCatchupCommand);
+ final List subscriptionCatchupDetailsList = singletonList(subscriptionCatchupDetails);
- assertThat(removedCatchupInProgress, is(catchupInProgress_2));
+ final ZonedDateTime catchupStartedAt = new UtcClock().now();
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_1.getSubscriptionName(), indexerCatchupCommand), is(true));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_2.getSubscriptionName(), indexerCatchupCommand), is(false));
- assertThat(catchupStateManager.isCatchupInProgress(catchupInProgress_3.getSubscriptionName(), indexerCatchupCommand), is(true));
+ catchupStateManager.newCatchupInProgress(subscriptionCatchupDetailsList, catchupStartedAt);
- final List currentCatchupsInProgress = catchupStateManager.getAllCatchupsInProgress(indexerCatchupCommand);
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails), is(true));
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(false));
- assertThat(currentCatchupsInProgress.size(), is(2));
- assertThat(currentCatchupsInProgress, CoreMatchers.hasItems(catchupInProgress_1, catchupInProgress_3));
+ catchupStateManager.clear();
- catchupStateManager.clear(indexerCatchupCommand);
+ assertThat(catchupStateManager.isCatchupInProgress(subscriptionCatchupDetails), is(false));
+ assertThat(catchupStateManager.noCatchupsInProgress(), is(true));
- assertThat(catchupStateManager.noCatchupsInProgress(indexerCatchupCommand), is(true));
- assertThat(catchupStateManager.getAllCatchupsInProgress(indexerCatchupCommand).isEmpty(), is(true));
}
}
diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedForSubscriptionEvent.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedEvent.java
similarity index 61%
rename from event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedForSubscriptionEvent.java
rename to event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedEvent.java
index 0067e745f..afc05cfb8 100644
--- a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedForSubscriptionEvent.java
+++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/CatchupStartedEvent.java
@@ -3,24 +3,25 @@
import uk.gov.justice.services.eventstore.management.commands.CatchupCommand;
import java.time.ZonedDateTime;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
-public class CatchupStartedForSubscriptionEvent {
+public class CatchupStartedEvent {
private final UUID commandId;
- private final String subscriptionName;
private final CatchupCommand catchupCommand;
+ private final List subscriptionCatchupDefinition;
private final ZonedDateTime catchupStartedAt;
- public CatchupStartedForSubscriptionEvent(
+ public CatchupStartedEvent(
final UUID commandId,
- final String subscriptionName,
final CatchupCommand catchupCommand,
+ final List subscriptionCatchupDefinition,
final ZonedDateTime catchupStartedAt) {
this.commandId = commandId;
- this.subscriptionName = subscriptionName;
this.catchupCommand = catchupCommand;
+ this.subscriptionCatchupDefinition = subscriptionCatchupDefinition;
this.catchupStartedAt = catchupStartedAt;
}
@@ -28,14 +29,14 @@ public UUID getCommandId() {
return commandId;
}
- public String getSubscriptionName() {
- return subscriptionName;
- }
-
public CatchupCommand getCatchupCommand() {
return catchupCommand;
}
+ public List getSubscriptionCatchupDefinition() {
+ return subscriptionCatchupDefinition;
+ }
+
public ZonedDateTime getCatchupStartedAt() {
return catchupStartedAt;
}
@@ -43,25 +44,25 @@ public ZonedDateTime getCatchupStartedAt() {
@Override
public boolean equals(final Object o) {
if (this == o) return true;
- if (!(o instanceof CatchupStartedForSubscriptionEvent)) return false;
- final CatchupStartedForSubscriptionEvent that = (CatchupStartedForSubscriptionEvent) o;
+ if (!(o instanceof CatchupStartedEvent)) return false;
+ final CatchupStartedEvent that = (CatchupStartedEvent) o;
return Objects.equals(commandId, that.commandId) &&
- Objects.equals(subscriptionName, that.subscriptionName) &&
Objects.equals(catchupCommand, that.catchupCommand) &&
+ Objects.equals(subscriptionCatchupDefinition, that.subscriptionCatchupDefinition) &&
Objects.equals(catchupStartedAt, that.catchupStartedAt);
}
@Override
public int hashCode() {
- return Objects.hash(commandId, subscriptionName, catchupCommand, catchupStartedAt);
+ return Objects.hash(commandId, catchupCommand, subscriptionCatchupDefinition, catchupStartedAt);
}
@Override
public String toString() {
- return "CatchupStartedForSubscriptionEvent{" +
+ return "CatchupStartedEvent{" +
"commandId=" + commandId +
- ", subscriptionName='" + subscriptionName + '\'' +
", catchupCommand=" + catchupCommand +
+ ", subscriptionCatchupDefinition=" + subscriptionCatchupDefinition +
", catchupStartedAt=" + catchupStartedAt +
'}';
}
diff --git a/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/SubscriptionCatchupDetails.java b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/SubscriptionCatchupDetails.java
new file mode 100644
index 000000000..654f2e72a
--- /dev/null
+++ b/event-store-management/event-store-management-events/src/main/java/uk/gov/justice/services/eventstore/management/events/catchup/SubscriptionCatchupDetails.java
@@ -0,0 +1,54 @@
+package uk.gov.justice.services.eventstore.management.events.catchup;
+
+import java.util.Objects;
+
+public class SubscriptionCatchupDetails {
+
+ private final String subscriptionName;
+ private final String eventSourceName;
+ private final String componentName;
+
+ public SubscriptionCatchupDetails(final String subscriptionName,
+ final String eventSourceName,
+ final String componentName) {
+ this.subscriptionName = subscriptionName;
+ this.eventSourceName = eventSourceName;
+ this.componentName = componentName;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public String getEventSourceName() {
+ return eventSourceName;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SubscriptionCatchupDetails)) return false;
+ final SubscriptionCatchupDetails that = (SubscriptionCatchupDetails) o;
+ return Objects.equals(subscriptionName, that.subscriptionName) &&
+ Objects.equals(eventSourceName, that.eventSourceName) &&
+ Objects.equals(componentName, that.componentName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subscriptionName, eventSourceName, componentName);
+ }
+
+ @Override
+ public String toString() {
+ return "CatchupFor{" +
+ "subscriptionName='" + subscriptionName + '\'' +
+ ", eventSourceName='" + eventSourceName + '\'' +
+ ", componentName='" + componentName + '\'' +
+ '}';
+ }
+}
diff --git a/event-store-management/pom.xml b/event-store-management/pom.xml
index 21c5dbca3..810496747 100644
--- a/event-store-management/pom.xml
+++ b/event-store-management/pom.xml
@@ -16,6 +16,7 @@
event-store-management-core
event-store-management-events
event-store-management-commands
+ event-store-management-command-handler-extension
diff --git a/test-utils-event-store/test-utils-event/src/main/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCaller.java b/test-utils-event-store/test-utils-event/src/main/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCaller.java
deleted file mode 100644
index ab1560013..000000000
--- a/test-utils-event-store/test-utils-event/src/main/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCaller.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package uk.gov.justice.services.test.utils.jmx;
-
-import static uk.gov.justice.services.jmx.system.command.client.connection.JmxParametersBuilder.jmxParameters;
-import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;
-
-import uk.gov.justice.services.eventstore.management.commands.AddTriggerCommand;
-import uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand;
-import uk.gov.justice.services.eventstore.management.commands.IndexerCatchupCommand;
-import uk.gov.justice.services.eventstore.management.commands.RebuildCommand;
-import uk.gov.justice.services.eventstore.management.commands.RemoveTriggerCommand;
-import uk.gov.justice.services.eventstore.management.commands.ValidatePublishedEventsCommand;
-import uk.gov.justice.services.eventstore.management.commands.VerifyCatchupCommand;
-import uk.gov.justice.services.jmx.api.command.SystemCommand;
-import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClient;
-import uk.gov.justice.services.jmx.system.command.client.TestSystemCommanderClientFactory;
-import uk.gov.justice.services.jmx.system.command.client.connection.JmxParameters;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class EventStoreSystemCommandCaller {
-
- private static final String HOST = getHost();
- private static final int JMX_PORT = 9990;
- private static final String USERNAME = "admin";
-
- @SuppressWarnings("squid:S2068")
- private static final String PASSWORD = "admin";
-
-
- private final TestSystemCommanderClientFactory testSystemCommanderClientFactory;
- private final JmxParameters jmxParameters;
-
- public EventStoreSystemCommandCaller(final String contextName) {
- this(jmxParameters()
- .withContextName(contextName)
- .withHost(HOST)
- .withPort(JMX_PORT)
- .withUsername(USERNAME)
- .withPassword(PASSWORD)
- .build());
- }
-
- public EventStoreSystemCommandCaller(final JmxParameters jmxParameters) {
- this(jmxParameters, new TestSystemCommanderClientFactory());
- }
-
- @VisibleForTesting
- EventStoreSystemCommandCaller(final JmxParameters jmxParameters, final TestSystemCommanderClientFactory testSystemCommanderClientFactory) {
- this.jmxParameters = jmxParameters;
- this.testSystemCommanderClientFactory = testSystemCommanderClientFactory;
- }
-
- public void callRebuild() {
- callSystemCommand(new RebuildCommand());
- }
-
- public void callCatchup() {
- callSystemCommand(new EventCatchupCommand());
- }
-
- public void callIndexerCatchup() {
- callSystemCommand(new IndexerCatchupCommand());
- }
-
- public void callAddTrigger() {
- callSystemCommand(new AddTriggerCommand());
- }
-
- public void callRemoveTrigger() {
- callSystemCommand(new RemoveTriggerCommand());
- }
-
- public void callValidateCatchup() {
- callSystemCommand(new VerifyCatchupCommand());
- }
-
- public void callValidatePublishedEvents() {
- callSystemCommand(new ValidatePublishedEventsCommand());
- }
-
- private void callSystemCommand(final SystemCommand systemCommand) {
- try (final SystemCommanderClient systemCommanderClient = testSystemCommanderClientFactory.create(jmxParameters)) {
- systemCommanderClient.getRemote(jmxParameters.getContextName()).call(systemCommand.getName());
- }
- }
-}
diff --git a/test-utils-event-store/test-utils-event/src/test/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCallerTest.java b/test-utils-event-store/test-utils-event/src/test/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCallerTest.java
deleted file mode 100644
index 51bc370f9..000000000
--- a/test-utils-event-store/test-utils-event/src/test/java/uk/gov/justice/services/test/utils/jmx/EventStoreSystemCommandCallerTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package uk.gov.justice.services.test.utils.jmx;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static uk.gov.justice.services.eventstore.management.commands.AddTriggerCommand.ADD_TRIGGER;
-import static uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand.CATCHUP;
-import static uk.gov.justice.services.eventstore.management.commands.IndexerCatchupCommand.INDEXER_CATCHUP;
-import static uk.gov.justice.services.eventstore.management.commands.RebuildCommand.REBUILD;
-import static uk.gov.justice.services.eventstore.management.commands.RemoveTriggerCommand.REMOVE_TRIGGER;
-import static uk.gov.justice.services.eventstore.management.commands.ValidatePublishedEventsCommand.VALIDATE_EVENTS;
-import static uk.gov.justice.services.eventstore.management.commands.VerifyCatchupCommand.VERIFY_CATCHUP;
-import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;
-import static uk.gov.justice.services.test.utils.core.reflection.ReflectionUtil.getValueOfField;
-
-import uk.gov.justice.services.jmx.api.mbean.SystemCommanderMBean;
-import uk.gov.justice.services.jmx.system.command.client.SystemCommanderClient;
-import uk.gov.justice.services.jmx.system.command.client.TestSystemCommanderClientFactory;
-import uk.gov.justice.services.jmx.system.command.client.connection.Credentials;
-import uk.gov.justice.services.jmx.system.command.client.connection.JmxParameters;
-
-import java.util.Optional;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventStoreSystemCommandCallerTest {
-
- @Mock
- private TestSystemCommanderClientFactory testSystemCommanderClientFactory;
-
- @Test
- public void shouldCallRebuild() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callRebuild();
-
- verify(systemCommanderMBean).call(REBUILD);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallCatchup() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callCatchup();
-
- verify(systemCommanderMBean).call(CATCHUP);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallIndexerCatchup() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callIndexerCatchup();
-
- verify(systemCommanderMBean).call(INDEXER_CATCHUP);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallAddTrigger() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callAddTrigger();
-
- verify(systemCommanderMBean).call(ADD_TRIGGER);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallRemoveTrigger() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callRemoveTrigger();
-
- verify(systemCommanderMBean).call(REMOVE_TRIGGER);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallValidateCatchup() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callValidateCatchup();
-
- verify(systemCommanderMBean).call(VERIFY_CATCHUP);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCallValidatePublishedEvents() throws Exception {
-
- final String contextName = "contextName";
-
- final JmxParameters jmxParameters = mock(JmxParameters.class);
- final SystemCommanderClient systemCommanderClient = mock(SystemCommanderClient.class);
- final SystemCommanderMBean systemCommanderMBean = mock(SystemCommanderMBean.class);
-
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(jmxParameters, testSystemCommanderClientFactory);
-
- when(jmxParameters.getContextName()).thenReturn(contextName);
- when(testSystemCommanderClientFactory.create(jmxParameters)).thenReturn(systemCommanderClient);
- when(systemCommanderClient.getRemote(contextName)).thenReturn(systemCommanderMBean);
-
- eventStoreSystemCommandCaller.callValidatePublishedEvents();
-
- verify(systemCommanderMBean).call(VALIDATE_EVENTS);
- verify(systemCommanderClient).close();
- }
-
- @Test
- public void shouldCreateWithCorrectDefaultParametersIfInstantiatingUsingTheContextName() throws Exception {
-
- final String contextName = "contextName";
- final EventStoreSystemCommandCaller eventStoreSystemCommandCaller = new EventStoreSystemCommandCaller(contextName);
-
- final JmxParameters jmxParameters = getValueOfField(eventStoreSystemCommandCaller, "jmxParameters", JmxParameters.class);
-
- assertThat(jmxParameters.getContextName(), is(contextName));
- assertThat(jmxParameters.getHost(), is(getHost()));
- assertThat(jmxParameters.getPort(), is(9990));
-
- final Optional credentials = jmxParameters.getCredentials();
-
- if (credentials.isPresent()) {
- assertThat(credentials.get().getUsername(), is("admin"));
- assertThat(credentials.get().getPassword(), is("admin"));
- } else {
- fail();
- }
- }
-}