Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Streaming #1225

Merged
merged 108 commits into from Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
74aa7ee
Baseline project
huntc Sep 22, 2018
19d3892
Some initial codec stuff
huntc Sep 22, 2018
13ac5bd
Completed CONNECT codec along with tests
huntc Sep 23, 2018
3dc6a4c
ConnAck including tests
huntc Sep 23, 2018
4296318
Publish including tests
huntc Sep 23, 2018
5aa91b3
Publish ack and tests
huntc Sep 23, 2018
2e8e65d
PUBREC and tests
huntc Sep 23, 2018
d7e5ab6
PUBREL and tests
huntc Sep 23, 2018
1ddf549
PUB_COMP and friends
huntc Sep 23, 2018
872c626
The beginnings of SUBSCRIBE
huntc Sep 23, 2018
35d9186
SUBSCRIBE tests
huntc Sep 23, 2018
b0889d1
Now considers a max packet size when decoding - helps to avoid memory…
huntc Sep 23, 2018
60443e0
SUBACK and tests
huntc Sep 23, 2018
7c190af
UNSUBSCRIBE and tests
huntc Sep 23, 2018
6915a8e
UNSUBACK and tests
huntc Sep 23, 2018
f84c031
The last of the message types, including their tests
huntc Sep 23, 2018
2640290
I believe that packet id is like a correlation id for subscribe/unsub…
huntc Sep 24, 2018
230f015
The beginnings of the session flow - the test works too. :-)
huntc Sep 24, 2018
e7f484a
Some PR feedback - bit operation conveniences and some explicit lengt…
huntc Sep 25, 2018
be3c8fc
Implemented MQTT's special framing
huntc Sep 25, 2018
176d7a1
Embarked on the Java API - I've ran out of time today, and so the Jav…
huntc Sep 25, 2018
740ebd9
Fully working Java code including tests
huntc Sep 26, 2018
6b419f6
Completed the Java API
huntc Sep 26, 2018
59d62cd
An additional test for when packets are non-contiguous.
huntc Sep 26, 2018
e61f7ec
Corrected remaining length decoding and included the correct encoding
huntc Sep 26, 2018
6d51c83
PacketId is now an internal concern and will be managed by the sessio…
huntc Sep 27, 2018
78391b4
PR feedback - vectors are faster to append to
huntc Sep 27, 2018
3d3a2e0
Keep variable scope minimal
huntc Sep 27, 2018
ae68f7e
Some settings validation
huntc Sep 27, 2018
b988383
Some bitwise conveniences for AND
huntc Sep 27, 2018
42cdf40
Split the session into client session and server session as there are…
huntc Sep 28, 2018
a43c762
A little code saving
huntc Sep 28, 2018
c243b94
Generalised a convention for mapping Either in Java
huntc Sep 28, 2018
8c153b5
Separated out session state from the session flow as session state ha…
huntc Sep 28, 2018
2ba67cf
An initial cut of the Connector FSM for client-side operations. Uses …
huntc Sep 28, 2018
d2b66a5
Some tidying-up and styling for Akka Typed
huntc Sep 28, 2018
dfc5406
Improved the construction of the client session using apply and pass …
huntc Sep 29, 2018
75600b9
The beginnings of having connected up flows with the session actor
huntc Sep 29, 2018
974ad71
Some type aliasing to clarify what the option types stand for
huntc Sep 29, 2018
828a10d
Slight improvement on the type
huntc Sep 29, 2018
969d294
Some PR feedback
huntc Sep 30, 2018
0abe99b
All codec concerns are now with the session - the connector shouldn't…
huntc Sep 30, 2018
ad7bc73
Test coverage for connection management
huntc Oct 1, 2018
4107d0c
Improved session tests to cater for session management timings
huntc Oct 1, 2018
203bf15
Treat losing the event or command streams as losing the connection i.…
huntc Oct 1, 2018
ff21106
Upgrading of Akka so that some Akka Typed testing stuff can be utilised
huntc Oct 1, 2018
8ad6f42
Lots of progress here - there's a packet id allocator and most of the…
huntc Oct 1, 2018
fc686ef
Correctly deal with subscribers as children given the loss of type sa…
huntc Oct 2, 2018
8dd6c9d
Topic name filtering
huntc Oct 2, 2018
3023737
Publish received from a remote publication on a subscription
huntc Oct 2, 2018
a326747
Subscriber publish acknowledgements
huntc Oct 2, 2018
be8bbe6
Most of publishing is now in place - QoS of 0 and 1 work - will imple…
huntc Oct 2, 2018
961d03e
Introduces packet routing (formally the packet id allocator)
huntc Oct 3, 2018
ef8a168
Local and remote packet routing
huntc Oct 3, 2018
b8abf00
Safe actor naming
huntc Oct 3, 2018
0372f1a
Added command and event parallelism
huntc Oct 3, 2018
94241c5
Lossless actor name hashing
huntc Oct 3, 2018
ac5e2eb
Stash publications for the same topic
huntc Oct 4, 2018
5501448
Allow for multiple replies to a command
huntc Oct 5, 2018
8bda6d1
Indefinite retries
huntc Oct 5, 2018
20a757f
Publish timeout test
huntc Oct 5, 2018
ca415c7
QoS 2 publishing now supported from a client perspective
huntc Oct 5, 2018
a799e5b
Set the DUP flag when sending out duplicate publications
huntc Oct 5, 2018
9f3872a
Slightly improved pattern matching
huntc Oct 5, 2018
65e353b
Stash events while connecting
huntc Oct 5, 2018
3c7de8a
Defaults for settings
huntc Oct 5, 2018
091eecd
Tiny code improvement
huntc Oct 5, 2018
a9d9c96
PINGREQ test
huntc Oct 7, 2018
104a966
Client side PINGREQ and PINGRESP
huntc Oct 8, 2018
f012dbd
Client shutdown
huntc Oct 8, 2018
afe0de1
Implemented clean session
huntc Oct 8, 2018
4c06e43
Documentation
huntc Oct 8, 2018
543b310
UNSUBSCRIBE
huntc Oct 8, 2018
f9eb599
Test tidy-up
huntc Oct 9, 2018
acc59dd
Server implementation
huntc Oct 10, 2018
39284bf
Flailing test fix
huntc Oct 10, 2018
31aab77
More server progress
huntc Oct 12, 2018
1f75296
Unsubscribe tests
huntc Oct 12, 2018
f1617f8
Handle ping requests
huntc Oct 13, 2018
52d4aeb
Server side PINGREQ handling
huntc Oct 14, 2018
19360ba
Server side PUBLISH
huntc Oct 15, 2018
8ba4e59
Fail actors if required
huntc Oct 16, 2018
76fa928
Cleaned up test
huntc Oct 16, 2018
e0d3ac7
Tolerate PUBLISH being received locally while SUBSCRIBE is in motion
huntc Oct 17, 2018
12aabb8
Client termination notifications
huntc Oct 17, 2018
e3d574c
Documentation clarifications
huntc Oct 17, 2018
ba29585
Eradicate unwanted log messages
huntc Oct 17, 2018
a42c186
Use just `watch` instead of `watchWith`
huntc Oct 18, 2018
6065f8d
Correctly supervise
huntc Oct 18, 2018
f26106d
Stack tracings unrequired for exceptions
huntc Oct 18, 2018
d23f22d
Supervision loses the failure field
huntc Oct 18, 2018
ad2a2e1
Nicer actor names given URL encoding
huntc Oct 19, 2018
23819cb
Share a materialiser
huntc Oct 19, 2018
e13ae77
Small cleanup
huntc Oct 19, 2018
e924b5f
Small test cleanup
huntc Oct 19, 2018
ff5490a
Reduced unnecessary settings code
huntc Oct 19, 2018
495f823
Doco for settings
huntc Oct 19, 2018
689661e
Improved Java DSL with Pair usage
huntc Oct 19, 2018
ad03a2f
Doco explanations
huntc Oct 21, 2018
fc05ed8
Correctly scope test dependency
huntc Oct 21, 2018
abb20a9
Java create methods for API consistency
huntc Oct 21, 2018
de86e1a
Reduce API surface
huntc Oct 22, 2018
8f0f1fb
JMH Benchmarking
huntc Oct 22, 2018
f4b9d0a
Fixed a PUBACK issue
huntc Oct 24, 2018
e1aff25
The integration tests need to clean up the session each time
huntc Oct 24, 2018
9a62e05
Implement the cleaning of a session
huntc Oct 24, 2018
2f71093
Corrected max packet size
huntc Oct 24, 2018
5aca136
Small optimisation
huntc Oct 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sbt
Expand Up @@ -22,6 +22,7 @@ lazy val modules: Seq[ProjectReference] = Seq(
kudu,
mongodb,
mqtt,
mqttStreaming,
orientdb,
reference,
s3,
Expand Down Expand Up @@ -160,6 +161,11 @@ lazy val mongodb = alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb)

lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt)

lazy val mqttStreaming = alpakkaProject("mqtt-streaming", "mqttStreaming", Dependencies.MqttStreaming)
lazy val mqttStreamingBench = alpakkaProject("mqtt-streaming-bench", "mqttStreamingBench", Seq.empty)
.enablePlugins(JmhPlugin)
.dependsOn(mqtt, mqttStreaming)
ennru marked this conversation as resolved.
Show resolved Hide resolved

lazy val orientdb = alpakkaProject("orientdb",
"orientdb",
Dependencies.OrientDB,
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Expand Up @@ -42,6 +42,7 @@ The [Alpakka project](https://developer.lightbend.com/docs/alpakka/current/) is
* [JMS](jms.md)
* [MongoDB](mongodb.md)
* [MQTT](mqtt.md)
* [MQTT Streaming](mqtt-streaming.md)
* [OrientDB](orientdb.md)
* [Pulsar](external/pulsar.md)
* [Server-sent Events (SSE)](sse.md)
Expand Down
93 changes: 93 additions & 0 deletions docs/src/main/paradox/mqtt-streaming.md
@@ -0,0 +1,93 @@
# MQTT Streaming

@@@ note { title="MQTT Streaming" }

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on [mqtt.org](https://mqtt.org/).

@@@

@@@ note { title="Paho Differences" }

Alpakka contains @ref[another MQTT connector](mqtt.md) which is based on the Eclipse Paho client. Unlike the Paho version, this library has no dependencies other than those of Akka Streams i.e. it is entirely reactive. As such, there should be a significant performance advantage given its pure-Akka foundations, particularly in terms of memory usage given its diligent use of threads.

This library also differs in that it separates out the concern of how MQTT is connected. Unlike Paho, where TCP is assumed, this library can join in any flow. The end result is that by using this library, Unix Domain Sockets, TCP, UDP or anything else can be used to transport MQTT.

@@@

The Alpakka MQTT connector provides an Akka Stream flow to connect to MQTT brokers. In addition, a flow is provided so that you can implement your own MQTT server in the case where you do not wish to use a broker--MQTT is a fine protocol for directed client/server interactions, as well as having an intermediary broker.

### Reported issues

[Tagged issues at Github](https://github.com/akka/alpakka/labels/p%3Amqtt-streaming)

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-mqtt-streaming_$scala.binary.version$
version=$project.version$
}

## Flow through a client session

The following code illustrates how to establish an MQTT client session and join it with a TCP connection:

Scala
: @@snip [snip](/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) { #create-streaming-flow }

Java
: @@snip [snip](/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java) { #create-streaming-flow }

The resulting flow's type shows how `Command`s are received and `Event`s are emitted. With `Event`, they can
be either decoded successfully or not.

Run the flow by connecting a source of messages to be published via a queue:

Scala
: @@snip [snip](/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) { #run-streaming-flow }

Java
: @@snip [snip](/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java) { #run-streaming-flow }

We drop the first 3 events received as they will be ACKs to our connect, subscribe and publish. The next event
received is the publication to the topic we just subscribed to.

## Flow through a server session

The following code illustrates how to establish an MQTT server session and join it with a TCP binding:

Scala
: @@snip [snip](/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) { #create-streaming-bind-flow }

Java
: @@snip [snip](/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java) { #create-streaming-bind-flow }

The resulting source's type shows how `Event`s are received and `Command`s are queued in reply. Our example
acknowledges a connection, subscription and publication. Upon receiving a publication, it is re-published
from the server so that any client that is subscribed will receive it.

Run the flow:

Scala
: @@snip [snip](/mqtt-streaming/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) { #run-streaming-bind-flow }

Java
: @@snip [snip](/mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java) { #run-streaming-bind-flow }

## Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> mqtt-streaming/testOnly *.MqttFlowSpec
```

Java
: ```
sbt
> mqtt-streaming/testOnly *.MqttFlowTest
```
6 changes: 6 additions & 0 deletions docs/src/main/paradox/mqtt.md
Expand Up @@ -8,6 +8,12 @@ Further information on [mqtt.org](https://mqtt.org/).

@@@

@@@ note { title="Streaming Differences" }

Alpakka contains @ref[another MQTT connector](mqtt-streaming.md) which is _not_ based on the Eclipse Paho client, unlike this one. Please refer to the other connector where the differences are expanded on.

@@@

The Alpakka MQTT connector provides an Akka Stream source, sink and flow to connect to MQTT brokers. It is based on the [Eclipse Paho Java client](https://www.eclipse.org/paho/clients/java/).

### Reported issues
Expand Down
5 changes: 5 additions & 0 deletions mqtt-streaming-bench/src/main/resources/application.conf
@@ -0,0 +1,5 @@
akka {
loggers = []
stdout-loglevel = "OFF"
loglevel = "OFF"
}
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.mqtt

import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

import akka.actor.ActorSystem
import akka.stream.alpakka.mqtt.scaladsl.MqttFlow
import akka.stream.alpakka.mqtt.streaming.scaladsl.{ActorMqttServerSession, Mqtt}
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import akka.util.ByteString
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.openjdk.jmh.annotations._

import scala.concurrent.duration._
import scala.concurrent.Await

object MqttPerf {
/*
* An entry point for debugging purposes - invoke whatever you need to debug
*/
def main(args: Array[String]): Unit = {
val test = new MqttPerf()
test.setup()
try {
for (_ <- 0 until 10000) test.serverPublish()
} finally {
test.tearDown()
}
}
}

@State(Scope.Benchmark)
class MqttPerf {

import streaming.MqttCodec._

private implicit val system: ActorSystem = ActorSystem("mqttperf")
private implicit val mat: Materializer = ActorMaterializer()

private val (_, clientSource) = Source
.queue[MqttMessage](2, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()

private val (server, serverSource) = Source
.queue[streaming.Command[_]](1, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()

private val pubAckReceivedLock = new ReentrantLock()
private val pubAckReceived = pubAckReceivedLock.newCondition()

@Setup
def setup(): Unit = {
val host = "localhost"
val port = 9883

val connectionSettings = MqttConnectionSettings(s"tcp://$host:$port", "some-client-id", new MemoryPersistence)

val connAck = streaming.ConnAck(streaming.ConnAckFlags.None, streaming.ConnAckReturnCode.ConnectionAccepted)
val subAck = streaming.SubAck(streaming.PacketId(1), List(streaming.ControlPacketFlags.QoSAtLeastOnceDelivery))

val serverSession = ActorMqttServerSession(streaming.MqttSessionSettings())

val bound = Tcp()
.bind(host, port)
.flatMapMerge(
1, { connection =>
Source
.fromGraph(serverSource)
.via(
Mqtt
.serverSessionFlow(serverSession, ByteString(connection.remoteAddress.getAddress.getAddress))
.join(connection.flow)
)
.wireTap(Sink.foreach[Either[DecodeError, streaming.Event[_]]] {
case Right(streaming.Event(_: streaming.Connect, _)) =>
server.offer(streaming.Command(connAck))
case Right(streaming.Event(s: streaming.Subscribe, _)) =>
server.offer(streaming.Command(subAck.copy(packetId = s.packetId)))
case Right(streaming.Event(_: streaming.PubAck, _)) =>
pubAckReceivedLock.lock()
try {
pubAckReceived.signal()
} finally {
pubAckReceivedLock.unlock()
}
case _ =>
})
}
)
.toMat(Sink.ignore)(Keep.left)
.run()
Await.ready(bound, 3.seconds)

Source
.fromGraph(clientSource)
.via(
MqttFlow.atLeastOnce(
connectionSettings,
MqttSubscriptions("some-topic", MqttQoS.AtLeastOnce),
bufferSize = 8,
MqttQoS.AtLeastOnce
)
)
.mapAsync(1)(_.ack())
.runWith(Sink.ignore)
}

@Benchmark
def serverPublish(): Unit = {
server.offer(streaming.Command(streaming.Publish("some-topic", ByteString("some-payload"))))
pubAckReceivedLock.lock()
try {
pubAckReceived.await(3, TimeUnit.SECONDS)
} finally {
pubAckReceivedLock.unlock()
}
}

@TearDown
def tearDown(): Unit =
system.terminate()
}