Skip to content

Commit

Permalink
Added error report callback to the gateway session.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed May 20, 2024
1 parent 3a20a06 commit f4179c8
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 25 deletions.
6 changes: 6 additions & 0 deletions gateway/app/gateway/GatewaySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ bool GatewaySession::startSession()
});
});

m_session->setErrorReportCb(
[this](const char* msg)
{
m_logger.error() << msg << std::endl;
});

if (!m_sessionPtr) {
// Forwarder encapsulated session
return true;
Expand Down
25 changes: 25 additions & 0 deletions gateway/lib/doc/session.dox
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,28 @@
/// broker and report the @ref cc_mqttsn_gw_session_page_broker_conn "connectivity"
/// later on. The @b important part is that broker connectivity must be reported
/// @b after the callback function returns.
///
/// @section cc_mqttsn_gw_session_page_error_report Errors Report
/// The gateway library can detect and report some unexpected behavior from the
/// application, client, and/or broker. The @b Session object reports such
/// errors via callback:
///
/// @b C++ interface
/// @code
/// session->setErrorReportCb(
/// [](const char* msg)
/// {
/// std::cerr << "ERROR: " << msg << std::endl;
/// });
/// @endcode
///
/// @b C interface
/// @code
/// void my_error_report(void* userData, CC_MqttsnSessionHandle session, const char* msg)
/// {
/// printf("ERROR: %s\n", msg);
/// }
///
/// cc_mqttsn_gw_session_set_error_report_cb(handle, &my_error_report, someUserData);
/// @endcode
///
8 changes: 8 additions & 0 deletions gateway/lib/include/cc_mqttsn_gateway/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class Session
/// @return Authentication information
using AuthInfoReqCb = std::function<AuthInfo (const std::string& clientId)>;

/// @brief Type of the callback used to report detected errors.
/// @param[in] msg Error message
using ErrorReportCb = std::function<void (const char* msg)>;

/// @brief Type of callback used to notify the application about forwarding encapsulation session being created.
/// @details The application is responsible to perform the necessary session configuration
/// as well as set all the callbacks except the one set by the @ref setSendDataClientReqCb()
Expand Down Expand Up @@ -168,6 +172,10 @@ class Session
/// @param[in] func R-value reference to the callback object
void setAuthInfoReqCb(AuthInfoReqCb&& func);

/// @brief Set the callback to be used to report detected errors
/// @param[in] func R-value reference to the callback object
void setErrorReportCb(ErrorReportCb&& func);

/// @brief Set the callback to be invoked when the forwarding encapsulation session
/// is detected and to notify application about such session creation.
/// @details When not set, the forwarding enapsulation messages will be ignored
Expand Down
49 changes: 33 additions & 16 deletions gateway/lib/include/cc_mqttsn_gateway/gateway_all.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ typedef void (*CC_MqttsnSessionBrokerReconnectReqCb)(void* userData, CC_MqttsnSe
/// @param[in] clientId Client ID
typedef void (*CC_MqttsnSessionClientConnectReportCb)(void* userData, CC_MqttsnSessionHandle session, const char* clientId);

/// @brief Type of callback used to request authentication information of
/// the client that is trying to connect.
/// @param[in] userData User data passed as the last parameter to the setting function.
/// @param[in] session Handle of session performing the request
/// @param[in] clientId Client ID
/// @param[out] username Username string
/// @param[out] password Binary password buffer
/// @param[out] passwordLen Length of the binary password
typedef void (*CC_MqttsnSessionAuthInfoReqCb)(
void* userData,
CC_MqttsnSessionHandle session,
const char* clientId,
const char** username,
const unsigned char** password,
unsigned* passwordLen);

/// @brief Type of callback used to report error messages detected by the session.
/// @param[in] userData User data passed as the last parameter to the setting function.
/// @param[in] session Handle of session performing the request
/// @param[in] msg Error message
typedef void (*CC_MqttsnSessionErrorReportCb)(void* userData, CC_MqttsnSessionHandle session, const char* msg);

/// @brief Type of callback used to report forwarding encapsulated session creation.
/// @details The application is responsible to perform the necessary session configuration
/// as well as set all the callbacks except the one set by the @ref cc_mqttsn_gw_session_set_send_data_to_client_cb()
Expand All @@ -198,22 +220,6 @@ typedef bool (*CC_MqttsnSessionFwdEncSessionCreatedCb)(void* userData, CC_Mqttsn
/// @param[in] session Handle of session object about to be deleted
typedef void (*CC_MqttsnSessionFwdEncSessionDeletedCb)(void* userData, CC_MqttsnSessionHandle session);

/// @brief Type of callback used to request authentication information of
/// the client that is trying to connect.
/// @param[in] userData User data passed as the last parameter to the setting function.
/// @param[in] session Handle of session performing the request
/// @param[in] clientId Client ID
/// @param[out] username Username string
/// @param[out] password Binary password buffer
/// @param[out] passwordLen Length of the binary password
typedef void (*CC_MqttsnSessionAuthInfoReqCb)(
void* userData,
CC_MqttsnSessionHandle session,
const char* clientId,
const char** username,
const unsigned char** password,
unsigned* passwordLen);

/// @brief Allocate @b Session object.
/// @details The returned handle need to be passed as first parameter
/// to all relevant functions. Note that the @b Session object is
Expand Down Expand Up @@ -331,6 +337,17 @@ void cc_mqttsn_gw_session_set_auth_info_req_cb(
CC_MqttsnSessionAuthInfoReqCb cb,
void* data);

/// @brief Set the callback to be used to report error messages detected by the session.
/// @details This is an optional callback.
/// @param[in] session Handle returned by cc_mqttsn_gw_session_alloc() function.
/// @param[in] cb Pointer to callback function
/// @param[in] data Pointer to any user data, will be passed back as first
/// parameter to the callback.
void cc_mqttsn_gw_session_set_error_report_cb(
CC_MqttsnSessionHandle session,
CC_MqttsnSessionErrorReportCb cb,
void* data);

/// @brief Set the callback to be invoked when the forwarding encapsulation session
/// is detected and to notify application about such session creation.
/// @details When not set, the forwarding enapsulation messages will be ignored
Expand Down
5 changes: 5 additions & 0 deletions gateway/lib/src/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ void Session::setAuthInfoReqCb(AuthInfoReqCb&& func)
m_pImpl->setAuthInfoReqCb(std::move(func));
}

void Session::setErrorReportCb(ErrorReportCb&& func)
{
m_pImpl->setErrorReportCb(std::move(func));
}

void Session::setFwdEncSessionCreatedReportCb(FwdEncSessionCreatedReportCb&& func)
{
m_pImpl->setFwdEncSessionCreatedReportCb(std::move(func));
Expand Down
15 changes: 13 additions & 2 deletions gateway/lib/src/SessionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ bool SessionImpl::start()
(!m_sendToBrokerCb) ||
(!m_termReqCb) ||
(!m_brokerReconnectReqCb)) {
reportError("Not all \"must have\" callbacks have been set");
return false;
}

if (static_cast<bool>(m_fwdEncSessionCreatedReportCb) != static_cast<bool>(m_fwdEncSessionDeletedReportCb)) {
reportError("Not all forward encapsulated session management callbacks have been set");
return false;
}

Expand Down Expand Up @@ -127,8 +129,8 @@ std::size_t SessionImpl::dataFromClient(const std::uint8_t* buf, std::size_t len
}

if (es == comms::ErrorStatus::ProtocolError) {
++bufTmp;
continue;
reportError("Protocol error from the client side is detected");
return len;
}

if (es == comms::ErrorStatus::Success) {
Expand Down Expand Up @@ -273,6 +275,15 @@ SessionImpl::AuthInfo SessionImpl::authInfoRequest(const std::string& clientId)
return m_authInfoReqCb(clientId);
}

void SessionImpl::reportError(const char* str)
{
if (!m_errorReportCb) {
return;
}

m_errorReportCb(str);
}

void SessionImpl::handle(SearchgwMsg_SN& msg)
{
GwinfoMsg_SN respMsg;
Expand Down
9 changes: 9 additions & 0 deletions gateway/lib/src/SessionImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SessionImpl : public MsgHandler
using BrokerReconnectReqCb = Session::BrokerReconnectReqCb;
using ClientConnectedReportCb = Session::ClientConnectedReportCb;
using AuthInfoReqCb = Session::AuthInfoReqCb;
using ErrorReportCb = Session::ErrorReportCb;
using FwdEncSessionCreatedReportCb = Session::FwdEncSessionCreatedReportCb;
using FwdEncSessionDeletedReportCb = Session::FwdEncSessionDeletedReportCb;

Expand Down Expand Up @@ -91,6 +92,12 @@ class SessionImpl : public MsgHandler
m_authInfoReqCb = std::forward<TFunc>(func);
}

template <typename TFunc>
void setErrorReportCb(TFunc&& func)
{
m_errorReportCb = std::forward<TFunc>(func);
}

template <typename TFunc>
void setFwdEncSessionCreatedReportCb(TFunc&& func)
{
Expand Down Expand Up @@ -210,6 +217,7 @@ class SessionImpl : public MsgHandler
void brokerReconnectRequest();
void clientConnectedReport(const std::string& clientId);
AuthInfo authInfoRequest(const std::string& clientId);
void reportError(const char* str);

private:

Expand Down Expand Up @@ -248,6 +256,7 @@ class SessionImpl : public MsgHandler
BrokerReconnectReqCb m_brokerReconnectReqCb;
ClientConnectedReportCb m_clientConnectedCb;
AuthInfoReqCb m_authInfoReqCb;
ErrorReportCb m_errorReportCb;
FwdEncSessionCreatedReportCb m_fwdEncSessionCreatedReportCb;
FwdEncSessionDeletedReportCb m_fwdEncSessionDeletedReportCb;

Expand Down
16 changes: 16 additions & 0 deletions gateway/lib/src/gateway_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,22 @@ void cc_mqttsn_gw_session_set_auth_info_req_cb(
});
}

void cc_mqttsn_gw_session_set_error_report_cb(
CC_MqttsnSessionHandle session,
CC_MqttsnSessionErrorReportCb cb,
void* data)
{
if (session.obj == nullptr) {
return;
}

reinterpret_cast<Session*>(session.obj)->setErrorReportCb(
[cb, data, session](const char* msg)
{
cb(data, session, msg);
});
}

void cc_mqttsn_gw_session_set_fwd_enc_session_created_cb(
CC_MqttsnSessionHandle session,
CC_MqttsnSessionFwdEncSessionCreatedCb cb,
Expand Down
1 change: 1 addition & 0 deletions gateway/lib/src/session_op/Connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void Connect::handle(ConnectMsg_SN& msg)

if ((st.m_connStatus != ConnectionStatus::Disconnected) &&
(reqClientId != st.m_clientId)) {
session().reportError("Different client id reconnection in the same session");
sendDisconnectToClient();
state().m_connStatus = ConnectionStatus::Disconnected;
termRequest();
Expand Down
16 changes: 13 additions & 3 deletions gateway/lib/src/session_op/Forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

#include "Forward.h"

#include "SessionImpl.h"

#include "comms/util/ScopeGuard.h"
#include "comms/util/assign.h"

#include <cassert>
#include <algorithm>

#include "comms/util/ScopeGuard.h"

namespace cc_mqttsn_gateway
{
Expand Down Expand Up @@ -49,6 +53,7 @@ void Forward::handle(PublishMsg_SN& msg)
} while (false);

if (st.m_connStatus != ConnectionStatus::Connected) {
session().reportError("PUBLISH from client when not connected");
sendPubackToClient(
msg.field_topicId().value(),
msg.field_msgId().value(),
Expand All @@ -66,6 +71,7 @@ void Forward::handle(PublishMsg_SN& msg)

auto& topic = st.m_regMgr.mapTopicId(msg.field_topicId().value());
if (topic.empty()) {
session().reportError("Invalid topic ID in PUBLISH");
sendPubackToClient(
msg.field_topicId().value(),
msg.field_msgId().value(),
Expand All @@ -87,7 +93,7 @@ void Forward::handle(PublishMsg_SN& msg)
fwdMsg.field_topic().value() = topic;
fwdMsg.field_packetId().field().value() = msg.field_msgId().value();
auto& data = msg.field_data().value();
fwdMsg.field_payload().value().assign(data.begin(), data.end());
comms::util::assign(fwdMsg.field_payload().value(), data.begin(), data.end());
fwdMsg.doRefresh();
sendToBroker(fwdMsg);
}
Expand Down Expand Up @@ -128,6 +134,7 @@ void Forward::handle(SubscribeMsg_SN& msg)
};

if (state().m_connStatus != ConnectionStatus::Connected) {
session().reportError("SUBSCRIBE from client when not connected");
sendSubackFunc(ReturnCodeVal::NotSupported);
return;
}
Expand All @@ -140,9 +147,10 @@ void Forward::handle(SubscribeMsg_SN& msg)
assert(msg.field_topicId().isMissing());
assert(msg.field_flags().field_topicIdType().value() == TopicIdTypeVal::Normal);
auto& topicStorage = msg.field_topicName().field().value();
topic.assign(topicStorage.begin(), topicStorage.end());
comms::util::assign(topic, topicStorage.begin(), topicStorage.end());

if (topic.empty()) {
session().reportError("Empty topic in the SUBSCRIBE message from the client");
sendSubackFunc(ReturnCodeVal::NotSupported);
sendToBroker(PingreqMsg());
return;
Expand Down Expand Up @@ -174,6 +182,7 @@ void Forward::handle(SubscribeMsg_SN& msg)
}

if (msg.field_flags().field_topicIdType().value() != TopicIdTypeVal::PredefinedTopicId) {
session().reportError("Unknown topic type in SUBSCRIBE message from the client");
sendSubackFunc(ReturnCodeVal::NotSupported);
sendToBroker(PingreqMsg());
return;
Expand All @@ -189,6 +198,7 @@ void Forward::handle(SubscribeMsg_SN& msg)
break;
}

session().reportError("Invalid topic topic ID in SUBSCRIBE message from the client");
sendSubackFunc(ReturnCodeVal::InvalidTopicId);
sendToBroker(PingreqMsg());
return;
Expand Down
7 changes: 3 additions & 4 deletions gateway/lib/src/session_op/PubSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "PubSend.h"

#include "comms/util/assign.h"

#include <cassert>
#include <algorithm>

Expand Down Expand Up @@ -241,11 +243,8 @@ void PubSend::doSend()
}

if (!m_currPub->m_msg.empty()) {
auto& dataStorage = msg.field_data().value();
using DataStorageType = typename std::decay<decltype(dataStorage)>::type;
dataStorage = DataStorageType(&(*m_currPub->m_msg.begin()), m_currPub->m_msg.size());
comms::util::assign(msg.field_data().value(), m_currPub->m_msg.begin(), m_currPub->m_msg.end());
}
//msg.field_data().value().assign(m_currPub->m_msg.begin(), m_currPub->m_msg.end());
sendToClient(msg);

if (m_currPub->m_qos == QoS_AtMostOnceDelivery) {
Expand Down
4 changes: 4 additions & 0 deletions gateway/lib/src/session_op/WillUpdate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "WillUpdate.h"

#include "SessionImpl.h"

#include <cassert>
#include <algorithm>

Expand Down Expand Up @@ -65,6 +67,7 @@ void WillUpdate::handle([[maybe_unused]] DisconnectMsg_SN& msg)
void WillUpdate::handle(WilltopicupdMsg_SN& msg)
{
if (state().m_connStatus != ConnectionStatus::Connected) {
session().reportError("WILLTOPICUPD message from the client when not connected");
sendTopicResp(ReturnCodeVal::NotSupported);
return;
}
Expand Down Expand Up @@ -103,6 +106,7 @@ void WillUpdate::handle(WilltopicupdMsg_SN& msg)
void WillUpdate::handle(WillmsgupdMsg_SN& msg)
{
if (state().m_connStatus != ConnectionStatus::Connected) {
session().reportError("WILLMSGUPD message from the client when not connected");
sendMsgResp(ReturnCodeVal::NotSupported);
return;
}
Expand Down

0 comments on commit f4179c8

Please sign in to comment.