-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
30 changed files
with
775 additions
and
291 deletions.
There are no files selected for viewing
6 changes: 3 additions & 3 deletions
6
...ion/src/main/java/uk/gov/justice/framework/tools/fraction/runtime/FrameworkLibraries.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
96 changes: 41 additions & 55 deletions
96
...ols-replay/src/main/java/uk/gov/justice/framework/tools/replay/AsyncStreamDispatcher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,85 +1,71 @@ | ||
package uk.gov.justice.framework.tools.replay; | ||
|
||
import static org.apache.commons.lang3.StringUtils.substringBefore; | ||
import static java.util.stream.IntStream.range; | ||
import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED; | ||
import static javax.ejb.TransactionAttributeType.REQUIRED; | ||
|
||
import uk.gov.justice.services.core.handler.exception.MissingHandlerException; | ||
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; | ||
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository; | ||
import uk.gov.justice.services.messaging.JsonEnvelope; | ||
import uk.gov.justice.services.messaging.Metadata; | ||
|
||
import java.util.List; | ||
import java.util.UUID; | ||
import java.util.stream.Stream; | ||
|
||
import javax.ejb.Stateless; | ||
import javax.ejb.TransactionAttribute; | ||
import javax.inject.Inject; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@Stateless | ||
public class AsyncStreamDispatcher { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncStreamDispatcher.class); | ||
|
||
@Inject | ||
private TransactionalEnvelopeDispatcher envelopeDispatcher; | ||
|
||
@Inject | ||
private StreamStatusJdbcRepository streamStatusRepository; | ||
|
||
public UUID dispatch(final Stream<JsonEnvelope> stream) { | ||
final int[] noOfProcessedElements = {0}; | ||
final JsonEnvelope[] envelopes = {null}; | ||
|
||
try (final Stream<JsonEnvelope> stream1 = stream) { | ||
stream1.forEach(envelope -> { | ||
if (firstElement(noOfProcessedElements)) { | ||
LOGGER.info("Starting processing of stream: {}", streamIdOf(envelope)); | ||
} | ||
try { | ||
envelopeDispatcher.dispatch(envelope); | ||
} catch (MissingHandlerException ex) { | ||
final Metadata metadata = envelope.metadata(); | ||
LOGGER.warn("Missing handler for stream Id: {}, event name: {}, version: {}", metadata.streamId().get(), | ||
metadata.name(), metadata.version().get()); | ||
} | ||
noOfProcessedElements[0]++; | ||
if (shouldLogProgress(noOfProcessedElements)) { | ||
LOGGER.info("Processed {} elements of stream: {}", noOfProcessedElements[0], streamIdOf(envelope)); | ||
} | ||
envelopes[0] = envelope; | ||
}); | ||
final UUID streamId = streamIdOf(envelopes[0]); | ||
streamStatusRepository.insert(new StreamStatus(streamId, versionOf(envelopes[0]), sourceOf(envelopes[0]))); | ||
LOGGER.info("Finished processing of stream: {}, elements processed: {}", streamId, noOfProcessedElements[0]); | ||
return streamId; | ||
} | ||
} | ||
@Inject | ||
private StreamEnvelopeProvider streamEnvelopeProvider; | ||
|
||
private UUID streamIdOf(final JsonEnvelope envelope) { | ||
return envelope | ||
.metadata() | ||
.streamId() | ||
.orElseThrow(() -> new IllegalArgumentException(String.format("Stream id not found in the envelope: %s", envelope.toString()))); | ||
} | ||
@Inject | ||
private StreamStatusFactory streamStatusFactory; | ||
|
||
private Long versionOf(final JsonEnvelope envelope) { | ||
return envelope | ||
.metadata() | ||
.version() | ||
.orElseThrow(() -> new IllegalArgumentException(String.format("Version not found in the envelope: %s", envelope.toString()))); | ||
} | ||
@Inject | ||
private ProgressLogger progressLogger; | ||
|
||
@TransactionAttribute(NOT_SUPPORTED) | ||
public UUID dispatch(final UUID streamId) { | ||
|
||
progressLogger.logStart(streamId); | ||
|
||
final List<JsonEnvelope> envelopes = streamEnvelopeProvider.getStreamAsList(streamId); | ||
|
||
private String sourceOf(final JsonEnvelope envelope) { | ||
return substringBefore(envelope.metadata().name(), "."); | ||
range(0, envelopes.size()) | ||
.forEach(index -> dispatchEnvelope( | ||
envelopes.get(index), | ||
streamId, | ||
index)); | ||
|
||
insertStreamStatus(streamStatusFactory.create(envelopes, streamId)); | ||
|
||
progressLogger.logCompletion(streamId); | ||
|
||
return streamId; | ||
} | ||
|
||
private boolean shouldLogProgress(final int[] i) { | ||
return i[0] % 100 == 0; | ||
@TransactionAttribute(REQUIRED) | ||
private void dispatchEnvelope(final JsonEnvelope jsonEnvelope, final UUID streamId, final int index) { | ||
try { | ||
envelopeDispatcher.dispatch(jsonEnvelope); | ||
progressLogger.logSuccess(streamId, index); | ||
} catch (final MissingHandlerException ex) { | ||
progressLogger.logFailure(streamId, jsonEnvelope); | ||
} | ||
} | ||
|
||
private boolean firstElement(final int[] i) { | ||
return i[0] == 0; | ||
@TransactionAttribute(REQUIRED) | ||
private void insertStreamStatus(final StreamStatus streamStatus) { | ||
streamStatusRepository.insert(streamStatus); | ||
} | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
...ork-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressChecker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package uk.gov.justice.framework.tools.replay; | ||
|
||
public class ProgressChecker { | ||
|
||
private static final int PROGRESS_INTERVAL = 100; | ||
|
||
public boolean shouldLogProgress(final int index) { | ||
return index % PROGRESS_INTERVAL == 0; | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
...work-tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/ProgressLogger.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package uk.gov.justice.framework.tools.replay; | ||
|
||
import uk.gov.justice.services.messaging.JsonEnvelope; | ||
import uk.gov.justice.services.messaging.Metadata; | ||
|
||
import java.util.UUID; | ||
|
||
import javax.inject.Inject; | ||
|
||
import org.slf4j.Logger; | ||
|
||
public class ProgressLogger { | ||
|
||
@Inject | ||
private ProgressChecker progressChecker; | ||
|
||
@Inject | ||
private Logger logger; | ||
|
||
private int sucessCount = 0; | ||
|
||
public void logStart(final UUID streamId) { | ||
logger.info("Starting processing of stream: {}", streamId); | ||
} | ||
|
||
public void logSuccess(final UUID streamId, final int index) { | ||
|
||
sucessCount++; | ||
|
||
if (progressChecker.shouldLogProgress(index)) { | ||
logger.info("Processed {} element(s) of stream: {}", sucessCount, streamId); | ||
} | ||
} | ||
|
||
public void logFailure(final UUID streamId, final JsonEnvelope jsonEnvelope) { | ||
final Metadata metadata = jsonEnvelope.metadata(); | ||
logger.warn("Missing handler for stream Id: {}, event name: {}, version: {}", | ||
streamId, | ||
metadata.name(), | ||
metadata.version().map(theVersion -> "" + theVersion).orElse("Not set") | ||
); | ||
} | ||
|
||
public void logCompletion(final UUID streamId) { | ||
logger.info("Finished processing of stream: {}. Processed {} elements", streamId, sucessCount); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
...ls-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamEnvelopeProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package uk.gov.justice.framework.tools.replay; | ||
|
||
import static java.util.stream.Collectors.toList; | ||
|
||
import uk.gov.justice.services.eventsourcing.repository.jdbc.JdbcEventRepository; | ||
import uk.gov.justice.services.messaging.JsonEnvelope; | ||
|
||
import java.util.List; | ||
import java.util.UUID; | ||
import java.util.stream.Stream; | ||
|
||
import javax.inject.Inject; | ||
|
||
public class StreamEnvelopeProvider { | ||
|
||
@Inject | ||
private JdbcEventRepository jdbcEventRepository; | ||
|
||
public List<JsonEnvelope> getStreamAsList(final UUID streamId) { | ||
try (final Stream<JsonEnvelope> stream = jdbcEventRepository.getByStreamId(streamId)) { | ||
return stream.collect(toList()); | ||
} | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
...tools-replay/src/main/java/uk/gov/justice/framework/tools/replay/StreamStatusFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package uk.gov.justice.framework.tools.replay; | ||
|
||
import static java.lang.String.format; | ||
import static org.apache.commons.lang3.StringUtils.substringBefore; | ||
|
||
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus; | ||
import uk.gov.justice.services.messaging.JsonEnvelope; | ||
|
||
import java.util.List; | ||
import java.util.UUID; | ||
|
||
public class StreamStatusFactory { | ||
|
||
public StreamStatus create(final List<JsonEnvelope> envelopes, final UUID streamId) { | ||
final JsonEnvelope jsonEnvelope = envelopes.get(0); | ||
final Long version = getVersionFrom(jsonEnvelope); | ||
final String source = getSourceFrom(jsonEnvelope); | ||
|
||
return new StreamStatus(streamId, version, source); | ||
} | ||
|
||
private Long getVersionFrom(final JsonEnvelope envelope) { | ||
return envelope | ||
.metadata() | ||
.version() | ||
.orElseThrow(() -> new IllegalArgumentException(format("Version not found in the envelope: %s", envelope.toString()))); | ||
} | ||
|
||
private String getSourceFrom(final JsonEnvelope envelope) { | ||
return substringBefore(envelope.metadata().name(), "."); | ||
} | ||
} |
Oops, something went wrong.