Skip to content

Commit

Permalink
further stabilize connectionIds retrieval
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Oct 9, 2023
1 parent 7f3bbac commit 97a42da
Showing 1 changed file with 7 additions and 6 deletions.
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_NAME;
import static org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal.LIFECYCLE;

import java.time.Duration;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -52,7 +53,6 @@
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent;

/**
* Actor handling messages related to connections e.g. retrieving all connections ids.
Expand Down Expand Up @@ -116,15 +116,15 @@ private static boolean isDeleted(final Document document) {
.orElse(true);
}

private static boolean isNotEmptyEvent(final Document document) {
private static boolean isNotDeleted(final Document document) {
return Optional.ofNullable(document.getString(MongoReadJournal.J_EVENT_MANIFEST))
.map(manifest -> !EmptyEvent.TYPE.equals(manifest))
.map(manifest -> !ConnectionDeleted.TYPE.equals(manifest))
.orElse(false);
}

private static boolean isNotDeleted(final Document document) {
return Optional.ofNullable(document.getString(MongoReadJournal.J_EVENT_MANIFEST))
.map(manifest -> !ConnectionDeleted.TYPE.equals(manifest))
private static boolean snapshotIsNotDeleted(final Document document) {
return Optional.ofNullable(document.getString(LIFECYCLE))
.map(lifecycle -> !"DELETED".equals(lifecycle))
.orElse(false);
}

Expand Down Expand Up @@ -176,6 +176,7 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) {
logger.debug("getIdsFromSnapshotsSource element: <{}>", result);
return result;
}))
.filter(ConnectionIdsRetrievalActor::snapshotIsNotDeleted)
.map(this::extractPersistenceIdFromDocument)
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down

0 comments on commit 97a42da

Please sign in to comment.