Skip to content

Commit

Permalink
Enhanced further log statements with correlation ID.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch-si.com>
  • Loading branch information
Juergen Fickel committed Dec 2, 2019
1 parent 733ab1b commit e8e3896
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownReason;
Expand All @@ -38,7 +39,6 @@
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
Expand All @@ -54,7 +54,7 @@ public abstract class AbstractPersistenceOperationsActor extends AbstractActor {
/**
* The actor's logger.
*/
protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
protected final DittoDiagnosticLoggingAdapter logger;

private final ActorRef pubSubMediator;
private final String resourceType;
Expand Down Expand Up @@ -82,6 +82,7 @@ private AbstractPersistenceOperationsActor(final ActorRef pubSubMediator,
this.toCloseWhenStopped = Collections.unmodifiableCollection(toCloseWhenStopped);
materializer = ActorMaterializer.create(getContext());
delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
}

/**
Expand Down Expand Up @@ -162,15 +163,15 @@ public void postStop() throws Exception {
try {
closeable.close();
} catch (final IOException e) {
log.warning("Failed to close: <{}>!", e.getMessage());
logger.warning("Failed to close: <{}>!", e.getMessage());
}
});
super.postStop();
}

private void subscribeForNamespaceCommands() {
if (null != namespaceOps) {
log.debug("Subscribing for namespace commands.");
logger.debug("Subscribing for namespace commands.");
final ActorRef self = getSelf();
final DistributedPubSubMediator.Subscribe subscribe =
DistPubSubAccess.subscribeViaGroup(PurgeNamespace.TYPE, getSubscribeGroup(), self);
Expand All @@ -185,7 +186,7 @@ private void subscribeForEntitiesCommands() {
final DistributedPubSubMediator.Subscribe subscribe =
DistPubSubAccess.subscribeViaGroup(topic, getSubscribeGroup(), self);

log.debug("Subscribing for entities commands on topic <{}>.", topic);
logger.debug("Subscribing for entities commands on topic <{}>.", topic);
pubSubMediator.tell(subscribe, self);
}
}
Expand All @@ -200,18 +201,17 @@ public Receive createReceive() {
.match(PurgeNamespace.class, this::purgeNamespace)
.match(PurgeEntities.class, this::purgeEntities)
.match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck)
.matchAny(message -> log.warning("unhandled: <{}>", message))
.matchAny(message -> logger.warning("unhandled: <{}>", message))
.build();
}

private void purgeNamespace(final PurgeNamespace purgeNamespace) {
if (null == namespaceOps) {
log.warning("Cannot handle namespace command: <{}>!", purgeNamespace);
logger.withCorrelationId(purgeNamespace).warning("Cannot handle namespace command: <{}>!", purgeNamespace);
return;
}

LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
log.info("Running <{}>.", purgeNamespace);
logger.withCorrelationId(purgeNamespace).info("Running <{}>.", purgeNamespace);
final String namespace = purgeNamespace.getNamespace();
final ActorRef sender = getSender();

Expand All @@ -223,31 +223,32 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) {
response = PurgeNamespaceResponse.successful(namespace, resourceType,
purgeNamespace.getDittoHeaders());
} else {
LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
errors.forEach(error -> log.error(error, "Error purging namespace <{}>!", namespace));
logger.setCorrelationId(purgeNamespace);
errors.forEach(error -> logger.error(error, "Error purging namespace <{}>!", namespace));
logger.discardCorrelationId();
response = PurgeNamespaceResponse.failed(namespace, resourceType,
purgeNamespace.getDittoHeaders());
}
sender.tell(response, getSelf());
log.info("Successfully purged namespace <{}>.", namespace);
logger.withCorrelationId(purgeNamespace).info("Successfully purged namespace <{}>.", namespace);
})
.exceptionally(error -> {
LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
// Reply nothing - Error should not occur (DB errors were converted to stream elements and handled)
log.error(error, "Unexpected error when purging namespace <{}>!",
purgeNamespace.getNamespace());
logger.withCorrelationId(purgeNamespace)
.error(error, "Unexpected error when purging namespace <{}>!",
purgeNamespace.getNamespace());
return null;
});
}

private void purgeEntities(final PurgeEntities purgeEntities) {

if (null == entitiesOps) {
log.warning("Cannot handle entities command: <{}>.", purgeEntities);
logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>.", purgeEntities);
return;
}
if (!resourceType.equals(purgeEntities.getEntityType())) {
log.warning("Expected command with entityType <{}>, but got: <{}>.", resourceType, purgeEntities);
logger.withCorrelationId(purgeEntities)
.warning("Expected command with entityType <{}>, but got: <{}>.", resourceType, purgeEntities);
return;
}

Expand All @@ -271,12 +272,11 @@ private void schedulePurgingEntitiesIn(final Duration delay, final PurgeEntities

private void doPurgeEntities(final PurgeEntities purgeEntities, final ActorRef initiator) {
if (null == entitiesOps) {
log.warning("Cannot handle entities command: <{}>", purgeEntities);
logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>", purgeEntities);
return;
}

LogUtil.enhanceLogWithCorrelationId(log, purgeEntities);
log.info("Running <{}>.", purgeEntities);
logger.withCorrelationId(purgeEntities).info("Running <{}>.", purgeEntities);
final String entityType = purgeEntities.getEntityType();
final List<EntityId> entityIds = purgeEntities.getEntityIds();

Expand All @@ -287,26 +287,27 @@ private void doPurgeEntities(final PurgeEntities purgeEntities, final ActorRef i
if (errors.isEmpty()) {
response = PurgeEntitiesResponse.successful(entityType, purgeEntities.getDittoHeaders());
} else {
LogUtil.enhanceLogWithCorrelationId(log, purgeEntities);
errors.forEach(error -> log.error(error, "Error purging entities of type <{}>: <{}>",
logger.setCorrelationId(purgeEntities);
errors.forEach(error -> logger.error(error, "Error purging entities of type <{}>: <{}>",
entityType, entityIds));
logger.discardCorrelationId();
response = PurgeEntitiesResponse.failed(purgeEntities.getEntityType(),
purgeEntities.getDittoHeaders());
}
initiator.tell(response, getSelf());
log.info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds);
logger.withCorrelationId(purgeEntities)
.info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds);
})
.exceptionally(error -> {
LogUtil.enhanceLogWithCorrelationId(log, purgeEntities);
// Reply nothing - Error should not occur (DB errors were converted to stream elements and handled)
log.error(error, "Unexpected error when purging entities <{}>!",
purgeEntities.getEntityIds());
logger.withCorrelationId(purgeEntities)
.error(error, "Unexpected error when purging entities <{}>!", purgeEntities.getEntityIds());
return null;
});
}

private void handleSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) {
log.debug("Got subscribeAck <{}>.", subscribeAck);
logger.debug("Got subscribeAck <{}>.", subscribeAck);
}

}

0 comments on commit e8e3896

Please sign in to comment.