Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow Initial Data Transmission Issue in MQTT Client #629

Closed
SnowCharmQ opened this issue Apr 23, 2024 · 5 comments
Closed

Slow Initial Data Transmission Issue in MQTT Client #629

SnowCharmQ opened this issue Apr 23, 2024 · 5 comments
Assignees
Labels

Comments

@SnowCharmQ
Copy link

🐛 Bug Report

I encapsulate an MQTT service client. This API will monitor the input data and publish the data to the MQTT broker when the data changes. However, when I computed the throughput of the service, I noticed that the transmission speed of the first batch samples of data was extremely slow, taking 10 seconds or even longer. The transmission speed of the subsequent data sent meets expectations. Based on the calculated time, the printed results are as follows: (on the receiver side, but the result is the same on the sender side, there is much useless information in Logcat)

image

Is it a warm-up process? Or just a bug? This bug could lead to a loss of real-time performance in the program.

Code sample

    @OptIn(DelicateCoroutinesApi::class)
    fun request(
        qos: QosOption,
        data: MutableStateFlow<Pair<Long, DoubleArray>>
    ) {
        val mqttQos = qos.toQos()
        thread {
            val client: Mqtt5AsyncClient = MqttClient.builder()
                .useMqttVersion5()
                .identifier(mqttTask.identifier)
                .serverHost(GlobalConfig.mqtt.mqttIp)
                .serverPort(GlobalConfig.mqtt.mqttPort)
                .buildAsync()
            client.connect()
                .whenComplete { _: Mqtt5ConnAck?, throwable: Throwable? ->
                    if (throwable != null) {
                        return@whenComplete
                    } else {
                        var s = System.currentTimeMillis()
                        var cnt = 0
                        GlobalScope.launch(Dispatchers.IO) {
                            data.collect {
                                if (it.first == 0L) return@collect
                                client.publishWith()
                                    .topic(mqttTask.topic)
                                    .qos(mqttQos)
                                    .payload(it.toString().toByteArray())
                                    .send()
                                cnt += 1
                                if (cnt % GlobalConfig.sampleRate == 0) {
                                    val e = System.currentTimeMillis()
                                    println("${e - s} $cnt")
                                    s = e
                                }
                            }
                        }
                    }
                }
        }
    }

Environment

Windows 10

Android sdk33

Version: MQTT v5.0

MQTT Broker: EMQX

📈 Expected behavior

All batches of data should be transferred in 1s.

@SnowCharmQ SnowCharmQ added the bug label Apr 23, 2024
@SnowCharmQ
Copy link
Author

I can ensure that the quantity of data is large enough to be transmitted.

@pglombardo
Copy link
Contributor

pglombardo commented Apr 24, 2024

Hi @SnowCharmQ,

My initial thought would be at least partially due to warm up but >10 seconds seems extreme. I also see you connect on every call to request.

A couple ideas to potentially reduce the problem area would be:

  1. To rule out any broker issues, could you try with another? Maybe test messages to a any non EMQX public broker?
  2. You could keep a pre-initialized and connected client that is reused on each call to request. This should avoid repeating the client initialization and TCP connection establishment.
  3. You could profile the broker connection - how long is client.connect() on the first call versus subsequent?

Visibility is likely key here - The results from one of the above should put us on the right path...

@LukasBrand
Copy link
Contributor

@OptIn(DelicateCoroutinesApi::class) //#1
fun request(
        qos: QosOption,
        data: MutableStateFlow<Pair<Long, DoubleArray>> //#2
) {
    val mqttQos = qos.toQos()
    thread { //#3
        val client: Mqtt5AsyncClient = MqttClient.builder() //#4
                .useMqttVersion5()
                .identifier(mqttTask.identifier)
                .serverHost(GlobalConfig.mqtt.mqttIp)
                .serverPort(GlobalConfig.mqtt.mqttPort)
                .buildAsync()
        client.connect() //#5
                .whenComplete { _: Mqtt5ConnAck?, throwable: Throwable? ->
                    if (throwable != null) {
                        return@whenComplete
                    } else {
                        var s = System.currentTimeMillis() //#6
                        var cnt = 0
                        GlobalScope.launch(Dispatchers.IO) { //#1
                            data.collect { //#7
                                if (it.first == 0L) return@collect
                                client.publishWith()
                                        .topic(mqttTask.topic)
                                        .qos(mqttQos)
                                        .payload(it.toString().toByteArray())
                                        .send() //#8
                                cnt += 1
                                if (cnt % GlobalConfig.sampleRate == 0) {
                                    val e = System.currentTimeMillis()
                                    println("${e - s} $cnt")
                                    s = e
                                }
                            }
                        }
                    }
                }
    }
}

Hello @SnowCharmQ,

I really recommend you to have another read of the corresponding documentations and guides.
Your code has a lot of wrong assumptions, dubious architecture decisions, and a general wild mixture of asynchronous
APIs.
I added comments with numbers to your code and try to point you to things that should really be improved.

  1. @OptIn(DelicateCoroutinesApi::class): The use of the GlobalScope inside an Android app is really discouraged.
    Besides that, the complete function is a network action and should be in an IO scope. Have a look at:
  2. MutableStateFlow: Generally, MutableStateFlows should not be used for data streaming outside the UI Context of an
    Android application. Sending data over the network should not be coupled to UI. Instead, you should use the
    plain Flow API. See:
  3. Threads feel like the wrong technology in your tech stack and are somewhat overkill in your scenario. Instead, I
    would recommend changing the method itself to a suspendable function and leaving it up to the caller. Also, the
    client itself already uses the netty thread pool for executions.
  4. Every request generates a new instance of an MQTT client. It does not look like the configuration is changing
    during an execution so a single instance could be used for every request.
  5. Connection establishment is NOT cheap. After all, a network connection is created. I would really recommend using a
    single connection for all publishes. The client connection should be closely coupled with an application lifecycle,
    i.e. maybe a lazy initialization on first application mqtt request, and only disconnecting if the application goes
    into the background/suspend or is exited.
  6. In Kotlin you can use already existing constructs to measure time.
    See: https://kotlinlang.org/docs/time-measurement.html#measure-time
  7. The MQTT client already supports reactive stream libraries such as Reactor or RxJava. It might be a better idea to
    transform the stream using a custom adapter to Kotlin Flow instead of collecting it here to be used with the async
    API.
  8. You are using the async client and only execute a send() which (as async implied) returns immediately doing the
    publish in the background. It would only be blocking by getting the CompletableFuture with either get() or join(). So
    what you are actually measuring is only the time the MutableStateFlow takes before its first real (not 0L) element is
    emitted and not the duration of a publish. Nothing to do with the MQTT client.

As to why the MutableStateFlow takes so much time for its first element to be emitted I can only guess without the
corresponding code. As it is a hot flow it shouldn't have anything to do with how you collect the flow in the example.

I hope my points can help you improve your application and fix your issue though I really recommend having a look
at the overall technologies you are using in this code snippet.

@LukasBrand LukasBrand self-assigned this Jun 17, 2024
@SnowCharmQ
Copy link
Author

Hi @LukasBrand,

I have read your comment carefully. The issues you point out are truly helpful and I notice there are a lot of problems that are required to fix. I will study them in detail and try to improve my application.

Thanks for your time and advice!

@LukasBrand
Copy link
Contributor

Thank you! Have a great day!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants