Skip to content

Commit

Permalink
Merge 3437850 into 1e98600
Browse files Browse the repository at this point in the history
  • Loading branch information
Allan Mckenzie committed Feb 8, 2019
2 parents 1e98600 + 3437850 commit 567010c
Show file tree
Hide file tree
Showing 29 changed files with 974 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static java.util.Optional.of;
import static java.util.UUID.fromString;
import static javax.transaction.Transactional.TxType.MANDATORY;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;
Expand All @@ -14,7 +13,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -30,13 +28,12 @@ public class EventDeQueuer {
public static final String PUBLISH_TABLE_NAME = "publish_queue";

private static final String SELECT_FROM_PUBLISH_TABLE_QUERY_PATTERN = "SELECT id, event_log_id FROM %s ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED ";
private static final String SELECT_FROM_EVENT_LOG_QUERY = "SELECT stream_id, position_in_stream, name, payload, metadata, date_created " +
"FROM event_log WHERE id = ?";
private static final String DELETE_FROM_PUBLISH_TABLE_QUERY_PATTERN = "DELETE FROM %s where id = ?";

@Inject
SubscriptionDataSourceProvider subscriptionDataSourceProvider;


/**
* Method that gets the next event to process by
* querying the pre_publish_queue table for id & event_log_id,
Expand All @@ -46,7 +43,7 @@ public class EventDeQueuer {
* @return Optional<Event>
*/
@Transactional(MANDATORY)
public Optional<Event> popNextEvent(final String tableName) {
public Optional<UUID> popNextEventId(final String tableName) {

final String sql = format(SELECT_FROM_PUBLISH_TABLE_QUERY_PATTERN, tableName);
try (final Connection connection = subscriptionDataSourceProvider.getEventStoreDataSource().getConnection();
Expand All @@ -59,7 +56,7 @@ public Optional<Event> popNextEvent(final String tableName) {

deletePublishQueueRow(publishQueueId, tableName, connection);

return getEventFromEventLogTable(eventLogId, connection);
return of(eventLogId);
}
} catch (final SQLException e) {
throw new PublishQueueException(format("Failed to publish from %s table", tableName), e);
Expand All @@ -68,42 +65,7 @@ public Optional<Event> popNextEvent(final String tableName) {
return empty();
}

/**
* Method that gets the next event from the event_log table using the event_log_id.
*
* @return Optional<Event>
*/
private Optional<Event> getEventFromEventLogTable(final UUID eventLogId, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_EVENT_LOG_QUERY)) {

preparedStatement.setObject(1, eventLogId);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {

if (resultSet.next()) {
final UUID streamId = fromString(resultSet.getString("stream_id"));
final Long positionInStream = resultSet.getLong("position_in_stream");
final String name = resultSet.getString("name");
final String metadata = resultSet.getString("metadata");
final String payload = resultSet.getString("payload");
final ZonedDateTime createdAt = fromSqlTimestamp(resultSet.getTimestamp("date_created"));

return of(new Event(
eventLogId,
streamId,
positionInStream,
name,
metadata,
payload,
createdAt)
);
}
}
}

return empty();
}

/**
* Method that deletes the next event from the pre_publish_queue table using the event_log_id.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package uk.gov.justice.services.eventsourcing;

import static java.lang.String.format;
import static java.util.Optional.empty;
import static java.util.Optional.of;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;

import javax.inject.Inject;
import javax.sql.DataSource;

public class EventFetcher {

@Inject
SubscriptionDataSourceProvider subscriptionDataSourceProvider;

@Inject
EventFetcherRepository eventFetcherRepository;

/**
* Method that gets an event from the event_log table by id.
*
* @return Optional<Event>
*/
public Optional<Event> getEvent(final UUID id) {

final DataSource eventStoreDataSource = subscriptionDataSourceProvider.getEventStoreDataSource();
try (final Connection connection = eventStoreDataSource.getConnection()) {
return eventFetcherRepository.getEvent(id, connection);
} catch (final SQLException e) {
throw new EventFetchingException(format("Failed to get Event with id '%s'", id), e);
}
}

/**
* Method that gets an LinkedEvent from the linked_event table by id.
*
* @return Optional<Event>
*/
public Optional<LinkedEvent> getLinkedEvent(final UUID id) {

final DataSource eventStoreDataSource = subscriptionDataSourceProvider.getEventStoreDataSource();
try (final Connection connection = eventStoreDataSource.getConnection()) {
return eventFetcherRepository.getLinkedEvent(id, connection);
} catch (final SQLException e) {
throw new EventFetchingException(format("Failed to get LinkedEvent with id '%s'", id), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package uk.gov.justice.services.eventsourcing;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static java.util.UUID.fromString;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

public class EventFetcherRepository {

private static final String SELECT_FROM_EVENT_LOG_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created " +
"FROM event_log " +
"WHERE id = ?";

private static final String SELECT_FROM_LINKED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM linked_event " +
"WHERE id = ?";

/**
* Method that gets an event from the event_log table by id.
*
* @return Optional<Event>
*/
public Optional<Event> getEvent(final UUID id, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_EVENT_LOG_QUERY)) {

preparedStatement.setObject(1, id);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {

if (resultSet.next()) {
final UUID streamId = fromString(resultSet.getString("stream_id"));
final Long positionInStream = resultSet.getLong("position_in_stream");
final String name = resultSet.getString("name");
final String metadata = resultSet.getString("metadata");
final String payload = resultSet.getString("payload");
final ZonedDateTime createdAt = fromSqlTimestamp(resultSet.getTimestamp("date_created"));

return of(new Event(
id,
streamId,
positionInStream,
name,
metadata,
payload,
createdAt)
);
}
}
}

return empty();
}

public Optional<LinkedEvent> getLinkedEvent(final UUID id, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_LINKED_EVENT_QUERY)) {

preparedStatement.setObject(1, id);

try (final ResultSet resultSet = preparedStatement.executeQuery()) {

if (resultSet.next()) {
final UUID streamId = fromString(resultSet.getString("stream_id"));
final Long positionInStream = resultSet.getLong("position_in_stream");
final String name = resultSet.getString("name");
final String metadata = resultSet.getString("metadata");
final String payload = resultSet.getString("payload");
final ZonedDateTime createdAt = fromSqlTimestamp(resultSet.getTimestamp("date_created"));
final long eventNumber = resultSet.getLong("event_number");
final long previousEventNumber = resultSet.getLong("previous_event_number");

return of(new LinkedEvent(
id,
streamId,
positionInStream,
name,
metadata,
payload,
createdAt,
eventNumber,
previousEventNumber)
);
}
}
}

return empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package uk.gov.justice.services.eventsourcing;

public class EventFetchingException extends RuntimeException {

public EventFetchingException(final String message) {
super(message);
}

public EventFetchingException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uk.gov.justice.services.eventsourcing.PublishQueueException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.messaging.Metadata;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;

Expand All @@ -23,7 +24,7 @@ public class EventPrePublisher {
SubscriptionDataSourceProvider subscriptionDataSourceProvider;

@Inject
MetadataSequenceNumberUpdater metadataSequenceNumberUpdater;
MetadataEventNumberUpdater metadataEventNumberUpdater;

@Inject
PrePublishRepository prePublishRepository;
Expand All @@ -34,21 +35,30 @@ public class EventPrePublisher {
@Inject
EventConverter eventConverter;

@Inject
LinkedEventFactory linkedEventFactory;

@Transactional(MANDATORY)
public void prePublish(final Event event) {

final UUID eventId = event.getId();

try (final Connection connection = subscriptionDataSourceProvider.getEventStoreDataSource().getConnection()) {
final long sequenceNumber = prePublishRepository.getSequenceNumber(eventId, connection);
final long previousSequenceNumber = prePublishRepository.getPreviousSequenceNumber(sequenceNumber, connection);
final long eventNumber = prePublishRepository.getEventNumber(eventId, connection);
final long previousEventNumber = prePublishRepository.getPreviousEventNumber(eventNumber, connection);

final Metadata updatedMetadata = metadataSequenceNumberUpdater.updateMetadataJson(
final Metadata updatedMetadata = metadataEventNumberUpdater.updateMetadataJson(
eventConverter.metadataOf(event),
previousSequenceNumber,
sequenceNumber);
previousEventNumber,
eventNumber);

final LinkedEvent linkedEvent = linkedEventFactory.create(
event,
updatedMetadata,
eventNumber,
previousEventNumber);

prePublishRepository.updateMetadata(eventId, updatedMetadata.asJsonObject().toString(), connection);
prePublishRepository.insertLinkedEvent(linkedEvent, connection);
prePublishRepository.addToPublishQueueTable(eventId, clock.now(), connection);

} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package uk.gov.justice.services.eventsourcing.prepublish;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.messaging.Metadata;

public class LinkedEventFactory {

public LinkedEvent create(
final Event event,
final Metadata updatedMetadata,
final long eventNumber,
final Long previousEventNumber) {

return new LinkedEvent(
event.getId(),
event.getStreamId(),
event.getSequenceId(),
event.getName(),
updatedMetadata.asJsonObject().toString(),
event.getPayload(),
event.getCreatedAt(),
eventNumber,
previousEventNumber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import uk.gov.justice.services.messaging.Metadata;

public class MetadataSequenceNumberUpdater {
public class MetadataEventNumberUpdater {

public Metadata updateMetadataJson(final Metadata metadata, final long previousSequenceNumber, final long sequenceNumber) {
return metadataFrom(metadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.justice.services.eventsourcing.prepublish;

public class MissingEventNumberException extends RuntimeException {

public MissingEventNumberException(final String message) {
super(message);
}
}
Loading

0 comments on commit 567010c

Please sign in to comment.