Skip to content

Commit

Permalink
Implement deliver message kind (ros2#2168)
Browse files Browse the repository at this point in the history
* Implement deliver message kind

Signed-off-by: methylDragon <methylDragon@gmail.com>

* Add examples to docstring

Signed-off-by: methylDragon <methylDragon@gmail.com>

---------

Signed-off-by: methylDragon <methylDragon@gmail.com>
  • Loading branch information
methylDragon authored and Barry-Xu-2018 committed Jan 12, 2024
1 parent 269c258 commit d8fd4b8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 39 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
options.to_rcl_subscription_options(qos),
options.event_callbacks,
options.use_default_callbacks,
SubscriptionType::SERIALIZED_MESSAGE),
DeliveredMessageKind::SERIALIZED_MESSAGE),
callback_(callback),
ts_lib_(ts_lib)
{}
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class Subscription : public SubscriptionBase
// NOTE(methylDragon): Passing these args separately is necessary for event binding
options.event_callbacks,
options.use_default_callbacks,
callback.is_serialized_message_callback() ? SubscriptionType::SERIALIZED_MESSAGE : SubscriptionType::ROS_MESSAGE), // NOLINT
callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT
any_callback_(callback),
options_(options),
message_memory_strategy_(message_memory_strategy)
Expand Down
35 changes: 24 additions & 11 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,25 @@ namespace experimental
class IntraProcessManager;
} // namespace experimental

enum class SubscriptionType : uint8_t
/// The kind of message that the subscription delivers in its callback, used by the executor
/**
* This enum needs to exist because the callback handle is not accessible to the executor's scope.
*
* "Kind" is used since what is being delivered is a category of messages, for example, there are
* different ROS message types that can be delivered, but they're all ROS messages.
*
* As a concrete example, all of the following callbacks will be considered ROS_MESSAGE for
* DeliveredMessageKind:
* - void callback(const std_msgs::msg::String &)
* - void callback(const std::string &) // type adaption
* - void callback(std::unique_ptr<std_msgs::msg::String>)
*/
enum class DeliveredMessageKind : uint8_t
{
INVALID = 0, // The subscription type is most likely uninitialized
ROS_MESSAGE = 1, // take message as ROS message and handle as ROS message
SERIALIZED_MESSAGE = 2, // take message as serialized and handle as serialized
DYNAMIC_MESSAGE_DIRECT = 3, // take message as DynamicMessage and handle as DynamicMessage
DYNAMIC_MESSAGE_FROM_SERIALIZED = 4 // take message as serialized and handle as DynamicMessage
INVALID = 0,
ROS_MESSAGE = 1, // The subscription delivers a ROS message to its callback
SERIALIZED_MESSAGE = 2, // The subscription delivers a serialized message to its callback
DYNAMIC_MESSAGE = 3, // The subscription delivers a dynamic message to its callback
};

/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
Expand All @@ -88,7 +100,8 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options Options for the subscription.
* \param[in] subscription_type Enum flag to change how the message will be received and delivered
* \param[in] delivered_message_kind Enum flag to change how the message will be received and
* delivered
*/
RCLCPP_PUBLIC
SubscriptionBase(
Expand All @@ -98,7 +111,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
const rcl_subscription_options_t & subscription_options,
const SubscriptionEventCallbacks & event_callbacks,
bool use_default_callbacks,
SubscriptionType subscription_type = SubscriptionType::ROS_MESSAGE);
DeliveredMessageKind delivered_message_kind = DeliveredMessageKind::ROS_MESSAGE);

/// Destructor.
RCLCPP_PUBLIC
Expand Down Expand Up @@ -249,10 +262,10 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

/// Return the type of the subscription.
/**
* \return `SubscriptionType`, which adjusts how messages are received and delivered.
* \return `DeliveredMessageKind`, which adjusts how messages are received and delivered.
*/
RCLCPP_PUBLIC
SubscriptionType
DeliveredMessageKind
get_subscription_type() const;

/// Get matching publisher count.
Expand Down Expand Up @@ -650,7 +663,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_DISABLE_COPY(SubscriptionBase)

rosidl_message_type_support_t type_support_;
SubscriptionType subscription_type_;
DeliveredMessageKind delivered_message_type_;

std::atomic<bool> subscription_in_use_by_wait_set_{false};
std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
Expand Down
20 changes: 7 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
message_info.get_rmw_message_info().from_intra_process = false;

switch (subscription->get_subscription_type()) {
// Take ROS message
case rclcpp::SubscriptionType::ROS_MESSAGE:
// Deliver ROS message
case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
{
if (subscription->can_loan_messages()) {
// This is the case where a loaned message is taken from the middleware via
Expand Down Expand Up @@ -659,8 +659,8 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
break;
}

// Take serialized message
case rclcpp::SubscriptionType::SERIALIZED_MESSAGE:
// Deliver serialized message
case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
{
// This is the case where a copy of the serialized message is taken from
// the middleware via inter-process communication.
Expand All @@ -679,21 +679,15 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}

// DYNAMIC SUBSCRIPTION ========================================================================
// Take dynamic message directly from the middleware
case rclcpp::SubscriptionType::DYNAMIC_MESSAGE_DIRECT:
{
throw std::runtime_error("Unimplemented");
}

// Take serialized and then convert to dynamic message
case rclcpp::SubscriptionType::DYNAMIC_MESSAGE_FROM_SERIALIZED:
// Deliver dynamic message
case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
{
throw std::runtime_error("Unimplemented");
}

default:
{
throw std::runtime_error("Subscription type is not supported");
throw std::runtime_error("Delivered message kind is not supported");
}
}
return;
Expand Down
18 changes: 5 additions & 13 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,16 @@ SubscriptionBase::SubscriptionBase(
const rcl_subscription_options_t & subscription_options,
const SubscriptionEventCallbacks & event_callbacks,
bool use_default_callbacks,
SubscriptionType subscription_type)
DeliveredMessageKind delivered_message_kind)
: node_base_(node_base),
node_handle_(node_base_->get_shared_rcl_node_handle()),
node_logger_(rclcpp::get_node_logger(node_handle_.get())),
use_intra_process_(false),
intra_process_subscription_id_(0),
event_callbacks_(event_callbacks),
type_support_(type_support_handle),
subscription_type_(subscription_type)
delivered_message_type_(delivered_message_kind)
{
if (!rmw_feature_supported(RMW_MIDDLEWARE_CAN_TAKE_DYNAMIC_MESSAGE) &&
subscription_type == rclcpp::SubscriptionType::DYNAMIC_MESSAGE_DIRECT)
{
throw std::runtime_error(
"Cannot set subscription to take dynamic message directly, feature not supported in rmw"
);
}

auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
{
if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) {
Expand Down Expand Up @@ -269,13 +261,13 @@ SubscriptionBase::get_message_type_support_handle() const
bool
SubscriptionBase::is_serialized() const
{
return subscription_type_ == rclcpp::SubscriptionType::SERIALIZED_MESSAGE;
return delivered_message_type_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
}

rclcpp::SubscriptionType
rclcpp::DeliveredMessageKind
SubscriptionBase::get_subscription_type() const
{
return subscription_type_;
return delivered_message_type_;
}

size_t
Expand Down

0 comments on commit d8fd4b8

Please sign in to comment.