Skip to content

Commit

Permalink
Merge pull request #109 from CJSCommonPlatform/name-position-in-strea…
Browse files Browse the repository at this point in the history
…m-references-correctly

Name positionInSteam variables and methods correctly
  • Loading branch information
mapingo committed May 29, 2019
2 parents 66a8139 + b8a3b00 commit bcf5d69
Show file tree
Hide file tree
Showing 38 changed files with 160 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class AggregateSnapshot<T extends Aggregate> implements Serializable {

private final UUID streamId;
private final Long versionId;
private final Long positionInStream;
private final String type;
private final byte[] aggregateByteRepresentation;

Expand All @@ -34,7 +34,7 @@ public AggregateSnapshot(final UUID streamId, final Long versionId, final Class<

public AggregateSnapshot(final UUID streamId, final Long versionId, final String type, final byte[] aggregateByteRepresentation) {
this.streamId = streamId;
this.versionId = versionId;
this.positionInStream = versionId;
this.type = type;
this.aggregateByteRepresentation = aggregateByteRepresentation;
}
Expand All @@ -43,8 +43,8 @@ public UUID getStreamId() {
return streamId;
}

public Long getVersionId() {
return versionId;
public Long getPositionInStream() {
return positionInStream;
}

public String getType() {
Expand Down Expand Up @@ -74,7 +74,7 @@ public boolean equals(Object o) {

return new EqualsBuilder()
.append(streamId, that.streamId)
.append(versionId, that.versionId)
.append(positionInStream, that.positionInStream)
.append(type, that.type)
.append(aggregateByteRepresentation, that.aggregateByteRepresentation)
.isEquals();
Expand All @@ -84,7 +84,7 @@ public boolean equals(Object o) {
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(streamId)
.append(versionId)
.append(positionInStream)
.append(type)
.append(aggregateByteRepresentation)
.toHashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void shouldCreateAnAggregateSnapshot() throws Exception {
final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate);

assertThat(snapshot.getStreamId(), is(STREAM_ID));
assertThat(snapshot.getVersionId(), is(VERSION_ID));
assertThat(snapshot.getPositionInStream(), is(VERSION_ID));
assertThat(snapshot.getType(), is(TYPE));
assertThat(snapshot.getAggregateByteRepresentation(), is(SerializationUtils.serialize(aggregate)));
}
Expand All @@ -41,7 +41,7 @@ public void shouldGetAnAggregateSnapshot() throws Exception {
final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate);

assertThat(snapshot.getStreamId(), is(STREAM_ID));
assertThat(snapshot.getVersionId(), is(VERSION_ID));
assertThat(snapshot.getPositionInStream(), is(VERSION_ID));
assertThat(snapshot.getType(), is(TYPE));
assertThat(snapshot.getAggregate(streamStrategy), is(aggregate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public void storeSnapshot(final AggregateSnapshot aggregateSnapshot) {
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement ps = connection.prepareStatement(SQL_INSERT_EVENT_LOG)) {
ps.setObject(1, aggregateSnapshot.getStreamId());
ps.setLong(2, aggregateSnapshot.getVersionId());
ps.setLong(2, aggregateSnapshot.getPositionInStream());
ps.setString(3, aggregateSnapshot.getType());
ps.setBytes(4, aggregateSnapshot.getAggregateByteRepresentation());
ps.executeUpdate();
} catch (final SQLException e) {
logger.error("Error while storing a snapshot for {} at version {}", aggregateSnapshot.getStreamId(), aggregateSnapshot.getVersionId(), e);
logger.error("Error while storing a snapshot for {} at version {}", aggregateSnapshot.getStreamId(), aggregateSnapshot.getPositionInStream(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public <T extends Aggregate> Optional<VersionedAggregate<T>> getLatestVersionedA

if (aggregateSnapshot.isPresent()) {
final AggregateSnapshot<T> snapshotValue = aggregateSnapshot.get();
final VersionedAggregate<T> versionedAggregate = new VersionedAggregate<T>(snapshotValue.getVersionId(), snapshotValue.getAggregate(streamStrategy));
final VersionedAggregate<T> versionedAggregate = new VersionedAggregate<T>(snapshotValue.getPositionInStream(), snapshotValue.getAggregate(streamStrategy));
return Optional.of(versionedAggregate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void shouldStoreABrandNewSnapshotWhenEventCountInTheStreamReachesThreshol
assertThat(snapshot, not(nullValue()));
assertThat(snapshot.get().getType(), equalTo(TYPE));
assertThat(snapshot.get().getStreamId(), equalTo(streamId));
assertThat(snapshot.get().getVersionId(), equalTo(25L));
assertThat(snapshot.get().getPositionInStream(), equalTo(25L));

assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(25));
assertThat(aggregateFromSnapshot.numberOfAppliedEvents(), is(25));
Expand Down Expand Up @@ -284,7 +284,7 @@ public void shouldNotCreateNewSnapshotOnAggregateChangeWhenWeJustOneExistingSnap
assertThat(snapshotChanged.isPresent(), equalTo(true));
assertThat(snapshotChanged.get().getType(), equalTo(aggregateClass.getName()));
assertThat(snapshotChanged.get().getStreamId(), equalTo(streamId));
assertThat(snapshotChanged.get().getVersionId(), equalTo(25L));
assertThat(snapshotChanged.get().getPositionInStream(), equalTo(25L));

assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(48));
final TestAggregate aggregateFromSnapshot = snapshotChanged.get().getAggregate(new DefaultObjectInputStreamStrategy());
Expand Down Expand Up @@ -318,7 +318,7 @@ public void shouldCreateNewSnapshotOnAggregateChangeWhenWeHaveMultipleExistingSn
assertThat(newSnapshot.isPresent(), equalTo(true));
assertThat(newSnapshot.get().getType(), equalTo(aggregateClass.getName()));
assertThat(newSnapshot.get().getStreamId(), equalTo(streamId));
assertThat(newSnapshot.get().getVersionId(), equalTo(initialNumberOfSnapshots * SNAPSHOT_THRESHOLD));
assertThat(newSnapshot.get().getPositionInStream(), equalTo(initialNumberOfSnapshots * SNAPSHOT_THRESHOLD));
assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(123));
TestAggregate aggregateFromSnapshot2 = (TestAggregate) newSnapshot.get().getAggregate(new DefaultObjectInputStreamStrategy());
assertThat(aggregateFromSnapshot2.numberOfAppliedEvents(), is(100));
Expand Down Expand Up @@ -358,7 +358,7 @@ public void shouldNotStoreANewSnapshotOnTopOfExistingSnapshotsWhenThresholdNotMe
assertThat(snapshot.isPresent(), equalTo(true));
assertThat(snapshot.get().getType(), equalTo(TYPE));
assertThat(snapshot.get().getStreamId(), equalTo(streamId));
assertThat(snapshot.get().getVersionId(), equalTo(25L));
assertThat(snapshot.get().getPositionInStream(), equalTo(25L));
assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(48));

TestAggregate aggregateFromSnapshot = snapshot.get().getAggregate(new DefaultObjectInputStreamStrategy());
Expand All @@ -379,7 +379,7 @@ public void shouldStoreANewSnapshotOnTopOfExistingSnapshot() throws Exception {
assertThat(snapshot.isPresent(), equalTo(true));
assertThat(snapshot.get().getType(), equalTo(TYPE));
assertThat(snapshot.get().getStreamId(), equalTo(streamId));
assertThat(snapshot.get().getVersionId(), equalTo(50L));
assertThat(snapshot.get().getPositionInStream(), equalTo(50L));
assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(50));

TestAggregate aggregateFromSnapshot = snapshot.get().getAggregate(new DefaultObjectInputStreamStrategy());
Expand Down Expand Up @@ -421,7 +421,7 @@ public void shouldRebuildSnapshotOnAggregateModelChange() throws Exception {
assertThat(newSnapshot.isPresent(), equalTo(true));
assertThat(newSnapshot.get().getType(), equalTo(newAggregateClass.getName()));
assertThat(newSnapshot.get().getStreamId(), equalTo(streamId));
assertThat(newSnapshot.get().getVersionId(), equalTo(123L));
assertThat(newSnapshot.get().getPositionInStream(), equalTo(123L));
assertThat(rowCount(SQL_EVENT_LOG_COUNT_BY_STREAM_ID, streamId), is(123));
assertThat(rowCount(SQL_EVENT_STREAM_COUNT_BY_STREAM_ID, streamId), is(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void shouldCreateSnapshotIfStrategyMandatesCreation() throws AggregateCha
verify(snapshotRepository, times(1)).storeSnapshot(snapshotArgumentCaptor.capture());

assertThat(snapshotArgumentCaptor.getValue(), notNullValue());
assertThat(snapshotArgumentCaptor.getValue().getVersionId(), is(currentAggregateVersionId));
assertThat(snapshotArgumentCaptor.getValue().getPositionInStream(), is(currentAggregateVersionId));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public Properties configuration() {
@Test
public void shouldPublishEventsInTheEventLogTable() throws Exception {
final UUID streamId = randomUUID();
final Event event_1 = eventBuilder().withStreamId(streamId).withName("event_1").withEventNumber(1L).withSequenceId(1L).build();
final Event event_2 = eventBuilder().withStreamId(streamId).withName("event_2").withEventNumber(2L).withSequenceId(2L).build();
final Event event_3 = eventBuilder().withStreamId(streamId).withName("event_3").withEventNumber(3L).withSequenceId(3L).build();
final Event event_1 = eventBuilder().withStreamId(streamId).withName("event_1").withEventNumber(1L).withPositionInStream(1L).build();
final Event event_2 = eventBuilder().withStreamId(streamId).withName("event_2").withEventNumber(2L).withPositionInStream(2L).build();
final Event event_3 = eventBuilder().withStreamId(streamId).withName("event_3").withEventNumber(3L).withPositionInStream(3L).build();

testEventInserter.insertIntoEventLog(event_1);
testEventInserter.insertIntoEventLog(event_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public long getEventNumber(final UUID eventId, final DataSource dataSource) thro
}
}

public long getPreviousEventNumber(final long sequenceId, final DataSource dataSource) throws SQLException {
public long getPreviousEventNumber(final long eventNumber, final DataSource dataSource) throws SQLException {
try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_PREVIOUS_EVENT_NUMBER_SQL)) {
preparedStatement.setLong(1, sequenceId);
preparedStatement.setLong(1, eventNumber);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void insertPublishedEvent(final PublishedEvent publishedEvent, final Data
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_INTO_LINKED_EVENT_SQL)) {
preparedStatement.setObject(1, publishedEvent.getId());
preparedStatement.setObject(2, publishedEvent.getStreamId());
preparedStatement.setLong(3, publishedEvent.getSequenceId());
preparedStatement.setLong(3, publishedEvent.getPositionInStream());
preparedStatement.setString(4, publishedEvent.getName());
preparedStatement.setString(5, publishedEvent.getPayload());
preparedStatement.setString(6, publishedEvent.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public PublishedEvent create(
return new PublishedEvent(
event.getId(),
event.getStreamId(),
event.getSequenceId(),
event.getPositionInStream(),
event.getName(),
updatedMetadata.asJsonObject().toString(),
event.getPayload(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public PublishedEvent toPublishedEvent(final Event event, final long previousEve
return new PublishedEvent(
eventId,
event.getStreamId(),
event.getSequenceId(),
event.getPositionInStream(),
event.getName(),
event.getMetadata(),
event.getPayload(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void shouldPopEventsFromThePrePublishQueue() throws Exception {

assertThat(eventDeQueuer.popNextEventId(tableName).isPresent(), is(false));

final Event event_1 = eventBuilder().withName("example.first-event").withSequenceId(1L).build();
final Event event_2 = eventBuilder().withName("example.second-event").withSequenceId(2L).build();
final Event event_3 = eventBuilder().withName("example.third-event").withSequenceId(3L).build();
final Event event_1 = eventBuilder().withName("example.first-event").withPositionInStream(1L).build();
final Event event_2 = eventBuilder().withName("example.second-event").withPositionInStream(2L).build();
final Event event_3 = eventBuilder().withName("example.third-event").withPositionInStream(3L).build();

testEventInserter.insertIntoEventLog(event_1);
testEventInserter.insertIntoEventLog(event_2);
Expand All @@ -78,9 +78,9 @@ public void shouldPopEventsFromThePublishQueue() throws Exception {

assertThat(eventDeQueuer.popNextEventId(tableName).isPresent(), is(false));

final Event event_1 = eventBuilder().withName("example.first-event").withSequenceId(1L).build();
final Event event_2 = eventBuilder().withName("example.second-event").withSequenceId(2L).build();
final Event event_3 = eventBuilder().withName("example.third-event").withSequenceId(3L).build();
final Event event_1 = eventBuilder().withName("example.first-event").withPositionInStream(1L).build();
final Event event_2 = eventBuilder().withName("example.second-event").withPositionInStream(2L).build();
final Event event_3 = eventBuilder().withName("example.third-event").withPositionInStream(3L).build();

insertInPublishQueue(event_1, event_2, event_3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public void initDatabase() throws Exception {
@Test
public void shouldGetTheSequenceNumberOfAnEvent() throws Exception {

final Event event_1 = eventBuilder().withName("event-1").withSequenceId(101L).build();
final Event event_2 = eventBuilder().withName("event-2").withSequenceId(102L).build();
final Event event_3 = eventBuilder().withName("event-3").withSequenceId(103L).build();
final Event event_4 = eventBuilder().withName("event-4").withSequenceId(104L).build();
final Event event_1 = eventBuilder().withName("event-1").withPositionInStream(101L).build();
final Event event_2 = eventBuilder().withName("event-2").withPositionInStream(102L).build();
final Event event_3 = eventBuilder().withName("event-3").withPositionInStream(103L).build();
final Event event_4 = eventBuilder().withName("event-4").withPositionInStream(104L).build();

testEventInserter.insertIntoEventLog(event_1);
testEventInserter.insertIntoEventLog(event_2);
Expand All @@ -73,10 +73,10 @@ public void shouldGetThePreviousSequenceNumberOfAnEvent() throws Exception {
final UUID streamId = randomUUID();
testEventStreamInserter.insertIntoEventStream(streamId, 1l, true, clock.now());

final Event event_1 = eventBuilder().withStreamId(streamId).withName("event-1").withEventNumber(1l).withSequenceId(101L).build();
final Event event_2 = eventBuilder().withStreamId(streamId).withName("event-2").withEventNumber(2l).withSequenceId(102L).build();
final Event event_3 = eventBuilder().withStreamId(streamId).withName("event-3").withEventNumber(3l).withSequenceId(103L).build();
final Event event_4 = eventBuilder().withStreamId(streamId).withName("event-4").withEventNumber(4l).withSequenceId(104L).build();
final Event event_1 = eventBuilder().withStreamId(streamId).withName("event-1").withEventNumber(1l).withPositionInStream(101L).build();
final Event event_2 = eventBuilder().withStreamId(streamId).withName("event-2").withEventNumber(2l).withPositionInStream(102L).build();
final Event event_3 = eventBuilder().withStreamId(streamId).withName("event-3").withEventNumber(3l).withPositionInStream(103L).build();
final Event event_4 = eventBuilder().withStreamId(streamId).withName("event-4").withEventNumber(4l).withPositionInStream(104L).build();

testEventInserter.insertIntoEventLog(event_1);
testEventInserter.insertIntoEventLog(event_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldInsertPublishedEvent() throws Exception {
if (resultSet.next()) {
assertThat(resultSet.getObject(1), is(publishedEvent.getId()));
assertThat(resultSet.getObject(2), is(publishedEvent.getStreamId()));
assertThat(resultSet.getObject(3), is(publishedEvent.getSequenceId()));
assertThat(resultSet.getObject(3), is(publishedEvent.getPositionInStream()));
assertThat(resultSet.getString(4), is(publishedEvent.getName()));
assertThat(resultSet.getString(5), is(publishedEvent.getPayload()));
assertThat(resultSet.getString(6), is(publishedEvent.getMetadata()));
Expand All @@ -82,7 +82,7 @@ public void shouldFetchPublishedEventById() throws Exception {

final PublishedEvent publishedEvent = publishedEventBuilder()
.withName("example.published-event")
.withSequenceId(1L)
.withPositionInStream(1L)
.withEventNumber(1L)
.withPreviousEventNumber(0L)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void shouldCreateAPublishedEventFromAnEvent() throws Exception {

final UUID eventId = randomUUID();
final UUID streamId = randomUUID();
final long sequenceId = 23487L;
final long positionInStream = 23487L;
final String name = "event-name";
final String payload = "payload";
final ZonedDateTime createdAt = new UtcClock().now();
Expand All @@ -51,7 +51,7 @@ public void shouldCreateAPublishedEventFromAnEvent() throws Exception {
final Event event = new Event(
eventId,
streamId,
sequenceId,
positionInStream,
name,
"some metadata",
payload,
Expand All @@ -67,7 +67,7 @@ public void shouldCreateAPublishedEventFromAnEvent() throws Exception {

assertThat(publishedEvent.getId(), is(eventId));
assertThat(publishedEvent.getStreamId(), is(streamId));
assertThat(publishedEvent.getSequenceId(), is(sequenceId));
assertThat(publishedEvent.getPositionInStream(), is(positionInStream));
assertThat(publishedEvent.getName(), is(name));
assertThat(publishedEvent.getPayload(), is(payload));
assertThat(publishedEvent.getMetadata(), is(updatedMetadata.asJsonObject().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void shouldUpdateThePublishQueueTableIfARowIsInsertedIntoTheEventLogTable

final UUID eventLogId = randomUUID();
final UUID streamId = randomUUID();
final int sequenceId = 98123674;

final ZonedDateTime now = utcClock.now();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void shouldRenumberEventsInEventLogTruncatePublishedEventsAndUpdatePublis
assertThat(publishedEvents.get(i).getPayload(), is(events.get(i).getPayload()));
assertThat(publishedEvents.get(i).getMetadata(), is(events.get(i).getMetadata()));
assertThat(publishedEvents.get(i).getCreatedAt(), is(events.get(i).getCreatedAt()));
assertThat(publishedEvents.get(i).getSequenceId(), is(events.get(i).getSequenceId()));
assertThat(publishedEvents.get(i).getPositionInStream(), is(events.get(i).getPositionInStream()));
}

assertThat(sequenceSetter.getCurrentSequenceValue("event_sequence_seq", eventStoreDataSource), is((long) numberOfEvents));
Expand Down
Loading

0 comments on commit bcf5d69

Please sign in to comment.