From b27fb13e1500f8ccba0f17585f88b269339738b0 Mon Sep 17 00:00:00 2001 From: Benster Nzewi Date: Tue, 28 May 2019 15:20:27 +0100 Subject: [PATCH] Add implementation for updating StreamStatus table with component column --- CHANGELOG.md | 4 + .../StreamStatusJdbcRepository.java | 35 +++++--- .../repository/subscription/Subscription.java | 33 ++++---- .../ConsecutiveEventBufferService.java | 15 ++-- .../StreamStatusJdbcRepositoryIT.java | 84 ++++++++++--------- .../StreamStatusJdbcRepositoryTest.java | 6 +- .../ConsecutiveEventBufferServiceTest.java | 62 +++++++------- ...d-component-to-stream-status.changelog.xml | 51 +++++++++++ 8 files changed, 177 insertions(+), 113 deletions(-) create mode 100644 event-buffer/event-buffer-liquibase/src/main/resources/liquibase/event-buffer-changesets/006-add-component-to-stream-status.changelog.xml diff --git a/CHANGELOG.md b/CHANGELOG.md index 31b089385..f8d666714 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.0.0-M31] - 2019-05-29 +### Added +- Added implementation for updating StreamStatus table with component column + ## [2.0.0-M30] - 2019-05-28 ### Fixed - Re-release of 2.0.0-M28 as deployment to Bintray.com failed due to network issues diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepository.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepository.java index 99328b77c..4e6cc250a 100644 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepository.java +++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepository.java @@ -26,15 +26,16 @@ public class StreamStatusJdbcRepository { private static final String PRIMARY_KEY_ID = "stream_id"; private static final String LATEST_POSITION_COLUMN = "position"; private static final String SOURCE = "source"; + private static final String COMPONENT = "component"; /** * Statements */ - private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, position, source FROM stream_status WHERE stream_id=? AND source in (?,'unknown') FOR UPDATE"; - private static final String INSERT = "INSERT INTO stream_status (position, stream_id, source) VALUES (?, ?, ?)"; + private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, position, source, component FROM stream_status WHERE stream_id=? AND component=? AND source in (?,'unknown') FOR UPDATE"; + private static final String INSERT = "INSERT INTO stream_status (position, stream_id, source, component) VALUES (?, ?, ?, ?)"; private static final String INSERT_ON_CONFLICT_DO_NOTHING = INSERT + " ON CONFLICT DO NOTHING"; - private static final String UPDATE = "UPDATE stream_status SET position=?,source=? WHERE stream_id=? and source in (?,'unknown')"; - private static final String UPDATE_UNKNOWN_SOURCE = "UPDATE stream_status SET source=? WHERE stream_id=? and source = 'unknown'"; + private static final String UPDATE = "UPDATE stream_status SET position=?,source=?,component=? WHERE stream_id=? and component=? and source in (?,'unknown')"; + private static final String UPDATE_UNKNOWN_SOURCE = "UPDATE stream_status SET source=?, component=? WHERE stream_id=? and source = 'unknown'"; @Inject private PreparedStatementWrapperFactory preparedStatementWrapperFactory; @@ -67,23 +68,26 @@ public void insert(final Subscription subscription) { ps.setLong(1, subscription.getPosition()); ps.setObject(2, subscription.getStreamId()); ps.setString(3, subscription.getSource()); + ps.setString(4, subscription.getComponent()); ps.executeUpdate(); } catch (SQLException e) { throw new JdbcRepositoryException(format("Exception while storing status of the stream: %s", subscription), e); } } + /** * Tries to insert if database is PostgresSQL and version>=9.5. Uses PostgreSQl-specific sql * clause. Does not fail if status for the given stream already exists + * @param subscription the status of the stream to insert * - * @param subscription the status of the stream to insert */ public void insertOrDoNothing(final Subscription subscription) { try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, INSERT_ON_CONFLICT_DO_NOTHING)) { ps.setLong(1, subscription.getPosition()); ps.setObject(2, subscription.getStreamId()); ps.setString(3, subscription.getSource()); + ps.setString(4, subscription.getComponent()); ps.executeUpdate(); } catch (SQLException e) { throw new JdbcRepositoryException(format("Exception while storing status of the stream in PostgreSQL: %s", subscription), e); @@ -93,15 +97,17 @@ public void insertOrDoNothing(final Subscription subscription) { /** * Insert the given Subscription into the stream status table. + * @param subscription the event to insert * - * @param subscription the event to insert */ public void update(final Subscription subscription) { try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, UPDATE)) { ps.setLong(1, subscription.getPosition()); ps.setString(2, subscription.getSource()); - ps.setObject(3, subscription.getStreamId()); - ps.setString(4, subscription.getSource()); + ps.setObject(3, subscription.getComponent()); + ps.setObject(4, subscription.getStreamId()); + ps.setObject(5, subscription.getComponent()); + ps.setString(6, subscription.getSource()); ps.executeUpdate(); } catch (SQLException e) { throw new JdbcRepositoryException(format("Exception while updating status of the stream: %s", subscription), e); @@ -112,12 +118,14 @@ public void update(final Subscription subscription) { * Returns a Stream of {@link Subscription} for the given stream streamId. * * @param streamId streamId of the stream. + * @param component * @return a {@link Subscription}. */ - public Optional findByStreamIdAndSource(final UUID streamId, final String source) { + public Optional findByStreamIdAndSource(final UUID streamId, final String source, final String component) { try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, SELECT_BY_STREAM_ID_AND_SOURCE)) { ps.setObject(1, streamId); - ps.setObject(2, source); + ps.setObject(2, component); + ps.setObject(3, source); return subscriptionFrom(ps); } catch (SQLException e) { @@ -134,13 +142,14 @@ private Optional subscriptionFrom(final PreparedStatementWrapper p } protected Subscription entityFrom(final ResultSet rs) throws SQLException { - return new Subscription((UUID) rs.getObject(PRIMARY_KEY_ID), rs.getLong(LATEST_POSITION_COLUMN), rs.getString(SOURCE)); + return new Subscription((UUID) rs.getObject(PRIMARY_KEY_ID), rs.getLong(LATEST_POSITION_COLUMN), rs.getString(SOURCE), rs.getString(COMPONENT)); } - public void updateSource(final UUID streamId, final String source) { + public void updateSource(final UUID streamId, final String source, final String component) { try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, UPDATE_UNKNOWN_SOURCE)) { ps.setString(1, source); - ps.setObject(2, streamId); + ps.setObject(2, component); + ps.setObject(3, streamId); ps.executeUpdate(); } catch (SQLException e) { throw new JdbcRepositoryException(format("Exception while updating unknown source of the stream: %s", streamId), e); diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/Subscription.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/Subscription.java index 44028fa4b..bb3d81c37 100644 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/Subscription.java +++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/subscription/Subscription.java @@ -1,10 +1,8 @@ package uk.gov.justice.services.event.buffer.core.repository.subscription; +import java.util.Objects; import java.util.UUID; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; - /** * Entity to represent event subscription */ @@ -13,11 +11,13 @@ public class Subscription { private final UUID streamId; private final long position; private final String source; + private final String component; - public Subscription(final UUID streamId, final long position, final String source) { + public Subscription(final UUID streamId, final long position, final String source, final String component) { this.streamId = streamId; this.position = position; this.source = source; + this.component = component; } public UUID getStreamId() { @@ -32,36 +32,31 @@ public String getSource() { return source; } + public String getComponent() { return component; } + @Override public String toString() { return "Subscription{" + "streamId=" + streamId + ", position=" + position + ", source='" + source + '\'' + + ", component='" + component + '\'' + '}'; } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Subscription that = (Subscription) o; - - return new EqualsBuilder() - .append(streamId, that.streamId) - .append(position, that.position) - .append(source, that.source) - .isEquals(); + final Subscription that = (Subscription) o; + return position == that.position && + Objects.equals(streamId, that.streamId) && + Objects.equals(source, that.source) && + Objects.equals(component, that.component); } @Override public int hashCode() { - return new HashCodeBuilder(17, 37) - .append(streamId) - .append(position) - .append(source) - .toHashCode(); + return Objects.hash(streamId, position, source, component); } } diff --git a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java index ef998098a..d678019de 100644 --- a/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java +++ b/event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferService.java @@ -61,9 +61,9 @@ public Stream currentOrderedEventsWith(final JsonEnvelope incoming final long incomingEventVersion = versionOf(incomingEvent); final String source = getSource(incomingEvent); - streamStatusJdbcRepository.updateSource(streamId, source); - streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, 0L, source)); - final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source) + streamStatusJdbcRepository.updateSource(streamId, source, component); + streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, 0L, source, component)); + final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, component) .orElseThrow(() -> new IllegalStateException("stream status cannot be empty")) .getPosition(); @@ -78,8 +78,8 @@ public Stream currentOrderedEventsWith(final JsonEnvelope incoming } else { logger.trace("Message : {} version is valid sending stream to dispatcher", incomingEvent); - streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source)); - return bufferedEvents(streamId, incomingEvent, incomingEventVersion); + streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source, component)); + return bufferedEvents(streamId, incomingEvent, incomingEventVersion, component); } } @@ -93,14 +93,15 @@ private long versionOf(final JsonEnvelope event) { return incomingEventVersion; } - private Stream bufferedEvents(final UUID streamId, final JsonEnvelope incomingEvent, final long incomingEventVersion) { + private Stream bufferedEvents(final UUID streamId, final JsonEnvelope incomingEvent, final long incomingEventVersion, final String component) { final String source = getSource(incomingEvent); return concat(Stream.of(incomingEvent), consecutiveEventStreamFromBuffer(streamBufferRepository.findStreamByIdAndSource(streamId, source), incomingEventVersion) .peek(streamBufferEvent -> streamBufferRepository.remove(streamBufferEvent)) .peek(streamBufferEvent -> streamStatusJdbcRepository.update(new Subscription( streamBufferEvent.getStreamId(), streamBufferEvent.getPosition(), - source))) + source, + component))) .map(streamBufferEvent -> jsonObjectEnvelopeConverter.asEnvelope(streamBufferEvent.getEvent()))); } diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryIT.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryIT.java index b1c84468a..3dae13788 100644 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryIT.java +++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryIT.java @@ -25,6 +25,8 @@ public class StreamStatusJdbcRepositoryIT { + public static final String EVENT_LISTENER = "EVENT_LISTENER"; + private static final String COUNT_BY_STREAM_ID = "SELECT count(*) FROM stream_status WHERE stream_id=?"; private static final long INITIAL_POSITION = 0L; @@ -50,7 +52,7 @@ public void shouldNotCreateSeparateInitialSubscriptinForTheNewSourceWhenWeHaveEx final UUID streamId = randomUUID(); initialiseBuffer(streamId, source); - final Subscription subscription = new Subscription(streamId, 2L, source); + final Subscription subscription = new Subscription(streamId, 2L, source, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); @@ -58,7 +60,7 @@ public void shouldNotCreateSeparateInitialSubscriptinForTheNewSourceWhenWeHaveEx final int count = countByStreamId(streamId); assertThat(count, is(1)); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, "sjp"); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, "sjp", EVENT_LISTENER); assertThat(result.get().getSource(), is("sjp")); assertThat(result.get().getPosition(), is(2L)); @@ -73,7 +75,7 @@ public void shouldCreateSeparateInitialSubscriptionForTheNewSourceWhenWeHaveNoEx final int count = countByStreamId(streamId); assertThat(count, is(1)); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER); assertThat(result.get().getSource(), is(source)); assertThat(result.get().getPosition(), is(0L)); @@ -85,7 +87,7 @@ public void shouldAppendToExistingSubscriptionForTheNewSourceWhenWeHaveExistingE final UUID streamId = randomUUID(); initialiseBuffer(streamId, source); - final Subscription subscription = new Subscription(streamId, 2L, source); + final Subscription subscription = new Subscription(streamId, 2L, source, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); @@ -93,7 +95,7 @@ public void shouldAppendToExistingSubscriptionForTheNewSourceWhenWeHaveExistingE final int count = countByStreamId(streamId); assertThat(count, is(1)); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER); assertThat(result.get().getSource(), is("sjp")); assertThat(result.get().getPosition(), is(2L)); @@ -105,12 +107,12 @@ public void shouldUpdateSourceWhenUnknown() throws Exception { final String source = "unknown"; final UUID streamId = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, source)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, source, EVENT_LISTENER)); - final Subscription subscription = new Subscription(streamId, 2L, source); + final Subscription subscription = new Subscription(streamId, 2L, source, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER); assertThat(result.get().getSource(), is(source)); assertThat(result.get().getPosition(), is(2L)); } @@ -119,13 +121,13 @@ public void shouldUpdateSourceWhenUnknown() throws Exception { public void shouldUpdateSourceWhenNotUnknown() throws Exception { final UUID streamId = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "unknown")); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "unknown", EVENT_LISTENER)); final String source = "sjp"; - final Subscription subscription = new Subscription(streamId, 2L, source); + final Subscription subscription = new Subscription(streamId, 2L, source, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER); assertThat(result.get().getSource(), is(source)); assertThat(result.get().getPosition(), is(2L)); } @@ -136,9 +138,9 @@ public void shouldInsertAndReturnSubscription() throws Exception { final long version = 4L; final String source = "source"; - streamStatusJdbcRepository.insert(subscriptionOf(id, version, source)); + streamStatusJdbcRepository.insert(subscriptionOf(id, version, source, EVENT_LISTENER)); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(id, source); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(id, source, EVENT_LISTENER); assertTrue(result.isPresent()); assertThat(result.get().getPosition(), is(version)); assertThat(result.get().getSource(), is(source)); @@ -147,19 +149,19 @@ public void shouldInsertAndReturnSubscription() throws Exception { @Test public void shouldReturnOptionalNotPresentIfStatusNotFound() throws Exception { - Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(randomUUID(), "source"); + Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(randomUUID(), "source", EVENT_LISTENER); assertFalse(result.isPresent()); } @Test public void shouldUpdateVersionForSameStreamIdWithMultipleSources() throws Exception { final UUID id = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(id, 4L, "source 4")); - streamStatusJdbcRepository.update(subscriptionOf(id, 5L, "source 4")); - streamStatusJdbcRepository.insert(subscriptionOf(id, 4L, "source 5")); - streamStatusJdbcRepository.update(subscriptionOf(id, 5L, "source 5")); + streamStatusJdbcRepository.insert(subscriptionOf(id, 4L, "source 4", EVENT_LISTENER)); + streamStatusJdbcRepository.update(subscriptionOf(id, 5L, "source 4", EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(id, 4L, "source 5", EVENT_LISTENER)); + streamStatusJdbcRepository.update(subscriptionOf(id, 5L, "source 5", EVENT_LISTENER)); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(id, "source 5"); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(id, "source 5", EVENT_LISTENER); assertTrue(result.isPresent()); assertThat(result.get().getPosition(), is(5L)); assertThat(result.get().getSource(), is("source 5")); @@ -169,13 +171,13 @@ public void shouldUpdateVersionForSameStreamIdWithMultipleSources() throws Excep @Test public void shouldNotUpdateVersionForANewSourceField() throws Exception { final UUID streamId = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2")); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2", EVENT_LISTENER)); final String source3 = "source3"; - final Subscription subscription = new Subscription(streamId, 1L, source3); + final Subscription subscription = new Subscription(streamId, 1L, source3, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source3); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source3, EVENT_LISTENER); assertThat(result, is(Optional.empty())); } @@ -183,13 +185,13 @@ public void shouldNotUpdateVersionForANewSourceField() throws Exception { public void shouldUpdateVersionForAnExistingSourceField() throws Exception { final UUID streamId = randomUUID(); final String source3 = "source3"; - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 4L, source3)); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source4")); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 4L, source3, EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source4", EVENT_LISTENER)); - final Subscription subscription = new Subscription(streamId, 5L, source3); + final Subscription subscription = new Subscription(streamId, 5L, source3, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source3); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source3, EVENT_LISTENER); assertTrue(result.isPresent()); assertThat(result.get().getPosition(), is(5L)); assertThat(result.get().getSource(), is(source3)); @@ -199,15 +201,15 @@ public void shouldUpdateVersionForAnExistingSourceField() throws Exception { @Test public void shouldUpdateNewVersionNumberForExistingSourceWhenMultipleSourceEventsExist() throws Exception { final UUID streamId = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source1")); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2")); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 2L, "source3")); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source1", EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2", EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 2L, "source3", EVENT_LISTENER)); final String existingSource = "source2"; - final Subscription subscription = new Subscription(streamId, 2L, existingSource); + final Subscription subscription = new Subscription(streamId, 2L, existingSource, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, existingSource); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, existingSource, EVENT_LISTENER); assertTrue(result.isPresent()); assertThat(result.get().getPosition(), is(2L)); assertThat(result.get().getSource(), is(existingSource)); @@ -216,31 +218,31 @@ public void shouldUpdateNewVersionNumberForExistingSourceWhenMultipleSourceEvent @Test public void shouldNotUpdateNewVersionNumberForNewSourceWhenMultipleSourceEventsExist() throws Exception { final UUID streamId = randomUUID(); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source1")); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2")); - streamStatusJdbcRepository.insert(subscriptionOf(streamId, 2L, "source3")); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source1", EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 1L, "source2", EVENT_LISTENER)); + streamStatusJdbcRepository.insert(subscriptionOf(streamId, 2L, "source3", EVENT_LISTENER)); final String newSource = "source4"; - final Subscription subscription = new Subscription(streamId, 1L, newSource); + final Subscription subscription = new Subscription(streamId, 1L, newSource, EVENT_LISTENER); streamStatusJdbcRepository.update(subscription); - final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, newSource); + final Optional result = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, newSource, EVENT_LISTENER); assertThat(result, is(Optional.empty())); } - private Subscription subscriptionOf(final UUID id, final Long version, final String source) { - return new Subscription(id, version, source); + private Subscription subscriptionOf(final UUID id, final Long version, final String source, final String component) { + return new Subscription(id, version, source, component); } private long initialiseBuffer(final UUID streamId, final String source) { - streamStatusJdbcRepository.updateSource(streamId, source); - final Optional currentStatus = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source); + streamStatusJdbcRepository.updateSource(streamId, source, EVENT_LISTENER); + final Optional currentStatus = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER); if (!currentStatus.isPresent()) { //this is to address race condition //in case of primary key violation the exception gets thrown, event goes back into topic and the transaction gets retried streamStatusJdbcRepository - .insert(new Subscription(streamId, INITIAL_POSITION, source)); + .insert(new Subscription(streamId, INITIAL_POSITION, source, EVENT_LISTENER)); return INITIAL_POSITION; } else { diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryTest.java index b32772b6d..7964a5fd5 100644 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryTest.java +++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/subscription/StreamStatusJdbcRepositoryTest.java @@ -24,6 +24,8 @@ @RunWith(MockitoJUnitRunner.class) public class StreamStatusJdbcRepositoryTest { + public static final String EVENT_LISTENER = "EVENT_LISTENER"; + @SuppressWarnings("unused") @Spy private PreparedStatementWrapperFactory preparedStatementWrapperFactory = new PreparedStatementWrapperFactory(); @@ -55,12 +57,12 @@ public void shouldAttemptToInsert() throws Exception { final String source = "a source"; - when(connection.prepareStatement("INSERT INTO stream_status (position, stream_id, source) VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) + when(connection.prepareStatement("INSERT INTO stream_status (position, stream_id, source, component) VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING")) .thenReturn(preparedStatement); final UUID streamId = randomUUID(); final long position = 1l; - streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, position, source)); + streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, position, source, EVENT_LISTENER)); verify(preparedStatement).setLong(1, position); verify(preparedStatement).setObject(2, streamId); diff --git a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java index 4b36e4da8..cfcf28823 100644 --- a/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java +++ b/event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/service/ConsecutiveEventBufferServiceTest.java @@ -37,7 +37,7 @@ @RunWith(MockitoJUnitRunner.class) public class ConsecutiveEventBufferServiceTest { - public static final String EVENT_LISTENER = "event_listener"; + public static final String EVENT_LISTENER = "EVENT_LISTENER"; @Mock @SuppressWarnings("unused") private Logger logger; @@ -97,7 +97,7 @@ public void shouldIgnoreObsoleteEvent() { final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(4L); final JsonEnvelope event_3 = envelopeFrom( @@ -118,9 +118,9 @@ public void shouldIgnoreObsoleteEvent() { final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); } @@ -132,7 +132,7 @@ public void shouldReturnEventThatIsInCorrectOrder() { final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(4L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty()); @@ -147,9 +147,9 @@ public void shouldReturnEventThatIsInCorrectOrder() { final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); } @@ -165,7 +165,7 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() { final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(4L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn(Stream.empty()); @@ -173,10 +173,10 @@ public void shouldIncrementVersionOnIncomingEventInCorrectOrder() { final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).update(new Subscription(streamId, 5L, source)); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).update(new Subscription(streamId, 5L, source, EVENT_LISTENER)); } @@ -193,9 +193,9 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() { final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(4L); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(new Subscription(streamId, 4L, source))); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(new Subscription(streamId, 4L, source, EVENT_LISTENER))); when(jsonObjectEnvelopeConverter.asJsonString(incomingEvent)).thenReturn("someStringRepresentation"); @@ -203,9 +203,9 @@ public void shouldStoreEventIncomingNotInOrderAndReturnEmpty() { final InOrder inOrder = inOrder(streamStatusJdbcRepository, streamBufferRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); inOrder.verify(streamBufferRepository).insert(new EventBufferEvent(streamId, 6L, "someStringRepresentation", source, EVENT_LISTENER)); assertThat(returnedEvents, empty()); @@ -220,7 +220,7 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(2L); when(streamBufferRepository.findStreamByIdAndSource(streamId, source)).thenReturn( @@ -250,9 +250,9 @@ public void shouldReturnConsecutiveBufferedEventsIfIncomingEventFillsTheVersionG final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); } @@ -265,7 +265,7 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() { final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(2L); final StreamCloseSpy sourceStreamSpy = new StreamCloseSpy(); @@ -290,9 +290,9 @@ public void shoulCloseSourceStreamOnConsecutiveStreamClose() { final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); } @@ -304,7 +304,7 @@ public void shouldRemoveEventsFromBufferOnceStreamed() { final String eventName = "source.event.name"; final Subscription subscription = mock(Subscription.class); - when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)).thenReturn(of(subscription)); + when(streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, EVENT_LISTENER)).thenReturn(of(subscription)); when(subscription.getPosition()).thenReturn(2L); @@ -334,9 +334,9 @@ public void shouldRemoveEventsFromBufferOnceStreamed() { final InOrder inOrder = inOrder(streamStatusJdbcRepository); - inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source); - inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source)); - inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source); + inOrder.verify(streamStatusJdbcRepository).updateSource(streamId, source, EVENT_LISTENER); + inOrder.verify(streamStatusJdbcRepository).insertOrDoNothing(new Subscription(streamId, 0L, source, EVENT_LISTENER)); + inOrder.verify(streamStatusJdbcRepository).findByStreamIdAndSource(streamId, source, EVENT_LISTENER); verify(streamBufferRepository).remove(event4); verify(streamBufferRepository).remove(event5); diff --git a/event-buffer/event-buffer-liquibase/src/main/resources/liquibase/event-buffer-changesets/006-add-component-to-stream-status.changelog.xml b/event-buffer/event-buffer-liquibase/src/main/resources/liquibase/event-buffer-changesets/006-add-component-to-stream-status.changelog.xml new file mode 100644 index 000000000..d0ce46df0 --- /dev/null +++ b/event-buffer/event-buffer-liquibase/src/main/resources/liquibase/event-buffer-changesets/006-add-component-to-stream-status.changelog.xml @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + + + + + + + +