Skip to content

Commit

Permalink
Merge f3fc831 into da320a6
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Apr 24, 2019
2 parents da320a6 + f3fc831 commit c0609ff
Show file tree
Hide file tree
Showing 92 changed files with 1,233 additions and 2,357 deletions.
4 changes: 4 additions & 0 deletions aggregate-snapshot/aggregate-snapshot-repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<artifactId>event-subscription-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.framework-api</groupId>
<artifactId>framework-api-event-source</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

import uk.gov.justice.domain.aggregate.Aggregate;
import uk.gov.justice.domain.snapshot.AggregateSnapshot;
import uk.gov.justice.services.jdbc.persistence.JdbcDataSourceProvider;
import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider;
import uk.gov.justice.services.jdbc.persistence.JdbcRepositoryException;
import uk.gov.justice.subscription.registry.EventSourceDefinitionRegistry;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -15,10 +14,8 @@
import java.util.Optional;
import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.sql.DataSource;

import org.slf4j.Logger;

Expand All @@ -40,20 +37,15 @@ public class SnapshotJdbcRepository implements SnapshotRepository {
private static final String SQL_CURRENT_SNAPSHOT_VERSION_ID = "SELECT version_id FROM snapshot WHERE stream_id=? AND type=? ORDER BY version_id DESC";

@Inject
Logger logger;
private EventStoreDataSourceProvider eventStoreDataSourceProvider;

@Inject
JdbcDataSourceProvider jdbcDataSourceProvider;

@Inject
EventSourceDefinitionRegistry eventSourceDefinitionRegistry;

DataSource dataSource;
private Logger logger;

@Override
public void storeSnapshot(final AggregateSnapshot aggregateSnapshot) {

try (final Connection connection = dataSource.getConnection();
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement ps = connection.prepareStatement(SQL_INSERT_EVENT_LOG)) {
ps.setObject(1, aggregateSnapshot.getStreamId());
ps.setLong(2, aggregateSnapshot.getVersionId());
Expand All @@ -68,7 +60,7 @@ public void storeSnapshot(final AggregateSnapshot aggregateSnapshot) {
@Override
public <T extends Aggregate> Optional<AggregateSnapshot<T>> getLatestSnapshot(final UUID streamId, final Class<T> clazz) {

try (final Connection connection = dataSource.getConnection();
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SQL_FIND_LATEST_BY_STREAM_ID)) {

preparedStatement.setObject(1, streamId);
Expand All @@ -84,7 +76,7 @@ public <T extends Aggregate> Optional<AggregateSnapshot<T>> getLatestSnapshot(fi

@Override
public <T extends Aggregate> void removeAllSnapshots(final UUID streamId, final Class<T> clazz) {
try (final Connection connection = dataSource.getConnection();
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement ps = connection.prepareStatement(DELETE_ALL_SNAPSHOTS_FOR_STREAM_ID_AND_CLASS)) {
ps.setObject(1, streamId);
ps.setString(2, clazz.getName());
Expand All @@ -97,7 +89,7 @@ public <T extends Aggregate> void removeAllSnapshots(final UUID streamId, final
@Override
public <T extends Aggregate> long getLatestSnapshotVersion(final UUID streamId, final Class<T> clazz) {

try (final Connection connection = dataSource.getConnection();
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(SQL_CURRENT_SNAPSHOT_VERSION_ID)) {
preparedStatement.setObject(1, streamId);
preparedStatement.setObject(2, clazz.getName());
Expand All @@ -121,12 +113,6 @@ public AggregateSnapshot entityFrom(final ResultSet resultSet) throws SQLExcepti
resultSet.getBytes(COL_AGGREGATE));
}

@PostConstruct
private void initialiseDataSource() {
final String jndiName = eventSourceDefinitionRegistry.getDefaultEventSourceDefinition().getLocation().getDataSource().get();
dataSource = jdbcDataSourceProvider.getDataSource(jndiName);
}

@SuppressWarnings("unchecked")
private <T extends Aggregate> Optional<AggregateSnapshot<T>> extractResults(final PreparedStatement preparedStatement) throws SQLException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import uk.gov.justice.domain.aggregate.Aggregate;
import uk.gov.justice.domain.snapshot.AggregateSnapshot;
import uk.gov.justice.services.test.utils.persistence.TestEventStoreDataSourceFactory;
import uk.gov.justice.services.test.utils.persistence.FrameworkTestDataSourceFactory;
import uk.gov.justice.services.test.utils.persistence.SettableEventStoreDataSourceProvider;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;


@RunWith(MockitoJUnitRunner.class)
public class SnapshotRepositoryJdbcIT {

Expand All @@ -33,24 +32,22 @@ public class SnapshotRepositoryJdbcIT {
private static final Class<DifferentAggregate> OTHER_TYPE = DifferentAggregate.class;
private static final byte[] AGGREGATE = "Any String you want".getBytes();

private final SnapshotJdbcRepository snapshotJdbcRepository = new SnapshotJdbcRepository();
@SuppressWarnings("unused")
@Spy
private SettableEventStoreDataSourceProvider eventStoreDataSourceProvider = new SettableEventStoreDataSourceProvider();

@SuppressWarnings("unused")
@Mock
private Logger logger;

@InjectMocks
private SnapshotJdbcRepository snapshotJdbcRepository;

@Before
public void initialize() {
try {
snapshotJdbcRepository.dataSource = new TestEventStoreDataSourceFactory()
.createDataSource("frameworkeventstore");

snapshotJdbcRepository.logger = mock(Logger.class);
} catch (final Exception e) {
e.printStackTrace();
fail("SnapshotJdbcRepository construction failed");
}
public void setupDatabaseConnection() throws Exception {
eventStoreDataSourceProvider.setDataSource(new FrameworkTestDataSourceFactory().createEventStoreDataSource());
}

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldStoreAndRetrieveSnapshot() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ public SnapshotAwareEventSource(final EventStreamManager eventStreamManager,

@Override
public EventStream getStreamById(final UUID streamId) {
return new SnapshotAwareEnvelopeEventStream(streamId, eventStreamManager, snapshotService, eventSourceName);
return new SnapshotAwareEnvelopeEventStream(
streamId,
eventStreamManager,
snapshotService,
eventSourceName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,22 @@
package uk.gov.justice.services.eventsourcing.source.core;

import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.EventRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventFinder;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEventFinderFactory;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepository;
import uk.gov.justice.services.eventsourcing.repository.jdbc.eventstream.EventStreamJdbcRepositoryFactory;
import uk.gov.justice.services.eventsourcing.source.core.snapshot.SnapshotService;

import javax.inject.Inject;
import javax.sql.DataSource;

public class SnapshotAwareEventSourceFactory {

@Inject
private EventStreamManagerFactory eventStreamManagerFactory;
private EventStreamManager eventStreamManager;

@Inject
private EventRepositoryFactory eventRepositoryFactory;

@Inject
private EventJdbcRepositoryFactory eventJdbcRepositoryFactory;

@Inject
private EventStreamJdbcRepositoryFactory eventStreamJdbcRepositoryFactory;
private EventRepository eventRepository;

@Inject
private SnapshotService snapshotService;

@Inject
private PublishedEventFinderFactory publishedEventFinderFactory;

public EventSource create(final DataSource dataSource, final String eventSourceName) {

final EventJdbcRepository eventJdbcRepository = eventJdbcRepositoryFactory.eventJdbcRepository(dataSource);
final EventStreamJdbcRepository eventStreamJdbcRepository = eventStreamJdbcRepositoryFactory.eventStreamJdbcRepository(dataSource);
final PublishedEventFinder publishedEventFinder = publishedEventFinderFactory.create(dataSource);

final EventRepository eventRepository = eventRepositoryFactory.eventRepository(
eventJdbcRepository,
eventStreamJdbcRepository,
publishedEventFinder);

final EventStreamManager eventStreamManager = eventStreamManagerFactory.eventStreamManager(eventRepository, eventSourceName);
public EventSource create(final String eventSourceName) {

return new SnapshotAwareEventSource(
eventStreamManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import static java.lang.String.format;

import uk.gov.justice.services.cdi.QualifierAnnotationExtractor;
import uk.gov.justice.services.eventsourcing.source.core.annotation.EventSourceName;
import uk.gov.justice.services.jdbc.persistence.JdbcDataSourceProvider;
import uk.gov.justice.subscription.domain.eventsource.EventSourceDefinition;
import uk.gov.justice.subscription.domain.eventsource.Location;
import uk.gov.justice.subscription.registry.EventSourceDefinitionRegistry;
Expand All @@ -16,68 +13,36 @@
import javax.enterprise.inject.Alternative;
import javax.enterprise.inject.CreationException;
import javax.enterprise.inject.Produces;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.inject.Inject;

@ApplicationScoped
@Alternative
@Priority(100)
public class SnapshotAwareEventSourceProducer {

@Inject
private QualifierAnnotationExtractor qualifierAnnotationExtractor;

@Inject
private EventSourceDefinitionRegistry eventSourceDefinitionRegistry;

@Inject
private SnapshotAwareEventSourceFactory snapshotAwareEventSourceFactory;

@Inject
private JdbcDataSourceProvider jdbcDataSourceProvider;

/**
*
* Backwards compatible support for Unnamed EventSource injection points
* Produces Default EventSource injection point.
*
* @return {@link EventSource}
*/
@Produces
public EventSource eventSource() {
return createEventSourceFrom(eventSourceDefinitionRegistry.getDefaultEventSourceDefinition());
}

/**
* Support for Named EventSource injection points. Annotate injection point with {@code
* @EventSourceName("name")}
*
* @param injectionPoint the injection point for the EventSource
* @return {@link EventSource}
*/
@Produces
@EventSourceName
public EventSource eventSource(final InjectionPoint injectionPoint) {

final String eventSourceName = qualifierAnnotationExtractor.getFrom(injectionPoint, EventSourceName.class).value();
final Optional<EventSourceDefinition> eventSourceDefinition = eventSourceDefinitionRegistry.getEventSourceDefinitionFor(eventSourceName);

return eventSourceDefinition
.map(this::createEventSourceFrom)
.orElseThrow(() -> new CreationException(format("Failed to find EventSource named '%s' in event-sources.yaml", eventSourceName)));

}

private EventSource createEventSourceFrom(final EventSourceDefinition eventSourceDefinition) {

final EventSourceDefinition eventSourceDefinition = eventSourceDefinitionRegistry.getDefaultEventSourceDefinition();
final Location location = eventSourceDefinition.getLocation();
final Optional<String> dataSourceOptional = location.getDataSource();

return dataSourceOptional
.map(dataSource -> snapshotAwareEventSourceFactory.create(
jdbcDataSourceProvider.getDataSource(dataSource),
eventSourceDefinition.getName()))
.map(dataSource -> snapshotAwareEventSourceFactory.create(eventSourceDefinition.getName()))
.orElseThrow(() -> new CreationException(
format("No DataSource specified for EventSource '%s' specified in event-sources.yaml", eventSourceDefinition.getName())
));
}

}
Loading

0 comments on commit c0609ff

Please sign in to comment.