Skip to content

Commit

Permalink
Initial preparation for the "connect" operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Jun 4, 2024
1 parent 66477f7 commit 46b9120
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 24 deletions.
1 change: 1 addition & 0 deletions client/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ function (gen_lib_mqttsn_client config_file)

message (STATUS "Defining library ${lib_name}")
set (src
src/op/ConnectOp.cpp
src/op/Op.cpp
src/op/SearchOp.cpp
src/ClientImpl.cpp
Expand Down
52 changes: 44 additions & 8 deletions client/lib/include/cc_mqttsn_client/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extern "C" {
#define CC_MQTTSN_CLIENT_VERSION CC_MQTTSN_CLIENT_MAKE_VERSION(CC_MQTTSN_CLIENT_MAJOR_VERSION, CC_MQTTSN_CLIENT_MINOR_VERSION, CC_MQTTSN_CLIENT_PATCH_VERSION)

/// @brief Quality of Service
/// @ingroup client
typedef enum
{
CC_MqttsnQoS_NoGwPublish = -1, ///< QoS=-1. No gateway publish, used by publish only clients.
Expand All @@ -47,6 +48,7 @@ typedef enum
} CC_MqttsnQoS;

/// @brief Error code returned by various API functions.
/// @ingroup client
typedef enum
{
CC_MqttsnErrorCode_Success = 0, ///< The requested operation was successfully started.
Expand All @@ -68,6 +70,7 @@ typedef enum
} CC_MqttsnErrorCode;

/// @brief Status of the gateway
/// @ingroup client
typedef enum
{
CC_MqttsnGwStatus_AddedByGateway = 0, ///< Added by the @b ADVERTISE or @b GWINFO sent by the gateway messages
Expand All @@ -80,6 +83,7 @@ typedef enum
} CC_MqttsnGwStatus;

/// @brief Status of the asynchronous operation
/// @ingroup client
typedef enum
{
CC_MqttsnAsyncOpStatus_Complete = 0, ///< The requested operation has been completed, refer to reported extra details for information
Expand All @@ -100,6 +104,17 @@ typedef enum
CC_MqttsnGatewayDisconnectReason_ValuesLimit ///< Limit for the values
} CC_MqttsnGatewayDisconnectReason;

/// @brief Return code as per MQTT-SN specification
/// @ingroup client
typedef enum
{
CC_MqttsnReturnCode_Accepted = 0, ///< Accepted
CC_MqttsnReturnCode_Conjestion = 1, ///< Rejected due to conjesion
CC_MqttsnReturnCode_InvalidTopicId = 2, ///< Rejected due to invalid topic ID
CC_MqttsnReturnCode_NotSupported = 3, ///< Rejected as not supported
CC_MqttsnReturnCode_ValuesLimit ///< Limit for the values
} CC_MqttsnReturnCode;

/// @brief Declaration of struct for the @ref CC_MqttsnClientHandle;
struct CC_MqttsnClient;

Expand All @@ -116,6 +131,16 @@ struct CC_MqttsnSearch;
/// @ingroup "search".
typedef struct CC_MqttsnSearch* CC_MqttsnSearchHandle;

/// @brief Declaration of the hidden structure used to define @ref CC_MqttsnConnectHandle
/// @ingroup connect
struct CC_MqttsnConnect;

/// @brief Handle for "connect" operation.
/// @details Returned by @b cc_mqttsn_client_connect_prepare() function.
/// @ingroup "connect".
typedef struct CC_MqttsnConnect* CC_MqttsnConnectHandle;


/// @brief Type used to hold Topic ID value.
typedef unsigned short CC_MqttsnTopicId;

Expand All @@ -141,13 +166,21 @@ typedef struct
} CC_MqttsnMessageInfo;

/// @brief Gateway information
/// @ingroup client
typedef struct
{
unsigned char m_gwId; ///< Gateway ID
const unsigned char* m_addr; ///< Address of the gateway if known, NULL if not.
unsigned m_addrLen; ///< Length of the address
unsigned char m_gwId; ///< Gateway ID
const unsigned char* m_addr; ///< Address of the gateway if known, NULL if not.
unsigned m_addrLen; ///< Length of the address
} CC_MqttsnGatewayInfo;

/// @brief Information on the "connect" operation completion
/// @ingroup connect
typedef struct
{
CC_MqttsnReturnCode m_returnCode; ///< Return code reported by the @b CONNACK message
} CC_MqttsnConnectInfo;

/// @brief Callback used to request time measurement.
/// @details The callback is set using
/// cc_mqttsn_client_set_next_tick_program_callback() function.
Expand Down Expand Up @@ -225,12 +258,15 @@ typedef unsigned (*CC_MqttsnGwinfoDelayRequestCb)(void* data);
/// @ingroup search
typedef void (*CC_MqttsnSearchCompleteCb)(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnGatewayInfo* info);

/// @brief Callback used to report completion of the subscribe operation.
/// @brief Callback used to report completion of the connect operation.
/// @param[in] data Pointer to user data object, passed as the last parameter to
/// the subscribe request.
/// @param[in] status Status of the subscribe operation.
/// @param[in] qos Maximal level of quality of service, the gateway/gateway is going to use to publish incoming messages.
typedef void (*CC_MqttsnSubscribeCompleteReportCb)(void* data, CC_MqttsnAsyncOpStatus status, CC_MqttsnQoS qos);
/// the request call.
/// @param[in] status Status of the "connect" operation.
/// @param[in] info Information about op completion. Not-NULL is reported <b>if and onfly if</b>
/// the "status" is equal to @ref CC_MqttsnAsyncOpStatus_Complete.
/// @post The data members of the reported response can NOT be accessed after the function returns.
/// @ingroup connect
typedef void (*CC_MqttsnConnectCompleteCb)(void* data, CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info);


#ifdef __cplusplus
Expand Down
64 changes: 59 additions & 5 deletions client/lib/src/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,60 @@ op::SearchOp* ClientImpl::searchPrepare(CC_MqttsnErrorCode* ec)
}
}

op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec)
{
op::ConnectOp* op = nullptr;
do {
if (!m_clientState.m_initialized) {
if (m_apiEnterCount > 0U) {
errorLog("Cannot prepare connect from within callback");
updateEc(ec, CC_MqttsnErrorCode_RetryLater);
break;
}

auto initEc = initInternal();
if (initEc != CC_MqttsnErrorCode_Success) {
updateEc(ec, initEc);
break;
}
}

if (!m_connectOps.empty()) {
// Already allocated
errorLog("Another connect operation is in progress.");
updateEc(ec, CC_MqttsnErrorCode_Busy);
break;
}

if (m_ops.max_size() <= m_ops.size()) {
errorLog("Cannot start connect operation, retry in next event loop iteration.");
updateEc(ec, CC_MqttsnErrorCode_RetryLater);
break;
}

if (m_preparationLocked) {
errorLog("Another operation is being prepared, cannot prepare \"connect\" without \"send\" or \"cancel\" of the previous.");
updateEc(ec, CC_MqttsnErrorCode_PreparationLocked);
break;
}

auto ptr = m_connectOpAlloc.alloc(*this);
if (!ptr) {
errorLog("Cannot allocate new connect operation.");
updateEc(ec, CC_MqttsnErrorCode_OutOfMemory);
break;
}

m_preparationLocked = true;
m_ops.push_back(ptr.get());
m_connectOps.push_back(std::move(ptr));
op = m_connectOps.back().get();
updateEc(ec, CC_MqttsnErrorCode_Success);
} while (false);

return op;
}

// op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec)
// {
// op::ConnectOp* connectOp = nullptr;
Expand Down Expand Up @@ -828,7 +882,7 @@ void ClientImpl::opComplete(const op::Op* op)
using ExtraCompleteFunc = void (ClientImpl::*)(const op::Op*);
static const ExtraCompleteFunc Map[] = {
/* Type_Search */ &ClientImpl::opComplete_Search,
// /* Type_Connect */ &ClientImpl::opComplete_Connect,
/* Type_Connect */ &ClientImpl::opComplete_Connect,
// /* Type_KeepAlive */ &ClientImpl::opComplete_KeepAlive,
// /* Type_Disconnect */ &ClientImpl::opComplete_Disconnect,
// /* Type_Subscribe */ &ClientImpl::opComplete_Subscribe,
Expand Down Expand Up @@ -1218,10 +1272,10 @@ void ClientImpl::opComplete_Search(const op::Op* op)
eraseFromList(op, m_searchOps);
}

// void ClientImpl::opComplete_Connect(const op::Op* op)
// {
// eraseFromList(op, m_connectOps);
// }
void ClientImpl::opComplete_Connect(const op::Op* op)
{
eraseFromList(op, m_connectOps);
}

// void ClientImpl::opComplete_KeepAlive(const op::Op* op)
// {
Expand Down
14 changes: 7 additions & 7 deletions client/lib/src/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// #include "SessionState.h"
#include "TimerMgr.h"

// #include "op/ConnectOp.h"
#include "op/ConnectOp.h"
// #include "op/DisconnectOp.h"
// #include "op/KeepAliveOp.h"
#include "op/Op.h"
Expand Down Expand Up @@ -69,7 +69,7 @@ class ClientImpl final : public ProtMsgHandler
// bool isNetworkDisconnected() const;

op::SearchOp* searchPrepare(CC_MqttsnErrorCode* ec);
// op::ConnectOp* connectPrepare(CC_MqttsnErrorCode* ec);
op::ConnectOp* connectPrepare(CC_MqttsnErrorCode* ec);
// op::DisconnectOp* disconnectPrepare(CC_MqttsnErrorCode* ec);
// op::SubscribeOp* subscribePrepare(CC_MqttsnErrorCode* ec);
// op::UnsubscribeOp* unsubscribePrepare(CC_MqttsnErrorCode* ec);
Expand Down Expand Up @@ -231,8 +231,8 @@ class ClientImpl final : public ProtMsgHandler
using SearchOpAlloc = ObjAllocator<op::SearchOp, ExtConfig::SearchOpsLimit>;
using SearchOpsList = ObjListType<SearchOpAlloc::Ptr, ExtConfig::SearchOpsLimit>;

// using ConnectOpAlloc = ObjAllocator<op::ConnectOp, ExtConfig::ConnectOpsLimit>;
// using ConnectOpsList = ObjListType<ConnectOpAlloc::Ptr, ExtConfig::ConnectOpsLimit>;
using ConnectOpAlloc = ObjAllocator<op::ConnectOp, ExtConfig::ConnectOpsLimit>;
using ConnectOpsList = ObjListType<ConnectOpAlloc::Ptr, ExtConfig::ConnectOpsLimit>;

// using KeepAliveOpAlloc = ObjAllocator<op::KeepAliveOp, ExtConfig::KeepAliveOpsLimit>;
// using KeepAliveOpsList = ObjListType<KeepAliveOpAlloc::Ptr, ExtConfig::KeepAliveOpsLimit>;
Expand Down Expand Up @@ -277,7 +277,7 @@ class ClientImpl final : public ProtMsgHandler
// bool processPublishAckMsg(ProtMessage& msg, std::uint16_t packetId, bool pubcompAck = false);

void opComplete_Search(const op::Op* op);
// void opComplete_Connect(const op::Op* op);
void opComplete_Connect(const op::Op* op);
// void opComplete_KeepAlive(const op::Op* op);
// void opComplete_Disconnect(const op::Op* op);
// void opComplete_Subscribe(const op::Op* op);
Expand Down Expand Up @@ -335,8 +335,8 @@ class ClientImpl final : public ProtMsgHandler
SearchOpAlloc m_searchOpAlloc;
SearchOpsList m_searchOps;

// ConnectOpAlloc m_connectOpAlloc;
// ConnectOpsList m_connectOps;
ConnectOpAlloc m_connectOpAlloc;
ConnectOpsList m_connectOps;

// KeepAliveOpAlloc m_keepAliveOpsAlloc;
// KeepAliveOpsList m_keepAliveOps;
Expand Down
128 changes: 128 additions & 0 deletions client/lib/src/op/ConnectOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//
// Copyright 2024 - 2024 (C). Alex Robenko. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#include "op/ConnectOp.h"
#include "ClientImpl.h"

#include "comms/util/assign.h"
#include "comms/util/ScopeGuard.h"
#include "comms/units.h"

#include <algorithm>
#include <limits>

namespace cc_mqttsn_client
{

namespace op
{

namespace
{

inline ConnectOp* asConnectOp(void* data)
{
return reinterpret_cast<ConnectOp*>(data);
}

} // namespace


ConnectOp::ConnectOp(ClientImpl& client) :
Base(client),
m_timer(client.timerMgr().allocTimer())
{
}

CC_MqttsnErrorCode ConnectOp::send(CC_MqttsnConnectCompleteCb cb, void* cbData)
{
client().allowNextPrepare();
auto completeOnError =
comms::util::makeScopeGuard(
[this]()
{
opComplete();
});

if (cb == nullptr) {
errorLog("Connect completion callback is not provided.");
return CC_MqttsnErrorCode_BadParam;
}

if (!m_timer.isValid()) {
errorLog("The library cannot allocate required number of timers.");
return CC_MqttsnErrorCode_InternalError;
}

m_cb = cb;
m_cbData = cbData;

auto ec = sendInternal();
if (ec == CC_MqttsnErrorCode_Success) {
completeOnError.release();
}

return ec;
}

CC_MqttsnErrorCode ConnectOp::cancel()
{
if (m_cb == nullptr) {
// hasn't been sent yet
client().allowNextPrepare();
}

opComplete();
return CC_MqttsnErrorCode_Success;
}

Op::Type ConnectOp::typeImpl() const
{
return Type_Connect;
}

void ConnectOp::terminateOpImpl(CC_MqttsnAsyncOpStatus status)
{
completeOpInternal(status);
}

void ConnectOp::completeOpInternal(CC_MqttsnAsyncOpStatus status, const CC_MqttsnConnectInfo* info)
{
auto cb = m_cb;
auto* cbData = m_cbData;
opComplete(); // mustn't access data members after destruction
if (cb != nullptr) {
cb(cbData, status, info);
}
}

void ConnectOp::restartTimer()
{
m_timer.wait(getRetryPeriod(), &ConnectOp::opTimeoutCb, this);
}

CC_MqttsnErrorCode ConnectOp::sendInternal()
{
if (getRetryCount() == 0U) {
errorLog("All retries of the connect operation have been exhausted.");
completeOpInternal(CC_MqttsnAsyncOpStatus_Timeout);
return CC_MqttsnErrorCode_InternalError;
}

decRetryCount();

return sendMessage(m_connectMsg);
}

void ConnectOp::opTimeoutCb(void* data)
{
asConnectOp(data)->sendInternal();
}

} // namespace op

} // namespace cc_mqttsn_client
Loading

0 comments on commit 46b9120

Please sign in to comment.