diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/operations/AbstractPersistenceOperationsActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/operations/AbstractPersistenceOperationsActor.java index e1b4696348..2e2a699a74 100644 --- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/operations/AbstractPersistenceOperationsActor.java +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/operations/AbstractPersistenceOperationsActor.java @@ -25,7 +25,8 @@ import javax.annotation.Nullable; import org.eclipse.ditto.model.base.entity.id.EntityId; -import org.eclipse.ditto.services.utils.akka.LogUtil; +import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter; +import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.signals.commands.common.Shutdown; import org.eclipse.ditto.signals.commands.common.ShutdownReason; @@ -38,7 +39,6 @@ import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.cluster.pubsub.DistributedPubSubMediator; -import akka.event.DiagnosticLoggingAdapter; import akka.japi.pf.ReceiveBuilder; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; @@ -54,7 +54,7 @@ public abstract class AbstractPersistenceOperationsActor extends AbstractActor { /** * The actor's logger. */ - protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this); + protected final DittoDiagnosticLoggingAdapter logger; private final ActorRef pubSubMediator; private final String resourceType; @@ -82,6 +82,7 @@ private AbstractPersistenceOperationsActor(final ActorRef pubSubMediator, this.toCloseWhenStopped = Collections.unmodifiableCollection(toCloseWhenStopped); materializer = ActorMaterializer.create(getContext()); delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown(); + logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); } /** @@ -162,7 +163,7 @@ public void postStop() throws Exception { try { closeable.close(); } catch (final IOException e) { - log.warning("Failed to close: <{}>!", e.getMessage()); + logger.warning("Failed to close: <{}>!", e.getMessage()); } }); super.postStop(); @@ -170,7 +171,7 @@ public void postStop() throws Exception { private void subscribeForNamespaceCommands() { if (null != namespaceOps) { - log.debug("Subscribing for namespace commands."); + logger.debug("Subscribing for namespace commands."); final ActorRef self = getSelf(); final DistributedPubSubMediator.Subscribe subscribe = DistPubSubAccess.subscribeViaGroup(PurgeNamespace.TYPE, getSubscribeGroup(), self); @@ -185,7 +186,7 @@ private void subscribeForEntitiesCommands() { final DistributedPubSubMediator.Subscribe subscribe = DistPubSubAccess.subscribeViaGroup(topic, getSubscribeGroup(), self); - log.debug("Subscribing for entities commands on topic <{}>.", topic); + logger.debug("Subscribing for entities commands on topic <{}>.", topic); pubSubMediator.tell(subscribe, self); } } @@ -200,18 +201,17 @@ public Receive createReceive() { .match(PurgeNamespace.class, this::purgeNamespace) .match(PurgeEntities.class, this::purgeEntities) .match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck) - .matchAny(message -> log.warning("unhandled: <{}>", message)) + .matchAny(message -> logger.warning("unhandled: <{}>", message)) .build(); } private void purgeNamespace(final PurgeNamespace purgeNamespace) { if (null == namespaceOps) { - log.warning("Cannot handle namespace command: <{}>!", purgeNamespace); + logger.withCorrelationId(purgeNamespace).warning("Cannot handle namespace command: <{}>!", purgeNamespace); return; } - LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace); - log.info("Running <{}>.", purgeNamespace); + logger.withCorrelationId(purgeNamespace).info("Running <{}>.", purgeNamespace); final String namespace = purgeNamespace.getNamespace(); final ActorRef sender = getSender(); @@ -223,31 +223,32 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) { response = PurgeNamespaceResponse.successful(namespace, resourceType, purgeNamespace.getDittoHeaders()); } else { - LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace); - errors.forEach(error -> log.error(error, "Error purging namespace <{}>!", namespace)); + logger.setCorrelationId(purgeNamespace); + errors.forEach(error -> logger.error(error, "Error purging namespace <{}>!", namespace)); + logger.discardCorrelationId(); response = PurgeNamespaceResponse.failed(namespace, resourceType, purgeNamespace.getDittoHeaders()); } sender.tell(response, getSelf()); - log.info("Successfully purged namespace <{}>.", namespace); + logger.withCorrelationId(purgeNamespace).info("Successfully purged namespace <{}>.", namespace); }) .exceptionally(error -> { - LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace); // Reply nothing - Error should not occur (DB errors were converted to stream elements and handled) - log.error(error, "Unexpected error when purging namespace <{}>!", - purgeNamespace.getNamespace()); + logger.withCorrelationId(purgeNamespace) + .error(error, "Unexpected error when purging namespace <{}>!", + purgeNamespace.getNamespace()); return null; }); } private void purgeEntities(final PurgeEntities purgeEntities) { - if (null == entitiesOps) { - log.warning("Cannot handle entities command: <{}>.", purgeEntities); + logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>.", purgeEntities); return; } if (!resourceType.equals(purgeEntities.getEntityType())) { - log.warning("Expected command with entityType <{}>, but got: <{}>.", resourceType, purgeEntities); + logger.withCorrelationId(purgeEntities) + .warning("Expected command with entityType <{}>, but got: <{}>.", resourceType, purgeEntities); return; } @@ -271,12 +272,11 @@ private void schedulePurgingEntitiesIn(final Duration delay, final PurgeEntities private void doPurgeEntities(final PurgeEntities purgeEntities, final ActorRef initiator) { if (null == entitiesOps) { - log.warning("Cannot handle entities command: <{}>", purgeEntities); + logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>", purgeEntities); return; } - LogUtil.enhanceLogWithCorrelationId(log, purgeEntities); - log.info("Running <{}>.", purgeEntities); + logger.withCorrelationId(purgeEntities).info("Running <{}>.", purgeEntities); final String entityType = purgeEntities.getEntityType(); final List entityIds = purgeEntities.getEntityIds(); @@ -287,26 +287,27 @@ private void doPurgeEntities(final PurgeEntities purgeEntities, final ActorRef i if (errors.isEmpty()) { response = PurgeEntitiesResponse.successful(entityType, purgeEntities.getDittoHeaders()); } else { - LogUtil.enhanceLogWithCorrelationId(log, purgeEntities); - errors.forEach(error -> log.error(error, "Error purging entities of type <{}>: <{}>", + logger.setCorrelationId(purgeEntities); + errors.forEach(error -> logger.error(error, "Error purging entities of type <{}>: <{}>", entityType, entityIds)); + logger.discardCorrelationId(); response = PurgeEntitiesResponse.failed(purgeEntities.getEntityType(), purgeEntities.getDittoHeaders()); } initiator.tell(response, getSelf()); - log.info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds); + logger.withCorrelationId(purgeEntities) + .info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds); }) .exceptionally(error -> { - LogUtil.enhanceLogWithCorrelationId(log, purgeEntities); // Reply nothing - Error should not occur (DB errors were converted to stream elements and handled) - log.error(error, "Unexpected error when purging entities <{}>!", - purgeEntities.getEntityIds()); + logger.withCorrelationId(purgeEntities) + .error(error, "Unexpected error when purging entities <{}>!", purgeEntities.getEntityIds()); return null; }); } private void handleSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) { - log.debug("Got subscribeAck <{}>.", subscribeAck); + logger.debug("Got subscribeAck <{}>.", subscribeAck); } }