Skip to content

Commit

Permalink
review: minor javadoc, logging fixes
Browse files Browse the repository at this point in the history
* added another testcase in MongoReadJournalIT testing for considerOnlyLatest=false as well

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Aug 22, 2022
1 parent 6e8561b commit b187d1b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class AcknowledgementAggregatorActorStarter {

private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(AcknowledgementAggregatorActorStarter.class);

private final ActorRefFactory actorRefFactory;
private final Duration maxTimeout;
private final HeaderTranslator headerTranslator;
Expand Down Expand Up @@ -267,8 +268,9 @@ private <C extends Command<?>> ActorRef startAckAggregatorActor(final EntityId e
final CommandResponseAcknowledgementProvider<C> acknowledgementProvider =
findRelevantAcknowledgementProvider(command)
.orElseThrow(() -> {
LOGGER.error("Tried to start acknowledgement aggregator for command <{}> " +
"but don't know any applicable acknowledgement providers for this.", command);
LOGGER.withCorrelationId(command)
.error("Tried to start acknowledgement aggregator for command <{}> but don't " +
"know any applicable acknowledgement providers for this.", command);
return DittoInternalErrorException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.build();
Expand All @@ -285,8 +287,9 @@ private <C extends Command<?>> ActorRef startAckAggregatorActor(final EntityId e
return actorRefFactory.actorOf(props, getNextActorName(command.getDittoHeaders()));
}

@SuppressWarnings("unchecked")
private <C extends Command<?>> Optional<CommandResponseAcknowledgementProvider<C>> findRelevantAcknowledgementProvider(
C signal) {
final C signal) {
return responseAcknowledgementProviders.stream()
.filter(provider -> provider.getCommandClass().isAssignableFrom(signal.getClass()))
.map(provider -> (CommandResponseAcknowledgementProvider<C>) provider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ private Source<List<Document>, NotUsed> listLatestJournalEntries(final MongoColl
* @param maxIdleTime how long the stream is allowed to idle without sending any element. Bounds the number of
* retries with exponential back-off.
* @param mat the actor materializer to run the query streams.
* @param considerOnlyLatest whether only the latest available journal entry should have the provided {@code tag},
* or if any still available journal entry should be considered. If set to {@code true} (only the latest available
* journal entry must have the tag), this Source needs an additional DB query per found {@code batchSize} PIDs. So
* e.g. one additional DB query for each 500 (if that is the {@code batchSize}) PIDs containing the {@code tag} in
* any journal entry.
* @return Source of all persistence IDs such that each element contains the persistence IDs in {@code batchSize}
* events that do not occur in prior buckets.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,19 @@ public void extractJournalPidsInOrderOfTagsOfNewestEvent() {
assertThat(pids).containsExactly("pid4", "pid3", "pid2", "pid1");
}

@Test
public void extractJournalPidsInOrderOfTagsOfNewestEventWhenNewestDoesNotContainTagAnymoreConsideringOnlyLatest() {
final List<String> pids = findPidsWithTestTagWithDifferentConsiderOnlyLatest(true);
assertThat(pids).containsExactly("pid2", "pid3");
}

@Test
public void extractJournalPidsInOrderOfTagsOfNewestEventWhenNewestDoesNotContainTagAnymore() {
final List<String> pids = findPidsWithTestTagWithDifferentConsiderOnlyLatest(false);
assertThat(pids).containsExactly("pid1", "pid2", "pid3");
}

private List<String> findPidsWithTestTagWithDifferentConsiderOnlyLatest(final boolean considerOnlyLatest) {
insert("test_journal",
new JournalEntry("pid1").withSn(1L).withTags(Set.of("test")).getDocument());
insert("test_journal",
Expand All @@ -346,12 +357,9 @@ public void extractJournalPidsInOrderOfTagsOfNewestEventWhenNewestDoesNotContain
insert("test_journal",
new JournalEntry("pid4").withSn(2L).withTags(Set.of()).getDocument());

final List<String> pids =
readJournal.getJournalPidsWithTag("test", 5, Duration.ZERO, materializer, true)
return readJournal.getJournalPidsWithTag("test", 5, Duration.ZERO, materializer, considerOnlyLatest)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().join();

assertThat(pids).containsExactly("pid2", "pid3");
}

@Test
Expand Down

0 comments on commit b187d1b

Please sign in to comment.