Skip to content

Commit

Permalink
Merge 5a71ec0 into 7caf605
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Nov 6, 2019
2 parents 7caf605 + 5a71ec0 commit 762817f
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 192 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Added event_id to the processed_event table to aid debugging of publishing

## [2.2.7] - 2019-12-04
### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="subscription-repository-005" author="TechPod"
logicalFilePath="005-add-event-id-to-processed_event-table.changelog.xml">

<addColumn tableName="processed_event">
<column name="event_id" type="uuid"/>
</addColumn>

<rollback>
<dropColumn columnName="event_id" tableName="processed_event"/>
</rollback>

</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package uk.gov.justice.services.subscription;

import java.util.Objects;
import java.util.UUID;

public class ProcessedEventTrackItem {
public class ProcessedEvent {

private final UUID eventId;
private final long previousEventNumber;
private final long eventNumber;
private final String source;
private final String componentName;

public ProcessedEventTrackItem(
public ProcessedEvent(
final UUID eventId,
final long previousEventNumber,
final long eventNumber,
final String source,
final String componentName) {
this.eventId = eventId;
this.previousEventNumber = previousEventNumber;
this.eventNumber = eventNumber;
this.source = source;
this.componentName = componentName;
}

public UUID getEventId() {
return eventId;
}

public long getPreviousEventNumber() {
return previousEventNumber;
}
Expand All @@ -39,23 +47,25 @@ public String getComponentName() {
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof ProcessedEventTrackItem)) return false;
final ProcessedEventTrackItem that = (ProcessedEventTrackItem) o;
if (!(o instanceof ProcessedEvent)) return false;
final ProcessedEvent that = (ProcessedEvent) o;
return previousEventNumber == that.previousEventNumber &&
eventNumber == that.eventNumber &&
Objects.equals(eventId, that.eventId) &&
Objects.equals(source, that.source) &&
Objects.equals(componentName, that.componentName);
}

@Override
public int hashCode() {
return Objects.hash(previousEventNumber, eventNumber, source, componentName);
return Objects.hash(eventId, previousEventNumber, eventNumber, source, componentName);
}

@Override
public String toString() {
return "ProcessedEventTrackItem{" +
"previousEventNumber=" + previousEventNumber +
return "ProcessedEvent{" +
"eventId=" + eventId +
", previousEventNumber=" + previousEventNumber +
", eventNumber=" + eventNumber +
", source='" + source + '\'' +
", componentName='" + componentName + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;

import javax.inject.Inject;

public class ProcessedEventTrackingRepository {

private static final String INSERT_SQL =
"INSERT INTO processed_event (event_number, previous_event_number, source, component) " +
"VALUES (?, ?, ?, ?)";
"INSERT INTO processed_event (event_id, event_number, previous_event_number, source, component) " +
"VALUES (?, ?, ?, ?, ?)";

private static final String SELECT_MAX_SQL =
"SELECT event_number, previous_event_number, source, component " +
"SELECT event_id, event_number, previous_event_number, source, component " +
"FROM processed_event " +
"WHERE source = ? " +
"AND component = ? " +
"ORDER BY event_number DESC LIMIT 1";

private static final String SELECT_ALL_DESCENDING_ORDER_SQL =
"SELECT event_number, previous_event_number " +
"SELECT event_id, event_number, previous_event_number " +
"FROM processed_event " +
"WHERE source = ? " +
"AND component = ? " +
Expand All @@ -46,16 +47,17 @@ public class ProcessedEventTrackingRepository {
@Inject
private ViewStoreJdbcDataSourceProvider viewStoreJdbcDataSourceProvider;

public void save(final ProcessedEventTrackItem processedEventTrackItem) {
public void save(final ProcessedEvent processedEvent) {

try (
final Connection connection = viewStoreJdbcDataSourceProvider.getDataSource().getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) {

preparedStatement.setLong(1, processedEventTrackItem.getEventNumber());
preparedStatement.setLong(2, processedEventTrackItem.getPreviousEventNumber());
preparedStatement.setString(3, processedEventTrackItem.getSource());
preparedStatement.setString(4, processedEventTrackItem.getComponentName());
preparedStatement.setObject(1, processedEvent.getEventId());
preparedStatement.setLong(2, processedEvent.getEventNumber());
preparedStatement.setLong(3, processedEvent.getPreviousEventNumber());
preparedStatement.setString(4, processedEvent.getSource());
preparedStatement.setString(5, processedEvent.getComponentName());

preparedStatement.executeUpdate();

Expand All @@ -64,7 +66,7 @@ public void save(final ProcessedEventTrackItem processedEventTrackItem) {
}
}

public Stream<ProcessedEventTrackItem> getAllProcessedEventsDescendingOrder(final String source, final String componentName) {
public Stream<ProcessedEvent> getAllProcessedEventsDescendingOrder(final String source, final String componentName) {

try {
final PreparedStatementWrapper preparedStatement = preparedStatementWrapperFactory.preparedStatementWrapperOf(
Expand All @@ -76,9 +78,10 @@ public Stream<ProcessedEventTrackItem> getAllProcessedEventsDescendingOrder(fina
return jdbcResultSetStreamer.streamOf(preparedStatement, resultSet -> {

try {
final UUID eventId = (UUID) resultSet.getObject("event_id");
final long eventNumber = resultSet.getLong("event_number");
final long previousEventNumber = resultSet.getLong("previous_event_number");
return new ProcessedEventTrackItem(previousEventNumber, eventNumber, source, componentName);
return new ProcessedEvent(eventId, previousEventNumber, eventNumber, source, componentName);
} catch (final SQLException e) {
throw new ProcessedEventTrackingException("Failed to get row from processed_event table", e);
}
Expand All @@ -89,7 +92,7 @@ public Stream<ProcessedEventTrackItem> getAllProcessedEventsDescendingOrder(fina
}
}

public Optional<ProcessedEventTrackItem> getLatestProcessedEvent(final String source, final String componentName) {
public Optional<ProcessedEvent> getLatestProcessedEvent(final String source, final String componentName) {

try (
final Connection connection = viewStoreJdbcDataSourceProvider.getDataSource().getConnection();
Expand All @@ -100,10 +103,11 @@ public Optional<ProcessedEventTrackItem> getLatestProcessedEvent(final String so

try (final ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
final UUID eventId = (UUID) resultSet.getObject("event_id");
final long eventNumber = resultSet.getLong("event_number");
final long previousEventNumber = resultSet.getLong("previous_event_number");

return of(new ProcessedEventTrackItem(previousEventNumber, eventNumber, source, componentName));
return of(new ProcessedEvent(eventId, previousEventNumber, eventNumber, source, componentName));
}

return empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,39 @@ public class ProcessedEventTrackingService {
public void trackProcessedEvent(final JsonEnvelope event, final String componentName) {

final Metadata metadata = event.metadata();
final UUID id = metadata.id();
final UUID eventId = metadata.id();
final Long previousEventNumber = metadata
.previousEventNumber()
.orElseThrow(() -> new ProcessedEventTrackingException(format("Missing previous event number for event with id '%s'", id)));
.orElseThrow(() -> new ProcessedEventTrackingException(format("Missing previous event number for event with id '%s'", eventId)));
final Long eventNumber = metadata
.eventNumber()
.orElseThrow(() -> new ProcessedEventTrackingException(format("Missing event number for event with id '%s'", id)));
.orElseThrow(() -> new ProcessedEventTrackingException(format("Missing event number for event with id '%s'", eventId)));

final String source = eventSourceNameCalculator.getSource(event);

final ProcessedEventTrackItem processedEventTrackItem = new ProcessedEventTrackItem(
previousEventNumber,
final ProcessedEvent processedEvent = new ProcessedEvent(
eventId, previousEventNumber,
eventNumber,
source,
componentName
);

processedEventTrackingRepository.save(processedEventTrackItem);
processedEventTrackingRepository.save(processedEvent);
}

public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceName, final String componentName) {

final EventNumberAccumulator eventNumberAccumulator = new EventNumberAccumulator();

final Optional<ProcessedEventTrackItem> latestProcessedEvent = processedEventTrackingRepository.getLatestProcessedEvent(eventSourceName, componentName);
final Optional<ProcessedEvent> latestProcessedEvent = processedEventTrackingRepository.getLatestProcessedEvent(eventSourceName, componentName);

if (latestProcessedEvent.isPresent()) {
notSeenEventsRange(latestProcessedEvent.get().getPreviousEventNumber(), eventNumberAccumulator);
} else {
notSeenEventsRange(1L, eventNumberAccumulator);
}

try (final Stream<ProcessedEventTrackItem> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(eventSourceName, componentName)) {
try (final Stream<ProcessedEvent> allProcessedEvents = processedEventTrackingRepository.getAllProcessedEventsDescendingOrder(eventSourceName, componentName)) {
allProcessedEvents
.forEach(processedEventTrackItem -> findMissingRange(processedEventTrackItem, eventNumberAccumulator));
}
Expand All @@ -83,7 +83,7 @@ public Stream<MissingEventRange> getAllMissingEvents(final String eventSourceNam
public Long getLatestProcessedEventNumber(final String source, final String componentName) {

return processedEventTrackingRepository.getLatestProcessedEvent(source, componentName)
.map(ProcessedEventTrackItem::getEventNumber)
.map(ProcessedEvent::getEventNumber)
.orElse(FIRST_POSSIBLE_EVENT_NUMBER);
}

Expand All @@ -98,10 +98,10 @@ private void notSeenEventsRange(final long currentPreviousEventNumber, final Eve
eventNumberAccumulator.set(currentPreviousEventNumber, currentEventNumber);
}

private void findMissingRange(final ProcessedEventTrackItem processedEventTrackItem, final EventNumberAccumulator eventNumberAccumulator) {
private void findMissingRange(final ProcessedEvent processedEvent, final EventNumberAccumulator eventNumberAccumulator) {

final long currentEventNumber = processedEventTrackItem.getEventNumber();
final long currentPreviousEventNumber = processedEventTrackItem.getPreviousEventNumber();
final long currentEventNumber = processedEvent.getEventNumber();
final long currentPreviousEventNumber = processedEvent.getPreviousEventNumber();

if (eventNumberAccumulator.isInitialised() && eventNumberAccumulator.getLastPreviousEventNumber() != currentEventNumber) {
eventNumberAccumulator.addRangeFrom(currentEventNumber);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package uk.gov.justice.services.subscription;

import java.util.UUID;

public class ProcessedEventBuilder {

private UUID eventId;
private long previousEventNumber;
private long eventNumber;
private String source;
private String componentName;

private ProcessedEventBuilder() {}

public static ProcessedEventBuilder processedEventTrackItem() {
return new ProcessedEventBuilder();
}

public ProcessedEventBuilder withEventId(final UUID eventId) {
this.eventId = eventId;
return this;
}

public ProcessedEventBuilder withPreviousEventNumber(final long previousEventNumber) {
this.previousEventNumber = previousEventNumber;
return this;
}

public ProcessedEventBuilder withEventNumber(final long eventNumber) {
this.eventNumber = eventNumber;
return this;
}

public ProcessedEventBuilder withSource(final String source) {
this.source = source;
return this;
}

public ProcessedEventBuilder withComponentName(final String componentName) {
this.componentName = componentName;
return this;
}

public ProcessedEvent build() {
return new ProcessedEvent(eventId, previousEventNumber, eventNumber, source, componentName);
}
}

This file was deleted.

Loading

0 comments on commit 762817f

Please sign in to comment.