Skip to content

Commit

Permalink
Handling pings in the new client.
Browse files Browse the repository at this point in the history
  • Loading branch information
arobenko committed Jun 13, 2024
1 parent cf995e5 commit 18e5c8c
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 209 deletions.
1 change: 1 addition & 0 deletions client/app/common/AppClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ std::string AppClient::toString(CC_MqttsnAsyncOpStatus val)
/* CC_MqttsnAsyncOpStatus_Aborted */ "Aborted",
/* CC_MqttsnAsyncOpStatus_OutOfMemory */ "Out of Memory",
/* CC_MqttsnAsyncOpStatus_BadParam */ "Bad Param",
/* CC_MqttsnAsyncOpStatus_GatewayDisconnected */ "Gateway Disconnected",
};

static constexpr std::size_t MapSize = std::extent<decltype(Map)>::value;
Expand Down
1 change: 1 addition & 0 deletions client/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ function (gen_lib_mqttsn_client config_file)
message (STATUS "Defining library ${lib_name}")
set (src
src/op/ConnectOp.cpp
src/op/KeepAliveOp.cpp
src/op/Op.cpp
src/op/SearchOp.cpp
src/ClientImpl.cpp
Expand Down
1 change: 1 addition & 0 deletions client/lib/include/cc_mqttsn_client/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ typedef enum
CC_MqttsnAsyncOpStatus_Aborted = 3, ///< The operation has been aborted before completion due to client's side operation.
CC_MqttsnAsyncOpStatus_OutOfMemory = 4, ///< The client library wasn't able to allocate necessary memory.
CC_MqttsnAsyncOpStatus_BadParam = 5, ///< Bad value has been returned from the relevant callback.
CC_MqttsnAsyncOpStatus_GatewayDisconnected = 6, ///< Gateway disconnection detected during the operation execution.
CC_MqttsnAsyncOpStatus_ValuesLimit ///< Limit for the values
} CC_MqttsnAsyncOpStatus;

Expand Down
262 changes: 79 additions & 183 deletions client/lib/src/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,18 @@ op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec)
break;
}

if (m_sessionState.m_disconnecting) {
errorLog("Session disconnection is in progress, cannot initiate connection.");
updateEc(ec, CC_MqttsnErrorCode_Disconnecting);
break;
}

if (m_sessionState.m_connected) {
errorLog("Client is already connected.");
updateEc(ec, CC_MqttsnErrorCode_AlreadyConnected);
break;
}

if (m_ops.max_size() <= m_ops.size()) {
errorLog("Cannot start connect operation, retry in next event loop iteration.");
updateEc(ec, CC_MqttsnErrorCode_RetryLater);
Expand Down Expand Up @@ -198,74 +210,6 @@ op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec)
return op;
}

// op::ConnectOp* ClientImpl::connectPrepare(CC_MqttsnErrorCode* ec)
// {
// op::ConnectOp* connectOp = nullptr;
// do {
// m_clientState.m_networkDisconnected = false;

// if (!m_clientState.m_initialized) {
// if (m_apiEnterCount > 0U) {
// errorLog("Cannot prepare connect from within callback");
// updateEc(ec, CC_MqttsnErrorCode_RetryLater);
// break;
// }

// auto initEc = initInternal();
// if (initEc != CC_MqttsnErrorCode_Success) {
// updateEc(ec, initEc);
// break;
// }
// }

// if (!m_connectOps.empty()) {
// // Already allocated
// errorLog("Another connect operation is in progress.");
// updateEc(ec, CC_MqttsnErrorCode_Busy);
// break;
// }

// if (m_sessionState.m_disconnecting) {
// errorLog("Session disconnection is in progress, cannot initiate connection.");
// updateEc(ec, CC_MqttsnErrorCode_Disconnecting);
// break;
// }

// if (m_sessionState.m_connected) {
// errorLog("Client is already connected.");
// updateEc(ec, CC_MqttsnErrorCode_AlreadyConnected);
// break;
// }

// if (m_ops.max_size() <= m_ops.size()) {
// errorLog("Cannot start connect operation, retry in next event loop iteration.");
// updateEc(ec, CC_MqttsnErrorCode_RetryLater);
// break;
// }

// if (m_preparationLocked) {
// errorLog("Another operation is being prepared, cannot prepare \"connect\" without \"send\" or \"cancel\" of the previous.");
// updateEc(ec, CC_MqttsnErrorCode_PreparationLocked);
// break;
// }

// auto ptr = m_connectOpAlloc.alloc(*this);
// if (!ptr) {
// errorLog("Cannot allocate new connect operation.");
// updateEc(ec, CC_MqttsnErrorCode_OutOfMemory);
// break;
// }

// m_preparationLocked = true;
// m_ops.push_back(ptr.get());
// m_connectOps.push_back(std::move(ptr));
// connectOp = m_connectOps.back().get();
// updateEc(ec, CC_MqttsnErrorCode_Success);
// } while (false);

// return connectOp;
// }

// op::DisconnectOp* ClientImpl::disconnectPrepare(CC_MqttsnErrorCode* ec)
// {
// op::DisconnectOp* disconnectOp = nullptr;
Expand Down Expand Up @@ -729,7 +673,7 @@ void ClientImpl::handle(GwinfoMsg& msg)

// if (!msg.transportField_flags().field_dup().getBitValue_bit()) {
// errorLog("Non duplicate PUBLISH with packet ID in use");
// brokerDisconnected(CC_MqttsnBrokerDisconnectReason_ProtocolError);
// gatewayDisconnected(CC_MqttsnGatewayDisconnectReason_ProtocolError);
// return;
// }
// else {
Expand Down Expand Up @@ -800,12 +744,21 @@ void ClientImpl::handle(GwinfoMsg& msg)

// #endif // #if CC_MQTTSN_CLIENT_MAX_QOS >= 2

void ClientImpl::handle(ProtMessage& msg)
void ClientImpl::handle([[maybe_unused]] PingreqMsg& msg)
{
if ((m_sessionState.m_disconnecting) || (!m_sessionState.m_connected)) {
return;
}

PingrespMsg respMsg;
sendMessage(respMsg);
}

void ClientImpl::handle([[maybe_unused]] ProtMessage& msg)
{
static_cast<void>(msg);
// if (m_sessionState.m_disconnecting) {
// return;
// }
if (m_sessionState.m_disconnecting) {
return;
}

// During the dispatch to callbacks can be called and new ops issues,
// the m_ops vector can be resized and iterators invalidated.
Expand All @@ -826,9 +779,9 @@ void ClientImpl::handle(ProtMessage& msg)
// After message dispatching the whole session may be in terminating state
// Don't continue iteration

// if (m_sessionState.m_disconnecting) {
// break;
// }
if (m_sessionState.m_disconnecting) {
break;
}
}
}

Expand All @@ -853,9 +806,9 @@ CC_MqttsnErrorCode ClientImpl::sendMessage(const ProtMessage& msg, unsigned broa
COMMS_ASSERT(m_sendOutputDataCb != nullptr);
m_sendOutputDataCb(m_sendOutputDataData, &m_buf[0], static_cast<unsigned>(len), broadcastRadius);

// for (auto& opPtr : m_keepAliveOps) {
// opPtr->messageSent();
// }
for (auto& opPtr : m_keepAliveOps) {
opPtr->messageSent();
}

return CC_MqttsnErrorCode_Success;
}
Expand All @@ -875,7 +828,7 @@ void ClientImpl::opComplete(const op::Op* op)
static const ExtraCompleteFunc Map[] = {
/* Type_Search */ &ClientImpl::opComplete_Search,
/* Type_Connect */ &ClientImpl::opComplete_Connect,
// /* Type_KeepAlive */ &ClientImpl::opComplete_KeepAlive,
/* Type_KeepAlive */ &ClientImpl::opComplete_KeepAlive,
// /* Type_Disconnect */ &ClientImpl::opComplete_Disconnect,
// /* Type_Subscribe */ &ClientImpl::opComplete_Subscribe,
// /* Type_Unsubscribe */ &ClientImpl::opComplete_Unsubscribe,
Expand All @@ -895,77 +848,28 @@ void ClientImpl::opComplete(const op::Op* op)
(this->*func)(op);
}

// void ClientImpl::brokerConnected(bool sessionPresent)
// {
// static_cast<void>(sessionPresent);
// m_clientState.m_firstConnect = false;
// m_sessionState.m_connected = true;

// do {
// if (sessionPresent) {
// for (auto& sendOpPtr : m_sendOps) {
// sendOpPtr->postReconnectionResend();
// }

// for (auto& recvOpPtr : m_recvOps) {
// recvOpPtr->postReconnectionResume();
// }

// auto resumeUntilIdx = m_sendOps.size();
// auto resumeFromIdx = resumeUntilIdx;
// for (auto count = resumeUntilIdx; count > 0U; --count) {
// auto idx = count - 1U;
// auto& sendOpPtr = m_sendOps[idx];
// if (!sendOpPtr->isPaused()) {
// break;
// }

// resumeFromIdx = idx;
// }

// if (resumeFromIdx < resumeUntilIdx) {
// resumeSendOpsSince(static_cast<unsigned>(resumeFromIdx));
// }

// break;
// }

// // Old stored session, terminate pending ops
// for (auto* op : m_ops) {
// auto opType = op->type();
// if ((opType != op::Op::Type::Type_Send) &&
// (opType != op::Op::Type::Type_Recv)) {
// continue;
// }

// op->terminateOp(CC_MqttsnAsyncOpStatus_Aborted);
// }
// } while (false);

// createKeepAliveOpIfNeeded();
// }

// void ClientImpl::brokerDisconnected(
// CC_MqttsnBrokerDisconnectReason reason,
// CC_MqttsnAsyncOpStatus status)
// {
// m_clientState.m_initialized = false; // Require re-initialization
// m_sessionState.m_connected = false;
void ClientImpl::gatewayConnected()
{
m_clientState.m_firstConnect = false;
m_sessionState.m_connected = true;
createKeepAliveOpIfNeeded();
}

// m_sessionState.m_disconnecting = true;
// terminateOps(status, TerminateMode_KeepSendRecvOps);
void ClientImpl::gatewayDisconnected(
CC_MqttsnGatewayDisconnectReason reason,
CC_MqttsnAsyncOpStatus status)
{
m_clientState.m_initialized = false; // Require re-initialization
m_sessionState.m_connected = false;

// for (auto* op : m_ops) {
// if (op != nullptr) {
// op->connectivityChanged();
// }
// }
m_sessionState.m_disconnecting = true;
terminateOps(status);

// if (reason < CC_MqttsnBrokerDisconnectReason_ValuesLimit) {
// COMMS_ASSERT(m_brokerDisconnectReportCb != nullptr);
// m_brokerDisconnectReportCb(m_brokerDisconnectReportData, reason);
// }
// }
if (reason < CC_MqttsnGatewayDisconnectReason_ValuesLimit) {
COMMS_ASSERT(m_gatewayDisconnectedReportCb != nullptr);
m_gatewayDisconnectedReportCb(m_gatewayDisconnectedReportData, reason);
}
}

// void ClientImpl::reportMsgInfo(const CC_MqttsnMessageInfo& info)
// {
Expand Down Expand Up @@ -1060,40 +964,32 @@ void ClientImpl::doApiExit()
m_nextTickProgramCb(m_nextTickProgramData, nextWait);
}

// void ClientImpl::createKeepAliveOpIfNeeded()
// {
// if (!m_keepAliveOps.empty()) {
// return;
// }

// auto ptr = m_keepAliveOpsAlloc.alloc(*this);
// if (!ptr) {
// COMMS_ASSERT(false); // Should not happen
// return;
// }

// m_ops.push_back(ptr.get());
// m_keepAliveOps.push_back(std::move(ptr));
// }
void ClientImpl::createKeepAliveOpIfNeeded()
{
if (!m_keepAliveOps.empty()) {
return;
}

// void ClientImpl::terminateOps(CC_MqttsnAsyncOpStatus status, TerminateMode mode)
// {
// for (auto* op : m_ops) {
// if (op == nullptr) {
// continue;
// }
auto ptr = m_keepAliveOpsAlloc.alloc(*this);
if (!ptr) {
COMMS_ASSERT(false); // Should not happen
return;
}

// if (mode == TerminateMode_KeepSendRecvOps) {
// auto opType = op->type();
m_ops.push_back(ptr.get());
m_keepAliveOps.push_back(std::move(ptr));
}

// if ((opType == op::Op::Type_Recv) || (opType == op::Op::Type_Send)) {
// continue;
// }
// }
void ClientImpl::terminateOps(CC_MqttsnAsyncOpStatus status)
{
for (auto* op : m_ops) {
if (op == nullptr) {
continue;
}

// op->terminateOp(status);
// }
// }
op->terminateOp(status);
}
}

void ClientImpl::cleanOps()
{
Expand Down Expand Up @@ -1271,10 +1167,10 @@ void ClientImpl::opComplete_Connect(const op::Op* op)
eraseFromList(op, m_connectOps);
}

// void ClientImpl::opComplete_KeepAlive(const op::Op* op)
// {
// eraseFromList(op, m_keepAliveOps);
// }
void ClientImpl::opComplete_KeepAlive(const op::Op* op)
{
eraseFromList(op, m_keepAliveOps);
}

// void ClientImpl::opComplete_Disconnect(const op::Op* op)
// {
Expand Down
Loading

0 comments on commit 18e5c8c

Please sign in to comment.