Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Subscription loosely following the ROS2 API. #186

Merged
merged 14 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions examples/OpenCyphal-Blink/OpenCyphal-Blink.ino
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static int const LED_BUILTIN = 2;
**************************************************************************************/

void onReceiveBufferFull(CanardFrame const &);
void onBit_1_0_Received (CanardRxTransfer const &, Node &);
void onBit_1_0_Received (Bit_1_0<BIT_PORT_ID> const & msg);

/**************************************************************************************
* GLOBAL VARIABLES
Expand All @@ -64,7 +64,10 @@ ArduinoMCP2515 mcp2515([]() { digitalWrite(MKRCAN_MCP2515_CS_PIN, LOW); },

CyphalHeap<Node::DEFAULT_O1HEAP_SIZE> node_heap;
Node node_hdl(node_heap.data(), node_heap.size(), micros);
Publisher<Heartbeat_1_0<>> heartbeat_pub = node_hdl.create_publisher<Heartbeat_1_0<>>(Heartbeat_1_0<>::PORT_ID, 1*1000*1000UL /* = 1 sec in usecs. */);
Publisher<Heartbeat_1_0<>> heartbeat_pub = node_hdl.create_publisher<Heartbeat_1_0<>>(
Heartbeat_1_0<>::PORT_ID, 1*1000*1000UL /* = 1 sec in usecs. */);
auto heartbeat_subscription = node_hdl.create_subscription<Bit_1_0<BIT_PORT_ID>>(
Bit_1_0<BIT_PORT_ID>::PORT_ID, CANARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, onBit_1_0_Received);

Heartbeat_1_0<> hb_msg;

Expand Down Expand Up @@ -111,9 +114,6 @@ void setup()
hb_msg = Heartbeat_1_0<>::Health::NOMINAL;
hb_msg = Heartbeat_1_0<>::Mode::INITIALIZATION;
hb_msg.data.vendor_specific_status_code = 0;

/* Subscribe to the reception of Bit message. */
node_hdl.subscribe<Bit_1_0<BIT_PORT_ID>>(onBit_1_0_Received);
}

void loop()
Expand Down Expand Up @@ -144,11 +144,9 @@ void onReceiveBufferFull(CanardFrame const & frame)
node_hdl.onCanFrameReceived(frame, micros());
}

void onBit_1_0_Received(CanardRxTransfer const & transfer, Node & /* node_hdl */)
void onBit_1_0_Received(Bit_1_0<BIT_PORT_ID> const & msg)
{
Bit_1_0<BIT_PORT_ID> const uavcan_led = Bit_1_0<BIT_PORT_ID>::deserialize(transfer);

if(uavcan_led.data.value)
if(msg.data.value)
{
#if defined(ARDUINO_EDGE_CONTROL)
Expander.digitalWrite(EXP_LED1, LOW);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static int const MKRCAN_MCP2515_INT_PIN = 7;
**************************************************************************************/

void onReceiveBufferFull (CanardFrame const &);
void onHeartbeat_1_0_Received(CanardRxTransfer const &, Node &);
void onHeartbeat_1_0_Received(Heartbeat_1_0<> const & msg);

/**************************************************************************************
* GLOBAL VARIABLES
Expand All @@ -48,6 +48,8 @@ ArduinoMCP2515 mcp2515([]() { digitalWrite(MKRCAN_MCP2515_CS_PIN, LOW); },

CyphalHeap<Node::DEFAULT_O1HEAP_SIZE> node_heap;
Node node_hdl(node_heap.data(), node_heap.size(), micros);
auto heartbeat_subscription =
node_hdl.create_subscription<Heartbeat_1_0<>>(Heartbeat_1_0<>::PORT_ID, CANARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC, onHeartbeat_1_0_Received);

/**************************************************************************************
* SETUP/LOOP
Expand All @@ -71,9 +73,6 @@ void setup()
mcp2515.begin();
mcp2515.setBitRate(CanBitRate::BR_250kBPS_16MHZ);
mcp2515.setNormalMode();

/* Subscribe to the reception of Heartbeat message. */
node_hdl.subscribe<Heartbeat_1_0<>>(onHeartbeat_1_0_Received);
}

void loop()
Expand All @@ -92,14 +91,12 @@ void onReceiveBufferFull(CanardFrame const & frame)
node_hdl.onCanFrameReceived(frame, micros());
}

void onHeartbeat_1_0_Received(CanardRxTransfer const & transfer, Node & /* node_hdl */)
void onHeartbeat_1_0_Received(Heartbeat_1_0<> const & msg)
{
Heartbeat_1_0<> const hb = Heartbeat_1_0<>::deserialize(transfer);

char msg[64];
snprintf(msg, 64,
"ID %02X, Uptime = %d, Health = %d, Mode = %d, VSSC = %d",
transfer.metadata.remote_node_id, hb.data.uptime, hb.data.health.value, hb.data.mode.value, hb.data.vendor_specific_status_code);
char msg_buf[64];
snprintf(msg_buf, sizeof(msg_buf),
"Uptime = %d, Health = %d, Mode = %d, VSSC = %d",
msg.data.uptime, msg.data.health.value, msg.data.mode.value, msg.data.vendor_specific_status_code);

Serial.println(msg);
Serial.println(msg_buf);
}
1 change: 1 addition & 0 deletions src/107-Arduino-Cyphal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Node.hpp"
#include "Types.h"
#include "Publisher.hpp"
#include "Subscription.hpp"
#include "nodeinfo/NodeInfo.h"
#include "register/RegisterList.h"
#include "utility/UniqueId16.h"
Expand Down
23 changes: 23 additions & 0 deletions src/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ void Node::onCanFrameReceived(CanardFrame const & frame, CanardMicrosecond const
_canard_rx_queue.enqueue(std::make_tuple(extended_can_id, payload_size, payload, rx_timestamp_us));
}

void Node::unsubscribe_message(CanardPortID const port_id)
{
canardRxUnsubscribe(&_canard_hdl,
CanardTransferKindMessage,
port_id);

_msg_subscription_map.erase(port_id);
}

/**************************************************************************************
* PRIVATE MEMBER FUNCTIONS
**************************************************************************************/
Expand Down Expand Up @@ -91,6 +100,20 @@ void Node::processRxQueue()

if(result == 1)
{
/* Check whether or not the incoming transfer is a message transfer,
* if the incoming port id has been subscribed too and, if all those
* preconditions hold true, invoke the required transfer received
* callback.
*/
if (transfer.metadata.transfer_kind == CanardTransferKindMessage)
{
auto const msg_sub_citer = _msg_subscription_map.find(transfer.metadata.port_id);
if (msg_sub_citer != std::end(_msg_subscription_map)) {
auto const msg_sub_ptr = msg_sub_citer->second;
msg_sub_ptr->onTransferReceived(transfer);
}
}

if (_rx_transfer_map.count(transfer.metadata.port_id) > 0)
{
OnTransferReceivedFunc transfer_received_func = _rx_transfer_map[transfer.metadata.port_id].transfer_complete_callback;
Expand Down
14 changes: 10 additions & 4 deletions src/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "Types.h"

#include "Publisher.hpp"
#include "Subscription.hpp"

#include "libo1heap/o1heap.h"
#include "libcanard/canard.h"
Expand All @@ -38,7 +39,6 @@
* TYPEDEF
**************************************************************************************/

class Node;
typedef std::function<void(CanardRxTransfer const &, Node &)> OnTransferReceivedFunc;
typedef std::function<bool(CanardFrame const &)> CanFrameTransmitFunc;

Expand Down Expand Up @@ -82,6 +82,11 @@ class Node
Publisher<T> create_publisher(CanardPortID const port_id,
CanardMicrosecond const tx_timeout_usec);

template <typename T, typename OnReceiveCb>
Subscription<T, OnReceiveCb> create_subscription(CanardPortID const port_id,
CanardMicrosecond const rx_timeout_usec,
OnReceiveCb&& on_receive_cb);

/* Must be called from the application to process
* all received CAN frames.
*/
Expand All @@ -92,14 +97,14 @@ class Node
void onCanFrameReceived(CanardFrame const & frame, CanardMicrosecond const & rx_timestamp_us);


template <typename T> bool subscribe (OnTransferReceivedFunc func);
template <typename T> bool unsubscribe ();

/* request/response API for "service" data exchange paradigm */
template <typename T_RSP> bool respond (T_RSP const & rsp, CanardNodeID const remote_node_id, CanardTransferID const transfer_id);
template <typename T_REQ, typename T_RSP> bool request (T_REQ const & req, CanardNodeID const remote_node_id, OnTransferReceivedFunc func);


void unsubscribe_message(CanardPortID const port_id);


private:

typedef struct
Expand All @@ -115,6 +120,7 @@ class Node
arduino::_107_::opencyphal::ThreadsafeRingBuffer<std::tuple<uint32_t, size_t, std::array<uint8_t, 8>, CanardMicrosecond>> _canard_rx_queue;
std::map<CanardPortID, RxTransferData> _rx_transfer_map;
std::map<CanardPortID, CanardTransferID> _tx_transfer_map;
std::map<CanardPortID, std::shared_ptr<impl::SubscriptionBase>> _msg_subscription_map;

static void * o1heap_allocate(CanardInstance * const ins, size_t const amount);
static void o1heap_free (CanardInstance * const ins, void * const pointer);
Expand Down
24 changes: 16 additions & 8 deletions src/Node.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@ Publisher<T> Node::create_publisher(CanardPortID const port_id,
return std::make_shared<impl::Publisher<T>>(_canard_hdl, _canard_tx_queue, port_id, tx_timeout_usec, _micros_func);
}

template <typename T>
bool Node::subscribe(OnTransferReceivedFunc func)
template <typename T, typename OnReceiveCb>
Subscription<T, OnReceiveCb> Node::create_subscription(CanardPortID const port_id,
CanardMicrosecond const rx_timeout_usec,
OnReceiveCb&& on_receive_cb)
{
return subscribe(T::TRANSFER_KIND, T::PORT_ID, T::MAX_PAYLOAD_SIZE, func);
}
auto sub = std::make_shared<impl::Subscription<T, OnReceiveCb>>(*this, port_id, std::forward<OnReceiveCb>(on_receive_cb));

template <typename T>
bool Node::unsubscribe()
{
return unsubscribe(T::TRANSFER_KIND, T::PORT_ID);
int8_t const rc = canardRxSubscribe(&_canard_hdl,
CanardTransferKindMessage,
port_id,
T::MAX_PAYLOAD_SIZE,
rx_timeout_usec,
&(sub->canard_rx_subscription()));
if (rc < 0)
return nullptr;

_msg_subscription_map[port_id] = sub;
return sub;
}

template <typename T_RSP>
Expand Down
92 changes: 92 additions & 0 deletions src/Subscription.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* This software is distributed under the terms of the MIT License.
* Copyright (c) 2020 LXRobotics.
* Author: Alexander Entinger <alexander.entinger@lxrobotics.com>
* Contributors: https://github.com/107-systems/107-Arduino-Cyphal/graphs/contributors.
*/

#ifndef INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP
#define INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP

/**************************************************************************************
* INCLUDE
**************************************************************************************/

#include <memory>

/**************************************************************************************
* FORWARD DECLARATION
**************************************************************************************/

class Node;

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

namespace impl
{

/**************************************************************************************
* CLASS DECLARATION
**************************************************************************************/

class SubscriptionBase
{
public:
SubscriptionBase() { }
virtual ~SubscriptionBase() { }
aentinger marked this conversation as resolved.
Show resolved Hide resolved
SubscriptionBase(SubscriptionBase const &) = delete;
SubscriptionBase(SubscriptionBase &&) = delete;

SubscriptionBase & operator = (SubscriptionBase const &) = delete;
SubscriptionBase & operator = (SubscriptionBase &&) = delete;

virtual void onTransferReceived(CanardRxTransfer const & transfer) = 0;
};

template <typename T, typename OnReceiveCb>
class Subscription : public SubscriptionBase
{
public:
Subscription(Node & node_hdl, CanardPortID const port_id, OnReceiveCb const & on_receive_cb)
: SubscriptionBase{}
, _node_hdl{node_hdl}
, _port_id{port_id}
, _on_receive_cb{on_receive_cb}
{ }
virtual ~Subscription();

virtual void onTransferReceived(CanardRxTransfer const & transfer) override;


CanardRxSubscription & canard_rx_subscription() { return _canard_rx_sub; }


private:
Node & _node_hdl;
CanardPortID const _port_id;
CanardRxSubscription _canard_rx_sub;
OnReceiveCb _on_receive_cb;
};

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

} /* impl */

/**************************************************************************************
* TYPEDEF
**************************************************************************************/

template <typename T, typename OnReceiveCb>
using Subscription = std::shared_ptr<impl::Subscription<T, OnReceiveCb>>;

/**************************************************************************************
* TEMPLATE IMPLEMENTATION
**************************************************************************************/

#include "Subscription.ipp"

#endif /* INC_107_ARDUINO_CYPHAL_SUBSCRIPTION_HPP */
41 changes: 41 additions & 0 deletions src/Subscription.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* This software is distributed under the terms of the MIT License.
* Copyright (c) 2020 LXRobotics.
* Author: Alexander Entinger <alexander.entinger@lxrobotics.com>
* Contributors: https://github.com/107-systems/107-Arduino-Cyphal/graphs/contributors.
*/

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

namespace impl {

/**************************************************************************************
* CTOR/DTOR
**************************************************************************************/

template<typename T, typename OnReceiveCb>
Subscription<T, OnReceiveCb>::~Subscription()
{
_node_hdl.unsubscribe_message(_port_id);
}

/**************************************************************************************
* PUBLIC MEMBER FUNCTIONS
**************************************************************************************/

template<typename T, typename OnReceiveCb>
void Subscription<T, OnReceiveCb>::onTransferReceived(CanardRxTransfer const & transfer)
{
T const msg = T::deserialize(transfer);
aentinger marked this conversation as resolved.
Show resolved Hide resolved

if (_on_receive_cb)
_on_receive_cb(msg);
}

/**************************************************************************************
* NAMESPACE
**************************************************************************************/

} /* impl */