Skip to content

Commit

Permalink
Reset primal behaviour of MqttPublisherActor when offering to source …
Browse files Browse the repository at this point in the history
…queue fails.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Jun 9, 2022
1 parent 2e756ef commit f60e1f0
Showing 1 changed file with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,29 +250,25 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,

private CompletionStage<SendResult> offerToSourceQueue(final MqttPublishingContext mqttPublishingContext) {
final CompletionStage<SendResult> result;

final var queueOfferResult = sourceQueue.offer(mqttPublishingContext);
if (isDropped(queueOfferResult)) {
result = CompletableFuture.failedFuture(
getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext)
);
throw getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext);
} else if (isQueueClosed(queueOfferResult)) {
result = CompletableFuture.failedFuture(
getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext)
);
throw getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext);
} else if (queueOfferResult instanceof QueueOfferResult.Failure failure) {
result = CompletableFuture.failedFuture(failure.cause());
} else {
result = mqttPublishingContext.getSendResultCompletableFuture();
}

return result.whenComplete((sendResult, error) -> {
if (null != error) {
final var errorDescription = MessageFormat.format(
"Failed to enqueue MQTT message to topic <{0}> for sending to broker.",
mqttPublishingContext.getGenericMqttPublish().getTopic()
);
logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders())
.error(error, errorDescription);

logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders()).error(error, errorDescription);
escalate(error, errorDescription);
}
});
Expand All @@ -282,10 +278,6 @@ private static boolean isDropped(final QueueOfferResult queueOfferResult) {
return Objects.equals(queueOfferResult, QueueOfferResult.dropped());
}

private static boolean isQueueClosed(final QueueOfferResult queueOfferResult) {
return queueOfferResult instanceof QueueOfferResult.QueueClosed$;
}

private MessageSendingFailedException getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(
final MqttPublishingContext mqttPublishingContext
) {
Expand All @@ -306,6 +298,10 @@ private MessageSendingFailedException getMessageSendingFailedExceptionBecauseDro
.build();
}

private static boolean isQueueClosed(final QueueOfferResult queueOfferResult) {
return queueOfferResult instanceof QueueOfferResult.QueueClosed$;
}

private static MessageSendingFailedException getMessageSendingFailedExceptionBecauseSourceQueueClosed(
final MqttPublishingContext mqttPublishingContext
) {
Expand Down

0 comments on commit f60e1f0

Please sign in to comment.