Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design of new MQTTv5 Java client #389

Closed
jpwsutton opened this issue Jul 28, 2017 · 12 comments
Closed

Design of new MQTTv5 Java client #389

jpwsutton opened this issue Jul 28, 2017 · 12 comments

Comments

@jpwsutton
Copy link
Member

jpwsutton commented Jul 28, 2017

With the latest MQTTv5 spec being available here, @icraggs and I are starting work on implementing some MQTT v5 clients. This GitHub issue should be the main focal point for any discussion around how the community would like the new client and it's APIs to be designed. I'm currently working with the intention of modeling how the existing 3.1.1 client works as it seems to work well for most people. However if you can think of any improvements that we can bake in from the start, that would be great.

Some useful links:

@timothyjward
Copy link

However if you can think of any improvements that we can bake in from the start, that would be great.

One thing that I've always found a little disappointing in the existing API is its heavy use of a single callback to try to handle all of the asynchronous state in the client. A Promise model, using lambda-friendly one time callbacks would be a much nicer way to interact, for example when using the following:

Promise<Void> connect(); 

I can write code like:

Promise<Void> result = client.connect()
        .then(p -> client.publish("myTopic", myMessage))
        .then(p -> System.out.println("Message Sent"),
              p -> System.out.println("Oops... " + p.getFailure().getMessage());

The good thing about this model is that I don't need to worry about working out which message has been sent (or failed to send) as each Promise represents a single connect/publish/...

There are numerous examples of Promises that already exist - as a biased individual I would suggest the OSGi Promise but there are other examples in projects like the netty communications framework.

@timothyjward
Copy link

However if you can think of any improvements that we can bake in from the start, that would be great.

Consuming messages from an MQTT client is an asynchronous stream of data. Currently this has to be handled by registering a single callback interface on subscription, or by having a single callback which gets all the messages for every topic. A more elegant design would be to use a reactive model, allowing each subscription to be separate, and to make it easier to compose a data processing pipeline.

An example of such a model is covered by the upcoming OSGi PushStreams release:

Promise<Optional<Void>> distress = client.subscribe("myTopic")
        .map(MqttMessage::getPayload)
        .map(b -> new String(b, "UTF-8"))
        .filter(s -> "HELP".equals(s))
        .findFirst();

distress.then(p -> p.ifPresent(x -> System.out.println("Sound the alarm!")),
              p -> System.out.println("Something went wrong - send an inspection team"));

@timothyjward
Copy link

However if you can think of any improvements that we can bake in from the start, that would be great.

The Paho MqttMessage type has a couple of shortcomings.

Firstly, and most importantly, because the Message exposes a byte[] it leaks mutable internal state unless copies are made. This is inherently unsafe in a multi-threaded system. It would make much more sense to use the NIO ByteBuffer as the input type. These can easily be made read only (and therefore safe to share between threads), can easily be created from a byte[], and also have numerous useful methods for reading/writing data.

Secondly, a Message has no link to its topic. This means that when receiving a message I always have to pass around two references, one for the message and the other for the topic. This feels like it should be encapsulated by the message as it is an intimately related piece of data.

@fpagliughi
Copy link

fpagliughi commented Aug 4, 2017 via email

@alexhelder
Copy link

Will the new Java client support Android? I think it would be best not to choose any particular reactive API, but allow easy integration with the existing possibilities like RxJava 2 (which can be used on Android), for example.

@RyanRamchandar
Copy link

RyanRamchandar commented Aug 8, 2017

I agree that using Reactive Streams [1] (eg. RxJava 2) makes sense here as MQTT messages and topics are inherently streams of data and subscriptions to those streams.

RxJava 2 is a reactive stream library for Java that is Reactive-Streams version 1.0.0 compliant and interoperable with other stream libraries including Java 8 streams.

In many of my projects where I'm using the Paho MQTT client, I am already wrapping the MQTT message callbacks in reactive streams. Having this built in natively to the MQTTv5 Java client would be a huge win for developers.

[1] http://reactivex.io/intro.html

@timothyjward
Copy link

Will the new Java client support Android?

Whilst I'm not aware of anyone using them on Android, OSGi Promises and OSGi PushStreams have no dependencies that aren't available in the Android SDK.

I think it would be best not to choose any particular reactive API, but allow easy integration with the existing possibilities like RxJava 2

I think that it is necessary to make an API choice, otherwise we force every user to write their own adapter code. I also think that it would be best not to create a special API just for the Paho MQTT client. I do agree, however, that whatever API choice is made should have few dependencies and be easy to adapt to whichever other API(s) users want to use. I think this fits with your requirement?

@timothyjward
Copy link

RxJava 2 is a reactive stream library for Java that is Reactive-Streams version 1.0.0 compliant and interoperable with other stream libraries including Java 8 streams.

Whilst RxJava is a popular choice I would be a little careful about tying the Paho client to it directly. The RxJava 2 binary is 1.2 MB (release 2.1.2) and contains a lot of pieces that will be irrelevant for many of the Paho use cases. Given that this MQTT client needs to be suitable for embedded systems as well as larger ones I think a smaller, simpler API that could easily be adapted into an RxJava Reactive Stream would be a better choice.

The latest snapshots of OSGi PushStreams are about 120k (including source), and they have no dependency on the OSGi framework, so the client could still be transparently used in non-OSGi applications.

@RyanRamchandar
Copy link

@timothyjward I agree that a smaller, simpler API is the right approach.

Instead of tying the Paho client directly to a specific reactive stream library like RxJava2 (or any other), I would suggest the client conforms to a standard so that any reactive library which conforms to that standard can be used. Reactive-Streams states it is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. For example, RxJava2 only has a single dependency on Reactive-Streams.

The Paho client can depend on the Reactive Streams library (v1.0.0 is ~125KB) and this will allow the user to bring their own reactive implementation (be it RxJava2 or another).

@timothyjward
Copy link

The Paho client can depend on the Reactive Streams library (v1.0.0 is ~125KB) and this will allow the user to bring their own reactive implementation (be it RxJava2 or another).

My concern is that this approach will require the user to bring their own reactive implementation, or require the Paho client to provide its own basic implementation for when no other library is present. Without this the API would have to be split in an awkward way, segregating the preferred streaming approaches from the "core" client and emphasising other non-streaming ways to receive messages. There is also the question of what should happen when someone attempts to use a streaming method in the absence of a relevant library.

I'm further concerned that putting the onus of plugability on Paho will lead to the creation of multiple extension jars/bundles that are needed to plug in the relevant library implementations. There's also the question of what happens when multiple extensions are present at runtime.

Finally, I'm not sure that the Reactive Streams API is the best fit for the Paho Client. Reactive Streams use a client controlled model for delivery (through requests from the subscription), which in turn forces the source of the messages to be responsible for buffering. If a slow consumer requests messages at a low rate then what should the Paho client do with the messages being pushed to it by the server?

In summary, I think that Paho does need to pick a library that it will use for streaming. Allowing further interoperability through the Reactive-Streams API should definitely be a part of that decision, but I'm not sure that it means natively using the Reactive Streams API in the Paho client is the answer.

@helins
Copy link
Contributor

helins commented Nov 23, 2017

A callback based API is certainly the right choice as it is the simplest one. Trying to bundle something like an MQTT client with a framework will harm composition. Instead, callbacks allow to build about anything. Plus, whatever framework you decide is best will certainly not suit all the use cases you haven't imagined. Let us not try to be too smart.

If someone really wants a high level API, he should simply build a wrapper around callbacks.

@RyanRamchandar
Copy link

Reactive Streams has officially been included in Java 9 as Flow via JEP-266. However, I understand v5 of the Paho client is targeting Java 8.

Reactive Streams use a client controlled model for delivery (through requests from the subscription), which in turn forces the source of the messages to be responsible for buffering. If a slow consumer requests messages at a low rate then what should the Paho client do with the messages being pushed to it by the server?

Back pressure is a feature of Reactive Streams and can be handled asynchronously in a non-blocking way. See: https://www.reactivemanifesto.org/glossary#Back-Pressure

And from http://www.reactive-streams.org:

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.

@icraggs icraggs modified the milestones: MQTTv5, 1.3.0 Feb 20, 2019
@rdasgupt rdasgupt modified the milestones: 1.3.0, 1.2.5 Jun 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
MQTTv5
Awaiting triage
Development

No branches or pull requests

8 participants