From b9f0ca43d208aaa7984617f6617ff42a282af49f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Mon, 9 Oct 2023 11:58:04 +0200 Subject: [PATCH] Revert "improved logging in ConnectionIdsRetrievalActor" This reverts commit 120a860a770280c1a30ca52ec0f503afa2594432. --- .../ConnectionIdsRetrievalActor.java | 48 +++--- ...utoDiscardingDiagnosticLoggingAdapter.java | 153 ++++++++++++++++++ .../DefaultDittoDiagnosticLoggingAdapter.java | 87 ++++++---- 3 files changed, 226 insertions(+), 62 deletions(-) create mode 100644 internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/AutoDiscardingDiagnosticLoggingAdapter.java diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java index b9c6cfb8d4..58ef39d359 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ConnectionIdsRetrievalActor.java @@ -15,24 +15,13 @@ import static org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_NAME; import java.time.Duration; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.AbstractActor; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; -import org.apache.pekko.cluster.pubsub.DistributedPubSub; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.pattern.Patterns; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.javadsl.Flow; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; import org.bson.Document; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -47,12 +36,23 @@ import org.eclipse.ditto.connectivity.model.signals.events.ConnectionDeleted; import org.eclipse.ditto.connectivity.service.config.ConnectionIdsRetrievalConfig; import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor; -import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.AbstractActor; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; +import org.apache.pekko.cluster.pubsub.DistributedPubSub; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.pattern.Patterns; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + /** * Actor handling messages related to connections e.g. retrieving all connections ids. */ @@ -167,22 +167,14 @@ private void getConnectionIDsByTag(final SudoRetrieveConnectionIdsByTag sudoRetr } private void getAllConnectionIDs(final WithDittoHeaders cmd) { - final DittoDiagnosticLoggingAdapter logger = log.withCorrelationId(cmd); - logger.info("Retrieving all connection IDs ..."); + log.withCorrelationId(cmd) + .info("Retrieving all connection IDs ..."); try { - final Source idsFromSnapshots = getIdsFromSnapshotsSource() - .via(Flow.fromFunction(result -> { - logger.debug("idsFromSnapshots element: <{}>", result); - return result; - })); + final Source idsFromSnapshots = getIdsFromSnapshotsSource(); final Source idsFromJournal = persistenceIdsFromJournalSourceSupplier.get() .filter(ConnectionIdsRetrievalActor::isNotDeleted) .filter(ConnectionIdsRetrievalActor::isNotEmptyEvent) - .map(document -> document.getString(MongoReadJournal.J_EVENT_PID)) - .via(Flow.fromFunction(result -> { - logger.debug("idsFromJournal element: <{}>", result); - return result; - })); + .map(document -> document.getString(MongoReadJournal.J_EVENT_PID)); final CompletionStage retrieveAllConnectionIdsResponse = persistenceIdsFromJournalSourceSupplier.get() @@ -194,10 +186,8 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) { .filter(pid -> pid.startsWith(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX)) .map(pid -> pid.substring( ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length())) - .runWith(Sink.seq(), materializer) - ) - .thenApply(idList -> idList.stream().sorted().toList()) - .thenApply(LinkedHashSet::new) + .runWith(Sink.seq(), materializer)) + .thenApply(HashSet::new) .thenApply(ids -> buildResponse(cmd, ids)) .exceptionally(throwable -> buildErrorResponse(throwable, cmd.getDittoHeaders())); Patterns.pipe(retrieveAllConnectionIdsResponse, getContext().dispatcher()).to(getSender()); diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/AutoDiscardingDiagnosticLoggingAdapter.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/AutoDiscardingDiagnosticLoggingAdapter.java new file mode 100644 index 0000000000..02ed959c4e --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/AutoDiscardingDiagnosticLoggingAdapter.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.logging; + +import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; + +import java.util.Map; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * This implementation of {@link AbstractDiagnosticLoggingAdapter} discards the correlation ID automatically after + * each log operation. + */ +@NotThreadSafe +final class AutoDiscardingDiagnosticLoggingAdapter extends AbstractDiagnosticLoggingAdapter { + + private final AbstractDiagnosticLoggingAdapter loggingAdapter; + + private AutoDiscardingDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter) { + this.loggingAdapter = checkNotNull(loggingAdapter, "loggingAdapter"); + } + + public static AutoDiscardingDiagnosticLoggingAdapter of(final AbstractDiagnosticLoggingAdapter loggingAdapter) { + return new AutoDiscardingDiagnosticLoggingAdapter(loggingAdapter); + } + + @Override + public boolean isErrorEnabled() { + return loggingAdapter.isErrorEnabled(); + } + + @Override + public boolean isWarningEnabled() { + return loggingAdapter.isWarningEnabled(); + } + + @Override + public boolean isInfoEnabled() { + return loggingAdapter.isInfoEnabled(); + } + + @Override + public boolean isDebugEnabled() { + return loggingAdapter.isDebugEnabled(); + } + + @Override + public void notifyError(final String message) { + try { + loggingAdapter.notifyError(message); + } finally { + discardMdcEntries(); + } + } + + @Override + public void notifyError(final Throwable cause, final String message) { + try { + loggingAdapter.notifyError(cause, message); + } finally { + discardMdcEntries(); + } + } + + @Override + public void notifyWarning(final String message) { + try { + loggingAdapter.notifyWarning(message); + } finally { + discardMdcEntries(); + } + } + + @Override + public void notifyInfo(final String message) { + try { + loggingAdapter.notifyInfo(message); + } finally { + discardMdcEntries(); + } + } + + @Override + public void notifyDebug(final String message) { + try { + loggingAdapter.notifyDebug(message); + } finally { + discardMdcEntries(); + } + } + + @Override + public scala.collection.immutable.Map mdc() { + return loggingAdapter.mdc(); + } + + @Override + public void mdc(final scala.collection.immutable.Map mdc) { + loggingAdapter.mdc(mdc); + } + + @Override + public Map getMDC() { + return loggingAdapter.getMDC(); + } + + @Override + public void setMDC(final Map jMdc) { + loggingAdapter.setMDC(jMdc); + } + + @Override + public void clearMDC() { + loggingAdapter.clearMDC(); + } + + @Override + public AutoDiscardingDiagnosticLoggingAdapter putMdcEntry(final CharSequence key, + @Nullable final CharSequence value) { + loggingAdapter.putMdcEntry(key, value); + return this; + } + + @Override + public AutoDiscardingDiagnosticLoggingAdapter removeMdcEntry(final CharSequence key) { + loggingAdapter.removeMdcEntry(key); + return this; + } + + @Override + public AutoDiscardingDiagnosticLoggingAdapter discardMdcEntries() { + loggingAdapter.discardMdcEntries(); + return this; + } + + @Override + public String getName() { + return loggingAdapter.getName(); + } + +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/DefaultDittoDiagnosticLoggingAdapter.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/DefaultDittoDiagnosticLoggingAdapter.java index 80bf12b005..a142456169 100644 --- a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/DefaultDittoDiagnosticLoggingAdapter.java +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/logging/DefaultDittoDiagnosticLoggingAdapter.java @@ -20,10 +20,10 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import org.apache.pekko.event.DiagnosticLoggingAdapter; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.apache.pekko.event.DiagnosticLoggingAdapter; import scala.collection.immutable.Seq; /** @@ -34,10 +34,15 @@ final class DefaultDittoDiagnosticLoggingAdapter extends DittoDiagnosticLoggingAdapter { private final AbstractDiagnosticLoggingAdapter loggingAdapter; + private final AbstractDiagnosticLoggingAdapter autoDiscardingLoggingAdapter; + private AbstractDiagnosticLoggingAdapter currentLogger; - private DefaultDittoDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter) { + private DefaultDittoDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter, + final AbstractDiagnosticLoggingAdapter autoDiscardingLoggingAdapter) { this.loggingAdapter = loggingAdapter; + this.autoDiscardingLoggingAdapter = autoDiscardingLoggingAdapter; + currentLogger = autoDiscardingLoggingAdapter; } /** @@ -55,12 +60,14 @@ public static DefaultDittoDiagnosticLoggingAdapter of(final DiagnosticLoggingAda final DefaultDiagnosticLoggingAdapter loggingAdapter = DefaultDiagnosticLoggingAdapter.of(diagnosticLoggingAdapter, loggerName); - return new DefaultDittoDiagnosticLoggingAdapter(loggingAdapter); + return new DefaultDittoDiagnosticLoggingAdapter(loggingAdapter, + AutoDiscardingDiagnosticLoggingAdapter.of(loggingAdapter)); } @Override public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final CharSequence correlationId) { - loggingAdapter.putMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId); + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId); return this; } @@ -89,11 +96,13 @@ public DefaultDittoDiagnosticLoggingAdapter setMdcEntry(final CharSequence key, @Nullable final CharSequence value) { putToMdcOfAllLoggerStates(key, value); + currentLogger = loggingAdapter; return this; } private void putToMdcOfAllLoggerStates(final CharSequence key, @Nullable final CharSequence value) { loggingAdapter.putMdcEntry(key, value); + autoDiscardingLoggingAdapter.putMdcEntry(key, value); } @Override @@ -102,6 +111,7 @@ public DefaultDittoDiagnosticLoggingAdapter setMdcEntries(final CharSequence k1, putToMdcOfAllLoggerStates(k1, v1); putToMdcOfAllLoggerStates(k2, v2); + currentLogger = loggingAdapter; return this; } @@ -113,23 +123,27 @@ public DefaultDittoDiagnosticLoggingAdapter setMdcEntries(final CharSequence k1, putToMdcOfAllLoggerStates(k1, v1); putToMdcOfAllLoggerStates(k2, v2); putToMdcOfAllLoggerStates(k3, v3); + currentLogger = loggingAdapter; return this; } @Override public void discardMdcEntry(final CharSequence key) { removeFromMdcOfAllLoggerStates(key); + currentLogger = loggingAdapter; } private void removeFromMdcOfAllLoggerStates(final CharSequence key) { loggingAdapter.removeMdcEntry(key); + autoDiscardingLoggingAdapter.removeMdcEntry(key); } @Override public DefaultDittoDiagnosticLoggingAdapter putMdcEntry(final CharSequence key, @Nullable final CharSequence value) { - loggingAdapter.putMdcEntry(key, value); + currentLogger = loggingAdapter; + currentLogger.putMdcEntry(key, value); return this; } @@ -137,7 +151,8 @@ public DefaultDittoDiagnosticLoggingAdapter putMdcEntry(final CharSequence key, public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final CharSequence key, @Nullable final CharSequence value) { - loggingAdapter.putMdcEntry(key, value); + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(key, value); return this; } @@ -145,8 +160,9 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final CharSequence key, public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final CharSequence k1, @Nullable final CharSequence v1, final CharSequence k2, @Nullable final CharSequence v2) { - loggingAdapter.putMdcEntry(k1, v1); - loggingAdapter.putMdcEntry(k2, v2); + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(k1, v1); + currentLogger.putMdcEntry(k2, v2); return this; } @@ -155,9 +171,10 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final CharSequence k1 final CharSequence k2, @Nullable final CharSequence v2, final CharSequence k3, @Nullable final CharSequence v3) { - loggingAdapter.putMdcEntry(k1, v1); - loggingAdapter.putMdcEntry(k2, v2); - loggingAdapter.putMdcEntry(k3, v3); + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(k1, v1); + currentLogger.putMdcEntry(k2, v2); + currentLogger.putMdcEntry(k3, v3); return this; } @@ -165,9 +182,10 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final CharSequence k1 public DittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final MdcEntry... furtherMdcEntries) { checkNotNull(furtherMdcEntries, "furtherMdcEntries"); - loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()); + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()); for (final MdcEntry furtherMdcEntry : furtherMdcEntries) { - loggingAdapter.putMdcEntry(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull()); + currentLogger.putMdcEntry(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull()); } return this; } @@ -176,7 +194,8 @@ public DittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final Collection mdcEntries) { checkNotNull(mdcEntries, "mdcEntries"); - mdcEntries.forEach(mdcEntry -> loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull())); + currentLogger = autoDiscardingLoggingAdapter; + mdcEntries.forEach(mdcEntry -> currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull())); return this; } @@ -186,97 +205,99 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry checkNotNull(furtherMdcEntries, "furtherMdcEntries"); - loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()); - furtherMdcEntries.foreach(furtherMdcEntry -> loggingAdapter.putMdcEntry(furtherMdcEntry.getKey(), + currentLogger = autoDiscardingLoggingAdapter; + currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()); + furtherMdcEntries.foreach(furtherMdcEntry -> currentLogger.putMdcEntry(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull())); return this; } @Override public DefaultDittoDiagnosticLoggingAdapter removeMdcEntry(final CharSequence key) { - loggingAdapter.removeMdcEntry(key); + currentLogger.removeMdcEntry(key); return this; } @Override public DefaultDittoDiagnosticLoggingAdapter discardMdcEntries() { - loggingAdapter.discardMdcEntries(); + currentLogger.discardMdcEntries(); + currentLogger = autoDiscardingLoggingAdapter; return this; } @Override public String getName() { - return loggingAdapter.getName(); + return currentLogger.getName(); } @Override public boolean isErrorEnabled() { - return loggingAdapter.isErrorEnabled(); + return currentLogger.isErrorEnabled(); } @Override public boolean isWarningEnabled() { - return loggingAdapter.isWarningEnabled(); + return currentLogger.isWarningEnabled(); } @Override public boolean isInfoEnabled() { - return loggingAdapter.isInfoEnabled(); + return currentLogger.isInfoEnabled(); } @Override public boolean isDebugEnabled() { - return loggingAdapter.isDebugEnabled(); + return currentLogger.isDebugEnabled(); } @Override public void notifyError(final String message) { - loggingAdapter.notifyError(message); + currentLogger.notifyError(message); } @Override public void notifyError(final Throwable cause, final String message) { - loggingAdapter.notifyError(cause, message); + currentLogger.notifyError(cause, message); } @Override public void notifyWarning(final String message) { - loggingAdapter.notifyWarning(message); + currentLogger.notifyWarning(message); } @Override public void notifyInfo(final String message) { - loggingAdapter.notifyInfo(message); + currentLogger.notifyInfo(message); } @Override public void notifyDebug(final String message) { - loggingAdapter.notifyDebug(message); + currentLogger.notifyDebug(message); } @Override public scala.collection.immutable.Map mdc() { - return loggingAdapter.mdc(); + return currentLogger.mdc(); } @Override public void mdc(final scala.collection.immutable.Map mdc) { - loggingAdapter.mdc(mdc); + currentLogger.mdc(mdc); } @Override public Map getMDC() { - return loggingAdapter.getMDC(); + return currentLogger.getMDC(); } @Override public void setMDC(final Map jMdc) { - loggingAdapter.setMDC(jMdc); + currentLogger.setMDC(jMdc); } @Override public void clearMDC() { - loggingAdapter.clearMDC(); + currentLogger.clearMDC(); } }