Skip to content

Commit

Permalink
improved logging in ConnectionIdsRetrievalActor
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Oct 9, 2023
1 parent b9f0ca4 commit 95e6d16
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 167 deletions.
Expand Up @@ -15,13 +15,24 @@
import static org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_NAME;

import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.Document;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand All @@ -36,23 +47,12 @@
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionDeleted;
import org.eclipse.ditto.connectivity.service.config.ConnectionIdsRetrievalConfig;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

/**
* Actor handling messages related to connections e.g. retrieving all connections ids.
*/
Expand Down Expand Up @@ -167,14 +167,22 @@ private void getConnectionIDsByTag(final SudoRetrieveConnectionIdsByTag sudoRetr
}

private void getAllConnectionIDs(final WithDittoHeaders cmd) {
log.withCorrelationId(cmd)
.info("Retrieving all connection IDs ...");
final DittoDiagnosticLoggingAdapter logger = log.withCorrelationId(cmd);
logger.info("Retrieving all connection IDs ...");
try {
final Source<String, NotUsed> idsFromSnapshots = getIdsFromSnapshotsSource();
final Source<String, NotUsed> idsFromSnapshots = getIdsFromSnapshotsSource()
.via(Flow.fromFunction(result -> {
logger.debug("idsFromSnapshots element: <{}>", result);
return result;
}));
final Source<String, NotUsed> idsFromJournal = persistenceIdsFromJournalSourceSupplier.get()
.filter(ConnectionIdsRetrievalActor::isNotDeleted)
.filter(ConnectionIdsRetrievalActor::isNotEmptyEvent)
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID));
.map(document -> document.getString(MongoReadJournal.J_EVENT_PID))
.via(Flow.fromFunction(result -> {
logger.debug("idsFromJournal element: <{}>", result);
return result;
}));

final CompletionStage<CommandResponse> retrieveAllConnectionIdsResponse =
persistenceIdsFromJournalSourceSupplier.get()
Expand All @@ -186,8 +194,10 @@ private void getAllConnectionIDs(final WithDittoHeaders cmd) {
.filter(pid -> pid.startsWith(ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX))
.map(pid -> pid.substring(
ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX.length()))
.runWith(Sink.seq(), materializer))
.thenApply(HashSet::new)
.runWith(Sink.seq(), materializer)
)
.thenApply(idList -> idList.stream().sorted().toList())
.thenApply(LinkedHashSet::new)
.thenApply(ids -> buildResponse(cmd, ids))
.exceptionally(throwable -> buildErrorResponse(throwable, cmd.getDittoHeaders()));
Patterns.pipe(retrieveAllConnectionIdsResponse, getContext().dispatcher()).to(getSender());
Expand Down
Expand Up @@ -20,10 +20,10 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;

import org.apache.pekko.event.DiagnosticLoggingAdapter;
import scala.collection.immutable.Seq;

/**
Expand All @@ -34,15 +34,10 @@
final class DefaultDittoDiagnosticLoggingAdapter extends DittoDiagnosticLoggingAdapter {

private final AbstractDiagnosticLoggingAdapter loggingAdapter;
private final AbstractDiagnosticLoggingAdapter autoDiscardingLoggingAdapter;
private AbstractDiagnosticLoggingAdapter currentLogger;

private DefaultDittoDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter,
final AbstractDiagnosticLoggingAdapter autoDiscardingLoggingAdapter) {
private DefaultDittoDiagnosticLoggingAdapter(final AbstractDiagnosticLoggingAdapter loggingAdapter) {

this.loggingAdapter = loggingAdapter;
this.autoDiscardingLoggingAdapter = autoDiscardingLoggingAdapter;
currentLogger = autoDiscardingLoggingAdapter;
}

/**
Expand All @@ -60,14 +55,12 @@ public static DefaultDittoDiagnosticLoggingAdapter of(final DiagnosticLoggingAda
final DefaultDiagnosticLoggingAdapter loggingAdapter =
DefaultDiagnosticLoggingAdapter.of(diagnosticLoggingAdapter, loggerName);

return new DefaultDittoDiagnosticLoggingAdapter(loggingAdapter,
AutoDiscardingDiagnosticLoggingAdapter.of(loggingAdapter));
return new DefaultDittoDiagnosticLoggingAdapter(loggingAdapter);
}

@Override
public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final CharSequence correlationId) {
currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId);
loggingAdapter.putMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId);
return this;
}

Expand All @@ -91,78 +84,37 @@ public void discardCorrelationId() {
discardMdcEntry(CommonMdcEntryKey.CORRELATION_ID);
}

@Override
public DefaultDittoDiagnosticLoggingAdapter setMdcEntry(final CharSequence key,
@Nullable final CharSequence value) {

putToMdcOfAllLoggerStates(key, value);
currentLogger = loggingAdapter;
return this;
}

private void putToMdcOfAllLoggerStates(final CharSequence key, @Nullable final CharSequence value) {
loggingAdapter.putMdcEntry(key, value);
autoDiscardingLoggingAdapter.putMdcEntry(key, value);
}

@Override
public DefaultDittoDiagnosticLoggingAdapter setMdcEntries(final CharSequence k1, @Nullable final CharSequence v1,
final CharSequence k2, @Nullable final CharSequence v2) {

putToMdcOfAllLoggerStates(k1, v1);
putToMdcOfAllLoggerStates(k2, v2);
currentLogger = loggingAdapter;
return this;
}

@Override
public DefaultDittoDiagnosticLoggingAdapter setMdcEntries(final CharSequence k1, @Nullable final CharSequence v1,
final CharSequence k2, @Nullable final CharSequence v2,
final CharSequence k3, @Nullable final CharSequence v3) {

putToMdcOfAllLoggerStates(k1, v1);
putToMdcOfAllLoggerStates(k2, v2);
putToMdcOfAllLoggerStates(k3, v3);
currentLogger = loggingAdapter;
return this;
}

@Override
public void discardMdcEntry(final CharSequence key) {
removeFromMdcOfAllLoggerStates(key);
currentLogger = loggingAdapter;
}

private void removeFromMdcOfAllLoggerStates(final CharSequence key) {
loggingAdapter.removeMdcEntry(key);
autoDiscardingLoggingAdapter.removeMdcEntry(key);
}

@Override
public DefaultDittoDiagnosticLoggingAdapter putMdcEntry(final CharSequence key,
@Nullable final CharSequence value) {

currentLogger = loggingAdapter;
currentLogger.putMdcEntry(key, value);
loggingAdapter.putMdcEntry(key, value);
return this;
}

@Override
public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final CharSequence key,
@Nullable final CharSequence value) {

currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(key, value);
loggingAdapter.putMdcEntry(key, value);
return this;
}

@Override
public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final CharSequence k1, @Nullable final CharSequence v1,
final CharSequence k2, @Nullable final CharSequence v2) {

currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(k1, v1);
currentLogger.putMdcEntry(k2, v2);
loggingAdapter.putMdcEntry(k1, v1);
loggingAdapter.putMdcEntry(k2, v2);
return this;
}

Expand All @@ -171,21 +123,19 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final CharSequence k1
final CharSequence k2, @Nullable final CharSequence v2,
final CharSequence k3, @Nullable final CharSequence v3) {

currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(k1, v1);
currentLogger.putMdcEntry(k2, v2);
currentLogger.putMdcEntry(k3, v3);
loggingAdapter.putMdcEntry(k1, v1);
loggingAdapter.putMdcEntry(k2, v2);
loggingAdapter.putMdcEntry(k3, v3);
return this;
}

@Override
public DittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final MdcEntry... furtherMdcEntries) {
checkNotNull(furtherMdcEntries, "furtherMdcEntries");

currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull());
loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull());
for (final MdcEntry furtherMdcEntry : furtherMdcEntries) {
currentLogger.putMdcEntry(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull());
loggingAdapter.putMdcEntry(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull());
}
return this;
}
Expand All @@ -194,8 +144,7 @@ public DittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final
public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final Collection<MdcEntry> mdcEntries) {
checkNotNull(mdcEntries, "mdcEntries");

currentLogger = autoDiscardingLoggingAdapter;
mdcEntries.forEach(mdcEntry -> currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()));
mdcEntries.forEach(mdcEntry -> loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull()));
return this;
}

Expand All @@ -205,99 +154,97 @@ public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry

checkNotNull(furtherMdcEntries, "furtherMdcEntries");

currentLogger = autoDiscardingLoggingAdapter;
currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull());
furtherMdcEntries.foreach(furtherMdcEntry -> currentLogger.putMdcEntry(furtherMdcEntry.getKey(),
loggingAdapter.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull());
furtherMdcEntries.foreach(furtherMdcEntry -> loggingAdapter.putMdcEntry(furtherMdcEntry.getKey(),
furtherMdcEntry.getValueOrNull()));
return this;
}

@Override
public DefaultDittoDiagnosticLoggingAdapter removeMdcEntry(final CharSequence key) {
currentLogger.removeMdcEntry(key);
loggingAdapter.removeMdcEntry(key);
return this;
}

@Override
public DefaultDittoDiagnosticLoggingAdapter discardMdcEntries() {
currentLogger.discardMdcEntries();
currentLogger = autoDiscardingLoggingAdapter;
loggingAdapter.discardMdcEntries();
return this;
}

@Override
public String getName() {
return currentLogger.getName();
return loggingAdapter.getName();
}

@Override
public boolean isErrorEnabled() {
return currentLogger.isErrorEnabled();
return loggingAdapter.isErrorEnabled();
}

@Override
public boolean isWarningEnabled() {
return currentLogger.isWarningEnabled();
return loggingAdapter.isWarningEnabled();
}

@Override
public boolean isInfoEnabled() {
return currentLogger.isInfoEnabled();
return loggingAdapter.isInfoEnabled();
}

@Override
public boolean isDebugEnabled() {
return currentLogger.isDebugEnabled();
return loggingAdapter.isDebugEnabled();
}

@Override
public void notifyError(final String message) {
currentLogger.notifyError(message);
loggingAdapter.notifyError(message);
}

@Override
public void notifyError(final Throwable cause, final String message) {
currentLogger.notifyError(cause, message);
loggingAdapter.notifyError(cause, message);
}

@Override
public void notifyWarning(final String message) {
currentLogger.notifyWarning(message);
loggingAdapter.notifyWarning(message);
}

@Override
public void notifyInfo(final String message) {
currentLogger.notifyInfo(message);
loggingAdapter.notifyInfo(message);
}

@Override
public void notifyDebug(final String message) {
currentLogger.notifyDebug(message);
loggingAdapter.notifyDebug(message);
}

@Override
public scala.collection.immutable.Map<String, Object> mdc() {
return currentLogger.mdc();
return loggingAdapter.mdc();
}

@Override
public void mdc(final scala.collection.immutable.Map<String, Object> mdc) {
currentLogger.mdc(mdc);
loggingAdapter.mdc(mdc);
}

@Override
public Map<String, Object> getMDC() {
return currentLogger.getMDC();
return loggingAdapter.getMDC();
}

@Override
public void setMDC(final Map<String, Object> jMdc) {
currentLogger.setMDC(jMdc);
loggingAdapter.setMDC(jMdc);
}

@Override
public void clearMDC() {
currentLogger.clearMDC();
loggingAdapter.clearMDC();
}

}

0 comments on commit 95e6d16

Please sign in to comment.