diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java index 0e931fec73..9d8053ba48 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.connectivity.service.messaging; import static org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_NAME; +import static org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal.LIFECYCLE; import java.time.Duration; import java.util.LinkedHashSet; @@ -52,7 +53,6 @@ import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent; /** * Actor handling messages related to connections e.g. retrieving all connections ids. @@ -116,15 +116,15 @@ private static boolean isDeleted(final Document document) { .orElse(true); } - private static boolean isNotEmptyEvent(final Document document) { + private static boolean isNotDeleted(final Document document) { return Optional.ofNullable(document.getString(MongoReadJournal.J_EVENT_MANIFEST)) - .map(manifest -> !EmptyEvent.TYPE.equals(manifest)) + .map(manifest -> !ConnectionDeleted.TYPE.equals(manifest)) .orElse(false); } - private static boolean isNotDeleted(final Document document) { - return Optional.ofNullable(document.getString(MongoReadJournal.J_EVENT_MANIFEST)) - .map(manifest -> !ConnectionDeleted.TYPE.equals(manifest)) + private static boolean snapshotIsNotDeleted(final Document document) { + return Optional.ofNullable(document.getString(LIFECYCLE)) + .map(lifecycle -> !"DELETED".equals(lifecycle)) .orElse(false); } @@ -176,6 +176,7 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) { logger.debug("getIdsFromSnapshotsSource element: <{}>", result); return result; })) + .filter(ConnectionIdsRetrievalActor::snapshotIsNotDeleted) .map(this::extractPersistenceIdFromDocument) .filter(Optional::isPresent) .map(Optional::get)