Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT support for multiple devices #138

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,15 +1301,15 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {

// todo: add IFTTT support for this event as well
if (os.mqtt.enabled()) {
sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval);
sprintf_P(topic, PSTR("station/%d"), lval);
strcpy_P(payload, PSTR("{\"state\":1}"));
}
break;

case NOTIFY_STATION_OFF:

if (os.mqtt.enabled()) {
sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval);
sprintf_P(topic, PSTR("station/%d"), lval);
if (os.iopts[IOPT_SENSOR1_TYPE]==SENSOR_TYPE_FLOW) {
sprintf_P(payload, PSTR("{\"state\":0,\"duration\":%d,\"flow\":%d.%02d}"), (int)fval, (int)flow_last_gpm, (int)(flow_last_gpm*100)%100);
} else {
Expand Down Expand Up @@ -1345,7 +1345,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_SENSOR1:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor1"));
strcpy_P(topic, PSTR("sensor1"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1357,7 +1357,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_SENSOR2:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor2"));
strcpy_P(topic, PSTR("sensor2"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1369,7 +1369,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_RAINDELAY:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/raindelay"));
strcpy_P(topic, PSTR("raindelay"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1384,7 +1384,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
volume = (volume<<8)+os.iopts[IOPT_PULSE_RATE_0];
volume = lval*volume;
if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor/flow"));
strcpy_P(topic, PSTR("sensor/flow"));
sprintf_P(payload, PSTR("{\"count\":%d,\"volume\":%d.%02d}"), lval, (int)volume/100, (int)volume%100);
}
if (ifttt_enabled) {
Expand Down Expand Up @@ -1412,7 +1412,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_REBOOT:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/system"));
strcpy_P(topic, PSTR("system"));
strcpy_P(payload, PSTR("{\"state\":\"started\"}"));
}
if (ifttt_enabled) {
Expand Down
121 changes: 77 additions & 44 deletions mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
#if defined(ENABLE_DEBUG)
#if defined(ARDUINO)
#include "TimeLib.h"
#define DEBUG_PRINTF(msg, ...) {char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, msg, ##__VA_ARGS__); Serial.println(buffer);}
#define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, "%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t)); Serial.println(buffer);}
#define DEBUG_PRINTF(msg, ...) {char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, msg, ##__VA_ARGS__); Serial.print(buffer);}
#define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, "%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t)); Serial.print(buffer);}
#else
#include <sys/time.h>
#define DEBUG_PRINTF(msg, ...) {printf(msg, ##__VA_ARGS__);}
Expand Down Expand Up @@ -76,10 +76,11 @@ extern char tmp_buffer[];
#define MQTT_MAX_USERNAME_LEN 32 // Note: App is set to max 32 chars for username
#define MQTT_MAX_PASSWORD_LEN 32 // Note: App is set to max 32 chars for password
#define MQTT_MAX_ID_LEN 16 // MQTT Client Id to uniquely reference this unit
#define MQTT_MAX_TOPIC_LEN 60 // Includes ROOT_TOPIC, DEVICE_TOPIC and endpoint
#define MQTT_RECONNECT_DELAY 120 // Minumum of 60 seconds between reconnect attempts

#define MQTT_ROOT_TOPIC "opensprinkler"
#define MQTT_AVAILABILITY_TOPIC MQTT_ROOT_TOPIC "/availability"
#define MQTT_AVAILABILITY_TOPIC "availability"
#define MQTT_ONLINE_PAYLOAD "online"
#define MQTT_OFFLINE_PAYLOAD "offline"

Expand All @@ -93,16 +94,31 @@ char OSMqtt::_password[MQTT_MAX_PASSWORD_LEN + 1] = {0}; // password to connect
int OSMqtt::_port = MQTT_DEFAULT_PORT; // Port of the broker (default 1883)
bool OSMqtt::_enabled = false; // Flag indicating whether MQTT is enabled

/**************************** HELPERS ********************************************/

int OSMqtt::_make_full_topic(char * full_topic, const char * topic)
{
#if defined(ENABLE_DEBUG)
if (strlen(MQTT_ROOT_TOPIC) + strlen(_id) + strlen(topic) + 2 > MQTT_MAX_TOPIC_LEN)
DEBUG_LOGF("MQTT Topic (%s) is too large. Will be truncated\n", topic);
#endif

int len = snprintf(full_topic, MQTT_MAX_TOPIC_LEN, "%s/%s/%s", MQTT_ROOT_TOPIC, _id, topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root topic should be configurable through the app (like host, username, password). The MAC address should not part of the topic !!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So again thanks for the feedback and let me try to explain some of my thinking by taking this in two parts.

Firstly, the use of "opensprinkler" as the root topic. This is currently how the code exists today and available in the master branch. So this hasn't changed, I have just moved the definition out of main.cpp and into mqtt.cpp in order to better compartmentalise the code. I agree that in the future this should be configurable via the App and this PR moves us a little bit in that direction.

The second part is to address the situation where there are two or more opensprinkler devices on the network. This is the case for me and others. The current master branch is problematic as a user could enable mqtt on multiple devices with conflicting messages causing unexpected behaviour. So this is a first step to guarding against that with the information readily at hand. Again, a future PR could make this configurable from the App and the structure that this PR puts in place would make that a bit easier.

Note that the load_hardware_mac() routine produces a pretty stable identifier. Only recent OS3.0 have an option to change network interfaces and on OSPi I search the interface list in a particular order to minimise risk of change. In the future, I think leveraging the existing Site Name that App uses would be a good choice to differentiate opensprinkler devices as people are already familiar with it. Unfortunately, at the moment, it isn't accessible over the rest api and would require some changes to make that possible. I explored this some time ago here and it looks to be a viable option.

Let me know your thoughts as I was keen to get something that makes the current master branch a bit safer while the bigger questions in #130 and #134 get worked through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For enabling mqtt, you also have to provide a host. So let's show and propose to change the root topic as we did for username and password. This will allow multiple devices. And if you want to put your mac address in your topic, you are free to configure it like this.
But don't always append the root topic with the MAC address. For me, it is a bad design, and should not be the default behavior.
So default root topic:
"opensprinkler"
can be changed in the config to any other value like for example:
"opensprinkler1"
"opensprinkler/1"
"opensprinkler/00:0a:95:9d:68:16"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two quite notes:

  1. We do plan to add a 'device name', mainly to make it easy for users (with multiple OS) to tell which controller it's coming from when using, say IFTTT notifications (i.e. the device name will be included in the message). It's probably not appropriate to use 'device name' as mqtt root topic, because the device name can have all sorts of special characters and I am not sure if that's a good fit for root topic.
  2. When I was working on integrating the mqtt branch, I did think about whether we should add an additional parameter - root topic - to give users the choice of deciding what topic to use. Related to my point 1 above -- what are the restrictions on the root topic name? For example, is it ok to have the 'space' character? What about other special characters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An mqtt topic is any utf-8, case-sensitive string. In many implementations the topic defaults to a slug of the device name or a readily available unique identifier (eg. mac).

Including a "device" leaf in the topic allows for multiple device support and having a "product" leaf in the topic allows for broadcast and discovery. So topics like "opensprinkler/device_id/endpoint" are quite common. However, there are alternative conventions and many sophisticated users will want to customise their own topics to their particular preference.

My intent with this PR is two fold: 1) safeguard multi-device owners so that devices don't clash on the same topic and cause unpredictable behaviour, and 2) do it in a way that doesn't constrain the design of the message syntax under discussion in #134. Hence this PR doesn't expose the topic structure to end-user customisation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many sophisticated users will want to customise their own topics to their particular preference.

Exactly. So expose the root topic to end-user customization :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in favor of exposing the root topic to the user for customization. The topic can have an initial default value (it could either be 'opensprinkler' as we currently have, or it could be the device ID that @PeteBa proposed), and then user can change it to something else if they want.

full_topic[MQTT_MAX_TOPIC_LEN] = 0;

return len;
}

/**************************** PUBLIC INTERFACE ************************************/

// Initialise the client libraries and event handlers.
void OSMqtt::init(void) {
DEBUG_LOGF("MQTT Init\n");
char id[MQTT_MAX_ID_LEN + 1] = {0};

#if defined(ARDUINO)
uint8_t mac[6] = {0};
os.load_hardware_mac(mac, m_server!=NULL);
snprintf(id, MQTT_MAX_ID_LEN, "OS-%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
#endif

init(id);
};
Expand All @@ -113,6 +129,7 @@ void OSMqtt::init(const char * clientId) {

strncpy(_id, clientId, MQTT_MAX_ID_LEN);
_id[MQTT_MAX_ID_LEN] = 0;

_init();
};

Expand Down Expand Up @@ -174,7 +191,10 @@ void OSMqtt::publish(const char *topic, const char *payload) {
return;
}

_publish(topic, payload);
char full_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(full_topic, topic);

_publish(full_topic, payload);
}

// Regularly call the loop function to ensure "keep alive" messages are sent to the broker and to reconnect if needed.
Expand Down Expand Up @@ -202,7 +222,7 @@ void OSMqtt::loop(void) {
DEBUG_LOGF("MQTT Loop: Network %s, MQTT %s, State - %s\n",
network ? "UP" : "DOWN",
mqtt ? "UP" : "DOWN",
_state_string(state));
_state_to_string(state));
last_state = state; last_network = network; last_mqtt = mqtt;
}
#endif
Expand Down Expand Up @@ -240,18 +260,21 @@ int OSMqtt::_init(void) {
}

int OSMqtt::_connect(void) {

mqtt_client->setServer(_host, _port);
boolean state;
if (_username[0])
state = mqtt_client->connect(_id, _username, _password, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD);
else
state = mqtt_client->connect(_id, NULL, NULL, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD);
if (state) {
mqtt_client->publish(MQTT_AVAILABILITY_TOPIC, MQTT_ONLINE_PAYLOAD, true);

char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);

// Note: If (username == NULL) then password is ignored for anonymous login (i.e.no username or password)
// If (username != NULL && password == NULL) then username is used without a password
if ( mqtt_client->connect(_id, _username[0] ? _username : NULL, _password[0] ? _password : NULL, availability_topic, 0, true, MQTT_OFFLINE_PAYLOAD) ) {
mqtt_client->publish(availability_topic, MQTT_ONLINE_PAYLOAD, true);
} else {
DEBUG_LOGF("MQTT Connect: Failed (%d)\n", mqtt_client->state());
return MQTT_ERROR;
}

return MQTT_SUCCESS;
}

Expand All @@ -275,7 +298,7 @@ int OSMqtt::_loop(void) {
return mqtt_client->state();
}

const char * OSMqtt::_state_string(int rc) {
const char * OSMqtt::_state_to_string(int rc) {
switch (rc) {
case MQTT_CONNECTION_TIMEOUT: return "The server didn't respond within the keepalive time";
case MQTT_CONNECTION_LOST: return "The network connection was lost";
Expand All @@ -296,29 +319,35 @@ const char * OSMqtt::_state_string(int rc) {

static bool _connected = false;

static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);
struct _mqtt_callbacks {

::_connected = true;
static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);

if (reason == 0) {
int rc = mosquitto_publish(mqtt_client, NULL, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Publish: Failed (%s)\n", mosquitto_strerror(rc));
if (reason == 0) {
::_connected = true;

char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
OSMqtt::_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);

int rc = mosquitto_publish(mqtt_client, NULL, availability_topic, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Publish: Failed (%s)\n", mosquitto_strerror(rc));
}
}
}
}

static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);
static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);

::_connected = false;
}
::_connected = false;
}

static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){
if (level != MOSQ_LOG_DEBUG )
DEBUG_LOGF("MQTT Log Callback: %s (%d)\n", message, level);
}
static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){
if (level != MOSQ_LOG_DEBUG )
DEBUG_LOGF("MQTT Log Callback: %s (%d)\n", message, level);
}
};

int OSMqtt::_init(void) {
int major, minor, revision;
Expand All @@ -329,29 +358,33 @@ int OSMqtt::_init(void) {

if (mqtt_client) { mosquitto_destroy(mqtt_client); mqtt_client = NULL; };

mqtt_client = mosquitto_new("OS", true, NULL);
mqtt_client = mosquitto_new(_id, true, NULL);
if (mqtt_client == NULL) {
DEBUG_PRINTF("MQTT Init: Failed to initialise client\n");
return MQTT_ERROR;
}

mosquitto_connect_callback_set(mqtt_client, _mqtt_connection_cb);
mosquitto_disconnect_callback_set(mqtt_client, _mqtt_disconnection_cb);
mosquitto_log_callback_set(mqtt_client, _mqtt_log_cb);
mosquitto_will_set(mqtt_client, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true);
char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);
mosquitto_will_set(mqtt_client, availability_topic, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true);

mosquitto_connect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_connection_cb);
mosquitto_disconnect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_disconnection_cb);
mosquitto_log_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_log_cb);

return MQTT_SUCCESS;
}

int OSMqtt::_connect(void) {
int rc;
if (_username[0]) {
rc = mosquitto_username_pw_set(mqtt_client, _username, _password);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Connect: Connection Failed (%s)\n", mosquitto_strerror(rc));
return MQTT_ERROR;
}

// Note: If (username == NULL) then disable authentication
// If (username != NULL && password == NULL) then username is used without a password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you cannot do that. See https://github.com/eclipse/mosquitto/blob/master/lib/options.c#L69
Please revert this change

Copy link
Contributor Author

@PeteBa PeteBa Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. This is getting interesting as it looks like the latest version of mosquitto does not match the documentation! But we should probably reference this link here as this is the version 1.3.4-1.4.10 which is available by apt for OSPi (i.e. much earlier). But lets dig into this as the code isnt operating as I would expect.

The motivation behind this change was to ensure we deal with three use cases: 1) username not provided (un-authenticated), 2) both username and password are provided (authenticated); and 3) username is provided but password is not (unprotected user login).

It is this third case that I was considering as according to the mosquitto documentation and code I linked above then we should be calling mosquitto_username_pw_set(mqtt_client, _username, NULL) rather than passing an empty string as _password.

Appreciate your thoughts on this as it feels like a decision between coding for the currently distributed version vs what might come down the road.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a wrong reading of source code. Apparently, it does not change anything but documentation says to implement it as you did.
https://mosquitto.org/api/files/mosquitto-h.html#mosquitto_username_pw_set
So, I agree with this change ;-)
Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also pondered on this and so far I've assumed that if user name is non-empty then password is also non-empty. Would anyone intentionally set only a user name but empty password in practice?

Copy link
Contributor Author

@PeteBa PeteBa Jun 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, in normal practice you would want to use either case 1 or 2 but the spec does allow for case 3 as well. I can imagine scenarios in a trusted network where you want to track usage but aren't too worried about rogue agents !

Having said that, if we want to restrict against this scenario then we should do it earlier in the process i.e. in the App UI or api response so we can provide user feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As documentation says to do it like this and you implemented it, let's merge this improvement

int rc = mosquitto_username_pw_set(mqtt_client, _username[0] ? _username : NULL, _password[0] ? _password : NULL);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Connect: Connection Failed (%s)\n", mosquitto_strerror(rc));
return MQTT_ERROR;
}

rc = mosquitto_connect(mqtt_client, _host, _port, MQTT_KEEPALIVE);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Connect: Connection Failed (%s)\n", mosquitto_strerror(rc));
Expand Down Expand Up @@ -385,7 +418,7 @@ int OSMqtt::_loop(void) {
return mosquitto_loop(mqtt_client, 0 , 1);
}

const char * OSMqtt::_state_string(int error) {
const char * OSMqtt::_state_to_string(int error) {
return mosquitto_strerror(error);
}
#endif
8 changes: 7 additions & 1 deletion mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

class OSMqtt {
private:
friend struct _mqtt_callbacks;

static char _id[];
static char _host[];
static int _port;
Expand All @@ -40,7 +42,11 @@ class OSMqtt {
static bool _connected(void);
static int _publish(const char *topic, const char *payload);
static int _loop(void);
static const char * _state_string(int state);

// Helper functions
static const char * _state_to_string(int state);
static int _make_full_topic(char * full_topic, const char * topic);

public:
static void init(void);
static void init(const char * id);
Expand Down