Skip to content

Commit

Permalink
Implement Subscription loosely following the ROS2 API.
Browse files Browse the repository at this point in the history
  • Loading branch information
aentinger committed Dec 28, 2022
1 parent ba796a8 commit d2cb7c4
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static int const MKRCAN_MCP2515_INT_PIN = 7;
**************************************************************************************/

void onReceiveBufferFull (CanardFrame const &);
void onHeartbeat_1_0_Received(CanardRxTransfer const &, Node &);
void onHeartbeat_1_0_Received(Heartbeat_1_0<> const & msg);

/**************************************************************************************
* GLOBAL VARIABLES
Expand All @@ -48,6 +48,9 @@ ArduinoMCP2515 mcp2515([]() { digitalWrite(MKRCAN_MCP2515_CS_PIN, LOW); },

CyphalHeap<Node::DEFAULT_O1HEAP_SIZE> node_heap;
Node node_hdl(node_heap.data(), node_heap.size(), micros);
Subscription<Heartbeat_1_0<>> heartbeat_subscription = node_hdl.create_subscription<Heartbeat_1_0<>>(Heartbeat_1_0<>::PORT_ID,
CANARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC,
onHeartbeat_1_0_Received);

/**************************************************************************************
* SETUP/LOOP
Expand All @@ -71,9 +74,6 @@ void setup()
mcp2515.begin();
mcp2515.setBitRate(CanBitRate::BR_250kBPS_16MHZ);
mcp2515.setNormalMode();

/* Subscribe to the reception of Heartbeat message. */
node_hdl.subscribe<Heartbeat_1_0<>>(onHeartbeat_1_0_Received);
}

void loop()
Expand All @@ -92,14 +92,12 @@ void onReceiveBufferFull(CanardFrame const & frame)
node_hdl.onCanFrameReceived(frame, micros());
}

void onHeartbeat_1_0_Received(CanardRxTransfer const & transfer, Node & /* node_hdl */)
void onHeartbeat_1_0_Received(Heartbeat_1_0<> const & msg)
{
Heartbeat_1_0<> const hb = Heartbeat_1_0<>::deserialize(transfer);

char msg[64];
snprintf(msg, 64,
"ID %02X, Uptime = %d, Health = %d, Mode = %d, VSSC = %d",
transfer.metadata.remote_node_id, hb.data.uptime, hb.data.health.value, hb.data.mode.value, hb.data.vendor_specific_status_code);
char msg_buf[64];
snprintf(msg_buf, sizeof(msg_buf),
"Uptime = %d, Health = %d, Mode = %d, VSSC = %d",
msg.data.uptime, msg.data.health.value, msg.data.mode.value, msg.data.vendor_specific_status_code);

Serial.println(msg);
Serial.println(msg_buf);
}
1 change: 1 addition & 0 deletions src/107-Arduino-Cyphal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Node.hpp"
#include "Types.h"
#include "Publisher.hpp"
#include "Subscription.hpp"
#include "nodeinfo/NodeInfo.h"
#include "register/RegisterList.h"
#include "utility/UniqueId16.h"
Expand Down
20 changes: 20 additions & 0 deletions src/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ void Node::processRxQueue()

if(result == 1)
{
/* Check whether or not the incoming transfer is a message transfer,
* if the incoming port id has been subscribed too and, if all those
* preconditions hold true, invoke the required transfer received
* callback.
*/
if (transfer.metadata.transfer_kind == CanardTransferKindMessage)
{
if (_subscription_map.count(transfer.metadata.port_id) > 0)
_subscription_map.at(transfer.metadata.port_id)->onTransferReceived(transfer);
}

if (_rx_transfer_map.count(transfer.metadata.port_id) > 0)
{
OnTransferReceivedFunc transfer_received_func = _rx_transfer_map[transfer.metadata.port_id].transfer_complete_callback;
Expand Down Expand Up @@ -133,6 +144,15 @@ void Node::processTxQueue(CanFrameTransmitFunc const tx_func)
}
}

void Node::unsubscribe_subscription(CanardPortID const port_id)
{
canardRxUnsubscribe(&_canard_hdl,
CanardTransferKindMessage,
port_id);

_subscription_map.erase(port_id);
}

CanardTransferID Node::getNextTransferId(CanardPortID const port_id)
{
CanardTransferID const next_transfer_id = (_tx_transfer_map.count(port_id) > 0) ? ((_tx_transfer_map[port_id] + 1) % CANARD_TRANSFER_ID_MAX) : 0;
Expand Down
11 changes: 8 additions & 3 deletions src/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "Types.h"

#include "Publisher.hpp"
#include "Subscription.hpp"

#include "libo1heap/o1heap.h"
#include "libcanard/canard.h"
Expand Down Expand Up @@ -82,6 +83,11 @@ class Node
Publisher<T> create_publisher(CanardPortID const port_id,
CanardMicrosecond const tx_timeout_usec);

template <typename T>
Subscription<T> create_subscription(CanardPortID const port_id,
CanardMicrosecond const rx_timeout_usec,
std::function<void(T const &)> on_receive_cb);

/* Must be called from the application to process
* all received CAN frames.
*/
Expand All @@ -92,9 +98,6 @@ class Node
void onCanFrameReceived(CanardFrame const & frame, CanardMicrosecond const & rx_timestamp_us);


template <typename T> bool subscribe (OnTransferReceivedFunc func);
template <typename T> bool unsubscribe ();

/* request/response API for "service" data exchange paradigm */
template <typename T_RSP> bool respond (T_RSP const & rsp, CanardNodeID const remote_node_id, CanardTransferID const transfer_id);
template <typename T_REQ, typename T_RSP> bool request (T_REQ const & req, CanardNodeID const remote_node_id, OnTransferReceivedFunc func);
Expand All @@ -115,13 +118,15 @@ class Node
arduino::_107_::opencyphal::ThreadsafeRingBuffer<std::tuple<uint32_t, size_t, std::array<uint8_t, 8>, CanardMicrosecond>> _canard_rx_queue;
std::map<CanardPortID, RxTransferData> _rx_transfer_map;
std::map<CanardPortID, CanardTransferID> _tx_transfer_map;
std::map<CanardPortID, impl::SubscriptionBase *> _subscription_map;

static void * o1heap_allocate(CanardInstance * const ins, size_t const amount);
static void o1heap_free (CanardInstance * const ins, void * const pointer);

void processRxQueue();
void processTxQueue(CanFrameTransmitFunc const tx_func);

void unsubscribe_subscription(CanardPortID const port_id);
CanardTransferID getNextTransferId(CanardPortID const port_id);
bool subscribe (CanardTransferKind const transfer_kind, CanardPortID const port_id, size_t const payload_size_max, OnTransferReceivedFunc func);
bool unsubscribe (CanardTransferKind const transfer_kind, CanardPortID const port_id);
Expand Down
23 changes: 16 additions & 7 deletions src/Node.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@ Publisher<T> Node::create_publisher(CanardPortID const port_id,
}

template <typename T>
bool Node::subscribe(OnTransferReceivedFunc func)
Subscription<T> Node::create_subscription(CanardPortID const port_id,
CanardMicrosecond const rx_timeout_usec,
std::function<void(T const &)> on_receive_cb)
{
return subscribe(T::TRANSFER_KIND, T::PORT_ID, T::MAX_PAYLOAD_SIZE, func);
}
auto sub = std::make_shared<impl::Subscription<T>>(on_receive_cb,
[this, port_id]() { unsubscribe_subscription(port_id); });

template <typename T>
bool Node::unsubscribe()
{
return unsubscribe(T::TRANSFER_KIND, T::PORT_ID);
int8_t const rc = canardRxSubscribe(&_canard_hdl,
CanardTransferKindMessage,
port_id,
T::MAX_PAYLOAD_SIZE,
rx_timeout_usec,
&(sub->canard_rx_subscription()));
if (rc < 0)
return nullptr;

_subscription_map[port_id] = dynamic_cast<impl::SubscriptionBase *>(sub.get());
return sub;
}

template <typename T_RSP>
Expand Down
78 changes: 78 additions & 0 deletions src/Subscription.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* This software is distributed under the terms of the MIT License.
* Copyright (c) 2020 LXRobotics.
* Author: Alexander Entinger <alexander.entinger@lxrobotics.com>
* Contributors: https://github.com/107-systems/107-Arduino-Cyphal/graphs/contributors.
*/

#ifndef INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP
#define INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP

/**************************************************************************************
* INCLUDE
**************************************************************************************/

#include <memory>

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

namespace impl
{

/**************************************************************************************
* CLASS DECLARATION
**************************************************************************************/

class SubscriptionBase
{
public:
virtual ~SubscriptionBase() { }
virtual void onTransferReceived(CanardRxTransfer const & transfer) = 0;
};

template <typename T>
class Subscription : public SubscriptionBase
{
public:
Subscription(std::function<void(T const &)> on_receive_cb, std::function<void(void)> on_destruction_cb)
: SubscriptionBase{}
, _on_receive_cb{on_receive_cb}
, _on_destruction_cb{on_destruction_cb}
{ }
virtual ~Subscription();

virtual void onTransferReceived(CanardRxTransfer const & transfer) override;


inline CanardRxSubscription & canard_rx_subscription() { return _canard_rx_sub; }


private:
CanardRxSubscription _canard_rx_sub;
std::function<void(T const &)> _on_receive_cb;
std::function<void(void)> _on_destruction_cb;
};

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

} /* impl */

/**************************************************************************************
* TYPEDEF
**************************************************************************************/

template <typename T>
using Subscription = std::shared_ptr<impl::Subscription<T>>;

/**************************************************************************************
* TEMPLATE IMPLEMENTATION
**************************************************************************************/

#include "Subscription.ipp"

#endif /* INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP */

41 changes: 41 additions & 0 deletions src/Subscription.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* This software is distributed under the terms of the MIT License.
* Copyright (c) 2020 LXRobotics.
* Author: Alexander Entinger <alexander.entinger@lxrobotics.com>
* Contributors: https://github.com/107-systems/107-Arduino-Cyphal/graphs/contributors.
*/

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

namespace impl {

/**************************************************************************************
* CTOR/DTOR
**************************************************************************************/

template<typename T>
Subscription<T>::~Subscription()
{
_on_destruction_cb();
}

/**************************************************************************************
* PUBLIC MEMBER FUNCTIONS
**************************************************************************************/

template<typename T>
void Subscription<T>::onTransferReceived(CanardRxTransfer const & transfer)
{
T const msg = T::deserialize(transfer);

if (_on_receive_cb)
_on_receive_cb(msg);
}

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

} /* impl */

0 comments on commit d2cb7c4

Please sign in to comment.