Skip to content

Commit

Permalink
Saving work on gateway discovery functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed May 24, 2024
1 parent a661cf8 commit 32268f8
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 73 deletions.
21 changes: 8 additions & 13 deletions client/lib/include/cc_mqttsn_client/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ typedef enum
/// @brief Status of the gateway
typedef enum
{
CC_MqttsnGwStatus_Invalid, ///< Invalid value, should never be used
CC_MqttsnGwStatus_Available, ///< The gateway is available.
CC_MqttsnGwStatus_TimedOut, ///< The gateway hasn't advertised its presence in time, assumed disconnected.
CC_MqttsnGwStatus_Discarded ///< The gateway info was discarded using cc_mqttsn_client_discard_gw() or cc_mqttsn_client_discard_all_gw().
CC_MqttsnGwStatus_AddedByGateway, ///< Added by the @b ADVERTISE or @b GWINFO sent by the gateway messages
CC_MqttsnGwStatus_AddedByClient, ///< Added by the @b GWINFO message sent by another client.
CC_MqttsnGwStatus_UpdatedByClient, ///< The gateway's address was updated by another client.
CC_MqttsnGwStatus_Alive, ///< The @b ADVERTISE or @b GWINFO message have been received from the gateway indicating it's alive.
CC_MqttsnGwStatus_Removed, ///< The gateway hasn't advertised its presence in time, assumed no longer available.
} CC_MqttsnGwStatus;

/// @brief Status of the asynchronous operation
Expand Down Expand Up @@ -124,13 +125,6 @@ typedef struct
bool retain; ///< Retain flag of the message.
} CC_MqttsnMessageInfo;

/// @brief Tracked gateway status information
typedef struct
{
unsigned char m_gwId; ///< Gateway ID
CC_MqttsnGwStatus m_status; ///< Gateway status
} CC_MqttsnGatewayStatusInfo;

/// @brief Gateway information
typedef struct
{
Expand Down Expand Up @@ -175,8 +169,9 @@ typedef void (*CC_MqttsnSendOutputDataCb)(void* data, const unsigned char* buf,
/// cc_mqttsn_client_set_gw_status_report_callback() function.
/// @param[in] data Pointer to user data object, passed as last parameter to
/// cc_mqttsn_client_set_gw_status_report_callback() function.
/// @param[in] info Gateway status info.
typedef void (*CC_MqttsnGwStatusReportCb)(void* data, const CC_MqttsnGatewayStatusInfo* info);
/// @param[in] status Current status of the gateway.
/// @param[in] info Currently stored gateway information.
typedef void (*CC_MqttsnGwStatusReportCb)(void* data, CC_MqttsnGwStatus status, const CC_MqttsnGatewayInfo* info);

/// @brief Callback used to report unsolicited disconnection of the gateway.
/// @param[in] data Pointer to user data object, passed as the last parameter to
Expand Down
196 changes: 140 additions & 56 deletions client/lib/src/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "comms/process.h"
#include "comms/units.h"
#include "comms/util/ScopeGuard.h"
#include "comms/util/assign.h"

#include <algorithm>
#include <type_traits>
Expand Down Expand Up @@ -53,7 +54,8 @@ void updateEc(CC_MqttsnErrorCode* ec, CC_MqttsnErrorCode val)
} // namespace

ClientImpl::ClientImpl() :
m_gwDiscoveryTimer(m_timerMgr.allocTimer())
m_gwDiscoveryTimer(m_timerMgr.allocTimer()),
m_sendGwinfoTimer(m_timerMgr.allocTimer())
{
// TODO: check validity of timer in during intialization
static_cast<void>(m_searchOpAlloc);
Expand Down Expand Up @@ -461,11 +463,13 @@ void ClientImpl::handle(AdvertiseMsg& msg)

if (iter != m_clientState.m_gwInfos.end()) {
iter->m_expiryTimestamp = m_clientState.m_timestamp + comms::units::getMilliseconds<unsigned>(msg.field_duration());
reportGwStatus(CC_MqttsnGwStatus_Alive, *iter);
return;
}

if (m_clientState.m_gwInfos.max_size() <= m_clientState.m_gwInfos.size()) {
// Ignore new gateways if they cannot be stored
errorLog("Failed to store the new gateway information, due to insufficient storage");
return;
}

Expand All @@ -474,25 +478,38 @@ void ClientImpl::handle(AdvertiseMsg& msg)
info.m_gwId = msg.field_gwId().value();
info.m_expiryTimestamp = m_clientState.m_timestamp + comms::units::getMilliseconds<unsigned>(msg.field_duration());

if (m_gatewayStatusReportCb != nullptr) {
auto cbInfo = CC_MqttsnGatewayStatusInfo();
cbInfo.m_gwId = info.m_gwId;
cbInfo.m_status = CC_MqttsnGwStatus_Available;
m_gatewayStatusReportCb(m_gatewayStatusReportData, &cbInfo);
}
reportGwStatus(CC_MqttsnGwStatus_AddedByGateway, info);
}

void ClientImpl::handle(SearchgwMsg& msg)
{
static_assert(Config::HasGatewayDiscovery);
if (m_gwinfoDelayReqCb == nullptr) {
// The application didn't provide a callback to inquire about the delay for resonditing to SEARCHGW
return;
}

// TODO: check active

auto delay = m_gwinfoDelayReqCb(m_gwinfoDelayReqData);
if (delay == 0U) {
// The application rejected sending GWINFO on behalf of gateway
return;
}

m_pendingGwinfoBroadcastRadius = msg.field_radius().value();
m_sendGwinfoTimer.wait(delay, &ClientImpl::sendGwinfoCb, this);
}

void ClientImpl::handle(GwinfoMsg& msg)
{
auto onExit =
comms::util::makeScopeGuard(
[this, &msg]()
{
for (auto& op : m_searchOps) {
COMMS_ASSERT(op);
op->handle(msg);
}
});
static_assert(Config::HasGatewayDiscovery);
m_sendGwinfoTimer.cancel(); // Do not send GWINFO if pending

for (auto& op : m_searchOps) {
COMMS_ASSERT(op);
op->handle(msg);
}

auto iter =
std::find_if(
Expand All @@ -509,29 +526,41 @@ void ClientImpl::handle(GwinfoMsg& msg)
}

if (iter->m_addr.max_size() < addr.size()) {
iter->m_addr.clear();
errorLog("The gateway address reported by the client doesn't fit into the dedicated address storage, ignoring");
return;
}

if ((addr.size() == iter->m_addr.size()) &&
(std::equal(addr.begin(), addr.end(), iter->m_addr.begin()))) {
// The address is already recorded.
return;
}

iter->m_addr.assign(addr.begin(), addr.end());
reportGwStatus(CC_MqttsnGwStatus_UpdatedByClient, *iter);
return;
}

auto& addr = msg.field_gwAdd().value();
if (addr.empty()) {
reportGwStatus(CC_MqttsnGwStatus_Alive, *iter);
return;
}

if (m_clientState.m_gwInfos.max_size() <= m_clientState.m_gwInfos.size()) {
// Not enough space
errorLog("Failed to store the new gateway information, due to insufficient storage");
return;
}

m_clientState.m_gwInfos.resize(m_clientState.m_gwInfos.size() + 1U);
auto& info = m_clientState.m_gwInfos.back();
info.m_gwId = msg.field_gwId().value();
info.m_addr.assign(addr.begin(), addr.end());
// TODO: report gateway discovery
info.m_expiryTimestamp = m_clientState.m_timestamp + m_configState.m_gwAdvTimeoutMs;
monitorGatewayExpiry();

reportGwStatus(CC_MqttsnGwStatus_AddedByClient, info);
}
#endif // #if CC_MQTTSN_HAS_GATEWAY_DISCOVERY

Expand Down Expand Up @@ -658,36 +687,37 @@ void ClientImpl::handle(GwinfoMsg& msg)

// #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2

// void ClientImpl::handle(ProtMessage& msg)
// {
// static_cast<void>(msg);
// if (m_sessionState.m_disconnecting) {
// return;
// }
void ClientImpl::handle(ProtMessage& msg)
{
static_cast<void>(msg);
// if (m_sessionState.m_disconnecting) {
// return;
// }

// // During the dispatch to callbacks can be called and new ops issues,
// // the m_ops vector can be resized and iterators invalidated.
// // As the result, the iteration needs to be performed using indices
// // instead of iterators.
// // Also do not dispatch the message to new ops.
// auto count = m_ops.size();
// for (auto idx = 0U; idx < count; ++idx) {
// auto* op = m_ops[idx];
// if (op == nullptr) {
// // ops can be deleted, but the pointer will be nullified
// // until last api guard.
// continue;
// }
// During the dispatch to callbacks can be called and new ops issues,
// the m_ops vector can be resized and iterators invalidated.
// As the result, the iteration needs to be performed using indices
// instead of iterators.
// Also do not dispatch the message to new ops.
auto count = m_ops.size();
for (auto idx = 0U; idx < count; ++idx) {
auto* op = m_ops[idx];
if (op == nullptr) {
// ops can be deleted, but the pointer will be nullified
// until last api guard.
continue;
}

// msg.dispatch(*op);
msg.dispatch(*op);

// // After message dispatching the whole session may be in terminating state
// // Don't continue iteration
// if (m_sessionState.m_disconnecting) {
// break;
// }
// }
// }
// After message dispatching the whole session may be in terminating state
// Don't continue iteration

// if (m_sessionState.m_disconnecting) {
// break;
// }
}
}

CC_MqttsnErrorCode ClientImpl::sendMessage(const ProtMessage& msg, unsigned broadcastRadius)
{
Expand Down Expand Up @@ -1169,15 +1199,13 @@ void ClientImpl::monitorGatewayExpiry()
m_clientState.m_gwInfos.begin(), m_clientState.m_gwInfos.end(),
[](const auto& first, const auto& second)
{
if (first.m_expiryTimestamp == 0) {
return false;
}
COMMS_ASSERT(first.m_expiryTimestamp != 0);
COMMS_ASSERT(second.m_expiryTimestamp != 0);

return first.m_expiryTimestamp < second.m_expiryTimestamp;
});

if ((iter == m_clientState.m_gwInfos.end()) ||
(iter->m_expiryTimestamp == 0U)) {
if (iter == m_clientState.m_gwInfos.end()) {
return;
}

Expand All @@ -1194,12 +1222,7 @@ void ClientImpl::gwExpiryTimeout()
continue;
}

if (m_gatewayStatusReportCb != nullptr) {
auto cbInfo = CC_MqttsnGatewayStatusInfo();
cbInfo.m_gwId = info.m_gwId;
cbInfo.m_status = CC_MqttsnGwStatus_TimedOut;
m_gatewayStatusReportCb(m_gatewayStatusReportData, &cbInfo);
}
reportGwStatus(CC_MqttsnGwStatus_Removed, info);
}

m_clientState.m_gwInfos.erase(
Expand All @@ -1215,11 +1238,72 @@ void ClientImpl::gwExpiryTimeout()
}
}

void ClientImpl::reportGwStatus(CC_MqttsnGwStatus status, const ClientState::GwInfo& info)
{
if constexpr (Config::HasGatewayDiscovery) {
if (m_gatewayStatusReportCb == nullptr) {
return;
}

auto gwInfo = CC_MqttsnGatewayInfo();
gwInfo.m_gwId = info.m_gwId;
gwInfo.m_addr = info.m_addr.data();
comms::cast_assign(gwInfo.m_addrLen) = info.m_addr.size();

m_gatewayStatusReportCb(m_gatewayStatusReportData, status, &gwInfo);
}
}

void ClientImpl::sendGwinfo()
{
if constexpr (Config::HasGatewayDiscovery) {
auto iter =
std::max_element(
m_clientState.m_gwInfos.begin(), m_clientState.m_gwInfos.end(),
[](auto& first, auto& second)
{
if (first.m_addr.empty()) {
// Prefer one with the address
return !second.m_addr.empty();
}

if (second.m_addr.empty()) {
return false;
}

return first.m_expiryTimestamp < second.m_expiryTimestamp;
});

if ((iter == m_clientState.m_gwInfos.end()) ||
(iter->m_addr.empty())) {
// None of the gateways have known address
return;
}

GwinfoMsg msg;
if (msg.field_gwAdd().value().max_size() < iter->m_addr.size()) {
errorLog("Cannot fit the known gateway address into the GWINFO message address storage");
return;
}

msg.field_gwId().setValue(iter->m_gwId);
comms::util::assign(msg.field_gwAdd().value(), iter->m_addr.begin(), iter->m_addr.end());
sendMessage(msg, std::max(m_pendingGwinfoBroadcastRadius, 1U));
}
}

void ClientImpl::gwExpiryTimeoutCb(void* data)
{
if constexpr (Config::HasGatewayDiscovery) {
reinterpret_cast<ClientImpl*>(data)->gwExpiryTimeout();
}
}

void ClientImpl::sendGwinfoCb(void* data)
{
if constexpr (Config::HasGatewayDiscovery) {
reinterpret_cast<ClientImpl*>(data)->sendGwinfo();
}
}

} // namespace cc_mqttsn_client
10 changes: 8 additions & 2 deletions client/lib/src/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class ClientImpl final : public ProtMsgHandler
using Base::handle;
#if CC_MQTTSN_HAS_GATEWAY_DISCOVERY
virtual void handle(AdvertiseMsg& msg) override;
virtual void handle(SearchgwMsg& msg) override;
virtual void handle(GwinfoMsg& msg) override;
#endif // #if CC_MQTTSN_HAS_GATEWAY_DISCOVERY
// virtual void handle(PublishMsg& msg) override;
Expand All @@ -159,7 +160,7 @@ class ClientImpl final : public ProtMsgHandler
// virtual void handle(PubcompMsg& msg) override;
// #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2

// virtual void handle(ProtMessage& msg) override;
virtual void handle(ProtMessage& msg) override;

// -------------------- Ops Access API -----------------------------

Expand Down Expand Up @@ -286,7 +287,11 @@ class ClientImpl final : public ProtMsgHandler

void monitorGatewayExpiry();
void gwExpiryTimeout();
void reportGwStatus(CC_MqttsnGwStatus status, const ClientState::GwInfo& info);
void sendGwinfo();

static void gwExpiryTimeoutCb(void* data);
static void sendGwinfoCb(void* data);

friend class ApiEnterGuard;

Expand Down Expand Up @@ -317,10 +322,10 @@ class ClientImpl final : public ProtMsgHandler
ConfigState m_configState;
ClientState m_clientState;
// SessionState m_sessionState;
// ReuseState m_reuseState;

TimerMgr m_timerMgr;
TimerMgr::Timer m_gwDiscoveryTimer;
TimerMgr::Timer m_sendGwinfoTimer;
unsigned m_apiEnterCount = 0U;

OutputBuf m_buf;
Expand Down Expand Up @@ -352,6 +357,7 @@ class ClientImpl final : public ProtMsgHandler
// SendOpsList m_sendOps;

OpPtrsList m_ops;
unsigned m_pendingGwinfoBroadcastRadius = 0U;
bool m_opsDeleted = false;
bool m_preparationLocked = false;
};
Expand Down
Loading

0 comments on commit 32268f8

Please sign in to comment.