Skip to content

Commit

Permalink
Revert "improved logging in ConnectionIdsRetrievalActor"
Browse files Browse the repository at this point in the history
This reverts commit 120a860.
  • Loading branch information
thjaeckle committed Oct 9, 2023
1 parent 120a860 commit b9f0ca4
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 62 deletions.
Expand Up @@ -15,24 +15,13 @@
import static org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_NAME;

import java.time.Duration;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.Document;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand All @@ -47,12 +36,23 @@
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionDeleted;
import org.eclipse.ditto.connectivity.service.config.ConnectionIdsRetrievalConfig;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

/**
* Actor handling messages related to connections e.g. retrieving all connections ids.
*/
Expand Down Expand Up @@ -167,22 +167,14 @@ private void getConnectionIDsByTag(final SudoRetrieveConnectionIdsByTag sudoRetr
}

private void getAllConnectionIDs(final WithDittoHeaders cmd) {
final DittoDiagnosticLoggingAdapter logger = log.withCorrelationId(cmd);
logger.info("Retrieving all connection IDs ...");
log.withCorrelationId(cmd)
.info("Retrieving all connection IDs ...");
try {
final Source<String, NotUsed> idsFromSnapshots = getIdsFromSnapshotsSource()
.via(Flow.fromFunction(result -> {
logger.debug("idsFromSnapshots element: <{}>", result);
return result;
}));
final Source<String, NotUsed> idsFromSnapshots = getIdsFromSnapshotsSource();
final Source<String, NotUsed> idsFromJournal = persistenceIdsFromJournalSourceSupplier.get()
.filter(ConnectionIdsRetrievalActor::isNotDeleted)
.filter(ConnectionIdsRetrievalActor::isNotEmptyEvent)
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID))
.via(Flow.fromFunction(result -> {
logger.debug("idsFromJournal element: <{}>", result);
return result;
}));
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID));

final CompletionStage<CommandResponse> retrieveAllConnectionIdsResponse =
persistenceIdsFromJournalSourceSupplier.get()
Expand All @@ -194,10 +186,8 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) {
.filter(pid -> pid.startsWith(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX))
.map(pid -> pid.substring(
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length()))
.runWith(Sink.seq(), materializer)
)
.thenApply(idList -> idList.stream().sorted().toList())
.thenApply(LinkedHashSet::new)
.runWith(Sink.seq(), materializer))
.thenApply(HashSet::new)
.thenApply(ids -> buildResponse(cmd, ids))
.exceptionally(throwable -> buildErrorResponse(throwable, cmd.getDittoHeaders()));
Patterns.pipe(retrieveAllConnectionIdsResponse, getContext().dispatcher()).to(getSender());
Expand Down
@@ -0,0 +1,153 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pekko.logging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Map;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
* This implementation of {@link AbstractDiagnosticLoggingAdapter} discards the correlation ID automatically after
* each log operation.
*/
@NotThreadSafe
final class AutoDiscardingDiagnosticLoggingAdapter extends AbstractDiagnosticLoggingAdapter {

private final AbstractDiagnosticLoggingAdapter loggingAdapter;

private AutoDiscardingDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter) {
this.loggingAdapter = checkNotNull(loggingAdapter, "loggingAdapter");
}

public static AutoDiscardingDiagnosticLoggingAdapter of(final AbstractDiagnosticLoggingAdapter loggingAdapter) {
return new AutoDiscardingDiagnosticLoggingAdapter(loggingAdapter);
}

@Override
public boolean isErrorEnabled() {
return loggingAdapter.isErrorEnabled();
}

@Override
public boolean isWarningEnabled() {
return loggingAdapter.isWarningEnabled();
}

@Override
public boolean isInfoEnabled() {
return loggingAdapter.isInfoEnabled();
}

@Override
public boolean isDebugEnabled() {
return loggingAdapter.isDebugEnabled();
}

@Override
public void notifyError(final String message) {
try {
loggingAdapter.notifyError(message);
} finally {
discardMdcEntries();
}
}

@Override
public void notifyError(final Throwable cause, final String message) {
try {
loggingAdapter.notifyError(cause, message);
} finally {
discardMdcEntries();
}
}

@Override
public void notifyWarning(final String message) {
try {
loggingAdapter.notifyWarning(message);
} finally {
discardMdcEntries();
}
}

@Override
public void notifyInfo(final String message) {
try {
loggingAdapter.notifyInfo(message);
} finally {
discardMdcEntries();
}
}

@Override
public void notifyDebug(final String message) {
try {
loggingAdapter.notifyDebug(message);
} finally {
discardMdcEntries();
}
}

@Override
public scala.collection.immutable.Map<String, Object> mdc() {
return loggingAdapter.mdc();
}

@Override
public void mdc(final scala.collection.immutable.Map<String, Object> mdc) {
loggingAdapter.mdc(mdc);
}

@Override
public Map<String, Object> getMDC() {
return loggingAdapter.getMDC();
}

@Override
public void setMDC(final Map<String, Object> jMdc) {
loggingAdapter.setMDC(jMdc);
}

@Override
public void clearMDC() {
loggingAdapter.clearMDC();
}

@Override
public AutoDiscardingDiagnosticLoggingAdapter putMdcEntry(final CharSequence key,
@Nullable final CharSequence value) {
loggingAdapter.putMdcEntry(key, value);
return this;
}

@Override
public AutoDiscardingDiagnosticLoggingAdapter removeMdcEntry(final CharSequence key) {
loggingAdapter.removeMdcEntry(key);
return this;
}

@Override
public AutoDiscardingDiagnosticLoggingAdapter discardMdcEntries() {
loggingAdapter.discardMdcEntries();
return this;
}

@Override
public String getName() {
return loggingAdapter.getName();
}

}

0 comments on commit b9f0ca4

Please sign in to comment.