Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpakka MQTT Streaming not receiving message #3109

Open
kirankbs opened this issue Feb 16, 2024 · 0 comments
Open

Alpakka MQTT Streaming not receiving message #3109

kirankbs opened this issue Feb 16, 2024 · 0 comments

Comments

@kirankbs
Copy link

I am trying to setting up small example to stream MQTT messages but not successful.

import akka.Done
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError
import akka.stream.alpakka.mqtt.streaming._
import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttClientSession, Mqtt}
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source, Tcp}
import akka.util.ByteString

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Promise}

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system: ActorSystem = ActorSystem("MqttClient")

    val host = "localhost"
    val port = 1883

    val connect = Connect("some-client-id", ConnectFlags.None)
    val subscribe = Subscribe("measurements")
    val pubAck = PubAck(PacketId(1))

    val settings = MqttSessionSettings()
    val clientSession = ActorMqttClientSession(settings)

    val (client, clientSource) = Source
      .queue[Command[Nothing]](2, OverflowStrategy.backpressure)
      .toMat(BroadcastHub.sink)(Keep.both)
      .run()

    val subscribed = Promise[Done]()

    Source
      .fromGraph(clientSource)
      .via(
        Mqtt
          .clientSessionFlow(clientSession, ByteString("1"))
          .join(Tcp().outgoingConnection(host, port))
      )
      .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
        case Right(Event(conAck: ConnAck, carry: Option[_])) =>
          println(s"--> Client Connected $conAck and $carry")
          subscribed.success(Done)
        case Right(Event(sub: SubAck, carry: Option[_])) =>
          println(s"--> Client Subscribed $sub and carry: $carry")
          subscribed.success(Done)
        case Right(Event(p: Publish, carry: Option[_])) =>
          println(s"--> Client published $p and carry: $carry")
          client.offer(Command(pubAck.copy(packetId = p.packetId.get)))
        case Right(event: Event[_]) => println(s"unknown event $event")
        case Left(error) => println(s"failure $error")
      })
      .runWith(Sink.ignore)

    client.offer(Command(connect))
    client.offer(Command(subscribe))
    Await.ready(subscribed.future, 3.seconds)

  }
}

Response:

--> Client Connected ConnAck(ConnAckFlags(1),ConnAckReturnCode(0)) and None
--> Client Subscribed SubAck(PacketId(1),Vector(ControlPacketFlags(1))) and carry: None

Command:

mosquitto_pub -d  -h localhost -p 1883 -u 7f9a441e-11f9-4e87-9b6a-1bfa2ce67329 -t 'measurements' -m '[{"name": "measurement1", "value": 10000}]'

I am unable to receive messages.

Please help me to spot anything I am missing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants