Skip to content

Commit

Permalink
Replace deprecated Source.queue method in MqttPublisherActor.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Jun 8, 2022
1 parent 8d0c6ee commit 2e756ef
Showing 1 changed file with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@
import akka.actor.Props;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.BoundedSourceQueue;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.OverflowStrategy;
import akka.stream.QueueOfferResult;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;

/**
* Actor responsible for converting outbound {@code Signal}s to a {@code GenericMqttPublish} and sending them to an
Expand All @@ -57,7 +56,7 @@ public final class MqttPublisherActor extends BasePublisherActor<MqttPublishTarg
private final GenericMqttPublishingClient genericMqttPublishingClient;
private final OperationMode operationMode;

private SourceQueueWithComplete<MqttPublishingContext> sourceQueue;
private BoundedSourceQueue<MqttPublishingContext> sourceQueue;
private KillSwitch killSwitch;

@SuppressWarnings("java:S1144")
Expand Down Expand Up @@ -135,11 +134,11 @@ public void preStart() throws Exception {
killSwitch = sourceQueueWithCompleteUniqueKillSwitchPair.second();
}

private Pair<SourceQueueWithComplete<MqttPublishingContext>, UniqueKillSwitch> initializeSourceQueue() {
private Pair<BoundedSourceQueue<MqttPublishingContext>, UniqueKillSwitch> initializeSourceQueue() {
final var connectionConfig = connectivityConfig.getConnectionConfig();
final var mqttConfig = connectionConfig.getMqttConfig();

return Source.<MqttPublishingContext>queue(mqttConfig.getMaxQueueSize(), OverflowStrategy.dropNew())
return Source.<MqttPublishingContext>queue(mqttConfig.getMaxQueueSize())
.viaMat(KillSwitches.single(), Keep.both())
.to(Sink.foreach(this::sendMqttPublishMessageToBroker))
.run(getContext().getSystem());
Expand Down Expand Up @@ -250,38 +249,43 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
}

private CompletionStage<SendResult> offerToSourceQueue(final MqttPublishingContext mqttPublishingContext) {
return sourceQueue.offer(mqttPublishingContext)
.thenCompose(queueOfferResult -> {
if (isEnqueued(queueOfferResult)) {
return mqttPublishingContext.getSendResultCompletableFuture();
} else if (isDropped(queueOfferResult)) {
throw getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext);
} else {
throw getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext);
}
})
.whenComplete((sendResult, error) -> {
if (null != error) {
final var errorDescription = MessageFormat.format(
"Failed to enqueue MQTT message to topic <{0}> for sending to broker.",
mqttPublishingContext.getGenericMqttPublish().getTopic()
);
logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders())
.error(error, errorDescription);

escalate(error, errorDescription);
}
});
}

private static boolean isEnqueued(final QueueOfferResult queueOfferResult) {
return Objects.equals(queueOfferResult, QueueOfferResult.enqueued());
final CompletionStage<SendResult> result;
final var queueOfferResult = sourceQueue.offer(mqttPublishingContext);
if (isDropped(queueOfferResult)) {
result = CompletableFuture.failedFuture(
getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(mqttPublishingContext)
);
} else if (isQueueClosed(queueOfferResult)) {
result = CompletableFuture.failedFuture(
getMessageSendingFailedExceptionBecauseSourceQueueClosed(mqttPublishingContext)
);
} else if (queueOfferResult instanceof QueueOfferResult.Failure failure) {
result = CompletableFuture.failedFuture(failure.cause());
} else {
result = mqttPublishingContext.getSendResultCompletableFuture();
}
return result.whenComplete((sendResult, error) -> {
if (null != error) {
final var errorDescription = MessageFormat.format(
"Failed to enqueue MQTT message to topic <{0}> for sending to broker.",
mqttPublishingContext.getGenericMqttPublish().getTopic()
);
logger.withCorrelationId(mqttPublishingContext.getSignalDittoHeaders())
.error(error, errorDescription);

escalate(error, errorDescription);
}
});
}

private static boolean isDropped(final QueueOfferResult queueOfferResult) {
return Objects.equals(queueOfferResult, QueueOfferResult.dropped());
}

private static boolean isQueueClosed(final QueueOfferResult queueOfferResult) {
return queueOfferResult instanceof QueueOfferResult.QueueClosed$;
}

private MessageSendingFailedException getMessageSendingFailedExceptionBecauseDroppedBySourceQueue(
final MqttPublishingContext mqttPublishingContext
) {
Expand Down

0 comments on commit 2e756ef

Please sign in to comment.