From a3af2b2aa03ba9ce535dd8f541744bcd42a463a7 Mon Sep 17 00:00:00 2001 From: amckenzie Date: Tue, 13 Nov 2018 15:02:55 +0000 Subject: [PATCH] Remove ansi buffer initialisation strategy --- ...iSQLBasedBufferInitialisationStrategy.java | 33 ------- .../service/BufferInitialisationStrategy.java | 18 ---- .../BufferInitialisationStrategyProducer.java | 42 --------- .../ConsecutiveEventBufferService.java | 6 +- ...ategy.java => EventBufferInitialiser.java} | 20 +++-- ...BasedBufferInitialisationStrategyTest.java | 80 ----------------- ...ferInitialisationStrategyProducerTest.java | 86 ------------------- .../ConsecutiveEventBufferServiceTest.java | 16 ++-- ...t.java => EventBufferInitialiserTest.java} | 18 ++-- .../publishing/EventDeQueuerTimerBean.java | 1 - .../it/EventBufferAndFilterChainIT.java | 7 +- .../interceptors/it/EventBufferIT.java | 7 +- 12 files changed, 39 insertions(+), 295 deletions(-) delete mode 100644 event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategy.java delete mode 100644 event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategy.java delete mode 100644 event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducer.java rename event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/{PostgreSQLBasedBufferInitialisationStrategy.java => EventBufferInitialiser.java} (66%) delete mode 100644 event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategyTest.java delete mode 100644 event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducerTest.java rename event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/{PostgreSQLBasedBufferInitialisationStrategyTest.java => EventBufferInitialiserTest.java} (76%) diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategy.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategy.java deleted file mode 100644 index 63f0dcc48..000000000 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -package uk.gov.justice.services.event.buffer.core.service; - -import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription; -import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository; - -import java.util.Optional; -import java.util.UUID; - -public class AnsiSQLBasedBufferInitialisationStrategy implements BufferInitialisationStrategy { - - private static final long INITIAL_VERSION = 0L; - private final SubscriptionJdbcRepository subscriptionJdbcRepository; - - public AnsiSQLBasedBufferInitialisationStrategy(final SubscriptionJdbcRepository subscriptionJdbcRepository) { - this.subscriptionJdbcRepository = subscriptionJdbcRepository; - } - - @Override - public long initialiseBuffer(final UUID streamId, final String source) { - subscriptionJdbcRepository.updateSource(streamId,source); - final Optional currentStatus = subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source); - - if (!currentStatus.isPresent()) { - //this is to address race condition - //in case of primary key violation the exception gets thrown, event goes back into topic and the transaction gets retried - subscriptionJdbcRepository.insert(new Subscription(streamId, INITIAL_VERSION, source)); - return INITIAL_VERSION; - - } else { - return currentStatus.get().getPosition(); - } - } -} diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategy.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategy.java deleted file mode 100644 index 69b53aa89..000000000 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategy.java +++ /dev/null @@ -1,18 +0,0 @@ -package uk.gov.justice.services.event.buffer.core.service; - -import java.util.UUID; - -public interface BufferInitialisationStrategy { - - - /** - * Initialises buffer (if not already intialised) and returns the current version of the buffer - * status - * - * @param streamId - id of the stream to be initialised - * @return - version of the last event that was in order - */ - long initialiseBuffer(final UUID streamId, final String source); - - -} diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducer.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducer.java deleted file mode 100644 index bad44b819..000000000 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducer.java +++ /dev/null @@ -1,42 +0,0 @@ -package uk.gov.justice.services.event.buffer.core.service; - - -import uk.gov.justice.services.common.configuration.GlobalValue; -import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Produces; -import javax.inject.Inject; - -import org.slf4j.Logger; - -@ApplicationScoped -public class BufferInitialisationStrategyProducer { - - private static final String INSTANTIATION_ERROR_MSG = "Could not instantiate buffer initialisation strategy."; - - @Inject - @GlobalValue(key = "event.buffer.init.strategy", defaultValue = "uk.gov.justice.services.event.buffer.core.service.PostgreSQLBasedBufferInitialisationStrategy") - String strategyClass; - - @Inject - Logger logger; - - @Inject - SubscriptionJdbcRepository subscriptionJdbcRepository; - - @Produces - public BufferInitialisationStrategy bufferInitialisationStrategy() { - logger.info("Instantiating {}", strategyClass); - try { - Class clazz = Class.forName(strategyClass); - Constructor constructor = clazz.getConstructor(SubscriptionJdbcRepository.class); - return (BufferInitialisationStrategy) constructor.newInstance(subscriptionJdbcRepository); - } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { - throw new IllegalArgumentException(INSTANTIATION_ERROR_MSG, e); - } - } -} diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java index f7449c86b..12403155b 100644 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java +++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java @@ -42,7 +42,7 @@ public class ConsecutiveEventBufferService implements EventBufferService { private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter; @Inject - private BufferInitialisationStrategy bufferInitialisationStrategy; + private EventBufferInitialiser eventBufferInitialiser; /** @@ -64,7 +64,7 @@ public Stream currentOrderedEventsWith(final JsonEnvelope incoming final long incomingEventVersion = versionOf(incomingEvent); final String source = getSource(incomingEvent); - final long currentVersion = bufferInitialisationStrategy.initialiseBuffer(streamId, source); + final long currentVersion = eventBufferInitialiser.initialiseBuffer(streamId, source); if (incomingEventObsolete(incomingEventVersion, currentVersion)) { logger.warn("Message : {} is an obsolete version", incomingEvent); @@ -115,7 +115,7 @@ private void addToBuffer(final JsonEnvelope incomingEvent, final UUID streamId, } private Stream consecutiveEventStreamFromBuffer(final Stream messageBuffer, final long currentVersion) { - return stream(new ConsecutiveEventsSpliterator(messageBuffer, currentVersion), false).onClose(() -> messageBuffer.close()); + return stream(new ConsecutiveEventsSpliterator(messageBuffer, currentVersion), false).onClose(messageBuffer::close); } private boolean incomingEventNotInOrder(final long incomingEventVersion, final long currentVersion) { diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategy.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiser.java similarity index 66% rename from event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategy.java rename to event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiser.java index b88aac70e..03f632d81 100644 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategy.java +++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiser.java @@ -5,16 +5,22 @@ import java.util.UUID; -public class PostgreSQLBasedBufferInitialisationStrategy implements BufferInitialisationStrategy { - private static final long INITIAL_VERSION = 0L; +import javax.inject.Inject; - private final SubscriptionJdbcRepository subscriptionJdbcRepository; +public class EventBufferInitialiser { - public PostgreSQLBasedBufferInitialisationStrategy(final SubscriptionJdbcRepository subscriptionJdbcRepository) { - this.subscriptionJdbcRepository = subscriptionJdbcRepository; - } + private static final long INITIAL_VERSION = 0L; + + @Inject + private SubscriptionJdbcRepository subscriptionJdbcRepository; - @Override + /** + * Initialises buffer (if not already intialised) and returns the current version of the buffer + * status + * + * @param streamId - id of the stream to be initialised + * @return - version of the last event that was in order + */ public long initialiseBuffer(final UUID streamId, final String source) { subscriptionJdbcRepository.updateSource(streamId,source); subscriptionJdbcRepository.insertOrDoNothing(new Subscription(streamId, INITIAL_VERSION, source)); diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategyTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategyTest.java deleted file mode 100644 index c8a9e5695..000000000 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/AnsiSQLBasedBufferInitialisationStrategyTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package uk.gov.justice.services.event.buffer.core.service; - -import static java.util.Optional.of; -import static java.util.UUID.randomUUID; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription; -import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository; - -import java.util.Optional; -import java.util.UUID; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class AnsiSQLBasedBufferInitialisationStrategyTest { - - @Mock - private SubscriptionJdbcRepository subscriptionJdbcRepository; - - - private BufferInitialisationStrategy bufferInitialisationStrategy; - - @Before - public void setUp() throws Exception { - bufferInitialisationStrategy = new AnsiSQLBasedBufferInitialisationStrategy(subscriptionJdbcRepository); - } - - @Test - public void shouldAddZeroStatusIfItDoesNotExist() throws Exception { - - final UUID streamId = randomUUID(); - final String source = "a source"; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.empty()); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); - - verify(subscriptionJdbcRepository).insert(new Subscription(streamId, 0L, source)); - } - - @Test - public void shouldReturnVersionZeroIfBufferStatusDoesNotExist() throws Exception { - - final UUID streamId = randomUUID(); - final String source = "a source"; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.empty()); - assertThat(bufferInitialisationStrategy.initialiseBuffer(streamId, source), is(0L)); - } - - @Test - public void shouldNotAddStatusIfItExists() throws Exception { - final UUID streamId = randomUUID(); - final String source = "a source"; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 3L, source))); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); - - verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source); - verify(subscriptionJdbcRepository).updateSource(streamId, source); - - verifyNoMoreInteractions(subscriptionJdbcRepository); - } - - @Test - public void shouldReturnCurrentVersionIfItExists() throws Exception { - final UUID streamId = randomUUID(); - final String source = "a source"; - final long currentVersion = 3L; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, currentVersion, source))); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); - - assertThat(bufferInitialisationStrategy.initialiseBuffer(streamId, source), is(currentVersion)); - } -} diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducerTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducerTest.java deleted file mode 100644 index 42eee80df..000000000 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/BufferInitialisationStrategyProducerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package uk.gov.justice.services.event.buffer.core.service; - - -import static java.util.UUID.randomUUID; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription; -import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository; - -import java.util.Optional; -import java.util.UUID; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; -import org.slf4j.Logger; - -@RunWith(MockitoJUnitRunner.class) -public class BufferInitialisationStrategyProducerTest { - - @Mock - private Logger logger; - - @Mock - private SubscriptionJdbcRepository subscriptionJdbcRepository; - - @InjectMocks - private BufferInitialisationStrategyProducer strategyProducer; - - - @Test - public void shouldProducePostgresStrategy() throws Exception { - strategyProducer.strategyClass = "uk.gov.justice.services.event.buffer.core.service.PostgreSQLBasedBufferInitialisationStrategy"; - assertThat(strategyProducer.bufferInitialisationStrategy(), instanceOf(PostgreSQLBasedBufferInitialisationStrategy.class)); - } - - @Test - public void shouldProduceAnsiSQLStrategy() throws Exception { - strategyProducer.strategyClass = "uk.gov.justice.services.event.buffer.core.service.AnsiSQLBasedBufferInitialisationStrategy"; - assertThat(strategyProducer.bufferInitialisationStrategy(), instanceOf(AnsiSQLBasedBufferInitialisationStrategy.class)); - } - - @Test - public void shouldPassRepositoryToPostgresStrategy() throws Exception { - strategyProducer.strategyClass = "uk.gov.justice.services.event.buffer.core.service.PostgreSQLBasedBufferInitialisationStrategy"; - final BufferInitialisationStrategy bufferInitialisationStrategy = strategyProducer.bufferInitialisationStrategy(); - final UUID streamId = randomUUID(); - final String source = "a source"; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.of(new Subscription(streamId, 0L, source))); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); - verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source); - } - - @Test - public void shouldPassRepositoryToAnsiStrategy() throws Exception { - strategyProducer.strategyClass = "uk.gov.justice.services.event.buffer.core.service.AnsiSQLBasedBufferInitialisationStrategy"; - final BufferInitialisationStrategy bufferInitialisationStrategy = strategyProducer.bufferInitialisationStrategy(); - final UUID streamId = randomUUID(); - final String source = "a source"; - when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.of(new Subscription(streamId, 0L, source))); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); - verify(subscriptionJdbcRepository).findByStreamIdAndSource(streamId, source); - } - - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Test - public void shouldThrowExceptionIfClassDoesNotExist() { - strategyProducer.strategyClass = "uk.gov.justice.services.event.buffer.core.service.SomeUnknowClazzz"; - - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Could not instantiate buffer initialisation strategy"); - - strategyProducer.bufferInitialisationStrategy(); - - } - -} diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java index 7dcfe0b80..fd38c530c 100644 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java +++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java @@ -50,7 +50,7 @@ public class ConsecutiveEventBufferServiceTest { private JsonObjectEnvelopeConverter jsonObjectEnvelopeConverter; @Mock - private BufferInitialisationStrategy bufferInitialisationStrategy; + private EventBufferInitialiser eventBufferInitialiser; @InjectMocks private ConsecutiveEventBufferService bufferService; @@ -95,7 +95,7 @@ public void shouldIgnoreObsoleteEvent() { final String source = "source"; final String eventName = "source.event.name"; - when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L); + when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L); final JsonEnvelope event_3 = envelopeFrom( metadataBuilder().withId(randomUUID()).withName(eventName).withStreamId(streamId).withVersion(3L), @@ -121,7 +121,7 @@ public void shouldReturnEventThatIsInCorrectOrder() { final String source = "source"; final String eventName = "source.event.name"; - when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L); + when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(4L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty()); @@ -145,7 +145,7 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() { createObjectBuilder() ); - when(bufferInitialisationStrategy.initialiseBuffer(streamId, source)).thenReturn(4L); + when(eventBufferInitialiser.initialiseBuffer(streamId, source)).thenReturn(4L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty()); bufferService.currentOrderedEventsWith(incomingEvent); @@ -164,7 +164,7 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() { createObjectBuilder() ); - when(bufferInitialisationStrategy.initialiseBuffer(streamId, source)).thenReturn(4L); + when(eventBufferInitialiser.initialiseBuffer(streamId, source)).thenReturn(4L); when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(Optional.of(new Subscription(streamId, 4L, source))); when(jsonObjectEnvelopeConverter.asJsonString(incomingEvent)).thenReturn("someStringRepresentation"); @@ -183,7 +183,7 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG final String source = "source"; final String eventName = "source.event.name"; - when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); + when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn( Stream.of(new EventBufferEvent(streamId, 4L, "someEventContent4", "source_4"), @@ -218,7 +218,7 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() { final String source = "source"; final String eventName = "source.event.name"; - when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); + when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); final StreamCloseSpy sourceStreamSpy = new StreamCloseSpy(); @@ -247,7 +247,7 @@ public void shouldRemoveEventsFromBufferOnceStreamed() { final UUID streamId = randomUUID(); final String source = "source"; final String eventName = "source.event.name"; - when(bufferInitialisationStrategy.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); + when(eventBufferInitialiser.initialiseBuffer(eq(streamId), eq(source))).thenReturn(2L); final EventBufferEvent event4 = new EventBufferEvent(streamId, 4L, "someEventContent4", "source_1"); diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategyTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiserTest.java similarity index 76% rename from event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategyTest.java rename to event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiserTest.java index b467b1825..ad8faddac 100644 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategyTest.java +++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/EventBufferInitialiserTest.java @@ -13,31 +13,27 @@ import java.util.UUID; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class PostgreSQLBasedBufferInitialisationStrategyTest { +public class EventBufferInitialiserTest { @Mock private SubscriptionJdbcRepository subscriptionJdbcRepository; - private BufferInitialisationStrategy bufferInitialisationStrategy; - - @Before - public void setUp() throws Exception { - bufferInitialisationStrategy = new PostgreSQLBasedBufferInitialisationStrategy(subscriptionJdbcRepository); - } + @InjectMocks + private EventBufferInitialiser eventBufferInitialiser; @Test public void shouldTryInsertingZeroBufferStatus() throws Exception { final UUID streamId = randomUUID(); final String source = "a source"; when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 3, source))); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); + eventBufferInitialiser.initialiseBuffer(streamId, source); verify(subscriptionJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0, source)); } @@ -48,7 +44,7 @@ public void shouldReturnCurrentVersion() { final String source = "a source"; when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 3, source))); - final long currentVersion = bufferInitialisationStrategy.initialiseBuffer(streamId, source); + final long currentVersion = eventBufferInitialiser.initialiseBuffer(streamId, source); assertThat(currentVersion, is(3L)); } @@ -58,7 +54,7 @@ public void shouldThrowExceptionIfStatusNotFound() throws Exception { final String source = "a source"; when(subscriptionJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(empty()); - bufferInitialisationStrategy.initialiseBuffer(streamId, source); + eventBufferInitialiser.initialiseBuffer(streamId, source); } } diff --git a/event-sourcing/event-publisher/event-publisher-process/src/main/java/uk/gov/justice/services/eventsourcing/publishing/EventDeQueuerTimerBean.java b/event-sourcing/event-publisher/event-publisher-process/src/main/java/uk/gov/justice/services/eventsourcing/publishing/EventDeQueuerTimerBean.java index 508c75edb..4fd256e51 100644 --- a/event-sourcing/event-publisher/event-publisher-process/src/main/java/uk/gov/justice/services/eventsourcing/publishing/EventDeQueuerTimerBean.java +++ b/event-sourcing/event-publisher/event-publisher-process/src/main/java/uk/gov/justice/services/eventsourcing/publishing/EventDeQueuerTimerBean.java @@ -45,7 +45,6 @@ public void startTimerService() { } @Timeout - @SuppressWarnings({"StatementWithEmptyBody", "unused"}) public void doDeQueueAndPublish() { while (eventDeQueuerAndPublisher.deQueueAndPublish()) { diff --git a/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferAndFilterChainIT.java b/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferAndFilterChainIT.java index 5e96b828a..4f8f11fb6 100644 --- a/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferAndFilterChainIT.java +++ b/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferAndFilterChainIT.java @@ -68,7 +68,7 @@ import uk.gov.justice.services.core.sender.SenderProducer; import uk.gov.justice.services.event.buffer.api.AbstractEventFilter; import uk.gov.justice.services.event.buffer.api.AllowAllEventFilter; -import uk.gov.justice.services.event.buffer.core.service.BufferInitialisationStrategyProducer; +import uk.gov.justice.services.event.buffer.core.service.EventBufferInitialiser; import uk.gov.justice.services.event.buffer.core.service.ConsecutiveEventBufferService; import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper; import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider; @@ -160,7 +160,6 @@ public class EventBufferAndFilterChainIT { ConsecutiveEventBufferService.class, EventBufferInterceptor.class, - BufferInitialisationStrategyProducer.class, LoggerProducer.class, EmptySystemUserProvider.class, SystemUserUtil.class, @@ -199,7 +198,9 @@ public class EventBufferAndFilterChainIT { BackwardsCompatibleJsonSchemaValidator.class, MediaTypesMappingCacheInitialiser.class, - SchemaIdMappingCacheInitialiser.class + SchemaIdMappingCacheInitialiser.class, + + EventBufferInitialiser.class }) public WebApp war() { return new WebApp() diff --git a/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferIT.java b/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferIT.java index cce78cab1..557a19dd3 100644 --- a/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferIT.java +++ b/interceptors/event-listener-interceptors/src/test/java/uk/gov/justice/services/components/event/listener/interceptors/it/EventBufferIT.java @@ -68,7 +68,7 @@ import uk.gov.justice.services.event.buffer.core.repository.streambuffer.EventBufferJdbcRepository; import uk.gov.justice.services.event.buffer.core.repository.subscription.Subscription; import uk.gov.justice.services.event.buffer.core.repository.subscription.SubscriptionJdbcRepository; -import uk.gov.justice.services.event.buffer.core.service.BufferInitialisationStrategyProducer; +import uk.gov.justice.services.event.buffer.core.service.EventBufferInitialiser; import uk.gov.justice.services.event.buffer.core.service.ConsecutiveEventBufferService; import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryHelper; import uk.gov.justice.services.jdbc.persistence.ViewStoreJdbcDataSourceProvider; @@ -176,7 +176,6 @@ public Properties postgresqlConfiguration() { PolicyEvaluator.class, ConsecutiveEventBufferService.class, - BufferInitialisationStrategyProducer.class, LoggerProducer.class, EmptySystemUserProvider.class, SystemUserUtil.class, @@ -214,7 +213,9 @@ public Properties postgresqlConfiguration() { BackwardsCompatibleJsonSchemaValidator.class, MediaTypesMappingCacheInitialiser.class, - SchemaIdMappingCacheInitialiser.class + SchemaIdMappingCacheInitialiser.class, + + EventBufferInitialiser.class }) public WebApp war() {