Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT streaming: Document PubAck handling #1908

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# PR #1908
# https://github.com/akka/alpakka/pull/1908
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.javadsl.MqttSession.ask")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.mqtt.streaming.scaladsl.MqttSession.?")
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package akka.stream.alpakka.mqtt.streaming
package javadsl

import java.util.concurrent.CompletionStage

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
Expand All @@ -16,6 +18,8 @@ import akka.stream.alpakka.mqtt.streaming.scaladsl.{
}
import akka.stream.javadsl.Source

import scala.compat.java8.FutureConverters._

/**
* Represents MQTT session state for both clients or servers. Session
* state can survive across connections i.e. their lifetime is
Expand All @@ -32,6 +36,18 @@ abstract class MqttSession {
*/
def tell[A](cp: Command[A]): Unit

/**
* Ask the session to perform a command regardless of the state it is
* in. This is important for sending Publish messages in particular,
* as a connection may not have been established with a session.
* @param cp The command to perform
* @tparam A The type of any carry for the command.
* @return A future indicating when the command has completed. Completion
* is defined as when it has been acknowledged by the recipient
* endpoint.
*/
def ask[A](cp: Command[A]): CompletionStage[A]

/**
* Shutdown the session gracefully
*/
Expand All @@ -47,6 +63,9 @@ abstract class MqttClientSession extends MqttSession {
override def tell[A](cp: Command[A]): Unit =
underlying ! cp

override def ask[A](cp: Command[A]): CompletionStage[A] =
(underlying ? cp).toJava

override def shutdown(): Unit =
underlying.shutdown()
}
Expand Down Expand Up @@ -91,6 +110,9 @@ abstract class MqttServerSession extends MqttSession {
override def tell[A](cp: Command[A]): Unit =
underlying ! cp

override def ask[A](cp: Command[A]): CompletionStage[A] =
(underlying ? cp).toJava

override def shutdown(): Unit =
underlying.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ abstract class MqttSession {
*/
def ![A](cp: Command[A]): Unit

/**
* Ask the session to perform a command regardless of the state it is
* in. This is important for sending Publish messages in particular,
* as a connection may not have been established with a session.
* @param cp The command to perform
* @tparam A The type of any carry for the command.
* @return A future indicating when the command has completed. Completion
* is defined as when it has been acknowledged by the recipient
* endpoint.
*/
final def ask[A](cp: Command[A]): Future[A] =
this ? cp

/**
* Ask the session to perform a command regardless of the state it is
* in. This is important for sending Publish messages in particular,
* as a connection may not have been established with a session.
* @param cp The command to perform
* @tparam A The type of any carry for the command.
* @return A future indicating when the command has completed. Completion
* is defined as when it has been acknowledged by the recipient
* endpoint.
*/
def ?[A](cp: Command[A]): Future[A]

/**
* Shutdown the session gracefully
*/
Expand Down Expand Up @@ -171,6 +196,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
case c: Command[A] => throw new IllegalStateException(c + " is not a client command that can be sent directly")
}

override def ?[A](cp: Command[A]): Future[A] =
???

override def shutdown(): Unit = {
system.stop(clientConnector.toClassic)
system.stop(consumerPacketRouter.toClassic)
Expand Down Expand Up @@ -513,6 +541,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
case c: Command[A] => throw new IllegalStateException(c + " is not a server command that can be sent directly")
}

override def ?[A](cp: Command[A]): Future[A] =
???

override def shutdown(): Unit = {
system.stop(serverConnector.toClassic)
system.stop(consumerPacketRouter.toClassic)
Expand Down
20 changes: 12 additions & 8 deletions mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,20 @@ public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
SourceQueueWithComplete<Command<Object>> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
new Command<>(
new Publish(
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString("ohi"))));
CompletionStage<Done> publishDone =
session.ask(
new Command<>(
new Publish(
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString("ohi")),
Done.getInstance()));
// #run-streaming-flow

CompletionStage<Publish> event = run.second();
Publish publishEvent = event.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
publishDone.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

CompletionStage<Publish> events = run.second();
Publish publishEvent = events.toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertEquals(publishEvent.topicName(), topic);
assertEquals(publishEvent.payload(), ByteString.fromString("ohi"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ trait MqttFlowSpec extends WordSpecLike with Matchers with BeforeAndAfterAll wit

commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
commands.offer(Command(Subscribe(topic)))
session ! Command(
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
)
val publishDone = session ? Command(
Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi")),
Done
)

//#run-streaming-flow

publishDone.futureValue shouldBe Done
events.futureValue match {
case Publish(_, `topic`, _, bytes) => bytes shouldBe ByteString("ohi")
case e => fail("Unexpected event: " + e)
Expand Down