Skip to content

Commit

Permalink
Merge c245e09 into 9ad16f0
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahesh Subramanian committed Jun 25, 2019
2 parents 9ad16f0 + c245e09 commit 002289c
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class StartTransformation implements ManagedTaskListener {
@ConfigurationValue("streamCountReportingInterval")
private int streamCountReportingInterval;

@Inject
@ConfigurationValue("processAllStreams")
private boolean processAllStreams;

@Inject
private Logger logger;

Expand Down Expand Up @@ -85,7 +89,9 @@ private void createTransformationTasks(final int pass) {
throw new IllegalArgumentException("Invalid streamCountReportingInterval argument");
}

final Stream<UUID> activeStreams = eventRepository.getAllActiveStreamIds();
final Stream<UUID> activeStreams = processAllStreams ? eventRepository.getAllStreamIds() : eventRepository.getAllActiveStreamIds();

logger.info(format("Processing %s streams", processAllStreams ? "all" : "active"));

activeStreams
.forEach(streamId -> {
Expand Down Expand Up @@ -126,7 +132,7 @@ public void taskDone(final Future<?> futureTask, final ManagedExecutorService ma
}

public void taskAborted(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task, final Throwable throwable) {
logger.error(String.format("Aborted Transformation task: '%s'", throwable.getMessage()));
logger.error("Aborted Transformation task", throwable);
removeOutstandingTask(futureTask);
shutDownIfFinished();
truncateAndPopulateLinkedEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN

startTransformation.go();

verify(logger).info("-------------- Invoke Event Streams Transformation -------------");
verify(logger).info("-------------- Invocation of Event Streams Transformation Completed --------------");
verify(logger).info("Processing active streams");
verify(logger).info("Pass 1 - Streams count: 4 - time(ms): 12222222");
startTransformation.taskDone(future1, null, null, null);
startTransformation.taskDone(future2, null, null, null);
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<cpp.repo.name>stream-transformation-tool</cpp.repo.name>
<framework-api.version>3.1.1</framework-api.version>
<framework.version>5.1.2</framework.version>
<event-store.version>1.1.9</event-store.version>
<event-store.version>1.1.11</event-store.version>
<common-bom.version>1.28.0</common-bom.version>
<utilities.version>1.16.2</utilities.version>
<test-utils.version>1.18.1</test-utils.version>
Expand Down Expand Up @@ -103,11 +103,12 @@
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>event-repository-liquibase</artifactId>
<version>${framework.version}</version>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>event-repository-jdbc</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package uk.gov.justice.framework.tools.transformation;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static uk.gov.justice.framework.tools.transformation.EventLogBuilder.eventLogFrom;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;

import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

import javax.sql.DataSource;
Expand All @@ -27,25 +30,34 @@ public DatabaseUtils() throws SQLException, LiquibaseException {
}

public DataSource getDataSource() {
return dataSource;
return dataSource;
}

public void dropAndUpdateLiquibase() throws SQLException, LiquibaseException {
liquibaseUtil.dropAndUpdate();
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt) throws InvalidPositionException {
final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt);

eventLogJdbcRepository.insert(event);
eventStreamJdbcRepository.insert(streamId);
insertEventLogData(eventName, streamId, sequenceId, createdAt, empty());
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final long eventNumber) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, of(eventNumber));
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final Optional<Long> eventNumber) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, eventNumber, true);
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, boolean streamStatus) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, empty(), streamStatus);
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final Optional<Long> eventNumber, boolean streamStatus) throws InvalidPositionException {
final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt, eventNumber);

eventLogJdbcRepository.insert(event);
eventStreamJdbcRepository.insert(streamId);
eventStreamJdbcRepository.insert(streamId, streamStatus);
}

public TestEventLogJdbcRepository getEventLogJdbcRepository() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.framework.tools.transformation;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder.envelope;
import static uk.gov.justice.services.test.utils.core.messaging.MetadataBuilderFactory.metadataWithRandomUUID;
Expand All @@ -9,42 +10,23 @@
import uk.gov.justice.services.messaging.Metadata;

import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

public class EventLogBuilder {

private EventLogBuilder() {

// should not create an instance of this class
}

public static Event eventLogFrom(
final String eventName,
final Long sequenceId,
final UUID streamId,
final ZonedDateTime createdAt) {
final JsonEnvelope jsonEnvelope = envelope()
.with(metadataWithRandomUUID(eventName)
.createdAt(createdAt)
.withVersion(sequenceId)
.withStreamId(streamId)
.withSource("sample")
)
.withPayloadOf("test", "a string")
.build();

final Metadata metadata = jsonEnvelope.metadata();
final UUID id = metadata.id();

final String name = metadata.name();
final String payload = jsonEnvelope.payloadAsJsonObject().toString();
return eventLogFrom(eventName, sequenceId, streamId, createdAt, empty());

return new Event(
id,
streamId,
sequenceId,
name,
metadata.asJsonObject().toString(),
payload,
createdAt);
}

public static Event eventLogFrom(
Expand All @@ -53,6 +35,17 @@ public static Event eventLogFrom(
final UUID streamId,
final ZonedDateTime createdAt,
final long eventNumber) {

return eventLogFrom(eventName, sequenceId, streamId, createdAt, of(eventNumber));
}

public static Event eventLogFrom(
final String eventName,
final Long sequenceId,
final UUID streamId,
final ZonedDateTime createdAt,
final Optional<Long> eventNumber) {

final JsonEnvelope jsonEnvelope = envelope()
.with(metadataWithRandomUUID(eventName)
.createdAt(createdAt)
Expand All @@ -77,6 +70,6 @@ public static Event eventLogFrom(
metadata.asJsonObject().toString(),
payload,
createdAt,
of(eventNumber));
eventNumber);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.framework.tools.transformation;

import static com.google.common.base.Joiner.on;
import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -20,10 +21,16 @@ public class SwarmStarterUtil {
private static final Logger LOGGER = getLogger(SwarmStarterUtil.class);

public void runCommand(final boolean enableRemoteDebugging, final long timeoutInSeconds, final long streamCountReportingInterval, final String memoryOptions) throws IOException {
runCommand(enableRemoteDebugging, false, timeoutInSeconds, streamCountReportingInterval, memoryOptions);
}

public void runCommand(final boolean enableRemoteDebugging, final boolean processAllStreams, final long timeoutInSeconds, final long streamCountReportingInterval, final String memoryOptions) throws IOException {
final String memoryParmeter = format("-DXmx=%s", memoryOptions);
final String streamCountReportingIntervalParameter = format("-DstreamCountReportingInterval=%s", streamCountReportingInterval);
final String processAllStreamsParameter = format("-DprocessAllStreams=%s", processAllStreams);

final String command = createCommandToExecuteTransformationTool(enableRemoteDebugging, streamCountReportingIntervalParameter, memoryParmeter);
final String command = createCommandToExecuteTransformationTool(enableRemoteDebugging, processAllStreamsParameter, streamCountReportingIntervalParameter, memoryParmeter);
LOGGER.info("Executing command: {}", command);
final Process exec = execute(command);
final BufferedReader reader =
new BufferedReader(new InputStreamReader(exec.getInputStream()));
Expand All @@ -38,6 +45,7 @@ public void runCommand(final boolean enableRemoteDebugging, final long timeoutIn
}

private String createCommandToExecuteTransformationTool(final boolean enableRemoteDebugging,
final String processAllStreamsParameter,
final String streamCountReportingIntervalParameter,
final String memoryParmeter) throws IOException {
final String eventToolJarLocation = getResource("event-tool*.jar");
Expand All @@ -52,7 +60,7 @@ private String createCommandToExecuteTransformationTool(final boolean enableRemo
debug = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005";
}

return commandFrom(debug, mainProcessFilePath, streamJarLocation, eventToolJarLocation, anonymiseJarLocation, standaloneDSLocation, streamCountReportingIntervalParameter, memoryParmeter);
return commandFrom(debug, mainProcessFilePath, streamJarLocation, eventToolJarLocation, anonymiseJarLocation, standaloneDSLocation, streamCountReportingIntervalParameter, memoryParmeter, processAllStreamsParameter);
}

private String commandFrom(final String debug,
Expand All @@ -61,17 +69,15 @@ private String commandFrom(final String debug,
final String eventToolJarLocation,
final String anonymiseJarLocation,
final String standaloneDSLocation,
final String streamCountReportingIntervalParameter,
final String memoryParmeter) throws IOException {
return format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s %s -c %s %s %s",
final String... environmentParameters) throws IOException {
final String command = format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s %s -c %s ",
debug,
mainProcessFilePath,
streamJarLocation,
eventToolJarLocation,
anonymiseJarLocation,
standaloneDSLocation,
streamCountReportingIntervalParameter,
memoryParmeter);
standaloneDSLocation);
return command + on(" ").join(environmentParameters);
}

private String getResource(final String pattern) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static javax.json.Json.createReader;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -18,7 +19,6 @@
import java.util.UUID;
import java.util.stream.Stream;

import javax.json.Json;
import javax.json.JsonObject;

import org.junit.Before;
Expand All @@ -30,9 +30,9 @@ public class StreamAnonymisationTransformationIT {
private static final long STREAM_COUNT_REPORTING_INTERVAL = 10L;
private static final String MEMORY_OPTIONS_PARAMETER = "2048M";
private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false;
private static final Boolean PROCESS_ALL_STREAMS = true;
private static final int WILDFLY_TIMEOUT_IN_SECONDS = 60;

private static final UUID STREAM_ID = randomUUID();
private static final String EVENT_TO_ANONYMISE = "sample.transformation.anonymise";

private SwarmStarterUtil swarmStarterUtil;
Expand All @@ -48,26 +48,84 @@ public void setUp() throws Exception {


@Test
public void shouldAnonymiseEventData() throws Exception {
public void shouldAnonymiseActiveStreamEventData() throws Exception {

final UUID activeStreamId = randomUUID();

final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, STREAM_ID, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().filter(e -> e.getStreamId().equals(STREAM_ID)).collect(toList());
final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().filter(e -> e.getStreamId().equals(activeStreamId)).collect(toList());

assertThat(events, hasSize(1));

final Event event = retrieveEvent(STREAM_ID, EVENT_TO_ANONYMISE);
final Event event = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(event);
JsonObject payload = Json.createReader(new StringReader(event.getPayload())).readObject();
JsonObject payload = createReader(new StringReader(event.getPayload())).readObject();
assertFalse(payload.getString("a string").equalsIgnoreCase("test"));
assertThat(payload.getString("a string").length(), is("test".length()));

}

@Test
public void shouldAnonymiseActiveAndInactiveStreamEventDataAsEnvironmentVariableProcessAllStreamsSet() throws Exception {

final UUID activeStreamId = randomUUID();
final UUID inactiveStreamId = randomUUID();
final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, inactiveStreamId, 1L, createdAt, false);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, PROCESS_ALL_STREAMS, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().collect(toList());
assertThat(events, hasSize(2));

final Event activeStreamEvent = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(activeStreamEvent);
JsonObject activeStreamEventPayload = createReader(new StringReader(activeStreamEvent.getPayload())).readObject();
assertFalse(activeStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(activeStreamEventPayload.getString("a string").length(), is("test".length()));

final Event inactiveStreamEvent = retrieveEvent(inactiveStreamId, EVENT_TO_ANONYMISE);
assertNotNull(inactiveStreamEvent);
JsonObject inactiveStreamEventPayload = createReader(new StringReader(inactiveStreamEvent.getPayload())).readObject();
assertFalse(inactiveStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(inactiveStreamEventPayload.getString("a string").length(), is("test".length()));

}

@Test
public void shouldOnlyAnonymiseActiveStreamEventDataAsEnvironmentVariableProcessAllStreamsNotSet() throws Exception {

final UUID activeStreamId = randomUUID();
final UUID inactiveStreamId = randomUUID();
final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, inactiveStreamId, 1L, createdAt, false);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().collect(toList());
assertThat(events, hasSize(2));

final Event activeStreamEvent = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(activeStreamEvent);
JsonObject activeStreamEventPayload = createReader(new StringReader(activeStreamEvent.getPayload())).readObject();
assertFalse(activeStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(activeStreamEventPayload.getString("a string").length(), is("test".length()));

final Event inactiveStreamEvent = retrieveEvent(inactiveStreamId, EVENT_TO_ANONYMISE);
assertNotNull(inactiveStreamEvent);
JsonObject inactiveStreamEventPayload = createReader(new StringReader(inactiveStreamEvent.getPayload())).readObject();
assertThat(inactiveStreamEventPayload.getString("a string"), is("test"));
}

private Event retrieveEvent(final UUID streamId, final String eventName) {
final Stream<Event> eventLogs = databaseUtils.getEventLogJdbcRepository().findAll();
final Optional<Event> event = eventLogs
Expand Down

0 comments on commit 002289c

Please sign in to comment.