Skip to content

Commit

Permalink
Merge d30eb7a into a164e6f
Browse files Browse the repository at this point in the history
  • Loading branch information
Allan Mckenzie committed Jun 4, 2018
2 parents a164e6f + d30eb7a commit 4819675
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ public UUID dispatch(final UUID streamId) {

progressLogger.logStart(streamId);

replayAllEventsOf(streamId);
try {
replayAllEventsOf(streamId);
} finally {
final StreamStatus streamStatus = streamStatusFactory.create(
jsonEnvelopeJdbcRepository.getLatestEvent(streamId),
streamId);

insertStreamStatus(streamStatusFactory.create(
jsonEnvelopeJdbcRepository.getLatestEvent(streamId),
streamId));
insertStreamStatus(streamStatus);
}

progressLogger.logCompletion(streamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ public JsonEnvelope getLatestEvent(final UUID streamId) {
final EventStream eventStream = eventSource.getStreamById(streamId);
final long currentVersion = eventStream.getCurrentVersion();

return eventStream
.readFrom(currentVersion)
.findFirst()
.orElseThrow(() -> new MissingEventStreamHeadException(format("Unable to retrieve head Event from stream with id '%s'", streamId)));
try(final Stream<JsonEnvelope> jsonEnvelopeStream = eventStream.readFrom(currentVersion)) {
return jsonEnvelopeStream
.findFirst()
.orElseThrow(() -> new MissingEventStreamHeadException(format("Unable to retrieve head Event from stream with id '%s'", streamId)));
}
}

public long getCurrentVersion(final UUID streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,19 @@ public void shouldGetTheLatestEventFromAStream() throws Exception {
final EventStream eventStream = mock(EventStream.class);
final JsonEnvelope latestJsonEnvelope = mock(JsonEnvelope.class);

final CloseChecker closeChecker = new CloseChecker();

final Stream<JsonEnvelope> jsonEnvelopeStream = Stream.of(latestJsonEnvelope)
.onClose(closeChecker::setClosed);

when(eventSource.getStreamById(streamId)).thenReturn(eventStream);
when(eventStream.getCurrentVersion()).thenReturn(currentVersion);
when(eventStream.readFrom(currentVersion)).thenReturn(Stream.of(latestJsonEnvelope));

when(eventStream.readFrom(currentVersion)).thenReturn(jsonEnvelopeStream);

assertThat(jsonEnvelopeJdbcRepository.getLatestEvent(streamId), is(latestJsonEnvelope));

assertThat(closeChecker.isClosed(), is(true));
}

@Test
Expand Down Expand Up @@ -128,4 +136,17 @@ public void shouldGetTheCurrentVersion() throws Exception {

assertThat(jsonEnvelopeJdbcRepository.getCurrentVersion(streamId), is(currentVersion));
}

private static class CloseChecker {

private boolean closed = false;

public void setClosed() {
closed = true;
}

public boolean isClosed() {
return closed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class EventOrderingIT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class InsertAllEventsIntoViewStoreIT {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package uk.gov.justice.framework.tools.replay;


import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import uk.gov.justice.framework.tools.replay.database.DatasourceCreator;
import uk.gov.justice.framework.tools.replay.database.EventInserter;
import uk.gov.justice.framework.tools.replay.database.LiquibaseRunner;
import uk.gov.justice.framework.tools.replay.events.User;
import uk.gov.justice.framework.tools.replay.events.UserFactory;
import uk.gov.justice.framework.tools.replay.h2.InMemoryDatabaseRunner;
import uk.gov.justice.framework.tools.replay.wildfly.WildflyRunner;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import javax.sql.DataSource;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MultipleEventSteamsIT {

private static final Boolean SHOULD_LOG_WILDFLY_PROCESS_TO_CONSOLE = true;
private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false;

private static final int SECONDS_IN_A_MINUTE = 60;

private static final int WILDFLY_TIMEOUT_IN_SECONDS = SECONDS_IN_A_MINUTE * 5;
private static final int NUMBER_OF_EVENTS_TO_INSERT = 100;
private static final int NUMBER_OF_STREAMS = 10;

private final LiquibaseRunner liquibaseRunner = new LiquibaseRunner();
private final DatasourceCreator datasourceCreator = new DatasourceCreator();
private final WildflyRunner wildflyRunner = new WildflyRunner();

private final DataSource viewStoreDataSource = datasourceCreator.createViewStoreDataSource();
private final DataSource eventStoreDataSource = datasourceCreator.createEventStoreDataSource();
private final EventInserter eventInserter = new EventInserter(eventStoreDataSource, viewStoreDataSource);
private final UserFactory userFactory = new UserFactory();
private final InMemoryDatabaseRunner inMemoryDatabaseRunner = new InMemoryDatabaseRunner();

@Before
public void startDatabase() {
inMemoryDatabaseRunner.startH2Database();
}

@Before
public void runLiquibase() throws Exception {
liquibaseRunner.createEventStoreSchema(eventStoreDataSource);
liquibaseRunner.createViewStoreSchema(viewStoreDataSource);
}

@After
public void stopDB() throws Exception {
inMemoryDatabaseRunner.stopH2Database();
}

@Test
public void shouldInsertEventsIntoMultipleStreams() throws Exception {

final String eventName = "framework.update-user";

final List<User> allInsertedUsers = new ArrayList<>();

for(int i = 0; i < NUMBER_OF_STREAMS; i++) {
final UUID streamId = randomUUID();
System.out.println(format("Inserting %d events into stream %s", NUMBER_OF_EVENTS_TO_INSERT, streamId));
final List<User> users = userFactory.createSomeUsers(NUMBER_OF_EVENTS_TO_INSERT);
final List<Event> someEvents = userFactory.convertToEvents(users, eventName, streamId);

eventInserter.insertEventsIntoVewstore(
streamId,
someEvents);

allInsertedUsers.addAll(users);
}

final List<Event> insertedEvents = eventInserter.getAllFromEventStore().collect(toList());

System.out.println(format("%d events inserted into view store in %d streams", insertedEvents.size(), NUMBER_OF_STREAMS));

final boolean wildflyRanSuccessfully = wildflyRunner.run(
WILDFLY_TIMEOUT_IN_SECONDS,
SHOULD_LOG_WILDFLY_PROCESS_TO_CONSOLE,
ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY
);

assertTrue("Wildfly process exited abnormally", wildflyRanSuccessfully);

final List<User> usersFromViewStore = eventInserter.getUsersFromViewStore();

allInsertedUsers.forEach(usersFromViewStore::remove);

assertTrue(usersFromViewStore.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@
<level name="DEBUG"/>
</logger>
<logger category="com.arjuna.ats.jta">
<level name="TRACE"/>
<level name="WARN"/>
</logger>
<logger category="org.jboss.as.jpa">
<level name="TRACE"/>
<level name="WARN"/>
</logger>
<logger category="org.jboss">
<level name="WARN"/>
<level name="INFO"/>
</logger>
<logger category="sun.rmi">
<level name="WARN"/>
Expand Down Expand Up @@ -124,7 +124,7 @@
<xa-datasource-property name="URL">jdbc:h2:tcp://localhost:8092/mem:eventstore;MVCC=true</xa-datasource-property>
<xa-pool>
<min-pool-size>3</min-pool-size>
<max-pool-size>1000</max-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>true</prefill>
</xa-pool>
<security>
Expand All @@ -138,7 +138,7 @@
<xa-datasource-property name="URL">jdbc:h2:tcp://localhost:8092/mem:viewstore;MVCC=true</xa-datasource-property>
<xa-pool>
<min-pool-size>3</min-pool-size>
<max-pool-size>1000</max-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>true</prefill>
</xa-pool>
<security>
Expand All @@ -152,7 +152,7 @@
<xa-datasource-property name="URL">jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE</xa-datasource-property>
<xa-pool>
<min-pool-size>3</min-pool-size>
<max-pool-size>1000</max-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>true</prefill>
</xa-pool>
<security>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import javax.inject.Inject;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;

@ServiceComponent(value = EVENT_LISTENER)
public class FrameworkToolsTestListener {
Expand All @@ -24,13 +25,18 @@ public class FrameworkToolsTestListener {
@Inject
private ObjectMapper objectMapper;

@Inject
private Logger logger;

@Handles("framework.update-user")
public void handle(final JsonEnvelope envelope) {
logger.info("Replaying event: " + envelope);
testViewstoreRepository.save(fromJsonEnvelope(envelope));
}

private User fromJsonEnvelope(final JsonEnvelope envelope) {


final String payload = envelope.payloadAsJsonObject().toString();

try {
Expand Down

0 comments on commit 4819675

Please sign in to comment.