Skip to content

Commit

Permalink
Merge 1579d54 into 8323225
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Apr 12, 2019
2 parents 8323225 + 1579d54 commit ec922b0
Show file tree
Hide file tree
Showing 65 changed files with 461 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.source.core.snapshot.SnapshotService;
Expand Down Expand Up @@ -33,7 +33,7 @@ public class SnapshotAwareEventSourceFactory {
private SnapshotService snapshotService;

@Inject
private LinkedEventFinder linkedEventFinder;
private PublishedEventFinder publishedEventFinder;

public EventSource create(final String jndiDatasource, final String eventSourceName) {

Expand All @@ -43,7 +43,7 @@ public EventSource create(final String jndiDatasource, final String eventSourceN
final EventRepository eventRepository = eventRepositoryFactory.eventRepository(
eventJdbcRepository,
eventStreamJdbcRepository,
linkedEventFinder);
publishedEventFinder);

final EventStreamManager eventStreamManager = eventStreamManagerFactory.eventStreamManager(eventRepository, eventSourceName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.source.core.EventAppender;
import uk.gov.justice.services.eventsourcing.source.core.EventSource;
Expand Down Expand Up @@ -209,7 +209,7 @@ public class SnapshotAwareAggregateServiceIT {
DefaultEventSourceDefinitionFactory.class,

SubscriptionHelper.class,
LinkedEventFinder.class
PublishedEventFinder.class
})

public WebApp war() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.source.core.snapshot.SnapshotService;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class SnapshotAwareEventSourceFactoryTest {
private EventConverter eventConverter;

@Mock
private LinkedEventFinder linkedEventFinder;
private PublishedEventFinder publishedEventFinder;

@InjectMocks
private SnapshotAwareEventSourceFactory snapshotAwareEventSourceFactory;
Expand All @@ -69,7 +69,7 @@ public void shouldCreateSnapshotAwareEventSource() throws Exception {
when(eventRepositoryFactory.eventRepository(
eventJdbcRepository,
eventStreamJdbcRepository,
linkedEventFinder)).thenReturn(eventRepository);
publishedEventFinder)).thenReturn(eventRepository);

when(eventStreamManagerFactory.eventStreamManager(eventRepository, EVENT_SOURCE_NAME)).thenReturn(eventStreamManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventStreamMetadata;
import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcBasedEventRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
Expand Down Expand Up @@ -87,11 +87,11 @@ public void shouldFindEventsByEventNumber() throws Exception {

final long eventNumber = 92834L;

final LinkedEvent linkedEvent = mock(LinkedEvent.class);
final PublishedEvent publishedEvent = mock(PublishedEvent.class);
final JsonEnvelope jsonEnvelope = mock(JsonEnvelope.class);

when(eventRepository.findEventsSince(eventNumber)).thenReturn(Stream.of(linkedEvent));
when(eventConverter.envelopeOf(linkedEvent)).thenReturn(jsonEnvelope);
when(eventRepository.findEventsSince(eventNumber)).thenReturn(Stream.of(publishedEvent));
when(eventConverter.envelopeOf(publishedEvent)).thenReturn(jsonEnvelope);

final List<JsonEnvelope> envelopes = snapshotAwareEventSource.findEventsSince(eventNumber).collect(toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;

import java.sql.Connection;
Expand Down Expand Up @@ -38,17 +38,17 @@ public Optional<Event> getEvent(final UUID id) {
}

/**
* Method that gets an LinkedEvent from the linked_event table by id.
* Method that gets an PublishedEvent from the published_event table by id.
*
* @return Optional<Event>
*/
public Optional<LinkedEvent> getLinkedEvent(final UUID id) {
public Optional<PublishedEvent> getPublishedEvent(final UUID id) {

final DataSource eventStoreDataSource = subscriptionDataSourceProvider.getEventStoreDataSource();
try (final Connection connection = eventStoreDataSource.getConnection()) {
return eventFetcherRepository.getLinkedEvent(id, connection);
return eventFetcherRepository.getPublishedEvent(id, connection);
} catch (final SQLException e) {
throw new EventFetchingException(format("Failed to get LinkedEvent with id '%s'", id), e);
throw new EventFetchingException(format("Failed to get PublishedEvent with id '%s'", id), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -25,7 +25,7 @@ public class EventFetcherRepository {

private static final String SELECT_FROM_LINKED_EVENT_QUERY =
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " +
"FROM linked_event " +
"FROM published_event " +
"WHERE id = ?";

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public Optional<Event> getEvent(final UUID id, final Connection connection) thro
return empty();
}

public Optional<LinkedEvent> getLinkedEvent(final UUID id, final Connection connection) throws SQLException {
public Optional<PublishedEvent> getPublishedEvent(final UUID id, final Connection connection) throws SQLException {

try (final PreparedStatement preparedStatement = connection.prepareStatement(SELECT_FROM_LINKED_EVENT_QUERY)) {

Expand All @@ -83,7 +83,7 @@ public Optional<LinkedEvent> getLinkedEvent(final UUID id, final Connection conn
final long eventNumber = resultSet.getLong("event_number");
final long previousEventNumber = resultSet.getLong("previous_event_number");

return of(new LinkedEvent(
return of(new PublishedEvent(
id,
streamId,
positionInStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import uk.gov.justice.services.eventsourcing.PublishQueueException;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventJdbcRepository;
import uk.gov.justice.services.messaging.Metadata;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;

Expand All @@ -31,7 +31,7 @@ public class EventPrePublisher {
private PrePublishRepository prePublishRepository;

@Inject
private LinkedEventJdbcRepository linkedEventJdbcRepository;
private PublishedEventJdbcRepository publishedEventJdbcRepository;

@Inject
private UtcClock clock;
Expand All @@ -40,7 +40,7 @@ public class EventPrePublisher {
private EventConverter eventConverter;

@Inject
private LinkedEventFactory linkedEventFactory;
private PublishedEventFactory publishedEventFactory;

@Transactional(MANDATORY)
public void prePublish(final Event event) {
Expand All @@ -56,13 +56,13 @@ public void prePublish(final Event event) {
previousEventNumber,
eventNumber);

final LinkedEvent linkedEvent = linkedEventFactory.create(
final PublishedEvent publishedEvent = publishedEventFactory.create(
event,
updatedMetadata,
eventNumber,
previousEventNumber);

linkedEventJdbcRepository.insertLinkedEvent(linkedEvent, connection);
publishedEventJdbcRepository.insertPublishedEvent(publishedEvent, connection);
prePublishRepository.addToPublishQueueTable(eventId, clock.now(), connection);

} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ public class PrePublishRepository {
private static final String SELECT_EVENT_NUMBER_SQL = "SELECT event_number FROM event_log WHERE id = ?";
private static final String SELECT_PREVIOUS_EVENT_NUMBER_SQL = "SELECT event_number FROM event_log, event_stream WHERE event_number < ? and event_log.stream_id = event_stream.stream_id and event_stream.active = true ORDER BY event_number DESC LIMIT 1";
private static final String INSERT_INTO_PUBLISH_QUEUE_SQL = "INSERT INTO publish_queue (event_log_id, date_queued) VALUES (?, ?)";
private static final String INSERT_INTO_LINKED_EVENT_SQL = "INSERT into linked_event (" +
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " +
"VALUES " +
"(?, ?, ?, ?, ?, ?, ?, ?, ?)";


public long getEventNumber(final UUID eventId, final Connection connection) throws SQLException {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package uk.gov.justice.services.eventsourcing.prepublish;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.messaging.Metadata;

public class LinkedEventFactory {
public class PublishedEventFactory {

public LinkedEvent create(
public PublishedEvent create(
final Event event,
final Metadata updatedMetadata,
final long eventNumber,
final Long previousEventNumber) {

return new LinkedEvent(
return new PublishedEvent(
event.getId(),
event.getStreamId(),
event.getSequenceId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uk.gov.justice.services.eventsourcing.EventFetchingException;
import uk.gov.justice.services.eventsourcing.publisher.jms.EventPublisher;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.Optional;
Expand All @@ -22,7 +22,7 @@
* The EventDeQueuerAndPublisher class provides a method that returns an event from the EventDeQueuer
* and publishes the event.
*/
public class LinkedEventDeQueuerAndPublisher {
public class PublishedEventDeQueuerAndPublisher {

@Inject
EventDeQueuer eventDeQueuer;
Expand All @@ -48,15 +48,15 @@ public boolean deQueueAndPublish() {

final Optional<UUID> eventId = eventDeQueuer.popNextEventId(PUBLISH_TABLE_NAME);
if (eventId.isPresent()) {
final Optional<LinkedEvent> linkedEvent = eventFetcher.getLinkedEvent(eventId.get());
final Optional<PublishedEvent> publishedEvent = eventFetcher.getPublishedEvent(eventId.get());

if(linkedEvent.isPresent()) {
final JsonEnvelope jsonEnvelope = eventConverter.envelopeOf(linkedEvent.get());
if(publishedEvent.isPresent()) {
final JsonEnvelope jsonEnvelope = eventConverter.envelopeOf(publishedEvent.get());
eventPublisher.publish(jsonEnvelope);

return true;
} else {
throw new EventFetchingException(format("Failed to find LinkedEvent with id '%s'", eventId.get()));
throw new EventFetchingException(format("Failed to find PublishedEvent with id '%s'", eventId.get()));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uk.gov.justice.services.eventsourcing.publishing.helpers.EventFactory;
import uk.gov.justice.services.eventsourcing.publishing.helpers.TestEventInserter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.test.utils.core.eventsource.EventStoreInitializer;
import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory;

Expand Down Expand Up @@ -63,30 +63,30 @@ public void shouldReturnEmptyIfNoEventFound() throws Exception {
}

@Test
public void shouldFetchLinkedEventById() throws Exception {
public void shouldFetchPublishedEventById() throws Exception {

final LinkedEvent linkedEvent = eventFactory.createLinkedEvent(randomUUID(),"example.linked-event", 1L, 1L, 0L);
final PublishedEvent publishedEvent = eventFactory.createPublishedEvent(randomUUID(),"example.published-event", 1L, 1L, 0L);

testEventInserter.insertIntoLinkedEvent(linkedEvent);
testEventInserter.insertIntoPublishedEvent(publishedEvent);

try (final Connection connection = dataSource.getConnection()) {
final Optional<LinkedEvent> linkedEventOptional = eventFetcherRepository.getLinkedEvent(linkedEvent.getId(), connection);
final Optional<PublishedEvent> publishedEventOptional = eventFetcherRepository.getPublishedEvent(publishedEvent.getId(), connection);

if (linkedEventOptional.isPresent()) {
assertThat(linkedEventOptional.get(), is(linkedEvent));
if (publishedEventOptional.isPresent()) {
assertThat(publishedEventOptional.get(), is(publishedEvent));
} else {
fail();
}
}
}

@Test
public void shouldReturnEmptyIfNoLinkedEventFound() throws Exception {
public void shouldReturnEmptyIfNoPublishedEventFound() throws Exception {

final UUID unknownId = randomUUID();

try (final Connection connection = dataSource.getConnection()) {
assertThat(eventFetcherRepository.getLinkedEvent(unknownId, connection).isPresent(), is(false));
assertThat(eventFetcherRepository.getPublishedEvent(unknownId, connection).isPresent(), is(false));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static org.mockito.Mockito.when;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.LinkedEvent;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.subscription.registry.SubscriptionDataSourceProvider;

import java.sql.Connection;
Expand Down Expand Up @@ -79,24 +79,24 @@ public void shouldThowExceptionIfGettingAnEventFails() throws Exception {
}

@Test
public void shouldRetrieveLinkedEventFromTheRepository() throws Exception {
public void shouldRetrievePublishedEventFromTheRepository() throws Exception {

final UUID id = randomUUID();

final DataSource eventStoreDataSource = mock(DataSource.class);
final Connection connection = mock(Connection.class);
final Optional<LinkedEvent> linkedEvent = of(mock(LinkedEvent.class));
final Optional<PublishedEvent> publishedEvent = of(mock(PublishedEvent.class));

when(subscriptionDataSourceProvider.getEventStoreDataSource()).thenReturn(eventStoreDataSource);
when(eventStoreDataSource.getConnection()).thenReturn(connection);

when(eventFetcherRepository.getLinkedEvent(id, connection)).thenReturn(linkedEvent);
when(eventFetcherRepository.getPublishedEvent(id, connection)).thenReturn(publishedEvent);

assertThat(eventFetcher.getLinkedEvent(id), is(linkedEvent));
assertThat(eventFetcher.getPublishedEvent(id), is(publishedEvent));
}

@Test
public void shouldThowExceptionIfGettingLInkedEventFails() throws Exception {
public void shouldThowExceptionIfGettingPublishedEventFails() throws Exception {

final SQLException sqlException = new SQLException("Ooops");

Expand All @@ -108,14 +108,14 @@ public void shouldThowExceptionIfGettingLInkedEventFails() throws Exception {
when(subscriptionDataSourceProvider.getEventStoreDataSource()).thenReturn(eventStoreDataSource);
when(eventStoreDataSource.getConnection()).thenReturn(connection);

when(eventFetcherRepository.getLinkedEvent(id, connection)).thenThrow(sqlException);
when(eventFetcherRepository.getPublishedEvent(id, connection)).thenThrow(sqlException);

try {
eventFetcher.getLinkedEvent(id);
eventFetcher.getPublishedEvent(id);
fail();
} catch (final EventFetchingException expected) {
assertThat(expected.getCause(), is(sqlException));
assertThat(expected.getMessage(), is("Failed to get LinkedEvent with id '" + id + "'"));
assertThat(expected.getMessage(), is("Failed to get PublishedEvent with id '" + id + "'"));
}
}
}
Loading

0 comments on commit ec922b0

Please sign in to comment.