Skip to content

Commit

Permalink
improve logging of ThingUpdater etc. in order to be better able to fi…
Browse files Browse the repository at this point in the history
…nd out which changne caused a "failed patch update"

* also use "trace" logging for logging potentially huge complete "writeModels"

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 22, 2022
1 parent dfca359 commit 2410aa9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
Expand Up @@ -25,6 +25,8 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.ShardedMessageEnvelope;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
Expand All @@ -47,6 +49,9 @@
*/
final class BulkWriteResultAckFlow {

private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(BulkWriteResultAckFlow.class);

private static final String ERRORS_COUNTER_NAME = "search-index-update-errors";

private static final DittoHeaders INCORRECT_PATCH_HEADERS = DittoHeaders.newBuilder()
Expand Down Expand Up @@ -104,7 +109,11 @@ private void reportIncorrectPatch(final WriteResultAndErrors writeResultAndError
// It is not possible to identify which patches are not applied; therefore request all patch updates to retry.
writeResultAndErrors.getWriteModels().forEach(model -> {
final var response =
createFailureResponse(model.getMetadata(), INCORRECT_PATCH_HEADERS);
createFailureResponse(model.getMetadata(), INCORRECT_PATCH_HEADERS.toBuilder()
.correlationId(writeResultAndErrors.getBulkWriteCorrelationId()).build());
LOGGER.withCorrelationId(writeResultAndErrors.getBulkWriteCorrelationId())
.warn("Encountered incorrect patch update for metadata: <{}> and filter: <{}>",
model.getMetadata(), model.getFilter());
model.getMetadata().getOrigin().ifPresent(updater -> updater.tell(response, ActorRef.noSender()));
});
}
Expand Down
Expand Up @@ -16,6 +16,8 @@
import java.util.HashMap;
import java.util.Map;

import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;

Expand All @@ -41,6 +43,8 @@ public final class ChangeQueueActor extends AbstractActor {

private static final Duration ASK_SELF_TIMEOUT = Duration.ofSeconds(5L);

private static final DittoLogger LOG = DittoLoggerFactory.getLogger(ChangeQueueActor.class);

/**
* Caching changes of 1 Thing per key.
* Change type values according to caching strategy;
Expand Down Expand Up @@ -107,7 +111,20 @@ public static Source<Map<ThingId, Metadata>, NotUsed> createSource(
}
return repeat
.flatMapConcat(ChangeQueueActor.askSelf(changeQueueActor))
.filter(map -> !map.isEmpty());
.filter(map -> {
final boolean notEmpty = !map.isEmpty();
if (notEmpty) {
if (LOG.isDebugEnabled()) {
LOG.debug("Emitting dumped map for shouldAcknowledge <{}> and [thingId:<[updateReasons]>]: {}",
shouldAcknowledge, map.entrySet().stream()
.map(e -> e.getKey() + ":<" + e.getValue().getUpdateReasons() + ">")
.toList()
);
}
LOG.trace("Emitting dumped map for shouldAcknowledge <{}>: {}", shouldAcknowledge, map);
}
return notEmpty;
});
}

private void dump(final Control dump) {
Expand Down
Expand Up @@ -29,6 +29,11 @@

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
Expand Down Expand Up @@ -138,12 +143,12 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho
final String bulkWriteCorrelationId = UUID.randomUUID().toString();
if (LOGGER.isDebugEnabled()) {
LOGGER.withCorrelationId(bulkWriteCorrelationId)
.debug("Executing BulkWrite containing [<thingId>:{correlationIds}:<filter>]: {}", abstractWriteModels.stream()
.map(writeModel -> "<" + writeModel.getMetadata().getThingId() + ">:" +
writeModel.getMetadata().getEventsCorrelationIds()
.debug("Executing BulkWrite containing [<thingId>:{correlationIds}:<filter>]: {}", pairs.stream()
.map(writeModelPair -> "<" + writeModelPair.first().getMetadata().getThingId() + ">:" +
writeModelPair.first().getMetadata().getEventsCorrelationIds()
.stream()
.collect(Collectors.joining(",", "{", "}"))
+ ":<" + writeModel.getFilter() + ">"
+ ":<" + extractFilterBson(writeModelPair.second()) + ">"
)
.toList());

Expand Down Expand Up @@ -174,6 +179,21 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho
});
}

private static String extractFilterBson(final WriteModel<BsonDocument> writeModel) {
if (writeModel instanceof UpdateManyModel) {
return ((UpdateManyModel<BsonDocument>) writeModel).getFilter().toString();
} else if (writeModel instanceof UpdateOneModel) {
return ((UpdateOneModel<BsonDocument>) writeModel).getFilter().toString();
} else if (writeModel instanceof ReplaceOneModel) {
return ((ReplaceOneModel<BsonDocument>) writeModel).getFilter().toString();
} else if (writeModel instanceof DeleteOneModel) {
return ((DeleteOneModel<BsonDocument>) writeModel).getFilter().toString();
} else if (writeModel instanceof DeleteManyModel) {
return ((DeleteManyModel<BsonDocument>) writeModel).getFilter().toString();
}
return "no filter";
}

private static StartedTimer startBulkWriteTimer(final List<?> writeModels) {
DittoMetrics.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record((long) writeModels.size());
return DittoMetrics.timer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").start();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.ditto.internal.models.streaming.IdentifiableStreamingMessage;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand Down Expand Up @@ -90,6 +91,8 @@ final class ThingUpdater extends AbstractActorWithStashWithTimers {
private static final Counter PATCH_SKIP_COUNT = DittoMetrics.counter("search_patch_skips");
private static final Counter FULL_UPDATE_COUNT = DittoMetrics.counter("search_full_updates");

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(ThingUpdater.class); // logger for "trace" statements

private final DittoDiagnosticLoggingAdapter log;
private final ThingId thingId;
private final ShutdownBehaviour shutdownBehaviour;
Expand Down Expand Up @@ -187,9 +190,10 @@ public Receive createReceive() {
}

private void recoveryComplete(final AbstractWriteModel writeModel) {
log.debug("Recovered: <{}>", writeModel);
log.debug("Recovered");
LOGGER.trace("Recovered: <{}>", writeModel);
thingRevision = writeModel.getMetadata().getThingRevision();
writeModel.getMetadata().getPolicyRevision().ifPresent(policyRevision -> this.policyRevision = policyRevision);
writeModel.getMetadata().getPolicyRevision().ifPresent(rev -> this.policyRevision = rev);
lastWriteModel = writeModel;
getContext().become(recoveredBehavior());
unstashAll();
Expand Down Expand Up @@ -234,7 +238,8 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
if (diff.isPresent() && diff.get().isDiffSmaller()) {
final var aggregationPipeline = diff.get().consumeAndExport();
if (aggregationPipeline.isEmpty()) {
log.debug("Skipping update due to empty diff <{}>", nextWriteModel);
log.debug("Skipping update due to empty diff");
LOGGER.trace("Skipping update due to empty diff <{}>", nextWriteModel);
getSender().tell(Done.getInstance(), getSelf());
PATCH_SKIP_COUNT.increment();

Expand All @@ -244,24 +249,28 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
.asPatchUpdate(lastWriteModel.getMetadata().getThingRevision())
.getFilter();
mongoWriteModel = new UpdateOneModel<>(filter, aggregationPipeline);
log.debug("Using incremental update <{}>", mongoWriteModel);
log.debug("Using incremental update");
LOGGER.trace("Using incremental update <{}>", mongoWriteModel);
PATCH_UPDATE_COUNT.increment();
} else {
mongoWriteModel = nextWriteModel.toMongo();
if (log.isDebugEnabled()) {
log.debug("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
log.debug("Using replacement because diff is bigger or nonexistent.");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using replacement because diff is bigger or nonexistent. Diff=<{}>",
diff.map(BsonDiff::consumeAndExport));
}
FULL_UPDATE_COUNT.increment();
}
} else {
mongoWriteModel = nextWriteModel.toMongo();
if (forceUpdate) {
log.debug("Using replacement (forceUpdate) <{}> - forceNextUpdate was: <{}>", mongoWriteModel,
log.debug("Using replacement (forceUpdate) - forceNextUpdate was: <{}>", forceNextUpdate);
LOGGER.trace("Using replacement (forceUpdate) <{}> - forceNextUpdate was: <{}>", mongoWriteModel,
forceNextUpdate);
forceNextUpdate = false;
} else {
log.debug("Using replacement <{}>", mongoWriteModel);
log.debug("Using replacement");
LOGGER.trace("Using replacement <{}>", mongoWriteModel);
}
FULL_UPDATE_COUNT.increment();
}
Expand All @@ -272,7 +281,7 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
try {
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
} catch (BsonInvalidOperationException e) {
} catch (final BsonInvalidOperationException e) {
log.error(e, "Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend);

return Optional.empty();
Expand Down Expand Up @@ -354,7 +363,8 @@ private void processUpdateThingResponse(final UpdateThingResponse response) {
warningTemplate = "Got negative acknowledgement for <{}>; updating to <{}>.";
UPDATE_FAILURE_COUNT.increment();
}
log.warning(warningTemplate, Metadata.fromResponse(response), metadata);
log.withCorrelationId(response)
.warning(warningTemplate, Metadata.fromResponse(response), metadata);
enqueueMetadata(metadata.withUpdateReason(UpdateReason.RETRY));
}
}
Expand Down
Expand Up @@ -223,10 +223,11 @@ private <M> void forwardToShardRegion(final M message,
final Function<M, DittoHeaders> getDittoHeaders) {

final ThingId id = getId.apply(message);
log.debug("Forwarding incoming {} to shard region of {}", message.getClass().getSimpleName(), id);
final DittoHeaders dittoHeaders = getDittoHeaders.apply(message);
log.withCorrelationId(dittoHeaders)
.debug("Forwarding incoming {} to shard region of {}", message.getClass().getSimpleName(), id);
final String type = getType.apply(message);
final JsonObject jsonObject = toJson.apply(message);
final DittoHeaders dittoHeaders = getDittoHeaders.apply(message);
final ShardedMessageEnvelope messageEnvelope = ShardedMessageEnvelope.of(id, type, jsonObject, dittoHeaders);

final ActorRef sender = getSender();
Expand Down

0 comments on commit 2410aa9

Please sign in to comment.