Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CPI-53: Extending transformation tool for anonymisation activity #32

Merged
merged 1 commit into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class StartTransformation implements ManagedTaskListener {
@ConfigurationValue("streamCountReportingInterval")
private int streamCountReportingInterval;

@Inject
@ConfigurationValue("processAllStreams")
private boolean processAllStreams;

@Inject
private Logger logger;

Expand Down Expand Up @@ -85,7 +89,9 @@ private void createTransformationTasks(final int pass) {
throw new IllegalArgumentException("Invalid streamCountReportingInterval argument");
}

final Stream<UUID> activeStreams = eventRepository.getAllActiveStreamIds();
final Stream<UUID> activeStreams = processAllStreams ? eventRepository.getAllStreamIds() : eventRepository.getAllActiveStreamIds();

logger.info(format("Processing %s streams", processAllStreams ? "all" : "active"));

activeStreams
.forEach(streamId -> {
Expand Down Expand Up @@ -126,7 +132,7 @@ public void taskDone(final Future<?> futureTask, final ManagedExecutorService ma
}

public void taskAborted(final Future<?> futureTask, final ManagedExecutorService managedExecutorService, final Object task, final Throwable throwable) {
logger.error(String.format("Aborted Transformation task: '%s'", throwable.getMessage()));
logger.error("Aborted Transformation task", throwable);
removeOutstandingTask(futureTask);
shutDownIfFinished();
truncateAndPopulateLinkedEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ public void shouldLogOutputBasedOnStreamsProcessedCountStepInfoWhenStreamDoneIsN

startTransformation.go();

verify(logger).info("-------------- Invoke Event Streams Transformation -------------");
verify(logger).info("-------------- Invocation of Event Streams Transformation Completed --------------");
verify(logger).info("Processing active streams");
verify(logger).info("Pass 1 - Streams count: 4 - time(ms): 12222222");
startTransformation.taskDone(future1, null, null, null);
startTransformation.taskDone(future2, null, null, null);
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<cpp.repo.name>stream-transformation-tool</cpp.repo.name>
<framework-api.version>3.1.1</framework-api.version>
<framework.version>5.1.2</framework.version>
<event-store.version>1.1.9</event-store.version>
<event-store.version>1.1.11</event-store.version>
<common-bom.version>1.28.0</common-bom.version>
<utilities.version>1.16.2</utilities.version>
<test-utils.version>1.18.1</test-utils.version>
Expand Down Expand Up @@ -103,11 +103,12 @@
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>event-repository-liquibase</artifactId>
<version>${framework.version}</version>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
<artifactId>event-repository-jdbc</artifactId>
<version>${event-store.version}</version>
</dependency>
<dependency>
<groupId>uk.gov.justice.utils</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package uk.gov.justice.framework.tools.transformation;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static uk.gov.justice.framework.tools.transformation.EventLogBuilder.eventLogFrom;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event;
import uk.gov.justice.services.eventsourcing.repository.jdbc.exception.InvalidPositionException;

import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

import javax.sql.DataSource;
Expand All @@ -27,25 +30,34 @@ public DatabaseUtils() throws SQLException, LiquibaseException {
}

public DataSource getDataSource() {
return dataSource;
return dataSource;
}

public void dropAndUpdateLiquibase() throws SQLException, LiquibaseException {
liquibaseUtil.dropAndUpdate();
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt) throws InvalidPositionException {
final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt);

eventLogJdbcRepository.insert(event);
eventStreamJdbcRepository.insert(streamId);
insertEventLogData(eventName, streamId, sequenceId, createdAt, empty());
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final long eventNumber) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, of(eventNumber));
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final Optional<Long> eventNumber) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, eventNumber, true);
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, boolean streamStatus) throws InvalidPositionException {
insertEventLogData(eventName, streamId, sequenceId, createdAt, empty(), streamStatus);
}

public void insertEventLogData(final String eventName, final UUID streamId, final long sequenceId, final ZonedDateTime createdAt, final Optional<Long> eventNumber, boolean streamStatus) throws InvalidPositionException {
final Event event = eventLogFrom(eventName, sequenceId, streamId, createdAt, eventNumber);

eventLogJdbcRepository.insert(event);
eventStreamJdbcRepository.insert(streamId);
eventStreamJdbcRepository.insert(streamId, streamStatus);
}

public TestEventLogJdbcRepository getEventLogJdbcRepository() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.framework.tools.transformation;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static uk.gov.justice.services.test.utils.core.messaging.JsonEnvelopeBuilder.envelope;
import static uk.gov.justice.services.test.utils.core.messaging.MetadataBuilderFactory.metadataWithRandomUUID;
Expand All @@ -9,42 +10,23 @@
import uk.gov.justice.services.messaging.Metadata;

import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

public class EventLogBuilder {

private EventLogBuilder() {

// should not create an instance of this class
}

public static Event eventLogFrom(
final String eventName,
final Long sequenceId,
final UUID streamId,
final ZonedDateTime createdAt) {
final JsonEnvelope jsonEnvelope = envelope()
.with(metadataWithRandomUUID(eventName)
.createdAt(createdAt)
.withVersion(sequenceId)
.withStreamId(streamId)
.withSource("sample")
)
.withPayloadOf("test", "a string")
.build();

final Metadata metadata = jsonEnvelope.metadata();
final UUID id = metadata.id();

final String name = metadata.name();
final String payload = jsonEnvelope.payloadAsJsonObject().toString();
return eventLogFrom(eventName, sequenceId, streamId, createdAt, empty());

return new Event(
id,
streamId,
sequenceId,
name,
metadata.asJsonObject().toString(),
payload,
createdAt);
}

public static Event eventLogFrom(
Expand All @@ -53,6 +35,17 @@ public static Event eventLogFrom(
final UUID streamId,
final ZonedDateTime createdAt,
final long eventNumber) {

return eventLogFrom(eventName, sequenceId, streamId, createdAt, of(eventNumber));
}

public static Event eventLogFrom(
final String eventName,
final Long sequenceId,
final UUID streamId,
final ZonedDateTime createdAt,
final Optional<Long> eventNumber) {

final JsonEnvelope jsonEnvelope = envelope()
.with(metadataWithRandomUUID(eventName)
.createdAt(createdAt)
Expand All @@ -77,6 +70,6 @@ public static Event eventLogFrom(
metadata.asJsonObject().toString(),
payload,
createdAt,
of(eventNumber));
eventNumber);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.framework.tools.transformation;

import static com.google.common.base.Joiner.on;
import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -20,10 +21,16 @@ public class SwarmStarterUtil {
private static final Logger LOGGER = getLogger(SwarmStarterUtil.class);

public void runCommand(final boolean enableRemoteDebugging, final long timeoutInSeconds, final long streamCountReportingInterval, final String memoryOptions) throws IOException {
runCommand(enableRemoteDebugging, false, timeoutInSeconds, streamCountReportingInterval, memoryOptions);
}

public void runCommand(final boolean enableRemoteDebugging, final boolean processAllStreams, final long timeoutInSeconds, final long streamCountReportingInterval, final String memoryOptions) throws IOException {
final String memoryParmeter = format("-DXmx=%s", memoryOptions);
final String streamCountReportingIntervalParameter = format("-DstreamCountReportingInterval=%s", streamCountReportingInterval);
final String processAllStreamsParameter = format("-DprocessAllStreams=%s", processAllStreams);

final String command = createCommandToExecuteTransformationTool(enableRemoteDebugging, streamCountReportingIntervalParameter, memoryParmeter);
final String command = createCommandToExecuteTransformationTool(enableRemoteDebugging, processAllStreamsParameter, streamCountReportingIntervalParameter, memoryParmeter);
LOGGER.info("Executing command: {}", command);
final Process exec = execute(command);
final BufferedReader reader =
new BufferedReader(new InputStreamReader(exec.getInputStream()));
Expand All @@ -38,6 +45,7 @@ public void runCommand(final boolean enableRemoteDebugging, final long timeoutIn
}

private String createCommandToExecuteTransformationTool(final boolean enableRemoteDebugging,
final String processAllStreamsParameter,
final String streamCountReportingIntervalParameter,
final String memoryParmeter) throws IOException {
final String eventToolJarLocation = getResource("event-tool*.jar");
Expand All @@ -52,7 +60,7 @@ private String createCommandToExecuteTransformationTool(final boolean enableRemo
debug = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005";
}

return commandFrom(debug, mainProcessFilePath, streamJarLocation, eventToolJarLocation, anonymiseJarLocation, standaloneDSLocation, streamCountReportingIntervalParameter, memoryParmeter);
return commandFrom(debug, mainProcessFilePath, streamJarLocation, eventToolJarLocation, anonymiseJarLocation, standaloneDSLocation, streamCountReportingIntervalParameter, memoryParmeter, processAllStreamsParameter);
}

private String commandFrom(final String debug,
Expand All @@ -61,17 +69,15 @@ private String commandFrom(final String debug,
final String eventToolJarLocation,
final String anonymiseJarLocation,
final String standaloneDSLocation,
final String streamCountReportingIntervalParameter,
final String memoryParmeter) throws IOException {
return format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s %s -c %s %s %s",
final String... environmentParameters) throws IOException {
final String command = format("java %s -jar -Dorg.wildfly.swarm.mainProcessFile=%s -Devent.transformation.jar=%s %s %s -c %s ",
debug,
mainProcessFilePath,
streamJarLocation,
eventToolJarLocation,
anonymiseJarLocation,
standaloneDSLocation,
streamCountReportingIntervalParameter,
memoryParmeter);
standaloneDSLocation);
return command + on(" ").join(environmentParameters);
}

private String getResource(final String pattern) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static javax.json.Json.createReader;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -18,7 +19,6 @@
import java.util.UUID;
import java.util.stream.Stream;

import javax.json.Json;
import javax.json.JsonObject;

import org.junit.Before;
Expand All @@ -30,9 +30,9 @@ public class StreamAnonymisationTransformationIT {
private static final long STREAM_COUNT_REPORTING_INTERVAL = 10L;
private static final String MEMORY_OPTIONS_PARAMETER = "2048M";
private static final Boolean ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY = false;
private static final Boolean PROCESS_ALL_STREAMS = true;
private static final int WILDFLY_TIMEOUT_IN_SECONDS = 60;

private static final UUID STREAM_ID = randomUUID();
private static final String EVENT_TO_ANONYMISE = "sample.transformation.anonymise";

private SwarmStarterUtil swarmStarterUtil;
Expand All @@ -48,26 +48,84 @@ public void setUp() throws Exception {


@Test
public void shouldAnonymiseEventData() throws Exception {
public void shouldAnonymiseActiveStreamEventData() throws Exception {

final UUID activeStreamId = randomUUID();

final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, STREAM_ID, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().filter(e -> e.getStreamId().equals(STREAM_ID)).collect(toList());
final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().filter(e -> e.getStreamId().equals(activeStreamId)).collect(toList());

assertThat(events, hasSize(1));

final Event event = retrieveEvent(STREAM_ID, EVENT_TO_ANONYMISE);
final Event event = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(event);
JsonObject payload = Json.createReader(new StringReader(event.getPayload())).readObject();
JsonObject payload = createReader(new StringReader(event.getPayload())).readObject();
assertFalse(payload.getString("a string").equalsIgnoreCase("test"));
assertThat(payload.getString("a string").length(), is("test".length()));

}

@Test
public void shouldAnonymiseActiveAndInactiveStreamEventDataAsEnvironmentVariableProcessAllStreamsSet() throws Exception {

final UUID activeStreamId = randomUUID();
final UUID inactiveStreamId = randomUUID();
final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, inactiveStreamId, 1L, createdAt, false);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, PROCESS_ALL_STREAMS, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().collect(toList());
assertThat(events, hasSize(2));

final Event activeStreamEvent = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(activeStreamEvent);
JsonObject activeStreamEventPayload = createReader(new StringReader(activeStreamEvent.getPayload())).readObject();
assertFalse(activeStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(activeStreamEventPayload.getString("a string").length(), is("test".length()));

final Event inactiveStreamEvent = retrieveEvent(inactiveStreamId, EVENT_TO_ANONYMISE);
assertNotNull(inactiveStreamEvent);
JsonObject inactiveStreamEventPayload = createReader(new StringReader(inactiveStreamEvent.getPayload())).readObject();
assertFalse(inactiveStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(inactiveStreamEventPayload.getString("a string").length(), is("test".length()));

}

@Test
public void shouldOnlyAnonymiseActiveStreamEventDataAsEnvironmentVariableProcessAllStreamsNotSet() throws Exception {

final UUID activeStreamId = randomUUID();
final UUID inactiveStreamId = randomUUID();
final ZonedDateTime createdAt = new UtcClock().now().minusMonths(1);

databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, activeStreamId, 1L, createdAt);
databaseUtils.insertEventLogData(EVENT_TO_ANONYMISE, inactiveStreamId, 1L, createdAt, false);

swarmStarterUtil.runCommand(ENABLE_REMOTE_DEBUGGING_FOR_WILDFLY, WILDFLY_TIMEOUT_IN_SECONDS, STREAM_COUNT_REPORTING_INTERVAL, MEMORY_OPTIONS_PARAMETER);

final List<Event> events = databaseUtils.getEventLogJdbcRepository().findAll().collect(toList());
assertThat(events, hasSize(2));

final Event activeStreamEvent = retrieveEvent(activeStreamId, EVENT_TO_ANONYMISE);
assertNotNull(activeStreamEvent);
JsonObject activeStreamEventPayload = createReader(new StringReader(activeStreamEvent.getPayload())).readObject();
assertFalse(activeStreamEventPayload.getString("a string").equalsIgnoreCase("test"));
assertThat(activeStreamEventPayload.getString("a string").length(), is("test".length()));

final Event inactiveStreamEvent = retrieveEvent(inactiveStreamId, EVENT_TO_ANONYMISE);
assertNotNull(inactiveStreamEvent);
JsonObject inactiveStreamEventPayload = createReader(new StringReader(inactiveStreamEvent.getPayload())).readObject();
assertThat(inactiveStreamEventPayload.getString("a string"), is("test"));
}

private Event retrieveEvent(final UUID streamId, final String eventName) {
final Stream<Event> eventLogs = databaseUtils.getEventLogJdbcRepository().findAll();
final Optional<Event> event = eventLogs
Expand Down