From 29bb39e81e87978c9c10ccd879e2ab0e1bebc2c5 Mon Sep 17 00:00:00 2001 From: repa6 Date: Sun, 29 Jan 2023 13:25:26 +0100 Subject: [PATCH] added mac address to MQTT root topic --- main.cpp | 14 ++-- mqtt.cpp | 212 ++++++++++++++++++++++++++++++------------------------- mqtt.h | 8 ++- 3 files changed, 131 insertions(+), 103 deletions(-) diff --git a/main.cpp b/main.cpp index 32086901..d8435b56 100644 --- a/main.cpp +++ b/main.cpp @@ -1394,7 +1394,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_STATION_ON: if (os.mqtt.enabled()) { - sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval); + sprintf_P(topic, PSTR("station/%d"), lval); sprintf_P(payload, PSTR("{\"state\":1,\"duration\":%d}"), (int)fval); } @@ -1405,7 +1405,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_STATION_OFF: if (os.mqtt.enabled()) { - sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval); + sprintf_P(topic, PSTR("station/%d"), lval); if (os.iopts[IOPT_SENSOR1_TYPE]==SENSOR_TYPE_FLOW) { sprintf_P(payload, PSTR("{\"state\":0,\"duration\":%d,\"flow\":%d.%02d}"), (int)fval, (int)flow_last_gpm, (int)(flow_last_gpm*100)%100); } else { @@ -1442,7 +1442,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_SENSOR1: if (os.mqtt.enabled()) { - strcpy_P(topic, PSTR("opensprinkler/sensor1")); + strcpy_P(topic, PSTR("sensor1")); sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval); } if (ifttt_enabled) { @@ -1454,7 +1454,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_SENSOR2: if (os.mqtt.enabled()) { - strcpy_P(topic, PSTR("opensprinkler/sensor2")); + strcpy_P(topic, PSTR("sensor2")); sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval); } if (ifttt_enabled) { @@ -1466,7 +1466,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_RAINDELAY: if (os.mqtt.enabled()) { - strcpy_P(topic, PSTR("opensprinkler/raindelay")); + strcpy_P(topic, PSTR("raindelay")); sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval); } if (ifttt_enabled) { @@ -1481,7 +1481,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { volume = (volume<<8)+os.iopts[IOPT_PULSE_RATE_0]; volume = lval*volume; if (os.mqtt.enabled()) { - strcpy_P(topic, PSTR("opensprinkler/sensor/flow")); + strcpy_P(topic, PSTR("sensor/flow")); sprintf_P(payload, PSTR("{\"count\":%u,\"volume\":%d.%02d}"), lval, (int)volume/100, (int)volume%100); } if (ifttt_enabled) { @@ -1509,7 +1509,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) { case NOTIFY_REBOOT: if (os.mqtt.enabled()) { - strcpy_P(topic, PSTR("opensprinkler/system")); + strcpy_P(topic, PSTR("system")); strcpy_P(payload, PSTR("{\"state\":\"started\"}")); } if (ifttt_enabled) { diff --git a/mqtt.cpp b/mqtt.cpp index 6732e707..ba050dc1 100644 --- a/mqtt.cpp +++ b/mqtt.cpp @@ -48,21 +48,21 @@ #if defined(ENABLE_DEBUG) #if defined(ARDUINO) #include "TimeLib.h" - #define DEBUG_PRINTF(msg, ...) {Serial.printf(msg, ##__VA_ARGS__);} - #define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); Serial.printf("%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t));} + #define DEBUG_PRINTF(msg, ...) {char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, msg, ##__VA_ARGS__); Serial.print(buffer);} + #define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, "%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t)); Serial.print(buffer);} #else #include - #define DEBUG_PRINTF(msg, ...) {printf(msg, ##__VA_ARGS__);} - #define DEBUG_TIMESTAMP() {char tstr[21]; time_t t = time(NULL); struct tm *tm = localtime(&t); strftime(tstr, 21, "%y-%m-%d %H:%M:%S - ", tm);printf("%s", tstr);} + #define DEBUG_PRINTF(msg, ...) {printf(msg, ##__VA_ARGS__);} + #define DEBUG_TIMESTAMP() {char tstr[21]; time_t t = time(NULL); struct tm *tm = localtime(&t); strftime(tstr, 21, "%y-%m-%d %H:%M:%S - ", tm);printf("%s", tstr);} #endif - #define DEBUG_LOGF(msg, ...) {DEBUG_TIMESTAMP(); DEBUG_PRINTF(msg, ##__VA_ARGS__);} + #define DEBUG_LOGF(msg, ...) {DEBUG_TIMESTAMP(); DEBUG_PRINTF(msg, ##__VA_ARGS__);} - static unsigned long _lastMillis = 0; // Holds the timestamp associated with the last call to DEBUG_DURATION() - inline unsigned long DEBUG_DURATION() {unsigned long dur = millis() - _lastMillis; _lastMillis = millis(); return dur;} + static unsigned long _lastMillis = 0; // Holds the timestamp associated with the last call to DEBUG_DURATION() + inline unsigned long DEBUG_DURATION() {unsigned long dur = millis() - _lastMillis; _lastMillis = millis(); return dur;} #else - #define DEBUG_PRINTF(msg, ...) {} - #define DEBUG_LOGF(msg, ...) {} - #define DEBUG_DURATION() {} + #define DEBUG_PRINTF(msg, ...) {} + #define DEBUG_LOGF(msg, ...) {} + #define DEBUG_DURATION() {} #endif #define str(s) #s @@ -71,28 +71,46 @@ extern OpenSprinkler os; extern char tmp_buffer[]; -#define MQTT_KEEPALIVE 60 -#define MQTT_DEFAULT_PORT 1883 // Default port for MQTT. Can be overwritten through App config -#define MQTT_MAX_HOST_LEN 50 // Note: App is set to max 50 chars for broker name -#define MQTT_MAX_USERNAME_LEN 32 // Note: App is set to max 32 chars for username -#define MQTT_MAX_PASSWORD_LEN 32 // Note: App is set to max 32 chars for password -#define MQTT_MAX_ID_LEN 16 // MQTT Client Id to uniquely reference this unit -#define MQTT_RECONNECT_DELAY 120 // Minumum of 60 seconds between reconnect attempts - -#define MQTT_ROOT_TOPIC "opensprinkler" -#define MQTT_AVAILABILITY_TOPIC MQTT_ROOT_TOPIC "/availability" -#define MQTT_ONLINE_PAYLOAD "online" -#define MQTT_OFFLINE_PAYLOAD "offline" - -#define MQTT_SUCCESS 0 // Returned when function operated successfully -#define MQTT_ERROR 1 // Returned whan function failed - -char OSMqtt::_id[MQTT_MAX_ID_LEN + 1] = {0}; // Id to identify the client to the broker -char OSMqtt::_host[MQTT_MAX_HOST_LEN + 1] = {0}; // IP or host name of the broker -char OSMqtt::_username[MQTT_MAX_USERNAME_LEN + 1] = {0}; // username to connect to the broker -char OSMqtt::_password[MQTT_MAX_PASSWORD_LEN + 1] = {0}; // password to connect to the broker -int OSMqtt::_port = MQTT_DEFAULT_PORT; // Port of the broker (default 1883) -bool OSMqtt::_enabled = false; // Flag indicating whether MQTT is enabled +#define MQTT_KEEPALIVE 60 +#define MQTT_DEFAULT_PORT 1883 // Default port for MQTT. Can be overwritten through App config +#define MQTT_MAX_HOST_LEN 50 // Note: App is set to max 50 chars for broker name +#define MQTT_MAX_USERNAME_LEN 32 // Note: App is set to max 32 chars for username +#define MQTT_MAX_PASSWORD_LEN 32 // Note: App is set to max 32 chars for password +#define MQTT_MAX_ID_LEN 16 // MQTT Client Id to uniquely reference this unit +#define MQTT_MAX_TOPIC_LEN 60 // Includes ROOT_TOPIC, DEVICE_TOPIC and endpoint +#define MQTT_RECONNECT_DELAY 120 // Minumum of 60 seconds between reconnect attempts + +#define MQTT_ROOT_TOPIC "opensprinkler" +#define MQTT_AVAILABILITY_TOPIC "availability" +#define MQTT_ONLINE_PAYLOAD "online" +#define MQTT_OFFLINE_PAYLOAD "offline" + +#define MQTT_SUCCESS 0 // Returned when function operated successfully +#define MQTT_ERROR 1 // Returned whan function failed + +char OSMqtt::_id[MQTT_MAX_ID_LEN + 1] = {0}; // Id to identify the client to the broker +char OSMqtt::_host[MQTT_MAX_HOST_LEN + 1] = {0}; // IP or host name of the broker +char OSMqtt::_username[MQTT_MAX_USERNAME_LEN + 1] = {0}; // username to connect to the broker +char OSMqtt::_password[MQTT_MAX_PASSWORD_LEN + 1] = {0}; // password to connect to the broker +int OSMqtt::_port = MQTT_DEFAULT_PORT; // Port of the broker (default 1883) +bool OSMqtt::_enabled = false; // Flag indicating whether MQTT is enabled + +/**************************** HELPERS ********************************************/ + +int OSMqtt::_make_full_topic(char * full_topic, const char * topic) +{ +#if defined(ENABLE_DEBUG) + if (strlen(MQTT_ROOT_TOPIC) + strlen(_id) + strlen(topic) + 2 > MQTT_MAX_TOPIC_LEN) + DEBUG_LOGF("MQTT Topic (%s) is too large. Will be truncated\n", topic); +#endif + + int len = snprintf(full_topic, MQTT_MAX_TOPIC_LEN, "%s/%s/%s", MQTT_ROOT_TOPIC, _id, topic); + full_topic[MQTT_MAX_TOPIC_LEN] = 0; + + return len; +} + +/**************************** PUBLIC INTERFACE ************************************/ // Initialise the client libraries and event handlers. void OSMqtt::init(void) { @@ -179,7 +197,10 @@ void OSMqtt::publish(const char *topic, const char *payload) { return; } - _publish(topic, payload); + char full_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 }; + _make_full_topic(full_topic, topic); + + _publish(full_topic, payload); } // Regularly call the loop function to ensure "keep alive" messages are sent to the broker and to reconnect if needed. @@ -207,7 +228,7 @@ void OSMqtt::loop(void) { DEBUG_LOGF("MQTT Loop: Network %s, MQTT %s, State - %s\r\n", network ? "UP" : "DOWN", mqtt ? "UP" : "DOWN", - _state_string(state)); + _state_to_string(state)); last_state = state; last_network = network; last_mqtt = mqtt; } #endif @@ -219,7 +240,7 @@ void OSMqtt::loop(void) { #if defined(ESP8266) WiFiClient wifiClient; #else - EthernetClient ethClient; + EthernetClient ethClient; #endif int OSMqtt::_init(void) { @@ -246,25 +267,17 @@ int OSMqtt::_init(void) { int OSMqtt::_connect(void) { mqtt_client->setServer(_host, _port); - boolean state; - #define MQTT_CONNECT_NTRIES 3 - byte tries = 0; - do { - DEBUG_PRINT(F("mqtt: ")); - DEBUG_PRINTLN(_host); - if (_username[0]) - state = mqtt_client->connect(_id, _username, _password, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD); - else - state = mqtt_client->connect(_id, NULL, NULL, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD); - if(state) break; - tries++; - } while(triesstate()); - return MQTT_ERROR; + + char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 }; + _make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC); + + // Note: If (username == NULL) then password is ignored for anonymous login (i.e.no username or password) + // If (username != NULL && password == NULL) then username is used without a password + if ( mqtt_client->connect(_id, _username[0] ? _username : NULL, _password[0] ? _password : NULL, availability_topic, 0, true, MQTT_OFFLINE_PAYLOAD) ) { + mqtt_client->publish(availability_topic, MQTT_ONLINE_PAYLOAD, true); } else { - mqtt_client->publish(MQTT_AVAILABILITY_TOPIC, MQTT_ONLINE_PAYLOAD, true); + DEBUG_LOGF("MQTT Connect: Failed (%d)\n", mqtt_client->state()); + return MQTT_ERROR; } return MQTT_SUCCESS; } @@ -289,19 +302,19 @@ int OSMqtt::_loop(void) { return mqtt_client->state(); } -const char * OSMqtt::_state_string(int rc) { +const char * OSMqtt::_state_to_string(int rc) { switch (rc) { - case MQTT_CONNECTION_TIMEOUT: return "The server didn't respond within the keepalive time"; - case MQTT_CONNECTION_LOST: return "The network connection was lost"; - case MQTT_CONNECT_FAILED: return "The network connection failed"; - case MQTT_DISCONNECTED: return "The client has cleanly disconnected"; - case MQTT_CONNECTED: return "The client is connected"; - case MQTT_CONNECT_BAD_PROTOCOL: return "The server doesn't support the requested version of MQTT"; - case MQTT_CONNECT_BAD_CLIENT_ID: return "The server rejected the client identifier"; - case MQTT_CONNECT_UNAVAILABLE: return "The server was unavailable to accept the connection"; - case MQTT_CONNECT_BAD_CREDENTIALS: return "The username/password were rejected"; - case MQTT_CONNECT_UNAUTHORIZED: return "The client was not authorized to connect"; - default: return "Unrecognised state"; + case MQTT_CONNECTION_TIMEOUT: return "The server didn't respond within the keepalive time"; + case MQTT_CONNECTION_LOST: return "The network connection was lost"; + case MQTT_CONNECT_FAILED: return "The network connection failed"; + case MQTT_DISCONNECTED: return "The client has cleanly disconnected"; + case MQTT_CONNECTED: return "The client is connected"; + case MQTT_CONNECT_BAD_PROTOCOL: return "The server doesn't support the requested version of MQTT"; + case MQTT_CONNECT_BAD_CLIENT_ID: return "The server rejected the client identifier"; + case MQTT_CONNECT_UNAVAILABLE: return "The server was unavailable to accept the connection"; + case MQTT_CONNECT_BAD_CREDENTIALS: return "The username/password were rejected"; + case MQTT_CONNECT_UNAUTHORIZED: return "The client was not authorized to connect"; + default: return "Unrecognised state"; } } #else @@ -310,61 +323,70 @@ const char * OSMqtt::_state_string(int rc) { static bool _connected = false; -static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) { - DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\r\n", mosquitto_strerror(reason), reason); +struct _mqtt_callbacks { + + static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) { + DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason); - ::_connected = true; + if (reason == 0) { + ::_connected = true; - if (reason == 0) { - int rc = mosquitto_publish(mqtt_client, NULL, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true); - if (rc != MOSQ_ERR_SUCCESS) { - DEBUG_LOGF("MQTT Publish: Failed (%s)\r\n", mosquitto_strerror(rc)); + char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 }; + OSMqtt::_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC); + + int rc = mosquitto_publish(mqtt_client, NULL, availability_topic, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true); + if (rc != MOSQ_ERR_SUCCESS) { + DEBUG_LOGF("MQTT Publish: Failed (%s)\n", mosquitto_strerror(rc)); + } } } -} -static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) { - DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\r\n", mosquitto_strerror(reason), reason); + static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) { + DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason); - ::_connected = false; -} + ::_connected = false; + } -static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){ - if (level != MOSQ_LOG_DEBUG ) - DEBUG_LOGF("MQTT Log Callback: %s (%d)\r\n", message, level); -} + static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){ + if (level != MOSQ_LOG_DEBUG ) + DEBUG_LOGF("MQTT Log Callback: %s (%d)\n", message, level); + } +}; int OSMqtt::_init(void) { int major, minor, revision; mosquitto_lib_init(); mosquitto_lib_version(&major, &minor, &revision); - DEBUG_LOGF("MQTT Init: Mosquitto Library v%d.%d.%d\r\n", major, minor, revision); + DEBUG_LOGF("MQTT Init: Mosquitto Library v%d.%d.%d\n", major, minor, revision); if (mqtt_client) { mosquitto_destroy(mqtt_client); mqtt_client = NULL; }; - mqtt_client = mosquitto_new("OS", true, NULL); + mqtt_client = mosquitto_new(_id, true, NULL); if (mqtt_client == NULL) { DEBUG_PRINTF("MQTT Init: Failed to initialise client\r\n"); return MQTT_ERROR; } - mosquitto_connect_callback_set(mqtt_client, _mqtt_connection_cb); - mosquitto_disconnect_callback_set(mqtt_client, _mqtt_disconnection_cb); - mosquitto_log_callback_set(mqtt_client, _mqtt_log_cb); - mosquitto_will_set(mqtt_client, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true); + char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 }; + _make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC); + mosquitto_will_set(mqtt_client, availability_topic, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true); + + mosquitto_connect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_connection_cb); + mosquitto_disconnect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_disconnection_cb); + mosquitto_log_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_log_cb); return MQTT_SUCCESS; } int OSMqtt::_connect(void) { - int rc; - if (_username[0]) { - rc = mosquitto_username_pw_set(mqtt_client, _username, _password); - if (rc != MOSQ_ERR_SUCCESS) { + + // Note: If (username == NULL) then disable authentication + // If (username != NULL && password == NULL) then username is used without a password + int rc = mosquitto_username_pw_set(mqtt_client, _username[0] ? _username : NULL, _password[0] ? _password : NULL); + if (rc != MOSQ_ERR_SUCCESS) { DEBUG_LOGF("MQTT Connect: Connection Failed (%s)\r\n", mosquitto_strerror(rc)); - return MQTT_ERROR; - } + return MQTT_ERROR; } rc = mosquitto_connect(mqtt_client, _host, _port, MQTT_KEEPALIVE); if (rc != MOSQ_ERR_SUCCESS) { @@ -373,7 +395,7 @@ int OSMqtt::_connect(void) { } // Allow 10ms for the Broker's ack to be received. We need this on start-up so that the - // connection is registered before we attempt to send our first NOTIFY_REBOOT notification. + // connection is registered before we attempt to send our first NOTIFY_REBOOT notification. usleep(10000); return MQTT_SUCCESS; @@ -399,7 +421,7 @@ int OSMqtt::_loop(void) { return mosquitto_loop(mqtt_client, 0 , 1); } -const char * OSMqtt::_state_string(int error) { +const char * OSMqtt::_state_to_string(int error) { return mosquitto_strerror(error); } #endif diff --git a/mqtt.h b/mqtt.h index 3926d31b..1fbcf4da 100644 --- a/mqtt.h +++ b/mqtt.h @@ -26,6 +26,8 @@ class OSMqtt { private: + friend struct _mqtt_callbacks; + static char _id[]; static char _host[]; static int _port; @@ -40,7 +42,11 @@ class OSMqtt { static bool _connected(void); static int _publish(const char *topic, const char *payload); static int _loop(void); - static const char * _state_string(int state); + + // Helper functions + static const char * _state_to_string(int state); + static int _make_full_topic(char * full_topic, const char * topic); + public: static void init(void); static void init(const char * id);