Skip to content

Commit

Permalink
Load events in batches of 1000 when processing stream
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo authored and amckenzie committed May 29, 2018
1 parent b179556 commit bf0eeec
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

### Changed
- Events are loaded into memory in batches of 1000 at each time

## [4.2.1] - 2018-05-29

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package uk.gov.justice.framework.tools.replay;

import static java.util.stream.IntStream.range;
import static javax.ejb.TransactionAttributeType.NOT_SUPPORTED;
import static javax.ejb.TransactionAttributeType.REQUIRED;

Expand All @@ -9,8 +8,8 @@
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
Expand All @@ -19,17 +18,20 @@
@Stateless
public class AsyncStreamDispatcher {

private static final int PAGE_SIZE = 1000;
private static final long FIRST_POSITION = 1L;

@Inject
private TransactionalEnvelopeDispatcher envelopeDispatcher;

@Inject
private StreamStatusJdbcRepository streamStatusRepository;

@Inject
private StreamEnvelopeProvider streamEnvelopeProvider;
private StreamStatusFactory streamStatusFactory;

@Inject
private StreamStatusFactory streamStatusFactory;
private JsonEnvelopeJdbcRepository jsonEnvelopeJdbcRepository;

@Inject
private ProgressLogger progressLogger;
Expand All @@ -39,26 +41,37 @@ public UUID dispatch(final UUID streamId) {

progressLogger.logStart(streamId);

final List<JsonEnvelope> envelopes = streamEnvelopeProvider.getStreamAsList(streamId);
replayAllEventsOf(streamId);

range(0, envelopes.size())
.forEach(index -> dispatchEnvelope(
envelopes.get(index),
streamId,
index));

insertStreamStatus(streamStatusFactory.create(envelopes, streamId));
insertStreamStatus(streamStatusFactory.create(
jsonEnvelopeJdbcRepository.head(streamId),
streamId));

progressLogger.logCompletion(streamId);

return streamId;
}

private void replayAllEventsOf(final UUID streamId) {
final long lastPosition = jsonEnvelopeJdbcRepository.getLatestSequenceIdForStream(streamId);

for (long position = FIRST_POSITION; position <= lastPosition; position = position + PAGE_SIZE) {
replayBatchOfEvents(streamId, position);
}
}


@TransactionAttribute(REQUIRED)
private void dispatchEnvelope(final JsonEnvelope jsonEnvelope, final UUID streamId, final int index) {
private void replayBatchOfEvents(final UUID streamId, final long position) {
try (final Stream<JsonEnvelope> eventStream = jsonEnvelopeJdbcRepository.forward(streamId, position, PAGE_SIZE)) {
eventStream.forEach(jsonEnvelope -> dispatchEnvelope(jsonEnvelope, streamId));
}
}

private void dispatchEnvelope(final JsonEnvelope jsonEnvelope, final UUID streamId) {
try {
envelopeDispatcher.dispatch(jsonEnvelope);
progressLogger.logSuccess(streamId, index);
progressLogger.logSuccess(streamId, jsonEnvelope);
} catch (final MissingHandlerException ex) {
progressLogger.logFailure(streamId, jsonEnvelope);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package uk.gov.justice.framework.tools.replay;

import static java.lang.String.format;

import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventConverter;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.EventJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.UUID;
import java.util.stream.Stream;

import javax.inject.Inject;

public class JsonEnvelopeJdbcRepository {

private static final long PAGE_SIZE_OF_ONE = 1L;
@Inject
private EventJdbcRepository eventJdbcRepository;

@Inject
private EventConverter eventConverter;

public Stream<JsonEnvelope> forward(final UUID streamId, final long position, final long pageSize) {
return eventJdbcRepository
.forward(streamId, position, pageSize)
.map(eventConverter::envelopeOf);
}

public JsonEnvelope head(final UUID streamId) {
return eventJdbcRepository.head(streamId, PAGE_SIZE_OF_ONE)
.findFirst()
.map(eventConverter::envelopeOf)
.orElseThrow(() -> new RuntimeException(format("Failed to get head for stream id: %s", streamId)));
}

public long getLatestSequenceIdForStream(final UUID streamId) {
return eventJdbcRepository.getLatestSequenceIdForStream(streamId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.gov.justice.framework.tools.replay;

import uk.gov.justice.services.messaging.JsonEnvelope;

public class JsonEnvelopeUtil {

public Long versionOf(final JsonEnvelope jsonEnvelope) {
return jsonEnvelope
.metadata()
.version()
.orElse(-1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class ProgressChecker {

private static final int PROGRESS_INTERVAL = 100;

public boolean shouldLogProgress(final int index) {
public boolean shouldLogProgress(final long index) {
return index % PROGRESS_INTERVAL == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class ProgressLogger {
@Inject
private ProgressChecker progressChecker;

@Inject
private JsonEnvelopeUtil jsonEnvelopeUtil;

@Inject
private Logger logger;

Expand All @@ -23,11 +26,11 @@ public void logStart(final UUID streamId) {
logger.info("Starting processing of stream: {}", streamId);
}

public void logSuccess(final UUID streamId, final int index) {
public void logSuccess(final UUID streamId, final JsonEnvelope jsonEnvelope) {

sucessCount++;

if (progressChecker.shouldLogProgress(index)) {
if (progressChecker.shouldLogProgress(jsonEnvelopeUtil.versionOf(jsonEnvelope))) {
logger.info("Processed {} element(s) of stream: {}", sucessCount, streamId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatus;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
import java.util.UUID;

public class StreamStatusFactory {

public StreamStatus create(final List<JsonEnvelope> envelopes, final UUID streamId) {
final JsonEnvelope jsonEnvelope = envelopes.get(0);
public StreamStatus create(final JsonEnvelope jsonEnvelope, final UUID streamId) {
final Long version = getVersionFrom(jsonEnvelope);
final String source = getSourceFrom(jsonEnvelope);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package uk.gov.justice.framework.tools.replay;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -15,8 +13,8 @@
import uk.gov.justice.services.event.buffer.core.repository.streamstatus.StreamStatusJdbcRepository;
import uk.gov.justice.services.messaging.JsonEnvelope;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -28,17 +26,19 @@
@RunWith(MockitoJUnitRunner.class)
public class AsyncStreamDispatcherTest {

private static final int PAGE_SIZE = 1000;

@Mock
private TransactionalEnvelopeDispatcher envelopeDispatcher;

@Mock
private StreamStatusJdbcRepository streamStatusRepository;

@Mock
private StreamEnvelopeProvider streamEnvelopeProvider;
private StreamStatusFactory streamStatusFactory;

@Mock
private StreamStatusFactory streamStatusFactory;
private JsonEnvelopeJdbcRepository jsonEnvelopeJdbcRepository;

@Mock
private ProgressLogger progressLogger;
Expand All @@ -53,24 +53,26 @@ public void shouldGetTheEventsOfAStreamDispatchThemAllThenUpdateTheStreamStatus(
final JsonEnvelope jsonEnvelope_1 = mock(JsonEnvelope.class);
final JsonEnvelope jsonEnvelope_2 = mock(JsonEnvelope.class);
final JsonEnvelope jsonEnvelope_3 = mock(JsonEnvelope.class);
final List<JsonEnvelope> envelopes = asList(jsonEnvelope_1, jsonEnvelope_2, jsonEnvelope_3);

final Stream<JsonEnvelope> envelopeStream = Stream.of(jsonEnvelope_1, jsonEnvelope_2, jsonEnvelope_3);
final StreamStatus streamStatus = mock(StreamStatus.class);

when(streamEnvelopeProvider.getStreamAsList(streamId)).thenReturn(envelopes);
when(streamStatusFactory.create(envelopes, streamId)).thenReturn(streamStatus);
when(jsonEnvelopeJdbcRepository.getLatestSequenceIdForStream(streamId)).thenReturn(3L);
when(jsonEnvelopeJdbcRepository.forward(streamId, 1L, PAGE_SIZE)).thenReturn(envelopeStream);
when(jsonEnvelopeJdbcRepository.head(streamId)).thenReturn(jsonEnvelope_3);
when(streamStatusFactory.create(jsonEnvelope_3, streamId)).thenReturn(streamStatus);

assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId));

final InOrder inOrder = inOrder(progressLogger, envelopeDispatcher, streamStatusRepository, progressLogger);

inOrder.verify(progressLogger).logStart(streamId);
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_1);
inOrder.verify(progressLogger).logSuccess(streamId, 0);
inOrder.verify(progressLogger).logSuccess(streamId, jsonEnvelope_1);
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_2);
inOrder.verify(progressLogger).logSuccess(streamId, 1);
inOrder.verify(progressLogger).logSuccess(streamId, jsonEnvelope_2);
inOrder.verify(envelopeDispatcher).dispatch(jsonEnvelope_3);
inOrder.verify(progressLogger).logSuccess(streamId, 2);
inOrder.verify(progressLogger).logSuccess(streamId, jsonEnvelope_3);
inOrder.verify(streamStatusRepository).insert(streamStatus);
inOrder.verify(progressLogger).logCompletion(streamId);
}
Expand All @@ -83,13 +85,14 @@ public void shouldLogFailureIfNoHandlerFoundForDispatch() throws Exception {
final UUID streamId = randomUUID();

final JsonEnvelope jsonEnvelope = mock(JsonEnvelope.class);
final List<JsonEnvelope> envelopes = singletonList(jsonEnvelope);
final Stream<JsonEnvelope> envelopeStream = Stream.of(jsonEnvelope);

final StreamStatus streamStatus = mock(StreamStatus.class);

when(streamEnvelopeProvider.getStreamAsList(streamId)).thenReturn(envelopes);
when(jsonEnvelopeJdbcRepository.getLatestSequenceIdForStream(streamId)).thenReturn(1L);
when(jsonEnvelopeJdbcRepository.forward(streamId, 1L, PAGE_SIZE)).thenReturn(envelopeStream);
doThrow(missingHandlerException).when(envelopeDispatcher).dispatch(jsonEnvelope);
when(streamStatusFactory.create(envelopes, streamId)).thenReturn(streamStatus);
when(streamStatusFactory.create(jsonEnvelope, streamId)).thenReturn(streamStatus);

assertThat(asyncStreamDispatcher.dispatch(streamId), is(streamId));

Expand Down
Loading

0 comments on commit bf0eeec

Please sign in to comment.