Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt multi device #238

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_STATION_ON:

if (os.mqtt.enabled()) {
sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval);
sprintf_P(topic, PSTR("station/%d"), lval);
sprintf_P(payload, PSTR("{\"state\":1,\"duration\":%d}"), (int)fval);
}

Expand All @@ -1407,7 +1407,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_STATION_OFF:

if (os.mqtt.enabled()) {
sprintf_P(topic, PSTR("opensprinkler/station/%d"), lval);
sprintf_P(topic, PSTR("station/%d"), lval);
if (os.iopts[IOPT_SENSOR1_TYPE]==SENSOR_TYPE_FLOW) {
sprintf_P(payload, PSTR("{\"state\":0,\"duration\":%d,\"flow\":%d.%02d}"), (int)fval, (int)flow_last_gpm, (int)(flow_last_gpm*100)%100);
} else {
Expand Down Expand Up @@ -1444,7 +1444,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_SENSOR1:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor1"));
strcpy_P(topic, PSTR("sensor1"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1456,7 +1456,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_SENSOR2:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor2"));
strcpy_P(topic, PSTR("sensor2"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1468,7 +1468,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_RAINDELAY:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/raindelay"));
strcpy_P(topic, PSTR("raindelay"));
sprintf_P(payload, PSTR("{\"state\":%d}"), (int)fval);
}
if (ifttt_enabled) {
Expand All @@ -1483,7 +1483,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
volume = (volume<<8)+os.iopts[IOPT_PULSE_RATE_0];
volume = lval*volume;
if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/sensor/flow"));
strcpy_P(topic, PSTR("sensor/flow"));
sprintf_P(payload, PSTR("{\"count\":%u,\"volume\":%d.%02d}"), lval, (int)volume/100, (int)volume%100);
}
if (ifttt_enabled) {
Expand Down Expand Up @@ -1511,7 +1511,7 @@ void push_message(int type, uint32_t lval, float fval, const char* sval) {
case NOTIFY_REBOOT:

if (os.mqtt.enabled()) {
strcpy_P(topic, PSTR("opensprinkler/system"));
strcpy_P(topic, PSTR("system"));
strcpy_P(payload, PSTR("{\"state\":\"started\"}"));
}
if (ifttt_enabled) {
Expand Down
212 changes: 117 additions & 95 deletions mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@
#if defined(ENABLE_DEBUG)
#if defined(ARDUINO)
#include "TimeLib.h"
#define DEBUG_PRINTF(msg, ...) {Serial.printf(msg, ##__VA_ARGS__);}
#define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); Serial.printf("%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t));}
#define DEBUG_PRINTF(msg, ...) {char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, msg, ##__VA_ARGS__); Serial.print(buffer);}
#define DEBUG_TIMESTAMP(msg, ...) {time_t t = os.now_tz(); char buffer[TMP_BUFFER_SIZE]; sprintf(buffer, "%02d-%02d-%02d %02d:%02d:%02d - ", year(t), month(t), day(t), hour(t), minute(t), second(t)); Serial.print(buffer);}
repa6 marked this conversation as resolved.
Show resolved Hide resolved
#else
#include <sys/time.h>
#define DEBUG_PRINTF(msg, ...) {printf(msg, ##__VA_ARGS__);}
#define DEBUG_TIMESTAMP() {char tstr[21]; time_t t = time(NULL); struct tm *tm = localtime(&t); strftime(tstr, 21, "%y-%m-%d %H:%M:%S - ", tm);printf("%s", tstr);}
#define DEBUG_PRINTF(msg, ...) {printf(msg, ##__VA_ARGS__);}
#define DEBUG_TIMESTAMP() {char tstr[21]; time_t t = time(NULL); struct tm *tm = localtime(&t); strftime(tstr, 21, "%y-%m-%d %H:%M:%S - ", tm);printf("%s", tstr);}
#endif
#define DEBUG_LOGF(msg, ...) {DEBUG_TIMESTAMP(); DEBUG_PRINTF(msg, ##__VA_ARGS__);}
#define DEBUG_LOGF(msg, ...) {DEBUG_TIMESTAMP(); DEBUG_PRINTF(msg, ##__VA_ARGS__);}

static unsigned long _lastMillis = 0; // Holds the timestamp associated with the last call to DEBUG_DURATION()
inline unsigned long DEBUG_DURATION() {unsigned long dur = millis() - _lastMillis; _lastMillis = millis(); return dur;}
static unsigned long _lastMillis = 0; // Holds the timestamp associated with the last call to DEBUG_DURATION()
inline unsigned long DEBUG_DURATION() {unsigned long dur = millis() - _lastMillis; _lastMillis = millis(); return dur;}
#else
#define DEBUG_PRINTF(msg, ...) {}
#define DEBUG_LOGF(msg, ...) {}
#define DEBUG_DURATION() {}
#define DEBUG_PRINTF(msg, ...) {}
#define DEBUG_LOGF(msg, ...) {}
#define DEBUG_DURATION() {}
#endif

#define str(s) #s
Expand All @@ -71,28 +71,46 @@
extern OpenSprinkler os;
extern char tmp_buffer[];

#define MQTT_KEEPALIVE 60
#define MQTT_DEFAULT_PORT 1883 // Default port for MQTT. Can be overwritten through App config
#define MQTT_MAX_HOST_LEN 50 // Note: App is set to max 50 chars for broker name
#define MQTT_MAX_USERNAME_LEN 32 // Note: App is set to max 32 chars for username
#define MQTT_MAX_PASSWORD_LEN 32 // Note: App is set to max 32 chars for password
#define MQTT_MAX_ID_LEN 16 // MQTT Client Id to uniquely reference this unit
#define MQTT_RECONNECT_DELAY 120 // Minumum of 60 seconds between reconnect attempts

#define MQTT_ROOT_TOPIC "opensprinkler"
#define MQTT_AVAILABILITY_TOPIC MQTT_ROOT_TOPIC "/availability"
#define MQTT_ONLINE_PAYLOAD "online"
#define MQTT_OFFLINE_PAYLOAD "offline"

#define MQTT_SUCCESS 0 // Returned when function operated successfully
#define MQTT_ERROR 1 // Returned whan function failed

char OSMqtt::_id[MQTT_MAX_ID_LEN + 1] = {0}; // Id to identify the client to the broker
char OSMqtt::_host[MQTT_MAX_HOST_LEN + 1] = {0}; // IP or host name of the broker
char OSMqtt::_username[MQTT_MAX_USERNAME_LEN + 1] = {0}; // username to connect to the broker
char OSMqtt::_password[MQTT_MAX_PASSWORD_LEN + 1] = {0}; // password to connect to the broker
int OSMqtt::_port = MQTT_DEFAULT_PORT; // Port of the broker (default 1883)
bool OSMqtt::_enabled = false; // Flag indicating whether MQTT is enabled
#define MQTT_KEEPALIVE 60
#define MQTT_DEFAULT_PORT 1883 // Default port for MQTT. Can be overwritten through App config
#define MQTT_MAX_HOST_LEN 50 // Note: App is set to max 50 chars for broker name
#define MQTT_MAX_USERNAME_LEN 32 // Note: App is set to max 32 chars for username
#define MQTT_MAX_PASSWORD_LEN 32 // Note: App is set to max 32 chars for password
#define MQTT_MAX_ID_LEN 16 // MQTT Client Id to uniquely reference this unit
#define MQTT_MAX_TOPIC_LEN 60 // Includes ROOT_TOPIC, DEVICE_TOPIC and endpoint
repa6 marked this conversation as resolved.
Show resolved Hide resolved
#define MQTT_RECONNECT_DELAY 120 // Minumum of 60 seconds between reconnect attempts

#define MQTT_ROOT_TOPIC "opensprinkler"
#define MQTT_AVAILABILITY_TOPIC "availability"
#define MQTT_ONLINE_PAYLOAD "online"
#define MQTT_OFFLINE_PAYLOAD "offline"

#define MQTT_SUCCESS 0 // Returned when function operated successfully
#define MQTT_ERROR 1 // Returned whan function failed

char OSMqtt::_id[MQTT_MAX_ID_LEN + 1] = {0}; // Id to identify the client to the broker
char OSMqtt::_host[MQTT_MAX_HOST_LEN + 1] = {0}; // IP or host name of the broker
char OSMqtt::_username[MQTT_MAX_USERNAME_LEN + 1] = {0}; // username to connect to the broker
char OSMqtt::_password[MQTT_MAX_PASSWORD_LEN + 1] = {0}; // password to connect to the broker
int OSMqtt::_port = MQTT_DEFAULT_PORT; // Port of the broker (default 1883)
bool OSMqtt::_enabled = false; // Flag indicating whether MQTT is enabled

/**************************** HELPERS ********************************************/

int OSMqtt::_make_full_topic(char * full_topic, const char * topic)
{
#if defined(ENABLE_DEBUG)
if (strlen(MQTT_ROOT_TOPIC) + strlen(_id) + strlen(topic) + 2 > MQTT_MAX_TOPIC_LEN)
DEBUG_LOGF("MQTT Topic (%s) is too large. Will be truncated\n", topic);
#endif

int len = snprintf(full_topic, MQTT_MAX_TOPIC_LEN, "%s/%s/%s", MQTT_ROOT_TOPIC, _id, topic);
full_topic[MQTT_MAX_TOPIC_LEN] = 0;

return len;
}

/**************************** PUBLIC INTERFACE ************************************/

// Initialise the client libraries and event handlers.
void OSMqtt::init(void) {
Expand Down Expand Up @@ -179,7 +197,10 @@ void OSMqtt::publish(const char *topic, const char *payload) {
return;
}

_publish(topic, payload);
char full_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(full_topic, topic);

_publish(full_topic, payload);
}

// Regularly call the loop function to ensure "keep alive" messages are sent to the broker and to reconnect if needed.
Expand Down Expand Up @@ -207,7 +228,7 @@ void OSMqtt::loop(void) {
DEBUG_LOGF("MQTT Loop: Network %s, MQTT %s, State - %s\r\n",
network ? "UP" : "DOWN",
mqtt ? "UP" : "DOWN",
_state_string(state));
_state_to_string(state));
last_state = state; last_network = network; last_mqtt = mqtt;
}
#endif
Expand All @@ -219,7 +240,7 @@ void OSMqtt::loop(void) {
#if defined(ESP8266)
WiFiClient wifiClient;
#else
EthernetClient ethClient;
EthernetClient ethClient;
#endif

int OSMqtt::_init(void) {
Expand All @@ -246,25 +267,17 @@ int OSMqtt::_init(void) {

int OSMqtt::_connect(void) {
mqtt_client->setServer(_host, _port);
boolean state;
#define MQTT_CONNECT_NTRIES 3
byte tries = 0;
do {
DEBUG_PRINT(F("mqtt: "));
DEBUG_PRINTLN(_host);
if (_username[0])
state = mqtt_client->connect(_id, _username, _password, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD);
else
state = mqtt_client->connect(_id, NULL, NULL, MQTT_AVAILABILITY_TOPIC, 0, true, MQTT_OFFLINE_PAYLOAD);
if(state) break;
tries++;
} while(tries<MQTT_CONNECT_NTRIES);

if(tries==MQTT_CONNECT_NTRIES) {
DEBUG_LOGF("MQTT Connect: Failed (%d)\r\n", mqtt_client->state());
return MQTT_ERROR;

char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);

// Note: If (username == NULL) then password is ignored for anonymous login (i.e.no username or password)
// If (username != NULL && password == NULL) then username is used without a password
if ( mqtt_client->connect(_id, _username[0] ? _username : NULL, _password[0] ? _password : NULL, availability_topic, 0, true, MQTT_OFFLINE_PAYLOAD) ) {
mqtt_client->publish(availability_topic, MQTT_ONLINE_PAYLOAD, true);
} else {
mqtt_client->publish(MQTT_AVAILABILITY_TOPIC, MQTT_ONLINE_PAYLOAD, true);
DEBUG_LOGF("MQTT Connect: Failed (%d)\n", mqtt_client->state());
return MQTT_ERROR;
}
return MQTT_SUCCESS;
}
Expand All @@ -289,19 +302,19 @@ int OSMqtt::_loop(void) {
return mqtt_client->state();
}

const char * OSMqtt::_state_string(int rc) {
const char * OSMqtt::_state_to_string(int rc) {
switch (rc) {
case MQTT_CONNECTION_TIMEOUT: return "The server didn't respond within the keepalive time";
case MQTT_CONNECTION_LOST: return "The network connection was lost";
case MQTT_CONNECT_FAILED: return "The network connection failed";
case MQTT_DISCONNECTED: return "The client has cleanly disconnected";
case MQTT_CONNECTED: return "The client is connected";
case MQTT_CONNECT_BAD_PROTOCOL: return "The server doesn't support the requested version of MQTT";
case MQTT_CONNECT_BAD_CLIENT_ID: return "The server rejected the client identifier";
case MQTT_CONNECT_UNAVAILABLE: return "The server was unavailable to accept the connection";
case MQTT_CONNECT_BAD_CREDENTIALS: return "The username/password were rejected";
case MQTT_CONNECT_UNAUTHORIZED: return "The client was not authorized to connect";
default: return "Unrecognised state";
case MQTT_CONNECTION_TIMEOUT: return "The server didn't respond within the keepalive time";
case MQTT_CONNECTION_LOST: return "The network connection was lost";
case MQTT_CONNECT_FAILED: return "The network connection failed";
case MQTT_DISCONNECTED: return "The client has cleanly disconnected";
case MQTT_CONNECTED: return "The client is connected";
case MQTT_CONNECT_BAD_PROTOCOL: return "The server doesn't support the requested version of MQTT";
case MQTT_CONNECT_BAD_CLIENT_ID: return "The server rejected the client identifier";
case MQTT_CONNECT_UNAVAILABLE: return "The server was unavailable to accept the connection";
case MQTT_CONNECT_BAD_CREDENTIALS: return "The username/password were rejected";
case MQTT_CONNECT_UNAUTHORIZED: return "The client was not authorized to connect";
default: return "Unrecognised state";
}
}
#else
Expand All @@ -310,61 +323,70 @@ const char * OSMqtt::_state_string(int rc) {

static bool _connected = false;

static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\r\n", mosquitto_strerror(reason), reason);
struct _mqtt_callbacks {

static void _mqtt_connection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Connnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);

::_connected = true;
if (reason == 0) {
::_connected = true;

if (reason == 0) {
int rc = mosquitto_publish(mqtt_client, NULL, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Publish: Failed (%s)\r\n", mosquitto_strerror(rc));
char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
OSMqtt::_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);

int rc = mosquitto_publish(mqtt_client, NULL, availability_topic, strlen(MQTT_ONLINE_PAYLOAD), MQTT_ONLINE_PAYLOAD, 0, true);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Publish: Failed (%s)\n", mosquitto_strerror(rc));
}
}
}
}

static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\r\n", mosquitto_strerror(reason), reason);
static void _mqtt_disconnection_cb(struct mosquitto *mqtt_client, void *obj, int reason) {
DEBUG_LOGF("MQTT Disconnnection Callback: %s (%d)\n", mosquitto_strerror(reason), reason);

::_connected = false;
}
::_connected = false;
}

static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){
if (level != MOSQ_LOG_DEBUG )
DEBUG_LOGF("MQTT Log Callback: %s (%d)\r\n", message, level);
}
static void _mqtt_log_cb(struct mosquitto *mqtt_client, void *obj, int level, const char *message){
if (level != MOSQ_LOG_DEBUG )
DEBUG_LOGF("MQTT Log Callback: %s (%d)\n", message, level);
}
};

int OSMqtt::_init(void) {
int major, minor, revision;

mosquitto_lib_init();
mosquitto_lib_version(&major, &minor, &revision);
DEBUG_LOGF("MQTT Init: Mosquitto Library v%d.%d.%d\r\n", major, minor, revision);
DEBUG_LOGF("MQTT Init: Mosquitto Library v%d.%d.%d\n", major, minor, revision);

if (mqtt_client) { mosquitto_destroy(mqtt_client); mqtt_client = NULL; };

mqtt_client = mosquitto_new("OS", true, NULL);
mqtt_client = mosquitto_new(_id, true, NULL);
if (mqtt_client == NULL) {
DEBUG_PRINTF("MQTT Init: Failed to initialise client\r\n");
return MQTT_ERROR;
}

mosquitto_connect_callback_set(mqtt_client, _mqtt_connection_cb);
mosquitto_disconnect_callback_set(mqtt_client, _mqtt_disconnection_cb);
mosquitto_log_callback_set(mqtt_client, _mqtt_log_cb);
mosquitto_will_set(mqtt_client, MQTT_AVAILABILITY_TOPIC, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true);
char availability_topic[MQTT_MAX_TOPIC_LEN + 1] = { 0 };
_make_full_topic(availability_topic, MQTT_AVAILABILITY_TOPIC);
mosquitto_will_set(mqtt_client, availability_topic, strlen(MQTT_OFFLINE_PAYLOAD), MQTT_OFFLINE_PAYLOAD, 0, true);

mosquitto_connect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_connection_cb);
mosquitto_disconnect_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_disconnection_cb);
mosquitto_log_callback_set(mqtt_client, _mqtt_callbacks::_mqtt_log_cb);

return MQTT_SUCCESS;
}

int OSMqtt::_connect(void) {
int rc;
if (_username[0]) {
rc = mosquitto_username_pw_set(mqtt_client, _username, _password);
if (rc != MOSQ_ERR_SUCCESS) {

// Note: If (username == NULL) then disable authentication
// If (username != NULL && password == NULL) then username is used without a password
int rc = mosquitto_username_pw_set(mqtt_client, _username[0] ? _username : NULL, _password[0] ? _password : NULL);
if (rc != MOSQ_ERR_SUCCESS) {
DEBUG_LOGF("MQTT Connect: Connection Failed (%s)\r\n", mosquitto_strerror(rc));
return MQTT_ERROR;
}
return MQTT_ERROR;
}
rc = mosquitto_connect(mqtt_client, _host, _port, MQTT_KEEPALIVE);
if (rc != MOSQ_ERR_SUCCESS) {
Expand All @@ -373,7 +395,7 @@ int OSMqtt::_connect(void) {
}

// Allow 10ms for the Broker's ack to be received. We need this on start-up so that the
// connection is registered before we attempt to send our first NOTIFY_REBOOT notification.
// connection is registered before we attempt to send our first NOTIFY_REBOOT notification.
usleep(10000);

return MQTT_SUCCESS;
Expand All @@ -399,7 +421,7 @@ int OSMqtt::_loop(void) {
return mosquitto_loop(mqtt_client, 0 , 1);
}

const char * OSMqtt::_state_string(int error) {
const char * OSMqtt::_state_to_string(int error) {
return mosquitto_strerror(error);
}
#endif
Loading