Skip to content

Commit

Permalink
improved logging in ConnectionIdsRetrievalActor, pt.2
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Oct 9, 2023
1 parent 95e6d16 commit 7f3bbac
Showing 1 changed file with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.time.Duration;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -170,50 +171,58 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) {
final DittoDiagnosticLoggingAdapter logger = log.withCorrelationId(cmd);
logger.info("Retrieving all connection IDs ...");
try {
final Source<String, NotUsed> idsFromSnapshots = getIdsFromSnapshotsSource()
final Source<String, NotUsed> idsFromSnapshots = persistenceIdsFromSnapshotSourceSupplier.get()
.via(Flow.fromFunction(result -> {
logger.debug("getIdsFromSnapshotsSource element: <{}>", result);
return result;
}))
.map(this::extractPersistenceIdFromDocument)
.filter(Optional::isPresent)
.map(Optional::get)
.via(Flow.fromFunction(result -> {
logger.debug("idsFromSnapshots element: <{}>", result);
return result;
}));
final Source<String, NotUsed> idsFromJournal = persistenceIdsFromJournalSourceSupplier.get()
.via(Flow.fromFunction(result -> {
logger.debug("idsFromJournalSource element: <{}>", result);
return result;
}))
.filter(ConnectionIdsRetrievalActor::isNotDeleted)
.filter(ConnectionIdsRetrievalActor::isNotEmptyEvent)
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID))
.via(Flow.fromFunction(result -> {
logger.debug("idsFromJournal element: <{}>", result);
return result;
}));

final CompletionStage<CommandResponse> retrieveAllConnectionIdsResponse =
persistenceIdsFromJournalSourceSupplier.get()
.filter(ConnectionIdsRetrievalActor::isDeleted)
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID))
final CompletionStage<List<String>> deletedPidsStage = persistenceIdsFromJournalSourceSupplier.get()
.filter(ConnectionIdsRetrievalActor::isDeleted)
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID))
.runWith(Sink.seq(), materializer);

final CompletionStage<CommandResponse> retrieveAllConnectionIdsResponse = deletedPidsStage
.thenApply(deletedIdsFromJournal -> {
logger.debug("deletedIdsFromJournal element: <{}>", deletedIdsFromJournal);
return deletedIdsFromJournal;
})
.thenCompose(deletedIdsFromJournal -> idsFromSnapshots.concat(idsFromJournal)
.filter(pid -> !deletedIdsFromJournal.contains(pid))
.filter(pid -> pid.startsWith(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX))
.map(pid -> pid.substring(
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length()))
.runWith(Sink.seq(), materializer)
.thenCompose(deletedIdsFromJournal -> idsFromSnapshots.concat(idsFromJournal)
.filter(pid -> !deletedIdsFromJournal.contains(pid))
.filter(pid -> pid.startsWith(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX))
.map(pid -> pid.substring(
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length()))
.runWith(Sink.seq(), materializer)
)
.thenApply(idList -> idList.stream().sorted().toList())
.thenApply(LinkedHashSet::new)
.thenApply(ids -> buildResponse(cmd, ids))
.exceptionally(throwable -> buildErrorResponse(throwable, cmd.getDittoHeaders()));
)
.thenApply(idList -> idList.stream().sorted().toList())
.thenApply(LinkedHashSet::new)
.thenApply(ids -> buildResponse(cmd, ids))
.exceptionally(throwable -> buildErrorResponse(throwable, cmd.getDittoHeaders()));
Patterns.pipe(retrieveAllConnectionIdsResponse, getContext().dispatcher()).to(getSender());
} catch (final Exception e) {
log.info("Failed to load persistence ids from journal/snapshots.", e);
getSender().tell(buildErrorResponse(e, cmd.getDittoHeaders()), getSelf());
}
}

private Source<String, NotUsed> getIdsFromSnapshotsSource() {
return persistenceIdsFromSnapshotSourceSupplier.get()
.map(this::extractPersistenceIdFromDocument)
.filter(Optional::isPresent)
.map(Optional::get);
}

@SuppressWarnings({"rawtypes", "java:S3740"})
private CommandResponse buildResponse(final WithDittoHeaders cmd, final Set<String> ids) {
return RetrieveAllConnectionIdsResponse.of(ids, cmd.getDittoHeaders());
Expand Down

0 comments on commit 7f3bbac

Please sign in to comment.