Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karsten/serialized ipm touchups #2

Draft
wants to merge 9 commits into
base: dnae_adas/serialized_ipm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/publisher_base.cpp
src/rclcpp/qos.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
Expand Down Expand Up @@ -405,6 +407,15 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message test/test_serialized_message.cpp)
if(TARGET test_serialized_message)
ament_target_dependencies(test_serialized_message
test_msgs
)
target_link_libraries(test_serialized_message
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
ament_target_dependencies(test_service
Expand Down
3 changes: 1 addition & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ create_publisher(
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();

return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
node, topic_name, type_support,
qos, options);
node, topic_name, type_support, qos, options);
}

} // namespace rclcpp
Expand Down
16 changes: 6 additions & 10 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ create_subscription(
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
Expand Down Expand Up @@ -96,12 +94,10 @@ create_subscription(
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
MessageMemoryStrategyT::create_default()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
Expand Down
8 changes: 2 additions & 6 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,11 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the subscription.
*
* \param subscription the SubscriptionIntraProcess to register.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_subscription(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription,
const bool is_serialized = false);
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);

/// Unregister a subscription using the subscription's unique id.
/**
Expand All @@ -137,12 +134,11 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the publisher.
*
* \param publisher publisher to be registered with the manager.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false);
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized = false);

/// Unregister a publisher using the publisher's unique id.
/**
Expand Down
49 changes: 27 additions & 22 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type,
std::shared_ptr<SerializationBase> serializer)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
std::shared_ptr<SerializationBase> serializer,
bool is_serialized = false)
: SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized),
any_callback_(callback), serializer_(serializer)
{
if (!std::is_same<MessageT, CallbackMessageT>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value &&
!std::is_same<CallbackMessageT, rcl_serialized_message_t>::value)
!std::is_base_of<rcl_serialized_message_t, MessageT>::value)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't exactly sure why this check is happening. Also, I wouldn't compile if we check for the CallbackMessageT, which I believe is the right thing to check for though.

{
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}
Expand Down Expand Up @@ -157,11 +157,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
template<typename T>
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx ROS2, rx serialized\n");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}
Expand All @@ -170,15 +171,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
msg_info.from_intra_process = true;

ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg =
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()), serialized_msg.get());

if (nullptr == serialized_msg) {
if (0u == serialized_msg->buffer_length) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}

if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
any_callback_.dispatch_intra_process(std::static_pointer_cast<rcl_serialized_message_t>(serialized_msg), msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
Expand All @@ -189,11 +190,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
// forward from ROS2 message to ROS2 message (same type)
template<class T>
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx ROS2, rx ROS2\n");
rmw_message_info_t msg_info = {};
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;
Expand All @@ -207,14 +209,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
}
}

// forward from rcl_serialized_message_t to SerializationMessage (no conversion needed)
// forward from rcl_serialized_message_t to rcl_serialized_message_t (no conversion needed)
template<typename T>
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx serialize, rx serialize\n");
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;

Expand All @@ -234,17 +237,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
template<class T>
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
void>::type
execute_impl()
{
fprintf(stderr, "tx rcl, rx ROS2\n");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}

ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
if (nullptr == serialized_container) {
ConstMessageSharedPtr serialized_message = buffer_->consume_shared();
if (nullptr == serialized_message) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}

Expand All @@ -254,13 +258,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
// construct unique might not initialize the ros message?
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
serialized_message.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
SubscriptionIntraProcessBase(
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
bool is_serialized)
: topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized)
{}

virtual ~SubscriptionIntraProcessBase() = default;
Expand Down Expand Up @@ -70,6 +73,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
rmw_qos_profile_t
get_actual_qos() const;

RCLCPP_PUBLIC
bool
is_serialized() const;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
Expand All @@ -80,6 +87,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

std::string topic_name_;
rmw_qos_profile_t qos_profile_;
bool is_serialized_;
};

} // namespace experimental
Expand Down
Loading