Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/AgrirouterClient/inc/MqttConnectionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "Settings.h"
#include "third_party/mosquitto/mosquitto.h"

#include <mutex>

class MqttConnectionClient {

public:
Expand Down Expand Up @@ -53,6 +55,11 @@ class MqttConnectionClient {
static void subscribeCallback(struct mosquitto *mosq, void *obj, int messageId, int qosCount, const int *grantedQos);
static void unsubscribeCallback(struct mosquitto *mosq, void *obj, int messageId);
static void messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message);

static std::string globalSecret;
static std::mutex mutexSecret;
static std::string getStaticSecret();
static void setStaticSecret(std::string secret);
};

#endif // LIB_AGRIROUTERCLIENT_INC_MQTTCONNECTIONCLIENT_H_
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

#include "third_party/mosquitto/mosquitto_internal.h"

std::string MqttConnectionClient::globalSecret = "";
std::mutex MqttConnectionClient::mutexSecret;

MqttConnectionClient::MqttConnectionClient(const std::string& clientId, const std::string& host, int port, Settings *settings)
{
m_clientId = clientId;
m_port = port;
m_host = host;
m_settings = settings;
setStaticSecret(m_settings->getConnectionParameters().secret);
}

MqttConnectionClient::~MqttConnectionClient()
Expand All @@ -27,11 +31,6 @@ int MqttConnectionClient::init()

if (m_mosq != nullptr)
{
// set this to mosq for the pw_callback
// the function mosquitto_user_data_set(..) not working for this callback
// the direct set to struct, not the best solution but it works
m_mosq->userdata = this;

mosquitto_connect_callback_set(m_mosq, connectCallback);
mosquitto_disconnect_callback_set(m_mosq, disconnectCallback);
mosquitto_publish_callback_set(m_mosq, publishCallback);
Expand Down Expand Up @@ -114,7 +113,7 @@ int MqttConnectionClient::init()
(m_mqttErrorCallback) (loop, errorMessage, "", m_member);
return EXIT_FAILURE;
}

if ((connect == MOSQ_ERR_SUCCESS) && (loop == MOSQ_ERR_SUCCESS))
{
return EXIT_SUCCESS;
Expand Down Expand Up @@ -153,11 +152,7 @@ void* MqttConnectionClient::getMember() { return m_member; }

int MqttConnectionClient::onPWCallback(char *buf, int size, int rwflag, void *userdata)
{
struct mosquitto *mosq = static_cast<struct mosquitto *>(userdata);
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(mosq->userdata);
std::string secret = self->m_settings->getConnectionParameters().secret.c_str();

strncpy(buf, secret.c_str(), size);
strncpy(buf, getStaticSecret().c_str(), size);
buf[size-1] = '\0';
return strlen(buf);
}
Expand All @@ -178,7 +173,6 @@ void MqttConnectionClient::connectCallback(struct mosquitto *mosq, void *obj, in
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member);
}

}

void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj, int reasonCode)
Expand All @@ -187,13 +181,13 @@ void MqttConnectionClient::disconnectCallback(struct mosquitto *mosq, void *obj,
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: disconnect callback with result: '" + std::to_string(reasonCode) + ":" + mosquitto_strerror(reasonCode) + "'");
self->m_connected = false;

// reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client
// reasonCode 0 disconnect is called by client, so no reconnect on destruct mqtt client
if(reasonCode > 0)
{
std::string errorMessage = "MqttConnectionClient: disconnect unexpected " + std::to_string(reasonCode) + ": " + mosquitto_connack_string(reasonCode);
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (reasonCode, errorMessage, "", self->m_member);


// try to reconnect
int reconn = mosquitto_reconnect(self->m_mosq);
Expand All @@ -219,7 +213,7 @@ void MqttConnectionClient::publishCallback(struct mosquitto *mosq, void *obj, in
void MqttConnectionClient::loggingCallback(struct mosquitto *mosq, void *obj, int level, const char *message)
{
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(obj);
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) +
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: logging callback with message: '" + std::string(message) +
"' and Log level: '" + std::to_string(level) + "'");

if(std::string(message).find("tls_process_server_certificate:certificate verify failed") != std::string::npos)
Expand All @@ -243,7 +237,7 @@ void MqttConnectionClient::subscribeCallback(struct mosquitto *mosq, void *obj,
}
else
{
std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]);
std::string errorMessage = "MqttConnectionClient: subscribe callback count " + std::to_string(i) + " failed with " + std::to_string(grantedQos[i]);
self->m_settings->callOnLog(MG_LFL_ERR, errorMessage);
(self->m_mqttErrorCallback) (grantedQos[i], errorMessage, "", self->m_member);
}
Expand All @@ -259,15 +253,15 @@ void MqttConnectionClient::unsubscribeCallback(struct mosquitto *mosq, void *obj
void MqttConnectionClient::messageCallback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
MqttConnectionClient *self = static_cast<MqttConnectionClient *>(obj);
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " +
self->m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(message->mid) + "] messageCallback on topic " +
message->topic + " with qos " + std::to_string(message->qos));

(self->m_mqttCallback)(message->topic, message->payload, message->payloadlen, self->m_member);
}

void MqttConnectionClient::subscribe(const std::string& topic, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] subscribing on topic " +
topic.c_str() + " with qos " + std::to_string(qos));

mosquitto_subscribe(m_mosq, &(m_messageId), topic.c_str(), qos);
Expand All @@ -276,7 +270,7 @@ void MqttConnectionClient::subscribe(const std::string& topic, int qos)

void MqttConnectionClient::publish(const std::string& topic, const std::string& payload, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and payload-length " + std::to_string(payload.length()));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), strlen(payload.c_str()), payload.c_str(), qos, 0);
Expand All @@ -285,7 +279,7 @@ void MqttConnectionClient::publish(const std::string& topic, const std::string&

void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, false);
Expand All @@ -294,7 +288,7 @@ void MqttConnectionClient::publish(const std::string& topic, char *payload, int

void MqttConnectionClient::publish(const std::string& topic, char *payload, int size, int qos, bool retain)
{
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
m_settings->callOnLog(MG_LFL_NTC, "MqttConnectionClient: [MsgId: " + std::to_string(m_messageId) + "] publishing on topic " +
topic.c_str() + " with qos " + std::to_string(qos) + " and size " + std::to_string(size) + " and retain " + std::to_string(retain));

mosquitto_publish(m_mosq, &(m_messageId), topic.c_str(), size, payload, qos, retain);
Expand All @@ -305,3 +299,15 @@ bool MqttConnectionClient::isConnected()
{
return m_connected;
}

void MqttConnectionClient::setStaticSecret(std::string secret)
{
std::lock_guard<std::mutex> lock(mutexSecret);
globalSecret = secret;
}

std::string MqttConnectionClient::getStaticSecret()
{
std::lock_guard<std::mutex> lock(mutexSecret);
return globalSecret;
}