Skip to content

Commit

Permalink
Moved ExternalMessageWithSender to its own file.
Browse files Browse the repository at this point in the history
This makes it easier to access.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Apr 14, 2022
1 parent 5bbba61 commit 79302cb
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 96 deletions.
Expand Up @@ -30,7 +30,6 @@
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageBuilder;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand All @@ -40,7 +39,6 @@
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.InboundMappingSink.ExternalMessageWithSender;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.CounterKey;
Expand All @@ -49,7 +47,6 @@
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.TracingTags;

Expand Down Expand Up @@ -100,12 +97,14 @@ protected BaseConsumerActor(final Connection connection,
this.connectivityConfig = connectivityConfig;
acknowledgementConfig = connectivityConfig.getAcknowledgementConfig();

inboundMonitor = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig)
.forInboundConsumed(connection, sourceAddress);
final var connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig);
inboundMonitor = connectionMonitorRegistry.forInboundConsumed(connection, sourceAddress);
inboundAcknowledgedMonitor = connectionMonitorRegistry.forInboundAcknowledged(connection, sourceAddress);
}

inboundAcknowledgedMonitor =
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig)
.forInboundAcknowledged(connection, sourceAddress);
protected void resetResourceStatus() {
resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
ConnectivityStatus.OPEN, sourceAddress, "Consumer started.", Instant.now());
}

/**
Expand All @@ -125,36 +124,37 @@ private ExternalMessageWithSender withSender(final AcknowledgeableMessage acknow
final ActorRef responseCollector = getContext().actorOf(ResponseCollectorActor.props(collectorLifetime));
prepareResponseHandler(acknowledgeableMessage, responseCollector);

final ExternalMessage messageWithSourceAndReplyTarget =
addSourceAndReplyTarget(acknowledgeableMessage.getMessage());
final var messageWithSourceAndReplyTarget = addSourceAndReplyTarget(acknowledgeableMessage.getMessage());
log().debug("Forwarding incoming message for mapping. ResponseCollector=<{}>", responseCollector);
return new ExternalMessageWithSender(messageWithSourceAndReplyTarget, responseCollector);
}

private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeableMessage,
final ActorRef responseCollector) {

final StartedTimer ackTimer = DittoMetrics.timer(TIMER_ACK_HANDLING)
final var ackTimer = DittoMetrics.timer(TIMER_ACK_HANDLING)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.getName())
.start();
final var ackCounter = MetricAlertRegistry.getMetricsAlertGaugeOrDefault(
CounterKey.of(connectionId, sourceAddress), MetricAlertRegistry.COUNTER_ACK_HANDLING,
connectionType, connectivityConfig)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.toString())
.increment();
final var ackCounter =
MetricAlertRegistry.getMetricsAlertGaugeOrDefault(CounterKey.of(connectionId, sourceAddress),
MetricAlertRegistry.COUNTER_ACK_HANDLING,
connectionType,
connectivityConfig)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.toString())
.increment();
final Context traceContext = DittoTracing.extractTraceContext(acknowledgeableMessage.getMessage().getHeaders());
DittoTracing.wrapTimer(traceContext, ackTimer);

final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
// Ask response collector actor to get the collected responses in a future
Patterns.ask(responseCollector, ResponseCollectorActor.query(), askTimeout).thenCompose(output -> {
if (output instanceof ResponseCollectorActor.Output) {
return CompletableFuture.completedFuture((ResponseCollectorActor.Output) output);
} else if (output instanceof Throwable) {
if (output instanceof ResponseCollectorActor.Output o) {
return CompletableFuture.completedFuture(o);
} else if (output instanceof Throwable t) {
log().debug("Patterns.ask failed. ResponseCollector=<{}>", responseCollector);
return CompletableFuture.failedFuture((Throwable) output);
return CompletableFuture.failedFuture(t);
} else {
log().error("Expect ResponseCollectorActor.Output, got: <{}>. ResponseCollector=<{}>", output,
responseCollector);
Expand Down Expand Up @@ -184,18 +184,18 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
} else {
// don't count this as "failure" in the "source consumed" metric as the consumption
// itself was successful
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(error, rootCause -> {
// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>. " +
"ResponseCollector=<{}>", rootCause, responseCollector);
ackTimer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, true)
.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(true);
return null;
});
final var dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(error, rootCause -> {

// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>. " +
"ResponseCollector=<{}>", rootCause, responseCollector);
ackTimer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, true)
.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(true);
return null;
});
if (dittoRuntimeException != null) {
if (isConsideredSuccess(dittoRuntimeException)) {
ackTimer.tag(TracingTags.ACK_SUCCESS, true).stop();
Expand All @@ -215,30 +215,34 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
}
return null;
}).exceptionally(e -> {
log().error(e, "Unexpected error during manual acknowledgement. ResponseCollector=<{}>",
responseCollector);
log().error(e, "Unexpected error during manual acknowledgement. ResponseCollector=<{}>", responseCollector);
return null;
});
}

private ExternalMessage addSourceAndReplyTarget(final ExternalMessage message) {
return ExternalMessageFactory.newExternalMessageBuilder(message)
.withSource(source)
.withInternalHeaders(enrichHeadersWithReplyInformation(message.getInternalHeaders()))
.build();
}

/**
* Send an error to the mapping sink to be published in the reply-target.
*/
protected final Sink<DittoRuntimeException, ?> getDittoRuntimeExceptionSink() {
return Flow.<DittoRuntimeException, DittoRuntimeException>fromFunction(
message -> message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders())))
.via(Flow.<DittoRuntimeException, Object>fromFunction(value -> {
inboundMonitor.failure(value.getDittoHeaders(), value);
return value;
dittoRuntimeException -> dittoRuntimeException.setDittoHeaders(
enrichHeadersWithReplyInformation(dittoRuntimeException.getDittoHeaders())
)
)
.via(Flow.<DittoRuntimeException, Object>fromFunction(dittoRuntimeException -> {
inboundMonitor.failure(dittoRuntimeException.getDittoHeaders(), dittoRuntimeException);
return dittoRuntimeException;
}))
.to(inboundMappingSink);
}

protected void resetResourceStatus() {
resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
ConnectivityStatus.OPEN, sourceAddress, "Consumer started.", Instant.now());
}

protected ResourceStatus getCurrentSourceStatus() {
final Optional<ResourceStatus> statusOptional = Optional.ofNullable(resourceStatus);
return ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
Expand All @@ -259,14 +263,6 @@ protected void handleAddressStatus(final ResourceStatus resourceStatus) {
}
}

private ExternalMessage addSourceAndReplyTarget(final ExternalMessage message) {
final ExternalMessageBuilder externalMessageBuilder =
ExternalMessageFactory.newExternalMessageBuilder(message)
.withSource(source);
externalMessageBuilder.withInternalHeaders(enrichHeadersWithReplyInformation(message.getInternalHeaders()));
return externalMessageBuilder.build();
}

private DittoHeaders enrichHeadersWithReplyInformation(final DittoHeaders headers) {
return source.getReplyTarget()
.<DittoHeaders>map(replyTarget -> headers.toBuilder()
Expand Down
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import org.eclipse.ditto.connectivity.api.ExternalMessage;

import akka.actor.ActorRef;

/**
* This record bundles an {@link ExternalMessage} with the sender of this message.
*/
public record ExternalMessageWithSender(ExternalMessage externalMessage, ActorRef sender) {}
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
Expand All @@ -32,7 +31,6 @@
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.dispatch.MessageDispatcher;
import akka.stream.javadsl.Flow;
Expand Down Expand Up @@ -169,38 +167,27 @@ private int determinePoolSize(final int connectionPoolSize, final int maxPoolSiz

private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender withSender,
final InboundMappingProcessor inboundMappingProcessor) {
final var externalMessage = withSender.externalMessage;
final String correlationId =
externalMessage.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey());

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, externalMessage, inboundMappingProcessor);
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
final var outcomes =
InboundMappingOutcomes.of(withSender.externalMessage, e, withSender.sender);
logger.withCorrelationId(correlationId)
.error("Handling exception when mapping external message: {}", e.getMessage());
return outcomes;
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
}

private InboundMappingOutcomes mapExternalMessageToSignal(final ExternalMessageWithSender withSender,
final ExternalMessage externalMessage, final InboundMappingProcessor inboundMappingProcessor) {
return InboundMappingOutcomes.of(inboundMappingProcessor.process(withSender.externalMessage),
externalMessage, withSender.sender);
}

static final class ExternalMessageWithSender {

private final ExternalMessage externalMessage;
private final ActorRef sender;

ExternalMessageWithSender(final ExternalMessage externalMessage, final ActorRef sender) {
this.externalMessage = externalMessage;
this.sender = sender;
}
private static InboundMappingOutcomes mapExternalMessageToSignal(final ExternalMessageWithSender withSender,
final InboundMappingProcessor inboundMappingProcessor) {

return InboundMappingOutcomes.of(inboundMappingProcessor.process(withSender.externalMessage()),
withSender.externalMessage(),
withSender.sender());
}

}
Expand Up @@ -211,7 +211,7 @@ void testExternalMessageInDittoProtocolIsProcessed(

TestProbe collectorProbe = TestProbe.apply("collector", actorSystem);
inboundMappingProcessorActor.tell(
new InboundMappingSink.ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
new ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
ActorRef.noSender());

if (expectSuccess) {
Expand Down Expand Up @@ -284,7 +284,7 @@ <T> void testMessageMappingWithoutCorrelationId(

final TestProbe collectorProbe = TestProbe.apply("collector", actorSystem);
inboundMappingProcessorActor.tell(
new InboundMappingSink.ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
new ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
ActorRef.noSender());

final T received = expectMsgClass(expectedMessageClass);
Expand Down Expand Up @@ -318,7 +318,7 @@ <T> void testMessageMapping(final String correlationId,

final TestProbe collectorProbe = TestProbe.apply("collector", actorSystem);
inboundMappingProcessorActor.tell(
new InboundMappingSink.ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
new ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
ActorRef.noSender());

final T received = expectMsgClass(expectedMessageClass);
Expand Down
Expand Up @@ -91,10 +91,7 @@ public void onError() {
// attach non-null payload mapping to avoid using the default mapper
.withPayloadMapping(Mockito.mock(PayloadMapping.class))
.build();
underTest.tell(
new InboundMappingSink.ExternalMessageWithSender(message, getRef()),
ActorRef.noSender()
);
underTest.tell(new ExternalMessageWithSender(message, getRef()), ActorRef.noSender());

// THEN: InboundDispatchingActor receives 1 error outcome with the exception thrown.
final InboundMappingOutcomes outcomes =
Expand Down
Expand Up @@ -96,7 +96,7 @@ processorPoolSize, sink, getMappingConfig(),
.withText(string)
.withPayloadMapping(ConnectivityModelFactory.newPayloadMapping("javascript"))
.build();
return new InboundMappingSink.ExternalMessageWithSender(message, testActor());
return new ExternalMessageWithSender(message, testActor());
})
.collect(Collectors.toList());

Expand Down
Expand Up @@ -91,8 +91,7 @@ public void run() {
final boolean settleImmediately = modifyThing.getDittoHeaders().getAcknowledgementRequests().isEmpty();

inboundMappingProcessorActor.tell(
new InboundMappingSink.ExternalMessageWithSender(toExternalMessage(modifyThing),
collectorProbe.ref()),
new ExternalMessageWithSender(toExternalMessage(modifyThing), collectorProbe.ref()),
ActorRef.noSender()
);

Expand Down

0 comments on commit 79302cb

Please sign in to comment.