Skip to content

Commit

Permalink
Connectivity: Improve debug log for incoming messages.
Browse files Browse the repository at this point in the history
- Log actor ref of response collector for message correlation.

- Add toString() method to MappingOutcome.

- Fix an IndexOutOfBoundException in BaseConsumerActor logging.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 21, 2021
1 parent 9cda28e commit 3a9e9dd
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,13 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.getName())
.start();
forwardAndAwaitAck(addSourceAndReplyTarget(message))
// Start per-inbound-signal actor to collect acks of all thing-modify-commands mapped from incoming signal
final Duration collectorLifetime = acknowledgementConfig.getCollectorFallbackLifetime();
final ActorRef responseCollector = getContext().actorOf(ResponseCollectorActor.props(collectorLifetime));
forwardAndAwaitAck(addSourceAndReplyTarget(message), responseCollector)
.handle((output, error) -> {
log().debug("Result from ResponseCollector=<{}>: output=<{}> error=<{}>",
responseCollector, output, error);
if (output != null) {
final List<CommandResponse<?>> failedResponses = output.getFailedResponses();
if (output.allExpectedResponsesArrived() && failedResponses.isEmpty()) {
Expand All @@ -127,9 +132,8 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
// empty failed responses indicate that SetCount was missing
final boolean shouldRedeliver = failedResponses.isEmpty() ||
someFailedResponseRequiresRedelivery(failedResponses);
log().withCorrelationId(failedResponses.get(0))
.debug("Rejecting [redeliver={}] due to failed responses <{}>",
shouldRedeliver, failedResponses);
log().debug("Rejecting [redeliver={}] due to failed responses <{}>. " +
"ResponseCollector=<{}>", shouldRedeliver, failedResponses, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, shouldRedeliver)
.stop();
Expand All @@ -141,7 +145,8 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(error, rootCause -> {
// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>", rootCause);
log().debug("Rejecting [redeliver=true] due to error <{}>. " +
"ResponseCollector=<{}>", rootCause, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, true)
.stop();
Expand All @@ -154,8 +159,8 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
settle.run();
} else {
final var shouldRedeliver = requiresRedelivery(dittoRuntimeException.getHttpStatus());
log().debug("Rejecting [redeliver={}] due to error <{}>",
shouldRedeliver, dittoRuntimeException);
log().debug("Rejecting [redeliver={}] due to error <{}>. ResponseCollector=<{}>",
shouldRedeliver, dittoRuntimeException, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, shouldRedeliver)
.stop();
Expand All @@ -166,7 +171,8 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
return null;
})
.exceptionally(e -> {
log().error(e, "Unexpected error during manual acknowledgement.");
log().error(e, "Unexpected error during manual acknowledgement. ResponseCollector=<{}>",
responseCollector);
return null;
});
}
Expand Down Expand Up @@ -204,24 +210,25 @@ protected void handleAddressStatus(final ResourceStatus resourceStatus) {
}
}

private CompletionStage<ResponseCollectorActor.Output> forwardAndAwaitAck(final Object message) {
// 1. start per-inbound-signal actor to collect acks of all thing-modify-commands mapped from incoming signal
final Duration collectorLifetime = acknowledgementConfig.getCollectorFallbackLifetime();
private CompletionStage<ResponseCollectorActor.Output> forwardAndAwaitAck(final ExternalMessage message,
final ActorRef responseCollector) {
final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
final ActorRef responseCollector = getContext().actorOf(ResponseCollectorActor.props(collectorLifetime));
// 2. forward message to mapping processor actor with response collector actor as sender
// Forward message to mapping processor actor with response collector actor as sender
// message mapping processor actor will set the number of expected acks (can be 0)
// and start the same amount of ack aggregator actors
log().debug("Forwarding incoming message for mapping. ResponseCollector=<{}>", responseCollector);
inboundMappingProcessor.tell(message, responseCollector);
// 3. ask response collector actor to get the collected responses in a future
// Ask response collector actor to get the collected responses in a future

return Patterns.ask(responseCollector, ResponseCollectorActor.query(), askTimeout).thenCompose(output -> {
if (output instanceof ResponseCollectorActor.Output) {
return CompletableFuture.completedFuture((ResponseCollectorActor.Output) output);
} else if (output instanceof Throwable) {
log().debug("Patterns.ask failed. ResponseCollector=<{}>", responseCollector);
return CompletableFuture.failedFuture((Throwable) output);
} else {
log().error("Expect ResponseCollectorActor.Output, got: <{}>", output);
log().error("Expect ResponseCollectorActor.Output, got: <{}>. ResponseCollector=<{}>", output,
responseCollector);
return CompletableFuture.failedFuture(new ClassCastException("Unexpected acknowledgement type."));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,47 +37,47 @@
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage;
import org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityInternalErrorException;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityErrorResponse;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.TopicPathBuilder;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActor;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage;
import org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFilter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityErrorResponse;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.messages.model.signals.commands.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.TopicPathBuilder;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingLiveCommandAckRequestSetter;
Expand Down Expand Up @@ -218,6 +218,7 @@ private void dispatchMapped(final InboundMappingOutcomes outcomes) {
.flatMap(dispatchResponsesAndSearchCommands::apply)
.mapToInt(this::dispatchIncomingSignal)
.sum();
logger.debug("OnMapped from <{}>: ", sender, outcomes);
sender.tell(ResponseCollectorActor.setCount(ackRequestingSignalCount), getSelf());
}

Expand Down Expand Up @@ -282,6 +283,8 @@ public Optional<Signal<?>> onError(final String mapperId,
@Nullable final TopicPath topicPath,
@Nullable final ExternalMessage message) {

logger.debug("OnError mapperId=<{}> exception=<{}> topicPath=<{}> message=<{}> sender=<{}>",
mapperId, e, topicPath, message, getSender());
if (e instanceof DittoRuntimeException) {
final DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) e;
final ErrorResponse<?> errorResponse = toErrorResponseFunction.apply(dittoRuntimeException, topicPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ private void onSetCount(final SetCount setCount) {
}

private void onCommandResponse(final CommandResponse<?> commandResponse) {
log.debug("CommandResponse <{}>", commandResponse);
log.withCorrelationId(commandResponse).debug("CommandResponse <{}>", commandResponse);
commandResponses.add(commandResponse);
reportIfAllCollected();
}

private void onDittoRuntimeException(final DittoRuntimeException dittoRuntimeException) {
log.debug("DittoRuntimeException <{}>", dittoRuntimeException);
log.withCorrelationId(dittoRuntimeException)
.debug("DittoRuntimeException <{}>", dittoRuntimeException);
error = dittoRuntimeException;
reportIfAllCollected();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public <R> R accept(final Visitor<T, R> visitor) {
return visitor.onError(mapperId.toString(), e, null, droppedMessage);
}
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[mapperId=" + mapperId +
",externalMessage=" + droppedMessage +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.protocol.TopicPath;

// private to MappingOutcome. Do NOT use directly.
final class ErrorOutcome<T> implements MappingOutcome<T> {
Expand All @@ -37,4 +37,14 @@ final class ErrorOutcome<T> implements MappingOutcome<T> {
public <R> R accept(final Visitor<T, R> visitor) {
return visitor.onError(String.valueOf(mapperId), error, topicPath, externalMessage);
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[mapperId=" + mapperId +
",error=" + error +
",topicPath=" + topicPath +
",externalMessage=" + externalMessage +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.protocol.TopicPath;

// private to MappingOutcome. Do NOT use directly.
final class MappedOutcome<T> implements MappingOutcome<T> {
Expand All @@ -41,4 +41,14 @@ public <R> R accept(final Visitor<T, R> visitor) {
return visitor.onError(mapperId.toString(), e, topicPath, externalMessage);
}
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[mapperId=" + mapperId +
",mapped=" + mapped +
",topicPath=" + topicPath +
",externalMessage=" + externalMessage +
"]";
}
}

0 comments on commit 3a9e9dd

Please sign in to comment.