Skip to content

Commit

Permalink
Merge b27fb13 into 66a8139
Browse files Browse the repository at this point in the history
  • Loading branch information
BenNzewi committed May 29, 2019
2 parents 66a8139 + b27fb13 commit 7129515
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 113 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
## [2.0.0-M31] - 2019-05-29
### Added
- Added implementation for updating StreamStatus table with component column

## [2.0.0-M30] - 2019-05-28
### Fixed
- Re-release of 2.0.0-M28 as deployment to Bintray.com failed due to network issues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ public class StreamStatusJdbcRepository {
private static final String PRIMARY_KEY_ID = "stream_id";
private static final String LATEST_POSITION_COLUMN = "position";
private static final String SOURCE = "source";
private static final String COMPONENT = "component";

/**
* Statements
*/
private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, position, source FROM stream_status WHERE stream_id=? AND source in (?,'unknown') FOR UPDATE";
private static final String INSERT = "INSERT INTO stream_status (position, stream_id, source) VALUES (?, ?, ?)";
private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, position, source, component FROM stream_status WHERE stream_id=? AND component=? AND source in (?,'unknown') FOR UPDATE";
private static final String INSERT = "INSERT INTO stream_status (position, stream_id, source, component) VALUES (?, ?, ?, ?)";
private static final String INSERT_ON_CONFLICT_DO_NOTHING = INSERT + " ON CONFLICT DO NOTHING";
private static final String UPDATE = "UPDATE stream_status SET position=?,source=? WHERE stream_id=? and source in (?,'unknown')";
private static final String UPDATE_UNKNOWN_SOURCE = "UPDATE stream_status SET source=? WHERE stream_id=? and source = 'unknown'";
private static final String UPDATE = "UPDATE stream_status SET position=?,source=?,component=? WHERE stream_id=? and component=? and source in (?,'unknown')";
private static final String UPDATE_UNKNOWN_SOURCE = "UPDATE stream_status SET source=?, component=? WHERE stream_id=? and source = 'unknown'";

@Inject
private PreparedStatementWrapperFactory preparedStatementWrapperFactory;
Expand Down Expand Up @@ -67,23 +68,26 @@ public void insert(final Subscription subscription) {
ps.setLong(1, subscription.getPosition());
ps.setObject(2, subscription.getStreamId());
ps.setString(3, subscription.getSource());
ps.setString(4, subscription.getComponent());
ps.executeUpdate();
} catch (SQLException e) {
throw new JdbcRepositoryException(format("Exception while storing status of the stream: %s", subscription), e);
}
}


/**
* Tries to insert if database is PostgresSQL and version>=9.5. Uses PostgreSQl-specific sql
* clause. Does not fail if status for the given stream already exists
* @param subscription the status of the stream to insert
*
* @param subscription the status of the stream to insert
*/
public void insertOrDoNothing(final Subscription subscription) {
try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, INSERT_ON_CONFLICT_DO_NOTHING)) {
ps.setLong(1, subscription.getPosition());
ps.setObject(2, subscription.getStreamId());
ps.setString(3, subscription.getSource());
ps.setString(4, subscription.getComponent());
ps.executeUpdate();
} catch (SQLException e) {
throw new JdbcRepositoryException(format("Exception while storing status of the stream in PostgreSQL: %s", subscription), e);
Expand All @@ -93,15 +97,17 @@ public void insertOrDoNothing(final Subscription subscription) {

/**
* Insert the given Subscription into the stream status table.
* @param subscription the event to insert
*
* @param subscription the event to insert
*/
public void update(final Subscription subscription) {
try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, UPDATE)) {
ps.setLong(1, subscription.getPosition());
ps.setString(2, subscription.getSource());
ps.setObject(3, subscription.getStreamId());
ps.setString(4, subscription.getSource());
ps.setObject(3, subscription.getComponent());
ps.setObject(4, subscription.getStreamId());
ps.setObject(5, subscription.getComponent());
ps.setString(6, subscription.getSource());
ps.executeUpdate();
} catch (SQLException e) {
throw new JdbcRepositoryException(format("Exception while updating status of the stream: %s", subscription), e);
Expand All @@ -112,12 +118,14 @@ public void update(final Subscription subscription) {
* Returns a Stream of {@link Subscription} for the given stream streamId.
*
* @param streamId streamId of the stream.
* @param component
* @return a {@link Subscription}.
*/
public Optional<Subscription> findByStreamIdAndSource(final UUID streamId, final String source) {
public Optional<Subscription> findByStreamIdAndSource(final UUID streamId, final String source, final String component) {
try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, SELECT_BY_STREAM_ID_AND_SOURCE)) {
ps.setObject(1, streamId);
ps.setObject(2, source);
ps.setObject(2, component);
ps.setObject(3, source);
return subscriptionFrom(ps);

} catch (SQLException e) {
Expand All @@ -134,13 +142,14 @@ private Optional<Subscription> subscriptionFrom(final PreparedStatementWrapper p
}

protected Subscription entityFrom(final ResultSet rs) throws SQLException {
return new Subscription((UUID) rs.getObject(PRIMARY_KEY_ID), rs.getLong(LATEST_POSITION_COLUMN), rs.getString(SOURCE));
return new Subscription((UUID) rs.getObject(PRIMARY_KEY_ID), rs.getLong(LATEST_POSITION_COLUMN), rs.getString(SOURCE), rs.getString(COMPONENT));
}

public void updateSource(final UUID streamId, final String source) {
public void updateSource(final UUID streamId, final String source, final String component) {
try (final PreparedStatementWrapper ps = preparedStatementWrapperFactory.preparedStatementWrapperOf(dataSource, UPDATE_UNKNOWN_SOURCE)) {
ps.setString(1, source);
ps.setObject(2, streamId);
ps.setObject(2, component);
ps.setObject(3, streamId);
ps.executeUpdate();
} catch (SQLException e) {
throw new JdbcRepositoryException(format("Exception while updating unknown source of the stream: %s", streamId), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package uk.gov.justice.services.event.buffer.core.repository.subscription;

import java.util.Objects;
import java.util.UUID;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

/**
* Entity to represent event subscription
*/
Expand All @@ -13,11 +11,13 @@ public class Subscription {
private final UUID streamId;
private final long position;
private final String source;
private final String component;

public Subscription(final UUID streamId, final long position, final String source) {
public Subscription(final UUID streamId, final long position, final String source, final String component) {
this.streamId = streamId;
this.position = position;
this.source = source;
this.component = component;
}

public UUID getStreamId() {
Expand All @@ -32,36 +32,31 @@ public String getSource() {
return source;
}

public String getComponent() { return component; }

@Override
public String toString() {
return "Subscription{" +
"streamId=" + streamId +
", position=" + position +
", source='" + source + '\'' +
", component='" + component + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) return true;

if (o == null || getClass() != o.getClass()) return false;

Subscription that = (Subscription) o;

return new EqualsBuilder()
.append(streamId, that.streamId)
.append(position, that.position)
.append(source, that.source)
.isEquals();
final Subscription that = (Subscription) o;
return position == that.position &&
Objects.equals(streamId, that.streamId) &&
Objects.equals(source, that.source) &&
Objects.equals(component, that.component);
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(streamId)
.append(position)
.append(source)
.toHashCode();
return Objects.hash(streamId, position, source, component);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming
final long incomingEventVersion = versionOf(incomingEvent);
final String source = getSource(incomingEvent);

streamStatusJdbcRepository.updateSource(streamId, source);
streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, 0L, source));
final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source)
streamStatusJdbcRepository.updateSource(streamId, source, component);
streamStatusJdbcRepository.insertOrDoNothing(new Subscription(streamId, 0L, source, component));
final long currentVersion = streamStatusJdbcRepository.findByStreamIdAndSource(streamId, source, component)
.orElseThrow(() -> new IllegalStateException("stream status cannot be empty"))
.getPosition();

Expand All @@ -78,8 +78,8 @@ public Stream<JsonEnvelope> currentOrderedEventsWith(final JsonEnvelope incoming

} else {
logger.trace("Message : {} version is valid sending stream to dispatcher", incomingEvent);
streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source));
return bufferedEvents(streamId, incomingEvent, incomingEventVersion);
streamStatusJdbcRepository.update(new Subscription(streamId, incomingEventVersion, source, component));
return bufferedEvents(streamId, incomingEvent, incomingEventVersion, component);
}
}

Expand All @@ -93,14 +93,15 @@ private long versionOf(final JsonEnvelope event) {
return incomingEventVersion;
}

private Stream<JsonEnvelope> bufferedEvents(final UUID streamId, final JsonEnvelope incomingEvent, final long incomingEventVersion) {
private Stream<JsonEnvelope> bufferedEvents(final UUID streamId, final JsonEnvelope incomingEvent, final long incomingEventVersion, final String component) {
final String source = getSource(incomingEvent);
return concat(Stream.of(incomingEvent), consecutiveEventStreamFromBuffer(streamBufferRepository.findStreamByIdAndSource(streamId, source), incomingEventVersion)
.peek(streamBufferEvent -> streamBufferRepository.remove(streamBufferEvent))
.peek(streamBufferEvent -> streamStatusJdbcRepository.update(new Subscription(
streamBufferEvent.getStreamId(),
streamBufferEvent.getPosition(),
source)))
source,
component)))
.map(streamBufferEvent -> jsonObjectEnvelopeConverter.asEnvelope(streamBufferEvent.getEvent())));
}

Expand Down
Loading

0 comments on commit 7129515

Please sign in to comment.