-
Notifications
You must be signed in to change notification settings - Fork 517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
try to fix mqtt issue with concurrent connections #905
try to fix mqtt issue with concurrent connections #905
Conversation
moved mqtt->disconnect() out of onConnect() new Mosquitto\Client() now randomises the clientID and cleans out connections
|
Fundamentally I don't think this is the right solution. I'd rather see the data to be published, pushed onto a queue (with a date stamp) as a JSON payload (or make that configurable) and the MQTT input script altered to pick up that data and publish it. Any 'fix' like this, is, I think, simply perpetuating the problem. IMO MQTT IO should all be handled from a single point so the connection / disconnection is a one off event for EmonCMS. |
|
Just tried this 'fix' and no better here (probably worse), I've attached a MQTT log below, and you can see that it's constantly connecting and disconnecting, and the result is that no messages are being received. As Brian has already said - connection/disconnection should be a one off event, and not occurring with every message. Surely it should connect once and stay connected, probably with frequent checks to ensure that it's still connected. Although different library & OS, I'm using this to connect a esp8266 device to mosquitto, and it performs brilliantly. Don't know if you can glean anything from that example. |
|
Good idea @borpin An item to be published could be pushed onto a redis queue with rpush in the process list Then a check of $redis->llen and then $redis->pop could be called here, followed by $mqtt->publish: |
|
It would mean that phpmqtt_input would need to be running even if data is being posted to emoncms via the HTTP api.. |
|
Only if the publish facility was being used. I actually use the Could we roll my PR #821 into this work so a JSON object can be used so providing a timestamp to MQTT data? Perhaps also the setting of the base topics in the settings file would not go amiss either. |
|
Thanks @borpin best to keep these features separate for now, so that we can test each one step at a time. Any objections to moving publishing to phpmqtt_input @Paul-Reed @pb66? |
I already suggested moving the publishing to the phpmqtt_input.php and use a Redis queue just a couple of days ago and it attracted the usual level of interest. https://community.openenergymonitor.org/t/mqtt-log-entries/7784/4?u=pb66 There are 2 addition points to consider. One is the QoS level, that is essentially 0 currently due to the fact it keeps reconnecting as a new client. This sort of "fits" with the fact no timestamps are published, so it must remain QoS 0 or we need to think about the publish format(s). And two, there needs to be some way of limiting the number of publishes, imagine a network is down for 1/2 and hour, that would result in 360 frames being buffered and then posted using the bulk api, this means 360 "publish to mqtt" processes being run PER PUBLISHED VALUE, using @Paul-Reed's example, when the network comes up there will be 3x360 publish to mqtt's all within a couple of seconds. Even though the datapoints are timestamped coming in via http, they will be published as a mad rush of values with no time references. This will of course also impact decisions made about point one too. We still need a wider discussion about emoncms and mqtt, more specifically about QoS and timestamps which will most likely dictate a new (or additional?) topic structure and/or payload format.
No, it would mean that "phpmqtt_input.php" would always need to be running if MQTT is enabled, the publish to mqtt process should only be made available if MQTT is enabled. ie the "phpmqtt_input.php" script is pretty much the whole MQTT implementation and when that is enabled then MQTT dependent feature become available elsewhere eg the eventp module perhaps in time. To avoid confusion with both the roll-out and with future "why does the phpmqtt_input.php script do output?" questions, I think we should rename the script "emoncms_mqtt.php". "emoncms" helps identify what the script belongs to, "mqtt" is a wider description than "mqtt_input" and the extension tells us it's PHP, no need to include that in the title. Plus I'm sure I will remember "emoncms_mqtt.php" where as every time I have had to write "phpmqtt_input.php" in this post, I have had to look back at @TrystanLea's post to make sure I had the right name to avoid causing further confusion, it doesn't exactly roll off the tongue! |
|
Although the MQTT standard does have a persistent session feature that in theory should buffer messages, I am not convinced this has been well implemented by brokers and could put quite a strain on resources. For now, I'd favour a simple implementation of publish that works at QoS 0 but allows the payload to be configured either as a simple value or a timestamped JSON payload. I agree that the script should be renamed. Initially it's use could be limited to those needing publish as providing an update path for users will be a little messy. |
No objections, it sounds like the way forward. Until then, IMO, leave the mqtt format as it is. ie Paul |
moved the mqtt publish into phpmqtt_input.php reads from redis queue before publishing all the queued messages to mqtt
…connections-not-closing
scripts/phpmqtt_input.php
Outdated
| $mqtt_client->setCredentials($mqtt_server['user'],$mqtt_server['password']); | ||
| $mqtt_client->connect($mqtt_server['host'], $mqtt_server['port'], 5); | ||
| $topic = $mqtt_server['basetopic']."/#"; | ||
| //echo "Subscribing to: ".$topic."\n"; | ||
| $log->info("Subscribing to: ".$topic); | ||
| $mqtt_client->subscribe($topic,2); | ||
|
|
||
| // PUBLISH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TrystanLea - doesn't seem to loop continually? only grabs the redis data on "first run"... any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it looping the whole queue in the first run or just the first item in the queue?
I suspect the former, I think the publish will only get called once unless there are some connection issues.
I'm not convinced that is the right place for the publish. Surely it needs to be in the outer loop does it not? Where it is now it will only get called when there is no connection and it's been at least 5s since the last attempted reconnection.
Also I think you need to loop faster and just pop the first item in the queue each loop otherwise if there is a backlog of data to be published it will block new inputs arriving. eg after a reconnection and a flood of processing ends up with hundreds or publish to mqtt's, if you loop through the whole list it will take a while. This is especially important as the incoming data is not currently timestamped until emoncms processes it so any delays will alter the datapoint timestamps, or if there is a disconnection (possibly due to too many publishes) the incoming data queued at the broker will be lost as emoncms will reconnect with a new client id, ie clean slate.
Incoming data should take priority over outgoing at this stage.
|
Thankyou @emrysr! |
|
Looking good.... |
|
Should; |
|
Not sure if this is the right way to do this (comment on a merged PR), but there is a problem with this - I was surprised it was merged so quickly. Firstly this still has the issue in that if the connect fails, the subscribe is still attempted and so an exception will be raised. The 'right' way to do it is do the subscribe in the onConnect callback if the CONACK is zero. Secondly, the publish will be attempted whether or not there is a connection. This will again raise an exception (I suspect) which will not be caught (I think). Is this designed such that all values are loaded onto the queue or just the most recent value? Multiple values are pointless at a QoS of 2 without a persistent connection. In addition I think the publish call may be a blocking call. If it is and there is a lot in the queue, there might be the potential for an incoming message to be missed. Was this tested with the broker being switched off for say 30 mins while the publish queue filled up? |
All messages. Have a look at the attached log, where I stopped mosquitto for 5 minutes, and then restarted it in verbose mode. I don't know if it's something to do with line 40 - |
I had already pointed this out as an issue before it was merged
More to the point, they are pointless without a timestamp. The lack of a persistent connection is less of an issue when publishing as the buffering occurs between the topic and the subscribing client, emoncms could effectively publish 10 different values as 10 different clientids and all 10 messages will still land in the topic and be buffered for any clients that are subscribed to that topic. (assuming of course that emoncms doesn't continue to publish them whilst there is no connection). After a network outage there maybe 100's of "publish to mqtt's" processed and in the time it takes to publish all those there could be a new value to "publish to mqtt" so there may not be a "last value" to focus on, the last of the batch will be the most current and valid but it will be identifiable. If you only wanted to publish "status" QoS0 fresh data, then the data coming in via http could be filtered so that only data that is up to 30sec or 30min old is "published to mqtt" but A) this will not work with non-timestamped data eg MQTT inputs, B) what is the right cut-off? emonTH's only transmit every 60s so it cannot be less than that, which means you could have over 12 emonPi frames slip through and C) this is a very messy and inconsistent idea that doesn't fix the underlying issue. Another potential side effect of clearing the whole queue whilst blocking the subscribe side, is that the blocked messages may still arrive late and get timestamped incorrectly due to the broker buffering the data if the connection is solid enough, or worse, once you finish publishing the queue, there is a backlog batch of new inputs which result in another publish to mqtt queue so you end up with a very blocky "publish a batch, receive a batch" cycle where none of the data in or out reflects the time it was sampled. Although I do not currently use MQTT on my live servers, I do use emonhub to ensure I have no data loss due to network issues, I currently run buffers of 70,000 to 100,000 frames so that if someone switches of a router for the school half-term hols (for example), emonhub can buffer for weeks, using @Paul-Reed's processing example again, that could result in 300,000 "publish to mqtt" being pushed through in a matter of 3 to 4hrs, although all that publishing may well slow down the input processing so much that time increases significantly, This is an extreme case yes, but it does help demonstrate the potential issues. Those 300,000 publishes over 4hrs will bog down emoncms and the broker, and most likely anything that is subscribed to those topics, but all that data is useless, it is no use as historical data as it has no timestamp and it is no use as a status because it is from another time, imagine a mqtt controlled EV charger acting on that data, it will be switched on and off in line with a super accelerated and condensed "view" of the last few days over just a couple of hours. Some of these issues were true of the old "publish to mqtt" implementation, so I'm not necessarily picking fault with this PR or @emrysr's (or anyone else's) work, just with the way emoncms currently does mqtt for whatever reason. I am also not campaigning for a timestamped solution in particular, we just need to make a choice between untimestamped QoS0 "status" data with a single value payload and timestamped QoS2 "datapoint" data with a more complex payload structure. Unless we accept they are not the same thing and implement both, separately, as we cannot have a single solution to do both jobs, that simply isn't possible. |
|
Got to say though, that under normal operating conditions, this merge has really improved performance. |
That's good, I suspect it is solely down to the lack of consistently connecting/disconnecting.
absolutely, as I said above, my comments are not directed at anyone, they are just observations and opinions to hopefully help find and implement the right solution(s).
Are we looking at the same file? That is line 18 in the attached file I think it must be the wrong file as it doesn't cover the 1530695441 timestamp period. That line This block shows emoncms connecting to the broker and then subscribing to emon/#, it also shows emoncms publishing a single "power" value. the next block shows another client connecting (node-red?) and then subscribing to 9 topics (each QoS1 except the greenhouse/rssi which is QoS0). The lines that follow show each published value from emoncms being recieved and re-published to node-red (mqtt_298f26c0.b46dca). There were 212 queued values passed to node-red, I think the first item in the queue was published by emoncms before node-red connected so that first item wasn't passed to node-red. But all of these transactions, including emoncms connecting, subscribing, publishing 213 values and nodered connecting subscribing to 9 topics and receiving 212 values all happened in a single second. From there on you can see 3 value published by emoncms and received (or rather sent to) node-red each 5 secs. It's also interesting that despite all the traffic, the php-mosquitto extension still seems to feel there's a need to PING the broker every 5secs to keep the connection alive, that's only supposed to happen if it goes 5s without a control message. (I wonder if that's a "if less than 5" type issue and if set to 6 would get a 5sec timeout?)
Me neither, I can see this discussion getting buried. Looks like the discussion has moved to #907 [edit] - Although that PR is now merged and therefore effectively closed too now! |
|
I said > Multiple values are pointless at a QoS of 2 without a persistent connection. @pb66 said
Yes absolutely. I was oversimplifying the problem. On the input side, I did propose a reasonably simple solution to the timestamp data input issue a while back #821. It's backward compatible so should not upset anything and works for both the API and the MQTT input. |
|
Have I misunderstood this (which is quite possible) but why the need for the in I think the Could the publish function check the size of the queue and generate a warning if over a certain size? |
|
I've finally had a chance to read through. My initial thoughts on QoS level is that it should be 0 (fire and forget) - reminding myself of the qos levels with this useful post https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels. It looks like @emrysr has chosen 0 for publish for now (default). That said we are using QoS 2 for subscribe. Im sure I looked into this probably last year but I cant remember why we settled for 2 now. Perhaps it was in relation to issues trying to debug MQTT reliability.. Im aware of your suggestions @pb66 regarding timestamps, do you think you could create a specific emoncms issue that links to your various posts discussing the idea and we can try and factor it in to the development plan? I've had a mental note of it for a while! Thanks for pointing out the issue with the queue and likely burst of mqtt messages if the broker goes down. It would be good to find a solution to this, perhaps a starting point could be a maximum queue length ~ 1000? Can anyone think of a better way to solve the buffering issue? its a shame to add this complication.. if we could just do synchronous publish using the mqtt library within process_processlist.php, to avoid this whole need to use a redis stage to start with. "Have I misunderstood this (which is quite possible) but why the need for the @borpin good point this can now be removed |
Maybe an ill thought out suggestion.... but here goes... before adding mqtt data to the buffer, could a check be made to see if connected to the broker? and only cache data if connected, otherwise dump. |
I think at the process point, it is better to just check to see if MQTT is enabled and then check the size of the queue then decide what to do. I'd be inclined to always push the data onto the queue and let the publish end work it out (perhaps with a max size as a backstop to the publish script crashing). If you just check on connection, you might check during a short outage. At the publish end you could check on size of queue and potentially remove old points once queue is over a certain size. You could also mitigate the burst issue by limiting the number of publish requests done per cycle. Into this would also be the QoS as the handshaking will take longer I suspect. Any limit on the publish rate should be clear. |
|
If this was done by limiting the size of the queue, then 1000 (as suggested above) is way too high. Without the data being timestamped, the vast majority of datapoints would be worthless, and unnecessary. Paul |
Doh, yes you are quite right. Goes back to the QoS question. Rather than a queue it just needs to be a 'latest' data value for now until the timestamp option is available. |

!!need to test my changes on a rpi to see performance issues!!
not 100% sure these changes will fix the issue? not totally sure what the issue is.. can't re-create it !?
could be..
I averaged 30ms delay, on my laptop, between php publishing to the mqtt broker and the script completing (for each 50 iterations) - so I set the php script to hold for 2000ms to ensure all done for slower devices...
THIS APPROACH WILL MAKE THE INPUT PROCESS MUCH SLOWER!
the fixed delay will keep a connection open for 2s (10x200ms), however using the loopForever() with the disconnect() function disconnects once the publish is done. (1.2s for 50 publishes)
The test I ran as a GIST https://gist.github.com/emrysr/d7342a262efdaf2ba61ffd1208fc0d28
clone it with:
git clone https://gist.github.com/d7342a262efdaf2ba61ffd1208fc0d28.git