diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3c271b20c6..26573854ea 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/publisher_base.cpp src/rclcpp/qos.cpp src/rclcpp/qos_event.cpp + src/rclcpp/serialization.cpp + src/rclcpp/serialized_message.cpp src/rclcpp/service.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription_base.cpp @@ -405,6 +407,15 @@ if(BUILD_TESTING) ${PROJECT_NAME} ) endif() + ament_add_gtest(test_serialized_message test/test_serialized_message.cpp) + if(TARGET test_serialized_message) + ament_target_dependencies(test_serialized_message + test_msgs + ) + target_link_libraries(test_serialized_message + ${PROJECT_NAME} + ) + endif() ament_add_gtest(test_service test/test_service.cpp) if(TARGET test_service) ament_target_dependencies(test_service diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 89536d0e98..b677293ec3 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -85,8 +85,7 @@ create_publisher( const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); return create_publisher( - node, topic_name, type_support, - qos, options); + node, topic_name, type_support, qos, options); } } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index fb9ffea744..33a5c7b4b2 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -54,12 +54,10 @@ create_subscription( const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, CallbackT && callback, - const rclcpp::SubscriptionOptionsWithAllocator & options = ( - rclcpp::SubscriptionOptionsWithAllocator() - ), - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = MessageMemoryStrategyT::create_default() - ) ) { using rclcpp::node_interfaces::get_node_topics_interface; @@ -96,12 +94,10 @@ create_subscription( const std::string & topic_name, const rclcpp::QoS & qos, CallbackT && callback, - const rclcpp::SubscriptionOptionsWithAllocator & options = ( - rclcpp::SubscriptionOptionsWithAllocator() - ), - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = MessageMemoryStrategyT::create_default() - ) ) { const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 04508af46e..8fc9a67ff5 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -110,14 +110,11 @@ class IntraProcessManager * In addition this generates a unique intra process id for the subscription. * * \param subscription the SubscriptionIntraProcess to register. - * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the subscription's unique id. */ RCLCPP_PUBLIC uint64_t - add_subscription( - rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription, - const bool is_serialized = false); + add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); /// Unregister a subscription using the subscription's unique id. /** @@ -137,12 +134,11 @@ class IntraProcessManager * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. - * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false); + add_publisher(rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized = false); /// Unregister a publisher using the publisher's unique id. /** diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0172b0d036..8988eedea5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -73,13 +73,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase const std::string & topic_name, rmw_qos_profile_t qos_profile, rclcpp::IntraProcessBufferType buffer_type, - std::shared_ptr serializer) - : SubscriptionIntraProcessBase(topic_name, qos_profile), + std::shared_ptr serializer, + bool is_serialized = false) + : SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized), any_callback_(callback), serializer_(serializer) { if (!std::is_same::value && - !std::is_same::value && - !std::is_same::value) + !std::is_base_of::value) { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } @@ -157,11 +157,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // convert from ROS2 message to rcl_serialized_message_t (serilizatino needed) template typename std::enable_if< - std::is_same::value && - !std::is_same::value, - void>::type + !std::is_base_of::value && // tx not a SerializedMessage + std::is_base_of::value, // rx a SerializedMessage + void>::type execute_impl() { + fprintf(stderr, "tx ROS2, rx serialized\n"); if (nullptr == serializer_) { throw std::runtime_error("Subscription intra-process can't handle serialized messages"); } @@ -170,15 +171,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase msg_info.from_intra_process = true; ConstMessageSharedPtr msg = buffer_->consume_shared(); - auto serialized_msg = - serializer_->serialize_message(reinterpret_cast(msg.get())); + auto serialized_msg = std::make_shared(); + serializer_->serialize_message(reinterpret_cast(msg.get()), serialized_msg.get()); - if (nullptr == serialized_msg) { + if (0u == serialized_msg->buffer_length) { throw std::runtime_error("Subscription intra-process could not serialize message"); } if (any_callback_.use_take_shared_method()) { - any_callback_.dispatch_intra_process(serialized_msg, msg_info); + any_callback_.dispatch_intra_process(std::static_pointer_cast(serialized_msg), msg_info); } else { throw std::runtime_error( "Subscription intra-process for serialized " @@ -189,11 +190,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // forward from ROS2 message to ROS2 message (same type) template typename std::enable_if< - !std::is_same::value && - !std::is_same::value, + !std::is_base_of::value && // tx not a SerializedMessage + !std::is_base_of::value, // rx not a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx ROS2, rx ROS2\n"); rmw_message_info_t msg_info = {}; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -207,14 +209,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } - // forward from rcl_serialized_message_t to SerializationMessage (no conversion needed) + // forward from rcl_serialized_message_t to rcl_serialized_message_t (no conversion needed) template typename std::enable_if< - std::is_same::value && - std::is_same::value, + std::is_base_of::value && // tx a SerializedMessage + std::is_base_of::value, // rx a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx serialize, rx serialize\n"); rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; @@ -234,17 +237,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // convert from rcl_serialized_message_t to ROS2 message (deserialization needed) template typename std::enable_if< - !std::is_same::value && - std::is_same::value, + std::is_base_of::value && // tx a SerializedMessage + !std::is_base_of::value, // rx not a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx rcl, rx ROS2\n"); if (nullptr == serializer_) { throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); } - ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); - if (nullptr == serialized_container) { + ConstMessageSharedPtr serialized_message = buffer_->consume_shared(); + if (nullptr == serialized_message) { throw std::runtime_error("Subscription intra-process could not get serialized message"); } @@ -254,13 +258,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase if (any_callback_.use_take_shared_method()) { CallbackMessageSharedPtr msg = construct_unique(); serializer_->deserialize_message( - serialized_container.get(), + serialized_message.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(msg, msg_info); } else { + // construct unique might not initialize the ros message? CallbackMessageUniquePtr msg = construct_unique(); serializer_->deserialize_message( - serialized_container.get(), + serialized_message.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(std::move(msg), msg_info); } diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..6f23216ad9 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -39,8 +39,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) RCLCPP_PUBLIC - SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile) - : topic_name_(topic_name), qos_profile_(qos_profile) + SubscriptionIntraProcessBase( + const std::string & topic_name, + rmw_qos_profile_t qos_profile, + bool is_serialized) + : topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized) {} virtual ~SubscriptionIntraProcessBase() = default; @@ -70,6 +73,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC + bool + is_serialized() const; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; @@ -80,6 +87,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable std::string topic_name_; rmw_qos_profile_t qos_profile_; + bool is_serialized_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index fd53e727d1..122e9e1c08 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -69,12 +69,14 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + bool is_serialized = false) : PublisherBase( node_base, topic, *rosidl_typesupport_cpp::get_message_type_support_handle(), - options.template to_rcl_publisher_options(qos)), + options.template to_rcl_publisher_options(qos), + is_serialized), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())), message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) @@ -87,12 +89,14 @@ class Publisher : public PublisherBase const std::string & topic, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options, - const rosidl_message_type_support_t & type_support) + const rosidl_message_type_support_t & type_support, + bool is_serialized = false) : PublisherBase( node_base, topic, type_support, - options.template to_rcl_publisher_options(qos)), + options.template to_rcl_publisher_options(qos), + is_serialized), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())), message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) @@ -131,13 +135,9 @@ class Publisher : public PublisherBase throw std::invalid_argument( "intraprocess communication allowed only with volatile durability"); } - uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); - uint64_t intra_process_publisher_id_serialized = ipm->add_publisher( - this->shared_from_this(), true); - this->setup_intra_process( - intra_process_publisher_id, - intra_process_publisher_id_serialized, - ipm); + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), false); + uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(this->shared_from_this(), true); + this->setup_intra_process(intra_process_publisher_id, intra_process_publisher_id_serialized, ipm); } } @@ -173,10 +173,10 @@ class Publisher : public PublisherBase virtual void publish(std::unique_ptr msg) { - if (std::is_same::value) { - this->template publish(std::move(msg)); - return; - } + //if (std::is_same::value) { + // this->template publish(std::move(msg)); + // return; + //} if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); @@ -207,7 +207,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value, void>::type publish(const rcl_serialized_message_t & serialized_msg) { this->do_publish_message(serialized_msg); @@ -222,9 +222,16 @@ class Publisher : public PublisherBase "publishing unique_ptr with custom deleter only supported for serialized messages"); } + /// Publish a serialized message. + //template + //void publish(std::unique_ptr serialized_msg) + //{ + // this->do_serialized_publish(*serialized_msg); + //} + /// Publish a serialized message. template - void publish(std::unique_ptr serialized_msg) + void publish(std::unique_ptr serialized_msg) { this->do_serialized_publish(*serialized_msg); } @@ -309,7 +316,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value>::type do_publish_message(const T & msg) { // Avoid allocating when not using intra process. @@ -327,7 +334,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value>::type do_publish_message(const T & msg) { // Kept for backwards compatibility. Copies compelete memory! @@ -355,7 +362,7 @@ class Publisher : public PublisherBase } void - do_serialized_publish(rcl_serialized_message_t serialized_msg) + do_serialized_publish(const SerializedMessage & serialized_msg) { bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); @@ -368,10 +375,10 @@ class Publisher : public PublisherBase } } - auto msg = std::make_unique( - std::move(serialized_msg)); - if (intra_process_is_enabled_) { + auto msg = std::make_unique(); + *msg = serialized_msg; + do_intra_process_publish(std::move(msg), message_allocator_serialized_); } } @@ -411,12 +418,8 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - const uint64_t intra_process_publisher_id = std::is_same::value ? - intra_process_publisher_id_serialized_ : intra_process_publisher_id_; - ipm->template do_intra_process_publish( - intra_process_publisher_id, + intra_process_publisher_id_, std::move(msg), message_allocator); } @@ -436,12 +439,8 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - const uint64_t intra_process_publisher_id = std::is_same::value ? - intra_process_publisher_id_serialized_ : intra_process_publisher_id_; - return ipm->template do_intra_process_publish_and_return_shared( - intra_process_publisher_id, + intra_process_publisher_id_, std::move(msg), message_allocator); } diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 4f678ec0bb..a061dc60d3 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -73,7 +73,8 @@ class PublisherBase : public std::enable_shared_from_this rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rosidl_message_type_support_t & type_support, - const rcl_publisher_options_t & publisher_options); + const rcl_publisher_options_t & publisher_options, + bool is_serializd = false); RCLCPP_PUBLIC virtual ~PublisherBase(); @@ -194,6 +195,10 @@ class PublisherBase : public std::enable_shared_from_this uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm); + RCLCPP_PUBLIC + bool + is_serialized() const; + protected: template void @@ -226,6 +231,9 @@ class PublisherBase : public std::enable_shared_from_this uint64_t intra_process_publisher_id_serialized_; rmw_gid_t rmw_gid_; + +private: + bool is_serialized_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index f287d8e49d..84b45aad6b 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -75,9 +75,9 @@ create_publisher_factory( const rclcpp::QoS & qos ) -> std::shared_ptr { + auto is_serialized = std::is_base_of::value; auto publisher = std::make_shared( - node_base, topic_name, qos, options, - type_support); + node_base, topic_name, qos, options, type_support, is_serialized); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index ddc122cb43..65f81f05e9 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -15,16 +15,16 @@ #ifndef RCLCPP__SERIALIZATION_HPP_ #define RCLCPP__SERIALIZATION_HPP_ -#include - #include #include -#include "rcl/error_handling.h" +#include "rosidl_runtime_c/message_type_support_struct.h" namespace rclcpp { +class SerializedMessage; + /// Interface to (de)serialize a message class SerializationBase { @@ -35,7 +35,8 @@ class SerializationBase /** * \param[in] message The ROS2 message which is read and serialized by rmw. */ - virtual std::shared_ptr serialize_message(const void * message) = 0; + virtual void serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const = 0; /// Deserialize a serialized stream to a ROS message /** @@ -43,79 +44,23 @@ class SerializationBase * \param[out] message The deserialized ROS2 message. */ virtual void deserialize_message( - const rcl_serialized_message_t * serialized_message, - void * msg) = 0; + const SerializedMessage * serialized_message, void * ros_message) const = 0; }; /// Default implementation to (de)serialize a message by using rmw_(de)serialize class Serialization : public SerializationBase { public: - Serialization( - const rosidl_message_type_support_t & type_support, - const rcutils_allocator_t allocator = rcutils_get_default_allocator()) - : type_support_(type_support), rcutils_allocator_(allocator) - {} - - std::shared_ptr serialize_message(const void * message) override - { - auto serialized_message = new rcl_serialized_message_t; - *serialized_message = rmw_get_zero_initialized_serialized_message(); - const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_); - if (ret != RCUTILS_RET_OK) { - throw std::runtime_error( - "Error allocating resources for serialized message: " + - std::string(rcutils_get_error_string().str)); - } - - if (nullptr == message) { - delete serialized_message; - throw std::runtime_error("Message is nullpointer while serialization."); - } - - const auto error = rmw_serialize( - message, - &type_support_, - serialized_message); - if (error != RCL_RET_OK) { - delete serialized_message; - throw std::runtime_error("Failed to serialize."); - } - - auto shared_serialized_msg = std::shared_ptr( - serialized_message, - [](rcl_serialized_message_t * msg) { - auto fini_ret = rmw_serialized_message_fini(msg); - delete msg; - if (fini_ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Failed to destroy serialized message: %s", rcl_get_error_string().str); - } - }); - - return shared_serialized_msg; - } + Serialization(const rosidl_message_type_support_t & type_support); - void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override - { - if (nullptr == serialized_message || - serialized_message->buffer_capacity == 0 || - serialized_message->buffer_length == 0 || - !serialized_message->buffer) - { - throw std::runtime_error("Failed to deserialize nullptr serialized message."); - } + void serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const override; - const auto ret = rmw_deserialize(serialized_message, &type_support_, msg); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Failed to deserialize serialized message."); - } - } + void deserialize_message( + const SerializedMessage * serialized_message, void * ros_message) const override; private: rosidl_message_type_support_t type_support_; - rcutils_allocator_t rcutils_allocator_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 8e4dbf837f..3e7cff7f0b 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -15,13 +15,8 @@ #ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_ #define RCLCPP__SERIALIZED_MESSAGE_HPP_ -#include - -#include - -#include "rcutils/logging_macros.h" - -#include "rmw/serialized_message.h" +#include "rcl/allocator.h" +#include "rcl/types.h" namespace rclcpp { @@ -30,49 +25,50 @@ namespace rclcpp class SerializedMessage : public rcl_serialized_message_t { public: - SerializedMessage() - : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) - {} - - explicit SerializedMessage(const SerializedMessage & serialized_message) - : SerializedMessage(static_cast(serialized_message)) - {} - - explicit SerializedMessage(const rcl_serialized_message_t & serialized_message) - : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) - { - const auto ret = rmw_serialized_message_init( - this, serialized_message.buffer_length, - &serialized_message.allocator); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } - - // do not call memcpy if the pointer is "static" - if (buffer != serialized_message.buffer) { - std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); - } - buffer_length = serialized_message.buffer_length; - } - - explicit SerializedMessage(rcl_serialized_message_t && msg) - : rcl_serialized_message_t(msg) - { - // reset buffer to prevent double free - msg = rmw_get_zero_initialized_serialized_message(); - } - - ~SerializedMessage() - { - if (nullptr != buffer) { - const auto fini_ret = rmw_serialized_message_fini(this); - if (fini_ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Failed to destroy serialized message: %s", rcl_get_error_string().str); - } - } - } + /// Default constructor for a SerializedMessage + /** + * Default constructs a serialized message and initalizes its + * capacity with 0. + * + * \param[in] allocator The allocator to be used for the initialzation. + */ + explicit SerializedMessage( + const rcl_allocator_t & allocator = rcl_get_default_allocator()); + + /// Default constructor for a SerializedMessage + /** + * Default constructs a serialized message and initalizes its + * capacity with 0. + * + * \param[in] initial_capacity The amount of memory to be allocated. + * \param[in] allocator The allocator to be used for the initialzation. + */ + SerializedMessage( + size_t initial_capacity, + const rcl_allocator_t & allocator = rcl_get_default_allocator()); + + /// Copy Constructor for a SerializedMessage + SerializedMessage(const SerializedMessage & serialized_message); + + /// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t + SerializedMessage(const rcl_serialized_message_t & serialized_message); + + /// Move Constructor for a SerializedMessage + SerializedMessage(SerializedMessage && serialized_message); + + /// Move Constructor for a SerializedMessage from a rcl_serialized_message_t + SerializedMessage(rcl_serialized_message_t && serialized_message); + + SerializedMessage & operator=(const SerializedMessage & other); + + SerializedMessage & operator=(const rcl_serialized_message_t & other); + + SerializedMessage & operator=(SerializedMessage && other); + + SerializedMessage & operator=(rcl_serialized_message_t && other); + + /// Destructor for a SerializedMessage + ~SerializedMessage(); }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 9568027e9e..1e6a46604d 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -25,7 +25,6 @@ #include #include - #include "rcl/error_handling.h" #include "rcl/subscription.h" @@ -177,9 +176,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( - type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared(type_support_handle) ); TRACEPOINT( rclcpp_subscription_init, @@ -216,9 +213,8 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( - type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared(type_support_handle), + is_serialized() ); TRACEPOINT( rclcpp_subscription_init, @@ -227,8 +223,7 @@ class Subscription : public SubscriptionBase // Add it to the intra process manager. intra_process_subscription_id_serialized = ipm->add_subscription( - subscription_intra_process, - true); + subscription_intra_process); } this->setup_intra_process( diff --git a/rclcpp/include/rclcpp/subscription_traits.hpp b/rclcpp/include/rclcpp/subscription_traits.hpp index ecab458bd7..2d8772e3a3 100644 --- a/rclcpp/include/rclcpp/subscription_traits.hpp +++ b/rclcpp/include/rclcpp/subscription_traits.hpp @@ -18,6 +18,8 @@ #include #include "rclcpp/function_traits.hpp" +#include "rclcpp/serialized_message.hpp" +#include "rclcpp/subscription_options.hpp" #include "rcl/types.h" namespace rclcpp @@ -49,6 +51,15 @@ struct is_serialized_subscription_argument +struct is_serialized_subscription_argument: std::true_type +{}; + +template<> +struct is_serialized_subscription_argument> + : std::true_type +{}; + template struct is_serialized_subscription : is_serialized_subscription_argument {}; @@ -75,6 +86,7 @@ struct extract_message_type>: extract_message template< typename CallbackT, + typename AllocatorT = std::allocator, // Do not attempt if CallbackT is an integer (mistaken for depth) typename = std::enable_if_t>>::value>, @@ -85,6 +97,10 @@ template< // Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile) typename = std::enable_if_t>>::value>, + // Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator + typename = std::enable_if_t, std::remove_cv_t>>::value> > struct has_message_type : extract_message_type< diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 3b0b4d1e19..0493a13a71 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -33,8 +33,7 @@ IntraProcessManager::~IntraProcessManager() uint64_t IntraProcessManager::add_publisher( - rclcpp::PublisherBase::SharedPtr publisher, - const bool is_serialized) + rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized) { std::unique_lock lock(mutex_); @@ -59,9 +58,7 @@ IntraProcessManager::add_publisher( } uint64_t -IntraProcessManager::add_subscription( - SubscriptionIntraProcessBase::SharedPtr subscription, - const bool is_serialized) +IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) { std::unique_lock lock(mutex_); @@ -71,7 +68,7 @@ IntraProcessManager::add_subscription( subscriptions_[id].topic_name = subscription->get_topic_name(); subscriptions_[id].qos = subscription->get_actual_qos(); subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); - subscriptions_[id].is_serialized = is_serialized; + subscriptions_[id].is_serialized = subscription->is_serialized(); // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 0f2258881a..adb94d02e2 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -42,10 +42,12 @@ PublisherBase::PublisherBase( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rosidl_message_type_support_t & type_support, - const rcl_publisher_options_t & publisher_options) + const rcl_publisher_options_t & publisher_options, + bool is_serialized) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0), - intra_process_publisher_id_serialized_(0) + intra_process_is_enabled_(false), + intra_process_publisher_id_(0), + is_serialized_(is_serialized) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -106,7 +108,6 @@ PublisherBase::~PublisherBase() return; } ipm->remove_publisher(intra_process_publisher_id_); - ipm->remove_publisher(intra_process_publisher_id_serialized_); } const char * @@ -249,6 +250,12 @@ PublisherBase::setup_intra_process( intra_process_is_enabled_ = true; } +bool +PublisherBase::is_serialized() const +{ + return is_serialized_; +} + void PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const { diff --git a/rclcpp/src/rclcpp/serialization.cpp b/rclcpp/src/rclcpp/serialization.cpp new file mode 100644 index 0000000000..b7245bf0ba --- /dev/null +++ b/rclcpp/src/rclcpp/serialization.cpp @@ -0,0 +1,66 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/serialization.hpp" + +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/serialized_message.hpp" + +#include "rcpputils/asserts.hpp" + +#include "rmw/rmw.h" + +namespace rclcpp +{ + +Serialization::Serialization(const rosidl_message_type_support_t & type_support) +: type_support_(type_support) +{} + +void Serialization::serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const +{ + rcpputils::check_true(ros_message != nullptr, "ROS message is nullpointer."); + rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer."); + + const auto ret = rmw_serialize( + ros_message, + &type_support_, + serialized_message); + if (ret != RMW_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to serialize ROS message."); + } +} + +void +Serialization::deserialize_message( + const SerializedMessage * serialized_message, void * ros_message) const +{ + rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer."); + rcpputils::check_true( + serialized_message->buffer_capacity != 0 && + serialized_message->buffer_length != 0 && + serialized_message->buffer != nullptr, "Serialized message is wrongly initialized."); + rcpputils::check_true(ros_message != nullptr, "ROS message is a nullpointer."); + + const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message); + if (ret != RMW_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to deserialize ROS message."); + } +} + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp new file mode 100644 index 0000000000..99b20d4d5c --- /dev/null +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -0,0 +1,135 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/serialized_message.hpp" + +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/logging.hpp" + +#include "rmw/types.h" + +namespace rclcpp +{ + +/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks +SerializedMessage::SerializedMessage(const rcl_allocator_t & allocator) +: SerializedMessage(0u, allocator) +{} + +SerializedMessage::SerializedMessage( + size_t initial_capacity, const rcl_allocator_t & allocator) +: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) +{ + const auto ret = rmw_serialized_message_init( + this, initial_capacity, &allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } +} + +SerializedMessage::SerializedMessage(const SerializedMessage & serialized_message) +: SerializedMessage(static_cast(serialized_message)) +{} + +SerializedMessage::SerializedMessage(const rcl_serialized_message_t & serialized_message) +: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) +{ + fprintf(stderr, "copy constructor called\n"); + const auto ret = rmw_serialized_message_init( + this, serialized_message.buffer_capacity, &serialized_message.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != serialized_message.buffer) { + std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); + } + buffer_length = serialized_message.buffer_length; +} + +SerializedMessage::SerializedMessage(SerializedMessage && serialized_message) +: SerializedMessage( + std::forward( + static_cast(serialized_message))) +{} + +SerializedMessage::SerializedMessage(rcl_serialized_message_t && serialized_message) +: rcl_serialized_message_t(serialized_message) +{ + // reset buffer to prevent double free + serialized_message = rmw_get_zero_initialized_serialized_message(); +} + +SerializedMessage & SerializedMessage::operator=(const SerializedMessage & other) +{ + *this = static_cast(other); + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(const rcl_serialized_message_t & other) +{ + *this = static_cast(rmw_get_zero_initialized_serialized_message()); + + const auto ret = rmw_serialized_message_init( + this, other.buffer_capacity, &other.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != other.buffer) { + std::memcpy(buffer, other.buffer, other.buffer_length); + } + buffer_length = other.buffer_length; + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(SerializedMessage && other) +{ + *this = static_cast(other); + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other) +{ + this->buffer = other.buffer; + this->buffer_capacity = other.buffer_length; + this->buffer_length = other.buffer_length; + this->allocator = other.allocator; + + // reset original to prevent double free + other = rmw_get_zero_initialized_serialized_message(); + + return *this; +} + +SerializedMessage::~SerializedMessage() +{ + if (nullptr != buffer) { + const auto fini_ret = rmw_serialized_message_fini(this); + if (fini_ret != RCL_RET_OK) { + RCLCPP_ERROR( + get_logger("rclcpp"), + "Failed to destroy serialized message: %s", rcl_get_error_string().str); + } + } +} + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 3b951f34de..abd382a2ba 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -36,3 +36,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +bool +SubscriptionIntraProcessBase::is_serialized() const +{ + return is_serialized_; +} diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index cc702da0e8..65ab3f010f 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -14,8 +14,6 @@ #include -#include - #include #include #include @@ -26,8 +24,11 @@ #include "rclcpp/publisher.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" +#include "test_msgs/message_fixtures.hpp" + int32_t & get_test_allocation_counter() { static int32_t counter = 0; @@ -36,6 +37,7 @@ int32_t & get_test_allocation_counter() void * custom_allocate(size_t size, void * state) { + fprintf(stderr, "calling custom allocate\n"); static auto m_allocator = rcutils_get_default_allocator(); ++get_test_allocation_counter(); @@ -45,6 +47,7 @@ void * custom_allocate(size_t size, void * state) void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) { + fprintf(stderr, "calling custom zero allocate\n"); static auto m_allocator = rcutils_get_default_allocator(); ++get_test_allocation_counter(); @@ -54,6 +57,7 @@ void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, v void * custom_reallocate(void * pointer, size_t size, void * state) { + fprintf(stderr, "calling custom reallocate\n"); static auto m_allocator = rcutils_get_default_allocator(); if (pointer == nullptr) { @@ -66,14 +70,15 @@ void * custom_reallocate(void * pointer, size_t size, void * state) void custom_deallocate(void * pointer, void * state) { + fprintf(stderr, "calling custom deallocate\n"); static auto m_allocator = rcutils_get_default_allocator(); --get_test_allocation_counter(); m_allocator.deallocate(pointer, state); } -rcl_serialized_message_t make_serialized_string_msg( - const std::shared_ptr & stringMsg) +rclcpp::SerializedMessage make_serialized_string_msg( + const std::shared_ptr & string_msg) { auto m_allocator = rcutils_get_default_allocator(); @@ -83,21 +88,13 @@ rcl_serialized_message_t make_serialized_string_msg( m_allocator.reallocate = &custom_reallocate; m_allocator.zero_allocate = &custom_zero_allocate; - rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); - auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } + rclcpp::SerializedMessage msg(m_allocator); - static auto type = - rosidl_typesupport_cpp::get_message_type_support_handle - (); - auto error = rmw_serialize(stringMsg.get(), type, &msg); - if (error != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "test_intra_process_communication", - "Something went wrong preparing the serialized message"); - } + static auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + rclcpp::Serialization serializer(*type_support); + EXPECT_NO_THROW(serializer.serialize_message(string_msg.get(), &msg)); return msg; } @@ -121,22 +118,94 @@ std::ostream & operator<<(std::ostream & out, const TestParameters & params) return out; } -class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +class TestPublisherSubscriptionSerialized : public ::testing::Test { public: static void SetUpTestCase() { - if (!rclcpp::ok()) { - rclcpp::init(0, nullptr); - } + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); } protected: - static std::chrono::milliseconds offset; + std::chrono::milliseconds offset_ = std::chrono::milliseconds(250); + + std::vector parameters_ = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false), + rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + { + { + rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false), + rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two subscriptions, one using intra-process comm and the other not using it. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().use_intra_process_comms(false) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_one_intraprocess_one_not" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both using intra-process. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_in_two_contexts_with_intraprocess_comm" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both of them not using intra-process comm. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(false), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + // }, + // {0u, 0u}, + // 1, + // "two_subscriptions_in_two_contexts_without_intraprocess_comm" + //} + }; }; -std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( - 250); std::array counts; void OnMessageSerialized(const std::shared_ptr msg) @@ -144,12 +213,23 @@ void OnMessageSerialized(const std::shared_ptr m EXPECT_NE(msg->buffer, nullptr); EXPECT_GT(msg->buffer_capacity, 0u); + static auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + test_msgs::msg::Strings ros_msg; + rclcpp::Serialization serializer(*type_support); + EXPECT_NO_THROW( + serializer.deserialize_message( + static_cast(msg.get()), &ros_msg)); + EXPECT_EQ(ros_msg.string_value.front(), '0'); + EXPECT_EQ(ros_msg.string_value[6], '6'); + EXPECT_EQ(ros_msg.string_value.back(), '9'); ++counts[0]; } void OnMessageConst(std::shared_ptr msg) { - EXPECT_EQ(msg->string_value.back(), '9'); + EXPECT_EQ(msg->string_value[6], '6'); ++counts[1]; } @@ -163,189 +243,130 @@ void OnMessageUniquePtr(std::unique_ptr msg) void OnMessage(std::shared_ptr msg) { - EXPECT_EQ(msg->string_value.back(), '9'); + EXPECT_EQ(msg->string_value.front(), '0'); + EXPECT_EQ(msg->string_value[6], '6'); + EXPECT_EQ(msg->string_value.back(), '\0'); ++counts[1]; } -TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) { get_test_allocation_counter() = 0; - TestParameters parameters = GetParam(); - { + for (const auto & parameter : parameters_) { rclcpp::Node::SharedPtr node = std::make_shared( - "my_node", - "/ns", - parameters.node_options[0]); + "my_node", + "/ns", + parameter.node_options[0]); auto publisher = node->create_publisher("/topic", 10); auto sub_shared = node->create_subscription( - "/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription( - "/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription( - "/topic", 10, - &OnMessageConst); + "/topic", 10, + &OnMessage); + //auto sub_unique = node->create_subscription( + // "/topic", 10, + // &OnMessageUniquePtr); + //auto sub_const_shared = node->create_subscription( + // "/topic", 10, + // &OnMessageConst); auto sub_serialized = node->create_subscription( - "/topic", 10, - &OnMessageSerialized); + "/topic", 10, + &OnMessageSerialized); - rclcpp::sleep_for(offset); + rclcpp::sleep_for(offset_); counts.fill(0); - auto stringMsg = get_messages_strings()[3]; + std::shared_ptr string_msg = get_messages_strings()[3]; - for (size_t i = 0; i < parameters.runs; i++) { - auto msg0 = make_serialized_string_msg(stringMsg); + for (size_t i = 0; i < parameter.runs; i++) { + auto msg0 = make_serialized_string_msg(string_msg); - std::unique_ptr stringMsgU( - new test_msgs::msg::Strings( - *stringMsg)); + auto unique_string_msg = std::make_unique(*string_msg); - publisher->publish(std::make_unique(msg0)); - publisher->publish(*stringMsg); - publisher->publish(std::move(stringMsgU)); + { + auto unique_serialized_msg = std::make_unique(std::move(msg0)); + publisher->publish(std::move(unique_serialized_msg)); + } + //publisher->publish(*string_msg); + //publisher->publish(std::move(unique_string_msg)); } for (uint32_t i = 0; i < 3; ++i) { rclcpp::spin_some(node); - rclcpp::sleep_for(offset); + rclcpp::sleep_for(offset_); } rclcpp::spin_some(node); - } - - if (parameters.runs == 1) { - EXPECT_EQ(counts[0], 3u); - EXPECT_EQ(counts[1], 9u); - } - - EXPECT_EQ(get_test_allocation_counter(), 0); -} - -TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) -{ - get_test_allocation_counter() = 0; - - TestParameters parameters = GetParam(); - { - rclcpp::Node::SharedPtr node = std::make_shared( - "my_node", - "/ns", - parameters.node_options[0]); - auto publisher = rclcpp::create_publisher( - node, - "/topic", - *rosidl_typesupport_cpp::get_message_type_support_handle(), - rclcpp::QoS(10)); - - auto sub_gen_serialized = rclcpp::create_subscription( - node, - "/topic", - *rosidl_typesupport_cpp::get_message_type_support_handle(), - rclcpp::QoS(10), - &OnMessageSerialized); - - auto sub_shared = node->create_subscription( - "/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription( - "/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription( - "/topic", 10, - &OnMessageConst); - auto sub_serialized = node->create_subscription( - "/topic", 10, - &OnMessageSerialized); - - rclcpp::sleep_for(offset); - counts.fill(0); - auto stringMsg = get_messages_strings()[3]; - - for (size_t i = 0; i < parameters.runs; i++) { - auto msg0 = make_serialized_string_msg(stringMsg); - - publisher->publish(std::make_unique(msg0)); + if (parameter.runs == 1) { + EXPECT_EQ(counts[0], 3u); // count serialized message callbacks + EXPECT_EQ(counts[1], 9u); // count unique + shared message callbacks } - rclcpp::spin_some(node); - rclcpp::sleep_for(offset); - - rclcpp::spin_some(node); - } - - if (parameters.runs == 1) { - EXPECT_EQ(counts[0], 2u); - EXPECT_EQ(counts[1], 3u); } - EXPECT_EQ(get_test_allocation_counter(), 0); } -auto get_new_context() -{ - auto context = rclcpp::Context::make_shared(); - context->init(0, nullptr); - return context; -} +//TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +//{ +// get_test_allocation_counter() = 0; +// +// TestParameters parameters = GetParam(); +// { +// rclcpp::Node::SharedPtr node = std::make_shared( +// "my_node", +// "/ns", +// parameters.node_options[0]); +// auto publisher = rclcpp::create_publisher( +// node, +// "/topic", +// *rosidl_typesupport_cpp::get_message_type_support_handle(), +// rclcpp::QoS(10)); +// +// auto sub_gen_serialized = rclcpp::create_subscription( +// node, +// "/topic", +// *rosidl_typesupport_cpp::get_message_type_support_handle(), +// rclcpp::QoS(10), +// &OnMessageSerialized); +// +// auto sub_shared = node->create_subscription( +// "/topic", 10, +// &OnMessage); +// auto sub_unique = node->create_subscription( +// "/topic", 10, +// &OnMessageUniquePtr); +// auto sub_const_shared = node->create_subscription( +// "/topic", 10, +// &OnMessageConst); +// auto sub_serialized = node->create_subscription( +// "/topic", 10, +// &OnMessageSerialized); +// +// rclcpp::sleep_for(offset); +// +// counts.fill(0); +// auto stringMsg = get_messages_strings()[3]; +// +// for (size_t i = 0; i < parameters.runs; i++) { +// auto msg0 = make_serialized_string_msg(stringMsg); +// +// publisher->publish(std::make_unique(msg0)); +// } +// rclcpp::spin_some(node); +// rclcpp::sleep_for(offset); +// +// rclcpp::spin_some(node); +// } +// +// if (parameters.runs == 1) { +// EXPECT_EQ(counts[0], 2u); +// EXPECT_EQ(counts[1], 3u); +// } +// +// EXPECT_EQ(get_test_allocation_counter(), 0); +//} + -std::vector parameters = { - /* - Testing publisher subscription count api and internal process subscription count. - Two subscriptions in the same topic, both using intraprocess comm. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().use_intra_process_comms(true) - }, - {1u, 2u}, - 1, - "two_subscriptions_intraprocess_comm" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two subscriptions, one using intra-process comm and the other not using it. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().use_intra_process_comms(false) - }, - {1u, 1u}, - 1, - "two_subscriptions_one_intraprocess_one_not" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both using intra-process. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) - }, - {1u, 1u}, - 1, - "two_subscriptions_in_two_contexts_with_intraprocess_comm" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both of them not using intra-process comm. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(false), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) - }, - {0u, 0u}, - 1, - "two_subscriptions_in_two_contexts_without_intraprocess_comm" - } -}; std::vector setRuns(const std::vector & in, const size_t runs) { @@ -356,12 +377,12 @@ std::vector setRuns(const std::vector & in, cons return out; } -INSTANTIATE_TEST_CASE_P( - TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, - ::testing::ValuesIn(parameters), - ::testing::PrintToStringParamName()); +//INSTANTIATE_TEST_CASE_P( +// TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, +// ::testing::ValuesIn(parameters), +// ::testing::PrintToStringParamName()); -INSTANTIATE_TEST_CASE_P( - TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, - ::testing::ValuesIn(setRuns(parameters, 1000)), - ::testing::PrintToStringParamName()); +//INSTANTIATE_TEST_CASE_P( +// TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, +// ::testing::ValuesIn(setRuns(parameters, 1000)), +// ::testing::PrintToStringParamName()); diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index b71fb1d061..7eb860a985 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -93,6 +93,12 @@ class PublisherBase return false; } + bool + is_serialized() const + { + return false; + } + rclcpp::QoS qos; std::string topic_name; uint64_t intra_process_publisher_id_; @@ -212,6 +218,12 @@ class SubscriptionIntraProcessBase return topic_name; } + bool + is_serialized() const + { + return false; + } + rmw_qos_profile_t qos_profile; const char * topic_name; }; diff --git a/rclcpp/test/test_publisher.cpp b/rclcpp/test/test_publisher.cpp index dce0b8caa8..7af7868a04 100644 --- a/rclcpp/test/test_publisher.cpp +++ b/rclcpp/test/test_publisher.cpp @@ -20,6 +20,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/serialized_message.hpp" #include "test_msgs/msg/empty.hpp" @@ -143,6 +144,34 @@ TEST_F(TestPublisher, various_creation_signatures) { } } +/* + Testing for serialized publishers. + */ +TEST_F(TestPublisher, test_is_serialized) { + initialize(); + using test_msgs::msg::Empty; + { + auto publisher = node->create_publisher("topic", 42); + EXPECT_FALSE(publisher->is_serialized()); + } + { + auto publisher = rclcpp::create_publisher(node, "topic", 42, rclcpp::PublisherOptions()); + EXPECT_FALSE(publisher->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto publisher = rclcpp::create_publisher( + node, "topic", ts, 42, rclcpp::PublisherOptions()); + EXPECT_TRUE(publisher->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto publisher = rclcpp::create_publisher( + node, "topic", ts, 42, rclcpp::PublisherOptions()); + EXPECT_TRUE(publisher->is_serialized()); + } +} + /* Testing publisher with intraprocess enabled and invalid QoS */ diff --git a/rclcpp/test/test_serialized_message.cpp b/rclcpp/test/test_serialized_message.cpp new file mode 100644 index 0000000000..f42d8b4fd6 --- /dev/null +++ b/rclcpp/test/test_serialized_message.cpp @@ -0,0 +1,137 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rcpputils/asserts.hpp" + +#include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/basic_types.hpp" + +TEST(TestSerializedMessage, empty_initialize) { + rclcpp::SerializedMessage serialized_message; + EXPECT_TRUE(serialized_message.buffer == nullptr); + EXPECT_EQ(0u, serialized_message.buffer_length); + EXPECT_EQ(0u, serialized_message.buffer_capacity); +} + +TEST(TestSerializedMessage, initialize_with_capacity) { + rclcpp::SerializedMessage serialized_message(13); + EXPECT_TRUE(serialized_message.buffer != nullptr); + EXPECT_EQ(0u, serialized_message.buffer_length); + EXPECT_EQ(13u, serialized_message.buffer_capacity); +} + +TEST(TestSerializedMessage, various_constructors) { + std::string content = "Hello World"; + auto content_size = content.size() + 1; // accounting for null terminator + + rclcpp::SerializedMessage serialized_message(content_size); + // manually copy some content + std::memcpy(serialized_message.buffer, content.c_str(), content.size()); + serialized_message.buffer[content.size()] = '\0'; + serialized_message.buffer_length = content_size; + EXPECT_STREQ(content.c_str(), reinterpret_cast(serialized_message.buffer)); + EXPECT_EQ(content_size, serialized_message.buffer_capacity); + + // Copy Constructor + rclcpp::SerializedMessage other_serialized_message(serialized_message); + EXPECT_EQ(content_size, other_serialized_message.buffer_capacity); + EXPECT_EQ(content_size, other_serialized_message.buffer_length); + EXPECT_STREQ( + reinterpret_cast(serialized_message.buffer), + reinterpret_cast(other_serialized_message.buffer)); + + // Move Constructor + rclcpp::SerializedMessage yet_another_serialized_message(std::move(other_serialized_message)); + EXPECT_TRUE(other_serialized_message.buffer == nullptr); + EXPECT_EQ(0u, other_serialized_message.buffer_capacity); + EXPECT_EQ(0u, other_serialized_message.buffer_length); + + auto default_allocator = rcl_get_default_allocator(); + auto rcl_serialized_msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&rcl_serialized_msg, 13, &default_allocator); + ASSERT_EQ(RCL_RET_OK, ret); + + // manually copy some content + std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size()); + rcl_serialized_msg.buffer[content.size()] = '\0'; + rcl_serialized_msg.buffer_length = content_size; + EXPECT_EQ(13u, rcl_serialized_msg.buffer_capacity); + + // Copy Constructor from rcl_serialized_message_t + rclcpp::SerializedMessage from_rcl_msg(rcl_serialized_msg); + EXPECT_EQ(13u, from_rcl_msg.buffer_capacity); + EXPECT_EQ(content_size, from_rcl_msg.buffer_length); + + // Verify that despite being fini'd, the copy is real + ret = rmw_serialized_message_fini(&rcl_serialized_msg); + ASSERT_EQ(RCL_RET_OK, ret); + EXPECT_EQ(nullptr, rcl_serialized_msg.buffer); + EXPECT_EQ(0u, rcl_serialized_msg.buffer_capacity); + EXPECT_EQ(0u, rcl_serialized_msg.buffer_length); + EXPECT_TRUE(nullptr != from_rcl_msg.buffer); + EXPECT_EQ(13u, from_rcl_msg.buffer_capacity); + EXPECT_EQ(content_size, from_rcl_msg.buffer_length); +} + +TEST(TestSerializedMessage, serialization) { + auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + rclcpp::Serialization serializer(*type_support); + + auto basic_type_ros_msgs = get_messages_basic_types(); + for (const auto & ros_msg : basic_type_ros_msgs) { + // convert ros msg to serialized msg + rclcpp::SerializedMessage serialized_msg; + serializer.serialize_message(ros_msg.get(), &serialized_msg); + + // convert serialized msg back to ros msg + test_msgs::msg::BasicTypes deserialized_ros_msg; + serializer.deserialize_message(&serialized_msg, &deserialized_ros_msg); + + EXPECT_EQ(*ros_msg, deserialized_ros_msg); + } +} + +TEST(TestSerializedMessage, serialization_into_nullptr) { + auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + rclcpp::Serialization serializer(*type_support); + + auto basic_type_ros_msgs = get_messages_basic_types(); + for (const auto & ros_msg : basic_type_ros_msgs) { + rclcpp::SerializedMessage serialized_msg; + test_msgs::msg::BasicTypes deserialized_ros_msg; + + EXPECT_THROW( + serializer.serialize_message(ros_msg.get(), nullptr), + rcpputils::IllegalStateException); + EXPECT_THROW( + serializer.serialize_message(nullptr, &serialized_msg), + rcpputils::IllegalStateException); + + EXPECT_THROW( + serializer.deserialize_message(&serialized_msg, nullptr), + rcpputils::IllegalStateException); + EXPECT_THROW( + serializer.deserialize_message(nullptr, &deserialized_ros_msg), + rcpputils::IllegalStateException); + } +} diff --git a/rclcpp/test/test_subscription.cpp b/rclcpp/test/test_subscription.cpp index 95f9b96448..6f21d42f40 100644 --- a/rclcpp/test/test_subscription.cpp +++ b/rclcpp/test/test_subscription.cpp @@ -231,6 +231,40 @@ TEST_F(TestSubscription, various_creation_signatures) { } } +/* + Testing for serialized subscriptions + */ +TEST_F(TestSubscription, test_is_serialized) { + initialize(); + using test_msgs::msg::Empty; + auto cb = [](test_msgs::msg::Empty::SharedPtr) {}; + auto cb_rcl_serialized = [](std::shared_ptr) {}; + auto cb_rclcpp_serialized = [](std::shared_ptr) {}; + { + auto sub = node->create_subscription("topic", 1, cb); + EXPECT_FALSE(sub->is_serialized()); + } + { + auto sub = rclcpp::create_subscription(node, "topic", rclcpp::QoS(1), cb); + EXPECT_FALSE(sub->is_serialized()); + } + { + auto sub = rclcpp::create_subscription(node, "topic", rclcpp::QoS(1), cb_rcl_serialized); + EXPECT_TRUE(sub->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto sub = rclcpp::create_subscription( + node, "topic", ts, rclcpp::QoS(1), cb_rcl_serialized); + EXPECT_TRUE(sub->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto sub = rclcpp::create_subscription( + node, "topic", ts, rclcpp::QoS(1), cb_rclcpp_serialized); + EXPECT_TRUE(sub->is_serialized()); + } +} /* Testing subscriptions using std::bind. */ diff --git a/rclcpp/test/test_subscription_traits.cpp b/rclcpp/test/test_subscription_traits.cpp index 3112cafffd..926b157a72 100644 --- a/rclcpp/test/test_subscription_traits.cpp +++ b/rclcpp/test/test_subscription_traits.cpp @@ -52,6 +52,16 @@ void not_serialized_unique_ptr_callback( (void) unused; } +void rclcpp_serialized_callback_copy(rclcpp::SerializedMessage unused) +{ + (void) unused; +} + +void rclcpp_serialized_callback_shared_ptr(std::shared_ptr unused) +{ + (void) unused; +} + TEST(TestSubscriptionTraits, is_serialized_callback) { // Test regular functions auto cb1 = &serialized_callback_copy; @@ -97,6 +107,16 @@ TEST(TestSubscriptionTraits, is_serialized_callback) { static_assert( rclcpp::subscription_traits::is_serialized_callback::value == false, "passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback"); + + auto cb8 = &rclcpp_serialized_callback_copy; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "rclcpp::SerializedMessage in a first argument callback makes it a serialized callback"); + + auto cb9 = &rclcpp_serialized_callback_shared_ptr; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "std::shared_ptr in a callback makes it a serialized callback"); } TEST(TestSubscriptionTraits, callback_messages) {