diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/publishing/MqttPublisherActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/publishing/MqttPublisherActor.java index b9bc8eff0e..acae6ba5ca 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/publishing/MqttPublisherActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/publishing/MqttPublisherActor.java @@ -38,15 +38,14 @@ import akka.actor.Props; import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; +import akka.stream.BoundedSourceQueue; import akka.stream.KillSwitch; import akka.stream.KillSwitches; -import akka.stream.OverflowStrategy; import akka.stream.QueueOfferResult; import akka.stream.UniqueKillSwitch; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import akka.stream.javadsl.SourceQueueWithComplete; /** * Actor responsible for converting outbound {@code Signal}s to a {@code GenericMqttPublish} and sending them to an @@ -57,7 +56,7 @@ public final class MqttPublisherActor extends BasePublisherActor sourceQueue; + private BoundedSourceQueue sourceQueue; private KillSwitch killSwitch; @SuppressWarnings("java:S1144") @@ -135,11 +134,11 @@ public void preStart() throws Exception { killSwitch = sourceQueueWithCompleteUniqueKillSwitchPair.second(); } - private Pair, UniqueKillSwitch> initializeSourceQueue() { + private Pair, UniqueKillSwitch> initializeSourceQueue() { final var connectionConfig = connectivityConfig.getConnectionConfig(); final var mqttConfig = connectionConfig.getMqttConfig(); - return Source.queue(mqttConfig.getMaxQueueSize(), OverflowStrategy.dropNew()) + return Source.queue(mqttConfig.getMaxQueueSize()) .viaMat(KillSwitches.single(), Keep.both()) .to(Sink.foreach(this::sendMqttPublishMessageToBroker)) .run(getContext().getSystem()); @@ -250,38 +249,43 @@ protected CompletionStage publishMessage(final Signal signal, } private CompletionStage offerToSourceQueue(final MqttPublishingContext mqttPublishingContext) { - return sourceQueue.offer(mqttPublishingContext) - .thenCompose(queueOfferResult -> { - if (isEnqueued(queueOfferResult)) { - return mqttPublishingContext.getSendResultCompletableFuture(); - } else if (isDropped(queueOfferResult)) { - throw getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext); - } else { - throw getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext); - } - }) - .whenComplete((sendResult, error) -> { - if (null != error) { - final var errorDescription = MessageFormat.format( - "Failed to enqueue MQTT message to topic <{0}> for sending to broker.", - mqttPublishingContext.getGenericMqttPublish().getTopic() - ); - logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders()) - .error(error, errorDescription); - - escalate(error, errorDescription); - } - }); - } - - private static boolean isEnqueued(final QueueOfferResult queueOfferResult) { - return Objects.equals(queueOfferResult, QueueOfferResult.enqueued()); + final CompletionStage result; + final var queueOfferResult = sourceQueue.offer(mqttPublishingContext); + if (isDropped(queueOfferResult)) { + result = CompletableFuture.failedFuture( + getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext) + ); + } else if (isQueueClosed(queueOfferResult)) { + result = CompletableFuture.failedFuture( + getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext) + ); + } else if (queueOfferResult instanceof QueueOfferResult.Failure failure) { + result = CompletableFuture.failedFuture(failure.cause()); + } else { + result = mqttPublishingContext.getSendResultCompletableFuture(); + } + return result.whenComplete((sendResult, error) -> { + if (null != error) { + final var errorDescription = MessageFormat.format( + "Failed to enqueue MQTT message to topic <{0}> for sending to broker.", + mqttPublishingContext.getGenericMqttPublish().getTopic() + ); + logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders()) + .error(error, errorDescription); + + escalate(error, errorDescription); + } + }); } private static boolean isDropped(final QueueOfferResult queueOfferResult) { return Objects.equals(queueOfferResult, QueueOfferResult.dropped()); } + private static boolean isQueueClosed(final QueueOfferResult queueOfferResult) { + return queueOfferResult instanceof QueueOfferResult.QueueClosed$; + } + private MessageSendingFailedException getMessageSendingFailedExceptionBecauseDroppedBySourceQueue( final MqttPublishingContext mqttPublishingContext ) {