Skip to content

Commit

Permalink
fixed drop behavior of mapping queue in LegacyBaseConsumerActor
Browse files Browse the repository at this point in the history
* when configured with a buffer-size which was reached and backpressure was applied, new elements to the queue were silently dropped
* this dropping is now handled by
** logging an error/warning
** invoking the "reject with retry" mechanism
* pulled up logger construction to LegacyBaseConsumerActor
* increased default mapping buffer size to 500

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Sep 28, 2021
1 parent eb3c00a commit 8354a33
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@
*/
package org.eclipse.ditto.connectivity.service.messaging;

import java.util.Objects;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.QueueOfferResult;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.SourceQueueWithComplete;

Expand All @@ -34,7 +40,9 @@
@Deprecated
public abstract class LegacyBaseConsumerActor extends BaseConsumerActor {

private final SourceQueueWithComplete<AcknowledgeableMessage> messageSourceQueue;
protected final ThreadSafeDittoLoggingAdapter logger;

private final SourceQueueWithComplete<AcknowledgeableMessage> messageMappingSourceQueue;
private final SourceQueueWithComplete<DittoRuntimeException> dreSourceQueue;

protected LegacyBaseConsumerActor(final Connection connection,
Expand All @@ -44,12 +52,15 @@ protected LegacyBaseConsumerActor(final Connection connection,
final ConnectivityStatusResolver connectivityStatusResolver) {
super(connection, sourceAddress, inboundMappingSink, source, connectivityStatusResolver);

logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID.toString(), connectionId);

final ConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));

final var materializer = Materializer.createMaterializer(this::getContext);

messageSourceQueue = akka.stream.javadsl.Source
messageMappingSourceQueue = akka.stream.javadsl.Source
.<AcknowledgeableMessage>queue(connectivityConfig.getMappingConfig().getBufferSize(),
OverflowStrategy.dropNew())
.to(getMessageMappingSink())
Expand All @@ -69,18 +80,44 @@ protected LegacyBaseConsumerActor(final Connection connection,
* @param settle technically settle the incoming message. MUST be thread-safe.
* @param reject technically reject the incoming message. MUST be thread-safe.
*/
protected final void forwardToMappingActor(final ExternalMessage message, final Runnable settle,
protected final void forwardToMapping(final ExternalMessage message, final Runnable settle,
final Reject reject) {
messageSourceQueue.offer(AcknowledgeableMessage.of(message, settle, reject));
final AcknowledgeableMessage acknowledgeableMessage = AcknowledgeableMessage.of(message, settle, reject);
messageMappingSourceQueue.offer(acknowledgeableMessage)
.whenComplete((queueOfferResult, error) -> {
if (error != null) {
logger.withCorrelationId(message.getInternalHeaders())
.error(error,
"Message mapping source queue failure, invoking 'reject with redeliver'.");
acknowledgeableMessage.reject(true);
} else if (Objects.equals(queueOfferResult, QueueOfferResult.dropped())) {
logger.withCorrelationId(message.getInternalHeaders())
.warning("Message mapping source queue dropped message as part of backpressure " +
"strategy, invoking 'reject with redeliver'. Increase " +
"'ditto.connectivity.mapping.buffer-size' if this situation prevails.");
acknowledgeableMessage.reject(true);
}
});
}

/**
* Send an error to the inbound mapping sink to be published in the reply-target.
*
* @param message the error.
* @param dittoRuntimeException the error.
*/
protected final void forwardToMappingActor(final DittoRuntimeException message) {
dreSourceQueue.offer(message);
protected final void forwardToMapping(final DittoRuntimeException dittoRuntimeException) {
dreSourceQueue.offer(dittoRuntimeException)
.whenComplete((queueOfferResult, error) -> {
if (error != null) {
logger.withCorrelationId(dittoRuntimeException)
.error(error, "DRE handling source queue failure.");
} else if (Objects.equals(queueOfferResult, QueueOfferResult.dropped())) {
logger.withCorrelationId(dittoRuntimeException)
.warning("DRE handling source queue dropped dittoRuntimeException as part of " +
"backpressure strategy. Increase " +
"'ditto.connectivity.mapping.buffer-size' if this situation prevails.");
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
Expand All @@ -91,7 +89,6 @@ final class AmqpConsumerActor extends LegacyBaseConsumerActor implements Message
*/
static final String ACTOR_NAME_PREFIX = "amqpConsumerActor-";

private final ThreadSafeDittoLoggingAdapter log;
private final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory;
private final ActorRef backOffActor;

Expand All @@ -116,9 +113,6 @@ private AmqpConsumerActor(final Connection connection, final ConsumerData consum
consumerData.getSource(),
connectivityStatusResolver);

log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID.toString(), connectionId);

final ConnectionContext connectionContext = consumerData.getConnectionContext();
connectivityConfig = connectionContext.getConnectivityConfig();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
Expand Down Expand Up @@ -171,7 +165,7 @@ public Receive createReceive() {
final Receive rateLimiterBehavior = getRateLimiterBehavior();
final Receive matchAnyBehavior = ReceiveBuilder.create()
.matchAny(m -> {
log.warning("Unknown message: {}", m);
logger.warning("Unknown message: {}", m);
unhandled(m);
}).build();
return messageHandlingBehavior
Expand All @@ -186,7 +180,7 @@ public void preStart() throws Exception {
getConnectivityConfigProvider()
.registerForConnectivityConfigChanges(consumerData.getConnectionContext(), getSelf())
.exceptionally(e -> {
log.error(e, "Failed to register for connectivity config changes");
logger.error(e, "Failed to register for connectivity config changes");
return null;
});
initMessageConsumer();
Expand Down Expand Up @@ -254,10 +248,10 @@ private void initMessageConsumer() {
private void destroyMessageConsumer() {
if (messageConsumer != null) {
try {
log.debug("Closing AMQP Consumer for '{}'", sourceAddress);
logger.debug("Closing AMQP Consumer for '{}'", sourceAddress);
messageConsumer.close();
} catch (final JMSException jmsException) {
log.debug("Closing consumer failed (can be ignored if connection was closed already): {}",
logger.debug("Closing consumer failed (can be ignored if connection was closed already): {}",
jmsException.getMessage());
}
messageConsumer = null;
Expand All @@ -269,7 +263,7 @@ private boolean matchesOwnConsumer(final ConsumerClosedStatusReport event) {
}

private void handleNonMatchingConsumerClosed(final ConsumerClosedStatusReport event) {
log.debug("Received ConsumerClosedStatusReport which is handled by another consumer actor. Ignoring.");
logger.debug("Received ConsumerClosedStatusReport which is handled by another consumer actor. Ignoring.");
}

private void handleConsumerClosed(final ConsumerClosedStatusReport event) {
Expand All @@ -285,7 +279,7 @@ private void handleConsumerClosed(final ConsumerClosedStatusReport event) {
// destroy current message consumer in any case
destroyMessageConsumer();

log.info("Consumer for destination '{}' was closed. Will try to recreate after some backoff.", sourceAddress);
logger.info("Consumer for destination '{}' was closed. Will try to recreate after some backoff.", sourceAddress);
backOffActor.tell(BackOffActor.createBackOffWithAnswerMessage(Control.CREATE_CONSUMER),
getSelf());
}
Expand All @@ -302,7 +296,7 @@ private String buildStatusDetailsFromStatusReport(final ConsumerClosedStatusRepo
}

private void createMessageConsumer(final Control createConsumer) {
log.debug("Trying to create consumer for destination '{}'.", sourceAddress);
logger.debug("Trying to create consumer for destination '{}'.", sourceAddress);
/* ask JMSConnectionHandlingActor for a new consumer */
final CreateMessageConsumer createMessageConsumer = new CreateMessageConsumer(consumerData);
final CompletionStage<Object> responseFuture =
Expand All @@ -319,14 +313,14 @@ private void createMessageConsumer(final Control createConsumer) {

private void messageConsumerCreated(final CreateMessageConsumerResponse response) {
if (consumerData.equals(response.consumerData)) {
log.info("Consumer for destination '{}' created.", sourceAddress);
logger.info("Consumer for destination '{}' created.", sourceAddress);
destroyMessageConsumer();
messageConsumer = response.messageConsumer;
initMessageConsumer();
resetResourceStatus();
} else {
// got an orphaned message consumer! this is an error.
log.error("RESOURCE_LEAK! Got created MessageConsumer <{}> for <{}>, while I have <{}> for <{}>",
logger.error("RESOURCE_LEAK! Got created MessageConsumer <{}> for <{}>, while I have <{}> for <{}>",
response.messageConsumer, response.consumerData, messageConsumer, consumerData);
}
}
Expand All @@ -344,11 +338,11 @@ private void handleJmsMessage(final JmsMessage message) {
StartedTrace trace = Traces.emptyStartedTrace();
try {
recordIncomingForRateLimit(message.getJMSMessageID());
if (log.isDebugEnabled()) {
if (logger.isDebugEnabled()) {
final Integer ackType = Optional.ofNullable(message.getAcknowledgeCallback())
.map(JmsAcknowledgeCallback::getAckType)
.orElse(null);
log.debug("Received JmsMessage from AMQP 1.0: {} with Properties: {} and AckType {}",
logger.debug("Received JmsMessage from AMQP 1.0: {} with Properties: {} and AckType {}",
message.toString(),
message.getAllPropertyNames(),
ackType);
Expand All @@ -369,26 +363,26 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
log.withCorrelationId(correlationId).info("Received message from AMQP 1.0 with externalMessageHeaders: {}",
logger.withCorrelationId(correlationId).info("Received message from AMQP 1.0 with externalMessageHeaders: {}",
externalMessageHeaders);
if (log.isDebugEnabled()) {
log.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMappingActor(externalMessage,
forwardToMapping(externalMessage,
() -> acknowledge(message, true, false, externalMessageHeaders),
redeliver -> acknowledge(message, false, redeliver, externalMessageHeaders)
);
} catch (final DittoRuntimeException e) {
log.withCorrelationId(e)
logger.withCorrelationId(e)
.info("Got DittoRuntimeException '{}' when command was parsed: {}", e.getErrorCode(),
e.getMessage());
trace.fail(e);
if (headers != null) {
// forwarding to messageMappingProcessor only make sense if we were able to extract the headers,
// because we need a reply-to address to send the error response
inboundMonitor.failure(headers, e);
forwardToMappingActor(e.setDittoHeaders(DittoHeaders.of(headers)));
forwardToMapping(e.setDittoHeaders(DittoHeaders.of(headers)));
} else {
inboundMonitor.failure(e);
}
Expand All @@ -399,7 +393,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
trace.fail(e);
log.withCorrelationId(correlationId)
logger.withCorrelationId(correlationId)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
trace.finish();
Expand Down Expand Up @@ -433,7 +427,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
log.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
Expand All @@ -451,7 +445,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
inboundAcknowledgedMonitor.exception("Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final Exception e) {
log.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
}
}

Expand All @@ -472,10 +466,10 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
throw new IllegalArgumentException("Message too large...");
}
} else {
if (log.isDebugEnabled()) {
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
log.withCorrelationId(correlationId)
logger.withCorrelationId(correlationId)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Expand All @@ -489,17 +483,17 @@ private Map<String, String> extractHeadersMapFromJmsMessage(final JmsMessage mes

@Override
public ThreadSafeDittoLoggingAdapter log() {
return log;
return logger;
}

@Override
public void onConnectivityConfigModified(final ConnectivityConfig connectivityConfig) {
final Amqp10Config amqp10Config = connectivityConfig.getConnectionConfig().getAmqp10Config();
if (hasMessageRateLimiterConfigChanged(amqp10Config)) {
this.messageRateLimiter = MessageRateLimiter.of(amqp10Config, messageRateLimiter);
log.info("Built new rate limiter from existing one with modified config: {}", amqp10Config);
logger.info("Built new rate limiter from existing one with modified config: {}", amqp10Config);
} else {
log.debug("Relevant config for MessageRateLimiter unchanged, do nothing.");
logger.debug("Relevant config for MessageRateLimiter unchanged, do nothing.");
}
this.connectivityConfig = connectivityConfig;
}
Expand Down
Loading

0 comments on commit 8354a33

Please sign in to comment.