Skip to content

Commit

Permalink
Fix timeout causing partial replay of stream
Browse files Browse the repository at this point in the history
  • Loading branch information
amckenzie committed May 25, 2018
1 parent e019fd9 commit 7916f49
Show file tree
Hide file tree
Showing 23 changed files with 735 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -1,85 +1,71 @@
package uk.gov.justice.framework.tools.replay;

import static org.apache.commons.lang3.StringUtils.substringBefore;
import static java.util.stream.IntStream.range;
import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED;
import static javax.ejb.TransactionAttributeType.REQUIRED;

import uk.gov.justice.services.core.handler.exception.MissingHandlerException;
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus;
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.Metadata;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Stateless
public class AsyncStreamDispatcher {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncStreamDispatcher.class);

@Inject
private TransactionalEnvelopeDispatcher envelopeDispatcher;

@Inject
private StreamStatusJdbcRepository streamStatusRepository;

public UUID dispatch(final Stream<JsonEnvelope> stream) {
final int[] noOfProcessedElements = {0};
final JsonEnvelope[] envelopes = {null};

try (final Stream<JsonEnvelope> stream1 = stream) {
stream1.forEach(envelope -> {
if (firstElement(noOfProcessedElements)) {
LOGGER.info("Starting processing of stream: {}", streamIdOf(envelope));
}
try {
envelopeDispatcher.dispatch(envelope);
} catch (MissingHandlerException ex) {
final Metadata metadata = envelope.metadata();
LOGGER.warn("Missing handler for stream Id: {}, event name: {}, version: {}", metadata.streamId().get(),
metadata.name(), metadata.version().get());
}
noOfProcessedElements[0]++;
if (shouldLogProgress(noOfProcessedElements)) {
LOGGER.info("Processed {} elements of stream: {}", noOfProcessedElements[0], streamIdOf(envelope));
}
envelopes[0] = envelope;
});
final UUID streamId = streamIdOf(envelopes[0]);
streamStatusRepository.insert(new StreamStatus(streamId, versionOf(envelopes[0]), sourceOf(envelopes[0])));
LOGGER.info("Finished processing of stream: {}, elements processed: {}", streamId, noOfProcessedElements[0]);
return streamId;
}
}
@Inject
private StreamEnvelopeProvider streamEnvelopeProvider;

private UUID streamIdOf(final JsonEnvelope envelope) {
return envelope
.metadata()
.streamId()
.orElseThrow(() -> new IllegalArgumentException(String.format("Stream id not found in the envelope: %s", envelope.toString())));
}
@Inject
private StreamStatusFactory streamStatusFactory;

private Long versionOf(final JsonEnvelope envelope) {
return envelope
.metadata()
.version()
.orElseThrow(() -> new IllegalArgumentException(String.format("Version not found in the envelope: %s", envelope.toString())));
}
@Inject
private ProgressLogger progressLogger;

@TransactionAttribute(NOT_SUPPORTED)
public UUID dispatch(final UUID streamId) {

progressLogger.logStart(streamId);

final List<JsonEnvelope> envelopes = streamEnvelopeProvider.getStreamAsList(streamId);

private String sourceOf(final JsonEnvelope envelope) {
return substringBefore(envelope.metadata().name(), ".");
range(0, envelopes.size())
.forEach(index -> dispatchEnvelope(
envelopes.get(index),
streamId,
index));

insertStreamStatus(streamStatusFactory.create(envelopes, streamId));

progressLogger.logCompletion(streamId);

return streamId;
}

private boolean shouldLogProgress(final int[] i) {
return i[0] % 100 == 0;
@TransactionAttribute(REQUIRED)
private void dispatchEnvelope(final JsonEnvelope jsonEnvelope, final UUID streamId, final int index) {
try {
envelopeDispatcher.dispatch(jsonEnvelope);
progressLogger.logSuccess(streamId, index);
} catch (final MissingHandlerException ex) {
progressLogger.logFailure(streamId, jsonEnvelope);
}
}

private boolean firstElement(final int[] i) {
return i[0] == 0;
@TransactionAttribute(REQUIRED)
private void insertStreamStatus(final StreamStatus streamStatus) {
streamStatusRepository.insert(streamStatus);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package uk.gov.justice.framework.tools.replay;

public class ProgressChecker {

private static final int PROGRESS_INTERVAL = 100;

public boolean shouldLogProgress(final int index) {
return index % PROGRESS_INTERVAL == 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package uk.gov.justice.framework.tools.replay;

import uk.gov.justice.services.messaging.JsonEnvelope;
import uk.gov.justice.services.messaging.Metadata;

import java.util.UUID;

import javax.inject.Inject;

import org.slf4j.Logger;

public class ProgressLogger {

@Inject
private ProgressChecker progressChecker;

@Inject
private Logger logger;

private int sucessCount = 0;

public void logStart(final UUID streamId) {
logger.info("Starting processing of stream: {}", streamId);
}

public void logSuccess(final UUID streamId, final int index) {

sucessCount++;

if (progressChecker.shouldLogProgress(index)) {
logger.info("Processed {} element(s) of stream: {}", sucessCount, streamId);
}
}

public void logFailure(final UUID streamId, final JsonEnvelope jsonEnvelope) {
final Metadata metadata = jsonEnvelope.metadata();
logger.warn("Missing handler for stream Id: {}, event name: {}, version: {}",
streamId,
metadata.name(),
metadata.version().map(theVersion -> "" + theVersion).orElse("Not set")
);
}

public void logCompletion(final UUID streamId) {
logger.info("Finished processing of stream: {}. Processed {} elements", streamId, sucessCount);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.framework.tools.replay;

import static java.util.stream.Collectors.toList;
import static org.wildfly.swarm.bootstrap.Main.MAIN_PROCESS_FILE;

import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository;
Expand All @@ -8,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.Deque;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -33,12 +35,14 @@ public class StartReplay implements ManagedTaskListener {
private Logger logger;

@Resource
private ManagedExecutorService executorService;
private ManagedExecutorService managedExecutorService;

@Inject
private JdbcEventRepository jdbcEventRepository;

@Inject
private AsyncStreamDispatcher asyncStreamDispatcher;

private Deque<Future<UUID>> outstandingTasks = new LinkedBlockingDeque<>();

private boolean allTasksCreated = false;
Expand All @@ -47,11 +51,13 @@ public class StartReplay implements ManagedTaskListener {
void go() {
logger.info("-------------- Invoke Event Streams Replay-------------!");
checkForMainProcessFile();
jdbcEventRepository.getStreamOfAllActiveEventStreams()
.forEach(eventStream -> {
StreamDispatchTask dispatchTask = new StreamDispatchTask(eventStream, asyncStreamDispatcher, this);
outstandingTasks.add(executorService.submit(dispatchTask));
});

final List<UUID> activeStreamIds = jdbcEventRepository.getAllActiveStreamIds().collect(toList());
activeStreamIds.forEach(uuid -> {
StreamDispatchTask dispatchTask = new StreamDispatchTask(uuid, asyncStreamDispatcher, this);
outstandingTasks.add(managedExecutorService.submit(dispatchTask));
});

allTasksCreated = true;
if (outstandingTasks.isEmpty()) shutdown();
logger.info("-------------- Invocation of Event Streams Replay Completed --------------");
Expand Down Expand Up @@ -107,4 +113,4 @@ private void checkForMainProcessFile() {
logger.warn(NO_PROCESS_FILE_WARNING);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
package uk.gov.justice.framework.tools.replay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.justice.services.messaging.JsonEnvelope;

import javax.enterprise.concurrent.ManagedTask;
import javax.enterprise.concurrent.ManagedTaskListener;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Stream;

import javax.enterprise.concurrent.ManagedTask;
import javax.enterprise.concurrent.ManagedTaskListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StreamDispatchTask implements Callable<UUID>, ManagedTask {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamDispatchTask.class);
private final Stream<JsonEnvelope> stream;
private final UUID streamId;
private final AsyncStreamDispatcher dispatcher;
private final ManagedTaskListener dispatchListener;

public StreamDispatchTask(final Stream<JsonEnvelope> stream, final AsyncStreamDispatcher dispatcher, final ManagedTaskListener dispatchListener) {
public StreamDispatchTask(
final UUID streamId,
final AsyncStreamDispatcher dispatcher,
final ManagedTaskListener dispatchListener) {
this.streamId = streamId;
this.dispatcher = dispatcher;
this.dispatchListener = dispatchListener;
this.stream = stream;
}

@Override
public UUID call() {
LOGGER.debug("---------- Dispatching stream -------------");
return dispatcher.dispatch(this.stream);

return dispatcher.dispatch(streamId);
}

@Override
Expand All @@ -39,4 +42,4 @@ public Map<String, String> getExecutionProperties() {
public ManagedTaskListener getManagedTaskListener() {
return dispatchListener;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.gov.justice.framework.tools.replay;

import static java.util.stream.Collectors.toList;

import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import javax.inject.Inject;

public class StreamEnvelopeProvider {

@Inject
private JdbcEventRepository jdbcEventRepository;

public List<JsonEnvelope> getStreamAsList(final UUID streamId) {
try (final Stream<JsonEnvelope> stream = jdbcEventRepository.getByStreamId(streamId)) {
return stream.collect(toList());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package uk.gov.justice.framework.tools.replay;

import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.substringBefore;

import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
import java.util.UUID;

public class StreamStatusFactory {

public StreamStatus create(final List<JsonEnvelope> envelopes, final UUID streamId) {
final JsonEnvelope jsonEnvelope = envelopes.get(0);
final Long version = getVersionFrom(jsonEnvelope);
final String source = getSourceFrom(jsonEnvelope);

return new StreamStatus(streamId, version, source);
}

private Long getVersionFrom(final JsonEnvelope envelope) {
return envelope
.metadata()
.version()
.orElseThrow(() -> new IllegalArgumentException(format("Version not found in the envelope: %s", envelope.toString())));
}

private String getSourceFrom(final JsonEnvelope envelope) {
return substringBefore(envelope.metadata().name(), ".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,4 @@ public void init() {
public void dispatch(JsonEnvelope envelope) {
dispatcher.dispatch(envelope);
}


}
4 changes: 2 additions & 2 deletions framework-tools-replay/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
log4j.rootLogger=INFO, A1 file
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Loading

0 comments on commit 7916f49

Please sign in to comment.