From 1cd811a4126fc474e58c9bdd39f1205b872da9cd Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 17 Apr 2020 15:02:13 -0700 Subject: [PATCH 1/9] make it compile on osx Signed-off-by: Karsten Knese --- rclcpp/include/rclcpp/create_subscription.hpp | 16 ++++++---------- rclcpp/include/rclcpp/subscription_traits.hpp | 6 ++++++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index fb9ffea744..33a5c7b4b2 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -54,12 +54,10 @@ create_subscription( const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, CallbackT && callback, - const rclcpp::SubscriptionOptionsWithAllocator & options = ( - rclcpp::SubscriptionOptionsWithAllocator() - ), - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = MessageMemoryStrategyT::create_default() - ) ) { using rclcpp::node_interfaces::get_node_topics_interface; @@ -96,12 +94,10 @@ create_subscription( const std::string & topic_name, const rclcpp::QoS & qos, CallbackT && callback, - const rclcpp::SubscriptionOptionsWithAllocator & options = ( - rclcpp::SubscriptionOptionsWithAllocator() - ), - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = MessageMemoryStrategyT::create_default() - ) ) { const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); diff --git a/rclcpp/include/rclcpp/subscription_traits.hpp b/rclcpp/include/rclcpp/subscription_traits.hpp index ecab458bd7..c6d35a8423 100644 --- a/rclcpp/include/rclcpp/subscription_traits.hpp +++ b/rclcpp/include/rclcpp/subscription_traits.hpp @@ -18,6 +18,7 @@ #include #include "rclcpp/function_traits.hpp" +#include "rclcpp/subscription_options.hpp" #include "rcl/types.h" namespace rclcpp @@ -75,6 +76,7 @@ struct extract_message_type>: extract_message template< typename CallbackT, + typename AllocatorT = std::allocator, // Do not attempt if CallbackT is an integer (mistaken for depth) typename = std::enable_if_t>>::value>, @@ -85,6 +87,10 @@ template< // Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile) typename = std::enable_if_t>>::value>, + // Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator + typename = std::enable_if_t, std::remove_cv_t>>::value> > struct has_message_type : extract_message_type< From d6bf9833b3bbe97cd57691c45f2edca65fa86735 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 17 Apr 2020 19:46:10 -0700 Subject: [PATCH 2/9] make serialized message a class Signed-off-by: Karsten Knese --- rclcpp/CMakeLists.txt | 11 +++ .../subscription_intra_process.hpp | 16 ++-- rclcpp/include/rclcpp/serialization.hpp | 77 +++------------- rclcpp/include/rclcpp/serialized_message.hpp | 78 +++++++--------- rclcpp/include/rclcpp/subscription.hpp | 8 +- rclcpp/src/rclcpp/serialization.cpp | 68 ++++++++++++++ rclcpp/src/rclcpp/serialized_message.cpp | 88 +++++++++++++++++++ rclcpp/test/test_serialized_message.cpp | 88 +++++++++++++++++++ 8 files changed, 309 insertions(+), 125 deletions(-) create mode 100644 rclcpp/src/rclcpp/serialization.cpp create mode 100644 rclcpp/src/rclcpp/serialized_message.cpp create mode 100644 rclcpp/test/test_serialized_message.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3c271b20c6..26573854ea 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/publisher_base.cpp src/rclcpp/qos.cpp src/rclcpp/qos_event.cpp + src/rclcpp/serialization.cpp + src/rclcpp/serialized_message.cpp src/rclcpp/service.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription_base.cpp @@ -405,6 +407,15 @@ if(BUILD_TESTING) ${PROJECT_NAME} ) endif() + ament_add_gtest(test_serialized_message test/test_serialized_message.cpp) + if(TARGET test_serialized_message) + ament_target_dependencies(test_serialized_message + test_msgs + ) + target_link_libraries(test_serialized_message + ${PROJECT_NAME} + ) + endif() ament_add_gtest(test_service test/test_service.cpp) if(TARGET test_service) ament_target_dependencies(test_service diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0172b0d036..73998463dd 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -170,15 +170,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase msg_info.from_intra_process = true; ConstMessageSharedPtr msg = buffer_->consume_shared(); - auto serialized_msg = - serializer_->serialize_message(reinterpret_cast(msg.get())); + auto serialized_msg = std::make_shared(); + serializer_->serialize_message(reinterpret_cast(msg.get()), serialized_msg.get()); - if (nullptr == serialized_msg) { + if (0u == serialized_msg->buffer_length) { throw std::runtime_error("Subscription intra-process could not serialize message"); } if (any_callback_.use_take_shared_method()) { - any_callback_.dispatch_intra_process(serialized_msg, msg_info); + any_callback_.dispatch_intra_process(std::static_pointer_cast(serialized_msg), msg_info); } else { throw std::runtime_error( "Subscription intra-process for serialized " @@ -243,8 +243,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); } - ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); - if (nullptr == serialized_container) { + ConstMessageSharedPtr serialized_message = buffer_->consume_shared(); + if (nullptr == serialized_message) { throw std::runtime_error("Subscription intra-process could not get serialized message"); } @@ -254,13 +254,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase if (any_callback_.use_take_shared_method()) { CallbackMessageSharedPtr msg = construct_unique(); serializer_->deserialize_message( - serialized_container.get(), + serialized_message.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(msg, msg_info); } else { CallbackMessageUniquePtr msg = construct_unique(); serializer_->deserialize_message( - serialized_container.get(), + serialized_message.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(std::move(msg), msg_info); } diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index ddc122cb43..65f81f05e9 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -15,16 +15,16 @@ #ifndef RCLCPP__SERIALIZATION_HPP_ #define RCLCPP__SERIALIZATION_HPP_ -#include - #include #include -#include "rcl/error_handling.h" +#include "rosidl_runtime_c/message_type_support_struct.h" namespace rclcpp { +class SerializedMessage; + /// Interface to (de)serialize a message class SerializationBase { @@ -35,7 +35,8 @@ class SerializationBase /** * \param[in] message The ROS2 message which is read and serialized by rmw. */ - virtual std::shared_ptr serialize_message(const void * message) = 0; + virtual void serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const = 0; /// Deserialize a serialized stream to a ROS message /** @@ -43,79 +44,23 @@ class SerializationBase * \param[out] message The deserialized ROS2 message. */ virtual void deserialize_message( - const rcl_serialized_message_t * serialized_message, - void * msg) = 0; + const SerializedMessage * serialized_message, void * ros_message) const = 0; }; /// Default implementation to (de)serialize a message by using rmw_(de)serialize class Serialization : public SerializationBase { public: - Serialization( - const rosidl_message_type_support_t & type_support, - const rcutils_allocator_t allocator = rcutils_get_default_allocator()) - : type_support_(type_support), rcutils_allocator_(allocator) - {} - - std::shared_ptr serialize_message(const void * message) override - { - auto serialized_message = new rcl_serialized_message_t; - *serialized_message = rmw_get_zero_initialized_serialized_message(); - const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_); - if (ret != RCUTILS_RET_OK) { - throw std::runtime_error( - "Error allocating resources for serialized message: " + - std::string(rcutils_get_error_string().str)); - } - - if (nullptr == message) { - delete serialized_message; - throw std::runtime_error("Message is nullpointer while serialization."); - } - - const auto error = rmw_serialize( - message, - &type_support_, - serialized_message); - if (error != RCL_RET_OK) { - delete serialized_message; - throw std::runtime_error("Failed to serialize."); - } - - auto shared_serialized_msg = std::shared_ptr( - serialized_message, - [](rcl_serialized_message_t * msg) { - auto fini_ret = rmw_serialized_message_fini(msg); - delete msg; - if (fini_ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Failed to destroy serialized message: %s", rcl_get_error_string().str); - } - }); - - return shared_serialized_msg; - } + Serialization(const rosidl_message_type_support_t & type_support); - void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override - { - if (nullptr == serialized_message || - serialized_message->buffer_capacity == 0 || - serialized_message->buffer_length == 0 || - !serialized_message->buffer) - { - throw std::runtime_error("Failed to deserialize nullptr serialized message."); - } + void serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const override; - const auto ret = rmw_deserialize(serialized_message, &type_support_, msg); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Failed to deserialize serialized message."); - } - } + void deserialize_message( + const SerializedMessage * serialized_message, void * ros_message) const override; private: rosidl_message_type_support_t type_support_; - rcutils_allocator_t rcutils_allocator_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 8e4dbf837f..3227a11919 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -15,13 +15,8 @@ #ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_ #define RCLCPP__SERIALIZED_MESSAGE_HPP_ -#include - -#include - -#include "rcutils/logging_macros.h" - -#include "rmw/serialized_message.h" +#include "rcl/allocator.h" +#include "rcl/types.h" namespace rclcpp { @@ -30,49 +25,42 @@ namespace rclcpp class SerializedMessage : public rcl_serialized_message_t { public: - SerializedMessage() - : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) - {} + /// Default constructor for a SerializedMessage + /** + * Default constructs a serialized message and initalizes its + * capacity with 0. + * + * \param[in] allocator The allocator to be used for the initialzation. + */ + explicit SerializedMessage( + const rcl_allocator_t & allocator = rcl_get_default_allocator()); + + /// Default constructor for a SerializedMessage + /** + * Default constructs a serialized message and initalizes its + * capacity with 0. + * + * \param[in] initial_capacity The amount of memory to be allocated. + * \param[in] allocator The allocator to be used for the initialzation. + */ + explicit SerializedMessage( + size_t initial_capacity, + const rcl_allocator_t & allocator = rcl_get_default_allocator()); - explicit SerializedMessage(const SerializedMessage & serialized_message) - : SerializedMessage(static_cast(serialized_message)) - {} + /// Copy Constructor for a SerializedMessage + explicit SerializedMessage(const SerializedMessage & serialized_message); - explicit SerializedMessage(const rcl_serialized_message_t & serialized_message) - : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) - { - const auto ret = rmw_serialized_message_init( - this, serialized_message.buffer_length, - &serialized_message.allocator); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } + /// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t + explicit SerializedMessage(const rcl_serialized_message_t & serialized_message); - // do not call memcpy if the pointer is "static" - if (buffer != serialized_message.buffer) { - std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); - } - buffer_length = serialized_message.buffer_length; - } + /// Move Constructor for a SerializedMessage + explicit SerializedMessage(SerializedMessage && serialized_message); - explicit SerializedMessage(rcl_serialized_message_t && msg) - : rcl_serialized_message_t(msg) - { - // reset buffer to prevent double free - msg = rmw_get_zero_initialized_serialized_message(); - } + /// Move Constructor for a SerializedMessage from a rcl_serialized_message_t + explicit SerializedMessage(rcl_serialized_message_t && serialized_message); - ~SerializedMessage() - { - if (nullptr != buffer) { - const auto fini_ret = rmw_serialized_message_fini(this); - if (fini_ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Failed to destroy serialized message: %s", rcl_get_error_string().str); - } - } - } + /// Destructor for a SerializedMessage + ~SerializedMessage(); }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 9568027e9e..08658159c1 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -177,9 +177,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( - type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared(type_support_handle) ); TRACEPOINT( rclcpp_subscription_init, @@ -216,9 +214,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( - type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared(type_support_handle) ); TRACEPOINT( rclcpp_subscription_init, diff --git a/rclcpp/src/rclcpp/serialization.cpp b/rclcpp/src/rclcpp/serialization.cpp new file mode 100644 index 0000000000..4f47a2de5e --- /dev/null +++ b/rclcpp/src/rclcpp/serialization.cpp @@ -0,0 +1,68 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/serialization.hpp" + +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/serialized_message.hpp" + +#include "rcpputils/asserts.hpp" + +#include "rmw/rmw.h" + +namespace rclcpp +{ + +Serialization::Serialization(const rosidl_message_type_support_t & type_support) +: type_support_(type_support) +{} + +void Serialization::serialize_message( + const void * ros_message, SerializedMessage * serialized_message) const +{ + rcpputils::check_true(ros_message != nullptr, "ROS message is nullpointer."); + rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer."); + + const auto ret = rmw_serialize( + ros_message, + &type_support_, + serialized_message); + if (ret != RMW_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to serialize ROS message."); + } +} + +void +Serialization::deserialize_message( + const SerializedMessage * serialized_message, void * ros_message) const +{ + rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer."); + rcpputils::check_true( + serialized_message->buffer_capacity != 0 && + serialized_message->buffer_length != 0 && + !serialized_message->buffer, "Serialized message is wrongly initialized."); + { + throw std::runtime_error("Failed to deserialize nullptr serialized message."); + } + + const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message); + if (ret != RMW_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to deserialize ROS message."); + } +} + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp new file mode 100644 index 0000000000..d8c9c9a61f --- /dev/null +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -0,0 +1,88 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/serialized_message.hpp" + +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/logging.hpp" + +#include "rmw/types.h" + +namespace rclcpp +{ + +/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks +SerializedMessage::SerializedMessage(const rcl_allocator_t & allocator) +: SerializedMessage(0u, allocator) +{} + +SerializedMessage::SerializedMessage( + size_t initial_capacity, const rcl_allocator_t & allocator) +: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) +{ + const auto ret = rmw_serialized_message_init( + this, initial_capacity, &allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } +} + +SerializedMessage::SerializedMessage(const SerializedMessage & serialized_message) +: SerializedMessage(static_cast(serialized_message)) +{} + +SerializedMessage::SerializedMessage(const rcl_serialized_message_t & serialized_message) +: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) +{ + const auto ret = rmw_serialized_message_init( + this, serialized_message.buffer_capacity, &serialized_message.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != serialized_message.buffer) { + std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); + } + buffer_length = serialized_message.buffer_length; +} + +SerializedMessage::SerializedMessage(SerializedMessage && serialized_message) +: SerializedMessage( + std::forward( + static_cast(serialized_message))) +{} + +SerializedMessage::SerializedMessage(rcl_serialized_message_t && serialized_message) +: rcl_serialized_message_t(serialized_message) +{ + // reset buffer to prevent double free + serialized_message = rmw_get_zero_initialized_serialized_message(); +} + +SerializedMessage::~SerializedMessage() +{ + if (nullptr != buffer) { + const auto fini_ret = rmw_serialized_message_fini(this); + if (fini_ret != RCL_RET_OK) { + RCLCPP_ERROR( + get_logger("rclcpp"), + "Failed to destroy serialized message: %s", rcl_get_error_string().str); + } + } +} + +} // namespace rclcpp diff --git a/rclcpp/test/test_serialized_message.cpp b/rclcpp/test/test_serialized_message.cpp new file mode 100644 index 0000000000..273db67d59 --- /dev/null +++ b/rclcpp/test/test_serialized_message.cpp @@ -0,0 +1,88 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/serialized_message.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "test_msgs/msg/basic_types.hpp" + +TEST(TestSerializedMessage, empty_initialize) { + rclcpp::SerializedMessage serialized_message; + EXPECT_TRUE(serialized_message.buffer == nullptr); + EXPECT_EQ(0u, serialized_message.buffer_length); + EXPECT_EQ(0u, serialized_message.buffer_capacity); +} + +TEST(TestSerializedMessage, initialize_with_capacity) { + rclcpp::SerializedMessage serialized_message(13); + EXPECT_TRUE(serialized_message.buffer != nullptr); + EXPECT_EQ(0u, serialized_message.buffer_length); + EXPECT_EQ(13u, serialized_message.buffer_capacity); +} + +TEST(TestSerializedMessage, various_constructors) { + std::string content = "Hello World"; + auto content_size = content.size() + 1; // accounting for null terminator + + rclcpp::SerializedMessage serialized_message(content_size); + // manually copy some content + std::memcpy(serialized_message.buffer, content.c_str(), content.size()); + serialized_message.buffer[content.size()] = '\0'; + serialized_message.buffer_length = content_size; + EXPECT_STREQ(content.c_str(), reinterpret_cast(serialized_message.buffer)); + EXPECT_EQ(content_size, serialized_message.buffer_capacity); + + // Copy Constructor + rclcpp::SerializedMessage other_serialized_message(serialized_message); + EXPECT_EQ(content_size, other_serialized_message.buffer_capacity); + EXPECT_EQ(content_size, other_serialized_message.buffer_length); + EXPECT_STREQ( + reinterpret_cast(serialized_message.buffer), + reinterpret_cast(other_serialized_message.buffer)); + + // Move Constructor + rclcpp::SerializedMessage yet_another_serialized_message(std::move(other_serialized_message)); + EXPECT_TRUE(other_serialized_message.buffer == nullptr); + EXPECT_EQ(0u, other_serialized_message.buffer_capacity); + EXPECT_EQ(0u, other_serialized_message.buffer_length); + + auto default_allocator = rcl_get_default_allocator(); + auto rcl_serialized_msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&rcl_serialized_msg, 13, &default_allocator); + ASSERT_EQ(RCL_RET_OK, ret); + + // manually copy some content + std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size()); + rcl_serialized_msg.buffer[content.size()] = '\0'; + rcl_serialized_msg.buffer_length = content_size; + EXPECT_EQ(13u, rcl_serialized_msg.buffer_capacity); + + // Copy Constructor from rcl_serialized_message_t + rclcpp::SerializedMessage from_rcl_msg(rcl_serialized_msg); + EXPECT_EQ(13u, from_rcl_msg.buffer_capacity); + EXPECT_EQ(content_size, from_rcl_msg.buffer_length); + + // Verify that despite being fini'd, the copy is real + ret = rmw_serialized_message_fini(&rcl_serialized_msg); + ASSERT_EQ(RCL_RET_OK, ret); + EXPECT_EQ(nullptr, rcl_serialized_msg.buffer); + EXPECT_EQ(0u, rcl_serialized_msg.buffer_capacity); + EXPECT_EQ(0u, rcl_serialized_msg.buffer_length); + EXPECT_TRUE(nullptr != from_rcl_msg.buffer); + EXPECT_EQ(13u, from_rcl_msg.buffer_capacity); + EXPECT_EQ(content_size, from_rcl_msg.buffer_length); +} From 7f04a19cb80c55e5c8aa9962a2509c47df1c9790 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 17 Apr 2020 21:56:44 -0700 Subject: [PATCH 3/9] wip: make test run successfully Signed-off-by: Karsten Knese --- rclcpp/include/rclcpp/serialized_message.hpp | 18 +- rclcpp/src/rclcpp/serialized_message.cpp | 46 ++++ .../test/test_intra_process_communication.cpp | 259 +++++++++--------- 3 files changed, 187 insertions(+), 136 deletions(-) diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 3227a11919..3e7cff7f0b 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -43,21 +43,29 @@ class SerializedMessage : public rcl_serialized_message_t * \param[in] initial_capacity The amount of memory to be allocated. * \param[in] allocator The allocator to be used for the initialzation. */ - explicit SerializedMessage( + SerializedMessage( size_t initial_capacity, const rcl_allocator_t & allocator = rcl_get_default_allocator()); /// Copy Constructor for a SerializedMessage - explicit SerializedMessage(const SerializedMessage & serialized_message); + SerializedMessage(const SerializedMessage & serialized_message); /// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t - explicit SerializedMessage(const rcl_serialized_message_t & serialized_message); + SerializedMessage(const rcl_serialized_message_t & serialized_message); /// Move Constructor for a SerializedMessage - explicit SerializedMessage(SerializedMessage && serialized_message); + SerializedMessage(SerializedMessage && serialized_message); /// Move Constructor for a SerializedMessage from a rcl_serialized_message_t - explicit SerializedMessage(rcl_serialized_message_t && serialized_message); + SerializedMessage(rcl_serialized_message_t && serialized_message); + + SerializedMessage & operator=(const SerializedMessage & other); + + SerializedMessage & operator=(const rcl_serialized_message_t & other); + + SerializedMessage & operator=(SerializedMessage && other); + + SerializedMessage & operator=(rcl_serialized_message_t && other); /// Destructor for a SerializedMessage ~SerializedMessage(); diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index d8c9c9a61f..840581cc6e 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -73,6 +73,52 @@ SerializedMessage::SerializedMessage(rcl_serialized_message_t && serialized_mess serialized_message = rmw_get_zero_initialized_serialized_message(); } +SerializedMessage & SerializedMessage::operator=(const SerializedMessage & other) +{ + *this = static_cast(other); + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(const rcl_serialized_message_t & other) +{ + *this = static_cast(rmw_get_zero_initialized_serialized_message()); + + const auto ret = rmw_serialized_message_init( + this, other.buffer_capacity, &other.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != other.buffer) { + std::memcpy(buffer, other.buffer, other.buffer_length); + } + buffer_length = other.buffer_length; + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(SerializedMessage && other) +{ + *this = static_cast(other); + + return *this; +} + +SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other) +{ + this->buffer = other.buffer; + this->buffer_capacity = other.buffer_capacity; + this->buffer_length = other.buffer_length; + this->allocator = other.allocator; + + // reset original to prevent double free + other = rmw_get_zero_initialized_serialized_message(); + + return *this; +} + SerializedMessage::~SerializedMessage() { if (nullptr != buffer) { diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index cc702da0e8..96d9d497a2 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -14,8 +14,6 @@ #include -#include - #include #include #include @@ -26,8 +24,11 @@ #include "rclcpp/publisher.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" +#include "test_msgs/message_fixtures.hpp" + int32_t & get_test_allocation_counter() { static int32_t counter = 0; @@ -72,8 +73,8 @@ void custom_deallocate(void * pointer, void * state) m_allocator.deallocate(pointer, state); } -rcl_serialized_message_t make_serialized_string_msg( - const std::shared_ptr & stringMsg) +rclcpp::SerializedMessage make_serialized_string_msg( + const std::shared_ptr & string_msg) { auto m_allocator = rcutils_get_default_allocator(); @@ -83,21 +84,13 @@ rcl_serialized_message_t make_serialized_string_msg( m_allocator.reallocate = &custom_reallocate; m_allocator.zero_allocate = &custom_zero_allocate; - rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); - auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } + rclcpp::SerializedMessage msg(m_allocator); - static auto type = - rosidl_typesupport_cpp::get_message_type_support_handle - (); - auto error = rmw_serialize(stringMsg.get(), type, &msg); - if (error != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "test_intra_process_communication", - "Something went wrong preparing the serialized message"); - } + static auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + rclcpp::Serialization serializer(*type_support); + EXPECT_NO_THROW(serializer.serialize_message(string_msg.get(), &msg)); return msg; } @@ -126,11 +119,14 @@ class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam stringMsgU( - new test_msgs::msg::Strings( - *stringMsg)); + auto unique_string_msg = std::make_unique(*string_msg); - publisher->publish(std::make_unique(msg0)); - publisher->publish(*stringMsg); - publisher->publish(std::move(stringMsgU)); + { + auto whatever = std::make_unique(msg0); + } + //publisher->publish(std::make_unique(msg0)); + //publisher->publish(*string_msg); + //publisher->publish(std::move(unique_string_msg)); } for (uint32_t i = 0; i < 3; ++i) { rclcpp::spin_some(node); @@ -225,65 +222,65 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) EXPECT_EQ(get_test_allocation_counter(), 0); } -TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) -{ - get_test_allocation_counter() = 0; - - TestParameters parameters = GetParam(); - { - rclcpp::Node::SharedPtr node = std::make_shared( - "my_node", - "/ns", - parameters.node_options[0]); - auto publisher = rclcpp::create_publisher( - node, - "/topic", - *rosidl_typesupport_cpp::get_message_type_support_handle(), - rclcpp::QoS(10)); - - auto sub_gen_serialized = rclcpp::create_subscription( - node, - "/topic", - *rosidl_typesupport_cpp::get_message_type_support_handle(), - rclcpp::QoS(10), - &OnMessageSerialized); - - auto sub_shared = node->create_subscription( - "/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription( - "/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription( - "/topic", 10, - &OnMessageConst); - auto sub_serialized = node->create_subscription( - "/topic", 10, - &OnMessageSerialized); - - rclcpp::sleep_for(offset); - - counts.fill(0); - auto stringMsg = get_messages_strings()[3]; - - for (size_t i = 0; i < parameters.runs; i++) { - auto msg0 = make_serialized_string_msg(stringMsg); - - publisher->publish(std::make_unique(msg0)); - } - rclcpp::spin_some(node); - rclcpp::sleep_for(offset); - - rclcpp::spin_some(node); - } - - if (parameters.runs == 1) { - EXPECT_EQ(counts[0], 2u); - EXPECT_EQ(counts[1], 3u); - } - - EXPECT_EQ(get_test_allocation_counter(), 0); -} +//TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +//{ +// get_test_allocation_counter() = 0; +// +// TestParameters parameters = GetParam(); +// { +// rclcpp::Node::SharedPtr node = std::make_shared( +// "my_node", +// "/ns", +// parameters.node_options[0]); +// auto publisher = rclcpp::create_publisher( +// node, +// "/topic", +// *rosidl_typesupport_cpp::get_message_type_support_handle(), +// rclcpp::QoS(10)); +// +// auto sub_gen_serialized = rclcpp::create_subscription( +// node, +// "/topic", +// *rosidl_typesupport_cpp::get_message_type_support_handle(), +// rclcpp::QoS(10), +// &OnMessageSerialized); +// +// auto sub_shared = node->create_subscription( +// "/topic", 10, +// &OnMessage); +// auto sub_unique = node->create_subscription( +// "/topic", 10, +// &OnMessageUniquePtr); +// auto sub_const_shared = node->create_subscription( +// "/topic", 10, +// &OnMessageConst); +// auto sub_serialized = node->create_subscription( +// "/topic", 10, +// &OnMessageSerialized); +// +// rclcpp::sleep_for(offset); +// +// counts.fill(0); +// auto stringMsg = get_messages_strings()[3]; +// +// for (size_t i = 0; i < parameters.runs; i++) { +// auto msg0 = make_serialized_string_msg(stringMsg); +// +// publisher->publish(std::make_unique(msg0)); +// } +// rclcpp::spin_some(node); +// rclcpp::sleep_for(offset); +// +// rclcpp::spin_some(node); +// } +// +// if (parameters.runs == 1) { +// EXPECT_EQ(counts[0], 2u); +// EXPECT_EQ(counts[1], 3u); +// } +// +// EXPECT_EQ(get_test_allocation_counter(), 0); +//} auto get_new_context() { @@ -305,46 +302,46 @@ std::vector parameters = { {1u, 2u}, 1, "two_subscriptions_intraprocess_comm" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two subscriptions, one using intra-process comm and the other not using it. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().use_intra_process_comms(false) - }, - {1u, 1u}, - 1, - "two_subscriptions_one_intraprocess_one_not" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both using intra-process. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) - }, - {1u, 1u}, - 1, - "two_subscriptions_in_two_contexts_with_intraprocess_comm" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both of them not using intra-process comm. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(false), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) - }, - {0u, 0u}, - 1, - "two_subscriptions_in_two_contexts_without_intraprocess_comm" } + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two subscriptions, one using intra-process comm and the other not using it. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().use_intra_process_comms(false) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_one_intraprocess_one_not" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both using intra-process. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_in_two_contexts_with_intraprocess_comm" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both of them not using intra-process comm. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(false), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + // }, + // {0u, 0u}, + // 1, + // "two_subscriptions_in_two_contexts_without_intraprocess_comm" + //} }; std::vector setRuns(const std::vector & in, const size_t runs) @@ -361,7 +358,7 @@ INSTANTIATE_TEST_CASE_P( ::testing::ValuesIn(parameters), ::testing::PrintToStringParamName()); -INSTANTIATE_TEST_CASE_P( - TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, - ::testing::ValuesIn(setRuns(parameters, 1000)), - ::testing::PrintToStringParamName()); +//INSTANTIATE_TEST_CASE_P( +// TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, +// ::testing::ValuesIn(setRuns(parameters, 1000)), +// ::testing::PrintToStringParamName()); From 17f2d32250e0c200662a3a85760b0ba5f4b41e96 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 17 Apr 2020 23:04:55 -0700 Subject: [PATCH 4/9] remove instantiate_p Signed-off-by: Karsten Knese --- .../test/test_intra_process_communication.cpp | 198 +++++++++--------- 1 file changed, 98 insertions(+), 100 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 96d9d497a2..9db8cf98b4 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -114,7 +114,14 @@ std::ostream & operator<<(std::ostream & out, const TestParameters & params) return out; } -class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +class TestPublisherSubscriptionSerialized : public ::testing::Test { public: static void SetUpTestCase() @@ -122,17 +129,70 @@ class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam parameters_ = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + 1, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + 1, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + 1, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } + }; }; -std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( - 250); std::array counts; void OnMessageSerialized(const std::shared_ptr msg) @@ -164,61 +224,59 @@ void OnMessage(std::shared_ptr msg) ++counts[1]; } -TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) { get_test_allocation_counter() = 0; - TestParameters parameters = GetParam(); - { + for (const auto & parameter : parameters_) { rclcpp::Node::SharedPtr node = std::make_shared( - "my_node", - "/ns", - parameters.node_options[0]); + "my_node", + "/ns", + parameter.node_options[0]); auto publisher = node->create_publisher("/topic", 10); auto sub_shared = node->create_subscription( - "/topic", 10, - &OnMessage); + "/topic", 10, + &OnMessage); auto sub_unique = node->create_subscription( - "/topic", 10, - &OnMessageUniquePtr); + "/topic", 10, + &OnMessageUniquePtr); auto sub_const_shared = node->create_subscription( - "/topic", 10, - &OnMessageConst); + "/topic", 10, + &OnMessageConst); auto sub_serialized = node->create_subscription( - "/topic", 10, - &OnMessageSerialized); + "/topic", 10, + &OnMessageSerialized); - rclcpp::sleep_for(offset); + rclcpp::sleep_for(offset_); counts.fill(0); auto string_msg = get_messages_strings()[3]; - for (size_t i = 0; i < parameters.runs; i++) { + for (size_t i = 0; i < parameter.runs; i++) { auto msg0 = make_serialized_string_msg(string_msg); - auto unique_string_msg = std::make_unique(*string_msg); + std::unique_ptr unique_string_msg( + new test_msgs::msg::Strings( + *string_msg)); + //auto unique_string_msg = std::make_unique(*string_msg); - { - auto whatever = std::make_unique(msg0); - } //publisher->publish(std::make_unique(msg0)); - //publisher->publish(*string_msg); - //publisher->publish(std::move(unique_string_msg)); + publisher->publish(*string_msg); + publisher->publish(std::move(unique_string_msg)); } for (uint32_t i = 0; i < 3; ++i) { rclcpp::spin_some(node); - rclcpp::sleep_for(offset); + rclcpp::sleep_for(offset_); } rclcpp::spin_some(node); - } - if (parameters.runs == 1) { - EXPECT_EQ(counts[0], 3u); - EXPECT_EQ(counts[1], 9u); + if (parameter.runs == 1) { + EXPECT_EQ(counts[0], 3u); // count serialized message callbacks + EXPECT_EQ(counts[1], 9u); // count unique + shared message callbacks + } } - EXPECT_EQ(get_test_allocation_counter(), 0); } @@ -282,67 +340,7 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) // EXPECT_EQ(get_test_allocation_counter(), 0); //} -auto get_new_context() -{ - auto context = rclcpp::Context::make_shared(); - context->init(0, nullptr); - return context; -} -std::vector parameters = { - /* - Testing publisher subscription count api and internal process subscription count. - Two subscriptions in the same topic, both using intraprocess comm. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().use_intra_process_comms(true) - }, - {1u, 2u}, - 1, - "two_subscriptions_intraprocess_comm" - } - ///* - // Testing publisher subscription count api and internal process subscription count. - // Two subscriptions, one using intra-process comm and the other not using it. - // */ - //{ - // { - // rclcpp::NodeOptions().use_intra_process_comms(true), - // rclcpp::NodeOptions().use_intra_process_comms(false) - // }, - // {1u, 1u}, - // 1, - // "two_subscriptions_one_intraprocess_one_not" - //}, - ///* - // Testing publisher subscription count api and internal process subscription count. - // Two contexts, both using intra-process. - // */ - //{ - // { - // rclcpp::NodeOptions().use_intra_process_comms(true), - // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) - // }, - // {1u, 1u}, - // 1, - // "two_subscriptions_in_two_contexts_with_intraprocess_comm" - //}, - ///* - // Testing publisher subscription count api and internal process subscription count. - // Two contexts, both of them not using intra-process comm. - // */ - //{ - // { - // rclcpp::NodeOptions().use_intra_process_comms(false), - // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) - // }, - // {0u, 0u}, - // 1, - // "two_subscriptions_in_two_contexts_without_intraprocess_comm" - //} -}; std::vector setRuns(const std::vector & in, const size_t runs) { @@ -353,10 +351,10 @@ std::vector setRuns(const std::vector & in, cons return out; } -INSTANTIATE_TEST_CASE_P( - TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, - ::testing::ValuesIn(parameters), - ::testing::PrintToStringParamName()); +//INSTANTIATE_TEST_CASE_P( +// TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, +// ::testing::ValuesIn(parameters), +// ::testing::PrintToStringParamName()); //INSTANTIATE_TEST_CASE_P( // TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, From d89ef5098b3b3c10b0f59bb19ddf5be7aa7fadc0 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Sat, 18 Apr 2020 12:06:22 -0700 Subject: [PATCH 5/9] use is_serialized from pub/sub base Signed-off-by: Karsten Knese --- rclcpp/include/rclcpp/create_publisher.hpp | 3 +- .../experimental/intra_process_manager.hpp | 8 +--- .../subscription_intra_process.hpp | 8 ++-- .../subscription_intra_process_base.hpp | 12 +++++- rclcpp/include/rclcpp/publisher.hpp | 42 +++++++++---------- rclcpp/include/rclcpp/publisher_base.hpp | 12 ++++-- rclcpp/include/rclcpp/publisher_factory.hpp | 4 +- rclcpp/include/rclcpp/subscription.hpp | 6 +-- rclcpp/src/rclcpp/intra_process_manager.cpp | 12 ++---- rclcpp/src/rclcpp/publisher_base.cpp | 17 +++++--- rclcpp/src/rclcpp/serialized_message.cpp | 1 + .../subscription_intra_process_base.cpp | 6 +++ .../test/test_intra_process_communication.cpp | 11 ++--- rclcpp/test/test_intra_process_manager.cpp | 12 ++++++ rclcpp/test/test_publisher.cpp | 30 +++++++++++++ 15 files changed, 121 insertions(+), 63 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 89536d0e98..b677293ec3 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -85,8 +85,7 @@ create_publisher( const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); return create_publisher( - node, topic_name, type_support, - qos, options); + node, topic_name, type_support, qos, options); } } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 04508af46e..383e4ddb11 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -110,14 +110,11 @@ class IntraProcessManager * In addition this generates a unique intra process id for the subscription. * * \param subscription the SubscriptionIntraProcess to register. - * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the subscription's unique id. */ RCLCPP_PUBLIC uint64_t - add_subscription( - rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription, - const bool is_serialized = false); + add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); /// Unregister a subscription using the subscription's unique id. /** @@ -137,12 +134,11 @@ class IntraProcessManager * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. - * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false); + add_publisher(rclcpp::PublisherBase::SharedPtr publisher); /// Unregister a publisher using the publisher's unique id. /** diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 73998463dd..fed16f2474 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -73,13 +73,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase const std::string & topic_name, rmw_qos_profile_t qos_profile, rclcpp::IntraProcessBufferType buffer_type, - std::shared_ptr serializer) - : SubscriptionIntraProcessBase(topic_name, qos_profile), + std::shared_ptr serializer, + bool is_serialized = false) + : SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized), any_callback_(callback), serializer_(serializer) { if (!std::is_same::value && - !std::is_same::value && - !std::is_same::value) + !std::is_base_of::value) { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..6f23216ad9 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -39,8 +39,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) RCLCPP_PUBLIC - SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile) - : topic_name_(topic_name), qos_profile_(qos_profile) + SubscriptionIntraProcessBase( + const std::string & topic_name, + rmw_qos_profile_t qos_profile, + bool is_serialized) + : topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized) {} virtual ~SubscriptionIntraProcessBase() = default; @@ -70,6 +73,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC + bool + is_serialized() const; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; @@ -80,6 +87,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable std::string topic_name_; rmw_qos_profile_t qos_profile_; + bool is_serialized_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index fd53e727d1..eb72d1b102 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -69,12 +69,14 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + bool is_serialized = false) : PublisherBase( node_base, topic, *rosidl_typesupport_cpp::get_message_type_support_handle(), - options.template to_rcl_publisher_options(qos)), + options.template to_rcl_publisher_options(qos), + is_serialized), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())), message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) @@ -87,12 +89,14 @@ class Publisher : public PublisherBase const std::string & topic, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options, - const rosidl_message_type_support_t & type_support) + const rosidl_message_type_support_t & type_support, + bool is_serialized = false) : PublisherBase( node_base, topic, type_support, - options.template to_rcl_publisher_options(qos)), + options.template to_rcl_publisher_options(qos), + is_serialized), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())), message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) @@ -132,12 +136,7 @@ class Publisher : public PublisherBase "intraprocess communication allowed only with volatile durability"); } uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); - uint64_t intra_process_publisher_id_serialized = ipm->add_publisher( - this->shared_from_this(), true); - this->setup_intra_process( - intra_process_publisher_id, - intra_process_publisher_id_serialized, - ipm); + this->setup_intra_process(intra_process_publisher_id, ipm); } } @@ -229,6 +228,13 @@ class Publisher : public PublisherBase this->do_serialized_publish(*serialized_msg); } + /// Publish a serialized message. + template + void publish(std::unique_ptr serialized_msg) + { + this->do_serialized_publish(*serialized_msg); + } + /// Publish an instance of a LoanedMessage. /** * When publishing a loaned message, the memory for this ROS message will be deallocated @@ -309,7 +315,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value>::type do_publish_message(const T & msg) { // Avoid allocating when not using intra process. @@ -327,7 +333,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value>::type do_publish_message(const T & msg) { // Kept for backwards compatibility. Copies compelete memory! @@ -411,12 +417,8 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - const uint64_t intra_process_publisher_id = std::is_same::value ? - intra_process_publisher_id_serialized_ : intra_process_publisher_id_; - ipm->template do_intra_process_publish( - intra_process_publisher_id, + intra_process_publisher_id_, std::move(msg), message_allocator); } @@ -436,12 +438,8 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - const uint64_t intra_process_publisher_id = std::is_same::value ? - intra_process_publisher_id_serialized_ : intra_process_publisher_id_; - return ipm->template do_intra_process_publish_and_return_shared( - intra_process_publisher_id, + intra_process_publisher_id_, std::move(msg), message_allocator); } diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 4f678ec0bb..6c84ebab85 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -73,7 +73,8 @@ class PublisherBase : public std::enable_shared_from_this rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rosidl_message_type_support_t & type_support, - const rcl_publisher_options_t & publisher_options); + const rcl_publisher_options_t & publisher_options, + bool is_serializd = false); RCLCPP_PUBLIC virtual ~PublisherBase(); @@ -191,9 +192,12 @@ class PublisherBase : public std::enable_shared_from_this void setup_intra_process( uint64_t intra_process_publisher_id, - uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm); + RCLCPP_PUBLIC + bool + is_serialized() const; + protected: template void @@ -223,9 +227,11 @@ class PublisherBase : public std::enable_shared_from_this bool intra_process_is_enabled_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; - uint64_t intra_process_publisher_id_serialized_; rmw_gid_t rmw_gid_; + +private: + bool is_serialized_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index f287d8e49d..84b45aad6b 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -75,9 +75,9 @@ create_publisher_factory( const rclcpp::QoS & qos ) -> std::shared_ptr { + auto is_serialized = std::is_base_of::value; auto publisher = std::make_shared( - node_base, topic_name, qos, options, - type_support); + node_base, topic_name, qos, options, type_support, is_serialized); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 08658159c1..0859fe68b3 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -214,7 +214,8 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared(type_support_handle) + std::make_shared(type_support_handle), + is_serialized() ); TRACEPOINT( rclcpp_subscription_init, @@ -223,8 +224,7 @@ class Subscription : public SubscriptionBase // Add it to the intra process manager. intra_process_subscription_id_serialized = ipm->add_subscription( - subscription_intra_process, - true); + subscription_intra_process); } this->setup_intra_process( diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 3b0b4d1e19..305fe4e1b7 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,9 +32,7 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher( - rclcpp::PublisherBase::SharedPtr publisher, - const bool is_serialized) +IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) { std::unique_lock lock(mutex_); @@ -43,7 +41,7 @@ IntraProcessManager::add_publisher( publishers_[id].publisher = publisher; publishers_[id].topic_name = publisher->get_topic_name(); publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); - publishers_[id].is_serialized = is_serialized; + publishers_[id].is_serialized = publisher->is_serialized(); // Initialize the subscriptions storage for this publisher. pub_to_subs_[id] = SplittedSubscriptions(); @@ -59,9 +57,7 @@ IntraProcessManager::add_publisher( } uint64_t -IntraProcessManager::add_subscription( - SubscriptionIntraProcessBase::SharedPtr subscription, - const bool is_serialized) +IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) { std::unique_lock lock(mutex_); @@ -71,7 +67,7 @@ IntraProcessManager::add_subscription( subscriptions_[id].topic_name = subscription->get_topic_name(); subscriptions_[id].qos = subscription->get_actual_qos(); subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); - subscriptions_[id].is_serialized = is_serialized; + subscriptions_[id].is_serialized = subscription->is_serialized(); // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 0f2258881a..2a90fcfcef 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -42,10 +42,12 @@ PublisherBase::PublisherBase( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rosidl_message_type_support_t & type_support, - const rcl_publisher_options_t & publisher_options) + const rcl_publisher_options_t & publisher_options, + bool is_serialized) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0), - intra_process_publisher_id_serialized_(0) + intra_process_is_enabled_(false), + intra_process_publisher_id_(0), + is_serialized_(is_serialized) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -106,7 +108,6 @@ PublisherBase::~PublisherBase() return; } ipm->remove_publisher(intra_process_publisher_id_); - ipm->remove_publisher(intra_process_publisher_id_serialized_); } const char * @@ -240,15 +241,19 @@ PublisherBase::operator==(const rmw_gid_t * gid) const void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, - uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm) { intra_process_publisher_id_ = intra_process_publisher_id; - intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized; weak_ipm_ = ipm; intra_process_is_enabled_ = true; } +bool +PublisherBase::is_serialized() const +{ + return is_serialized_; +} + void PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const { diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index 840581cc6e..724b76a9a2 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -47,6 +47,7 @@ SerializedMessage::SerializedMessage(const SerializedMessage & serialized_messag SerializedMessage::SerializedMessage(const rcl_serialized_message_t & serialized_message) : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) { + fprintf(stderr, "copy constructor called\n"); const auto ret = rmw_serialized_message_init( this, serialized_message.buffer_capacity, &serialized_message.allocator); if (ret != RCL_RET_OK) { diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 3b951f34de..abd382a2ba 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -36,3 +36,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +bool +SubscriptionIntraProcessBase::is_serialized() const +{ + return is_serialized_; +} diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 9db8cf98b4..47e2e13b74 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -256,12 +256,13 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) for (size_t i = 0; i < parameter.runs; i++) { auto msg0 = make_serialized_string_msg(string_msg); - std::unique_ptr unique_string_msg( - new test_msgs::msg::Strings( - *string_msg)); - //auto unique_string_msg = std::make_unique(*string_msg); + auto unique_string_msg = std::make_unique(*string_msg); - //publisher->publish(std::make_unique(msg0)); + { + auto unique_serialized_msg = std::make_unique(msg0); + //auto unique_rcl_msg = std::make_unique(*unique_serialized_msg.release()); + //publisher->publish(std::move(unique_serialized_msg)); + } publisher->publish(*string_msg); publisher->publish(std::move(unique_string_msg)); } diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index b71fb1d061..7eb860a985 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -93,6 +93,12 @@ class PublisherBase return false; } + bool + is_serialized() const + { + return false; + } + rclcpp::QoS qos; std::string topic_name; uint64_t intra_process_publisher_id_; @@ -212,6 +218,12 @@ class SubscriptionIntraProcessBase return topic_name; } + bool + is_serialized() const + { + return false; + } + rmw_qos_profile_t qos_profile; const char * topic_name; }; diff --git a/rclcpp/test/test_publisher.cpp b/rclcpp/test/test_publisher.cpp index dce0b8caa8..fc3ca900ef 100644 --- a/rclcpp/test/test_publisher.cpp +++ b/rclcpp/test/test_publisher.cpp @@ -20,6 +20,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/serialized_message.hpp" #include "test_msgs/msg/empty.hpp" @@ -143,6 +144,35 @@ TEST_F(TestPublisher, various_creation_signatures) { } } +/* + Testing for serialized publishers. + */ +TEST_F(TestPublisher, test_is_serialized) { + initialize(); + + using test_msgs::msg::Empty; + { + auto publisher = node->create_publisher("topic", 42); + EXPECT_FALSE(publisher->is_serialized()); + } + { + auto publisher = rclcpp::create_publisher(node, "topic", 42, rclcpp::PublisherOptions()); + EXPECT_FALSE(publisher->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto publisher = rclcpp::create_publisher( + node, "topic", ts, 42, rclcpp::PublisherOptions()); + EXPECT_TRUE(publisher->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto publisher = rclcpp::create_publisher( + node, "topic", ts, 42, rclcpp::PublisherOptions()); + EXPECT_TRUE(publisher->is_serialized()); + } +} + /* Testing publisher with intraprocess enabled and invalid QoS */ From dd6564518d498e55813962d3ea5607ac1746fdd1 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Sat, 18 Apr 2020 12:52:47 -0700 Subject: [PATCH 6/9] test for rclcpp::serializedmessage Signed-off-by: Karsten Knese --- rclcpp/include/rclcpp/subscription_traits.hpp | 10 ++++++ rclcpp/test/test_publisher.cpp | 1 - rclcpp/test/test_subscription.cpp | 34 +++++++++++++++++++ rclcpp/test/test_subscription_traits.cpp | 20 +++++++++++ 4 files changed, 64 insertions(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/subscription_traits.hpp b/rclcpp/include/rclcpp/subscription_traits.hpp index c6d35a8423..2d8772e3a3 100644 --- a/rclcpp/include/rclcpp/subscription_traits.hpp +++ b/rclcpp/include/rclcpp/subscription_traits.hpp @@ -18,6 +18,7 @@ #include #include "rclcpp/function_traits.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/subscription_options.hpp" #include "rcl/types.h" @@ -50,6 +51,15 @@ struct is_serialized_subscription_argument +struct is_serialized_subscription_argument: std::true_type +{}; + +template<> +struct is_serialized_subscription_argument> + : std::true_type +{}; + template struct is_serialized_subscription : is_serialized_subscription_argument {}; diff --git a/rclcpp/test/test_publisher.cpp b/rclcpp/test/test_publisher.cpp index fc3ca900ef..7af7868a04 100644 --- a/rclcpp/test/test_publisher.cpp +++ b/rclcpp/test/test_publisher.cpp @@ -149,7 +149,6 @@ TEST_F(TestPublisher, various_creation_signatures) { */ TEST_F(TestPublisher, test_is_serialized) { initialize(); - using test_msgs::msg::Empty; { auto publisher = node->create_publisher("topic", 42); diff --git a/rclcpp/test/test_subscription.cpp b/rclcpp/test/test_subscription.cpp index 95f9b96448..6f21d42f40 100644 --- a/rclcpp/test/test_subscription.cpp +++ b/rclcpp/test/test_subscription.cpp @@ -231,6 +231,40 @@ TEST_F(TestSubscription, various_creation_signatures) { } } +/* + Testing for serialized subscriptions + */ +TEST_F(TestSubscription, test_is_serialized) { + initialize(); + using test_msgs::msg::Empty; + auto cb = [](test_msgs::msg::Empty::SharedPtr) {}; + auto cb_rcl_serialized = [](std::shared_ptr) {}; + auto cb_rclcpp_serialized = [](std::shared_ptr) {}; + { + auto sub = node->create_subscription("topic", 1, cb); + EXPECT_FALSE(sub->is_serialized()); + } + { + auto sub = rclcpp::create_subscription(node, "topic", rclcpp::QoS(1), cb); + EXPECT_FALSE(sub->is_serialized()); + } + { + auto sub = rclcpp::create_subscription(node, "topic", rclcpp::QoS(1), cb_rcl_serialized); + EXPECT_TRUE(sub->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto sub = rclcpp::create_subscription( + node, "topic", ts, rclcpp::QoS(1), cb_rcl_serialized); + EXPECT_TRUE(sub->is_serialized()); + } + { + auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle(); + auto sub = rclcpp::create_subscription( + node, "topic", ts, rclcpp::QoS(1), cb_rclcpp_serialized); + EXPECT_TRUE(sub->is_serialized()); + } +} /* Testing subscriptions using std::bind. */ diff --git a/rclcpp/test/test_subscription_traits.cpp b/rclcpp/test/test_subscription_traits.cpp index 3112cafffd..926b157a72 100644 --- a/rclcpp/test/test_subscription_traits.cpp +++ b/rclcpp/test/test_subscription_traits.cpp @@ -52,6 +52,16 @@ void not_serialized_unique_ptr_callback( (void) unused; } +void rclcpp_serialized_callback_copy(rclcpp::SerializedMessage unused) +{ + (void) unused; +} + +void rclcpp_serialized_callback_shared_ptr(std::shared_ptr unused) +{ + (void) unused; +} + TEST(TestSubscriptionTraits, is_serialized_callback) { // Test regular functions auto cb1 = &serialized_callback_copy; @@ -97,6 +107,16 @@ TEST(TestSubscriptionTraits, is_serialized_callback) { static_assert( rclcpp::subscription_traits::is_serialized_callback::value == false, "passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback"); + + auto cb8 = &rclcpp_serialized_callback_copy; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "rclcpp::SerializedMessage in a first argument callback makes it a serialized callback"); + + auto cb9 = &rclcpp_serialized_callback_shared_ptr; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "std::shared_ptr in a callback makes it a serialized callback"); } TEST(TestSubscriptionTraits, callback_messages) { From e0fb0a4f5c2f0935813f0e7b663a031ffecc6a9b Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Sat, 18 Apr 2020 15:09:07 -0700 Subject: [PATCH 7/9] more tests Signed-off-by: Karsten Knese --- rclcpp/src/rclcpp/serialization.cpp | 8 ++-- rclcpp/test/test_serialized_message.cpp | 49 +++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/rclcpp/src/rclcpp/serialization.cpp b/rclcpp/src/rclcpp/serialization.cpp index 4f47a2de5e..b7245bf0ba 100644 --- a/rclcpp/src/rclcpp/serialization.cpp +++ b/rclcpp/src/rclcpp/serialization.cpp @@ -50,14 +50,12 @@ void Serialization::deserialize_message( const SerializedMessage * serialized_message, void * ros_message) const { - rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer."); + rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer."); rcpputils::check_true( serialized_message->buffer_capacity != 0 && serialized_message->buffer_length != 0 && - !serialized_message->buffer, "Serialized message is wrongly initialized."); - { - throw std::runtime_error("Failed to deserialize nullptr serialized message."); - } + serialized_message->buffer != nullptr, "Serialized message is wrongly initialized."); + rcpputils::check_true(ros_message != nullptr, "ROS message is a nullpointer."); const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message); if (ret != RMW_RET_OK) { diff --git a/rclcpp/test/test_serialized_message.cpp b/rclcpp/test/test_serialized_message.cpp index 273db67d59..f42d8b4fd6 100644 --- a/rclcpp/test/test_serialized_message.cpp +++ b/rclcpp/test/test_serialized_message.cpp @@ -15,9 +15,13 @@ #include #include +#include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/rclcpp.hpp" +#include "rcpputils/asserts.hpp" + +#include "test_msgs/message_fixtures.hpp" #include "test_msgs/msg/basic_types.hpp" TEST(TestSerializedMessage, empty_initialize) { @@ -86,3 +90,48 @@ TEST(TestSerializedMessage, various_constructors) { EXPECT_EQ(13u, from_rcl_msg.buffer_capacity); EXPECT_EQ(content_size, from_rcl_msg.buffer_length); } + +TEST(TestSerializedMessage, serialization) { + auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + rclcpp::Serialization serializer(*type_support); + + auto basic_type_ros_msgs = get_messages_basic_types(); + for (const auto & ros_msg : basic_type_ros_msgs) { + // convert ros msg to serialized msg + rclcpp::SerializedMessage serialized_msg; + serializer.serialize_message(ros_msg.get(), &serialized_msg); + + // convert serialized msg back to ros msg + test_msgs::msg::BasicTypes deserialized_ros_msg; + serializer.deserialize_message(&serialized_msg, &deserialized_ros_msg); + + EXPECT_EQ(*ros_msg, deserialized_ros_msg); + } +} + +TEST(TestSerializedMessage, serialization_into_nullptr) { + auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + rclcpp::Serialization serializer(*type_support); + + auto basic_type_ros_msgs = get_messages_basic_types(); + for (const auto & ros_msg : basic_type_ros_msgs) { + rclcpp::SerializedMessage serialized_msg; + test_msgs::msg::BasicTypes deserialized_ros_msg; + + EXPECT_THROW( + serializer.serialize_message(ros_msg.get(), nullptr), + rcpputils::IllegalStateException); + EXPECT_THROW( + serializer.serialize_message(nullptr, &serialized_msg), + rcpputils::IllegalStateException); + + EXPECT_THROW( + serializer.deserialize_message(&serialized_msg, nullptr), + rcpputils::IllegalStateException); + EXPECT_THROW( + serializer.deserialize_message(nullptr, &deserialized_ros_msg), + rcpputils::IllegalStateException); + } +} From 4dfcc5f7a76d5dc2b277ec082a39c25553d5d233 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Sat, 18 Apr 2020 20:33:12 -0700 Subject: [PATCH 8/9] wip make serilize to serialize communication work Signed-off-by: Karsten Knese --- .../subscription_intra_process.hpp | 25 ++-- rclcpp/include/rclcpp/publisher.hpp | 12 +- rclcpp/include/rclcpp/subscription.hpp | 1 - rclcpp/src/rclcpp/serialized_message.cpp | 5 +- .../test/test_intra_process_communication.cpp | 128 ++++++++++-------- 5 files changed, 94 insertions(+), 77 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index fed16f2474..8988eedea5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -157,11 +157,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // convert from ROS2 message to rcl_serialized_message_t (serilizatino needed) template typename std::enable_if< - std::is_same::value && - !std::is_same::value, - void>::type + !std::is_base_of::value && // tx not a SerializedMessage + std::is_base_of::value, // rx a SerializedMessage + void>::type execute_impl() { + fprintf(stderr, "tx ROS2, rx serialized\n"); if (nullptr == serializer_) { throw std::runtime_error("Subscription intra-process can't handle serialized messages"); } @@ -189,11 +190,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // forward from ROS2 message to ROS2 message (same type) template typename std::enable_if< - !std::is_same::value && - !std::is_same::value, + !std::is_base_of::value && // tx not a SerializedMessage + !std::is_base_of::value, // rx not a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx ROS2, rx ROS2\n"); rmw_message_info_t msg_info = {}; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -207,14 +209,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } - // forward from rcl_serialized_message_t to SerializationMessage (no conversion needed) + // forward from rcl_serialized_message_t to rcl_serialized_message_t (no conversion needed) template typename std::enable_if< - std::is_same::value && - std::is_same::value, + std::is_base_of::value && // tx a SerializedMessage + std::is_base_of::value, // rx a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx serialize, rx serialize\n"); rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; @@ -234,11 +237,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase // convert from rcl_serialized_message_t to ROS2 message (deserialization needed) template typename std::enable_if< - !std::is_same::value && - std::is_same::value, + std::is_base_of::value && // tx a SerializedMessage + !std::is_base_of::value, // rx not a SerializedMessage void>::type execute_impl() { + fprintf(stderr, "tx rcl, rx ROS2\n"); if (nullptr == serializer_) { throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); } @@ -258,6 +262,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(msg, msg_info); } else { + // construct unique might not initialize the ros message? CallbackMessageUniquePtr msg = construct_unique(); serializer_->deserialize_message( serialized_message.get(), diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index eb72d1b102..1bc099cad7 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -172,10 +172,10 @@ class Publisher : public PublisherBase virtual void publish(std::unique_ptr msg) { - if (std::is_same::value) { - this->template publish(std::move(msg)); - return; - } + //if (std::is_same::value) { + // this->template publish(std::move(msg)); + // return; + //} if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); @@ -206,7 +206,7 @@ class Publisher : public PublisherBase } template - typename std::enable_if::value>::type + typename std::enable_if::value, void>::type publish(const rcl_serialized_message_t & serialized_msg) { this->do_publish_message(serialized_msg); @@ -361,7 +361,7 @@ class Publisher : public PublisherBase } void - do_serialized_publish(rcl_serialized_message_t serialized_msg) + do_serialized_publish(SerializedMessage serialized_msg) { bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 0859fe68b3..1e6a46604d 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -25,7 +25,6 @@ #include #include - #include "rcl/error_handling.h" #include "rcl/subscription.h" diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index 724b76a9a2..17dd30bc4f 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -109,10 +109,7 @@ SerializedMessage & SerializedMessage::operator=(SerializedMessage && other) SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other) { - this->buffer = other.buffer; - this->buffer_capacity = other.buffer_capacity; - this->buffer_length = other.buffer_length; - this->allocator = other.allocator; + *this = other; // reset original to prevent double free other = rmw_get_zero_initialized_serialized_message(); diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 47e2e13b74..e888c879d5 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -37,6 +37,7 @@ int32_t & get_test_allocation_counter() void * custom_allocate(size_t size, void * state) { + fprintf(stderr, "calling custom allocate\n"); static auto m_allocator = rcutils_get_default_allocator(); ++get_test_allocation_counter(); @@ -46,6 +47,7 @@ void * custom_allocate(size_t size, void * state) void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) { + fprintf(stderr, "calling custom zero allocate\n"); static auto m_allocator = rcutils_get_default_allocator(); ++get_test_allocation_counter(); @@ -55,6 +57,7 @@ void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, v void * custom_reallocate(void * pointer, size_t size, void * state) { + fprintf(stderr, "calling custom reallocate\n"); static auto m_allocator = rcutils_get_default_allocator(); if (pointer == nullptr) { @@ -67,6 +70,7 @@ void * custom_reallocate(void * pointer, size_t size, void * state) void custom_deallocate(void * pointer, void * state) { + fprintf(stderr, "calling custom deallocate\n"); static auto m_allocator = rcutils_get_default_allocator(); --get_test_allocation_counter(); @@ -144,52 +148,52 @@ class TestPublisherSubscriptionSerialized : public ::testing::Test */ { { - rclcpp::NodeOptions().use_intra_process_comms(false), - rclcpp::NodeOptions().use_intra_process_comms(true) + rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false), + rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false) }, {1u, 2u}, 1, "two_subscriptions_intraprocess_comm" }, - /* - Testing publisher subscription count api and internal process subscription count. - Two subscriptions, one using intra-process comm and the other not using it. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().use_intra_process_comms(false) - }, - {1u, 1u}, - 1, - "two_subscriptions_one_intraprocess_one_not" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both using intra-process. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(true), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) - }, - {1u, 1u}, - 1, - "two_subscriptions_in_two_contexts_with_intraprocess_comm" - }, - /* - Testing publisher subscription count api and internal process subscription count. - Two contexts, both of them not using intra-process comm. - */ - { - { - rclcpp::NodeOptions().use_intra_process_comms(false), - rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) - }, - {0u, 0u}, - 1, - "two_subscriptions_in_two_contexts_without_intraprocess_comm" - } + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two subscriptions, one using intra-process comm and the other not using it. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().use_intra_process_comms(false) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_one_intraprocess_one_not" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both using intra-process. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(true), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + // }, + // {1u, 1u}, + // 1, + // "two_subscriptions_in_two_contexts_with_intraprocess_comm" + //}, + ///* + // Testing publisher subscription count api and internal process subscription count. + // Two contexts, both of them not using intra-process comm. + // */ + //{ + // { + // rclcpp::NodeOptions().use_intra_process_comms(false), + // rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + // }, + // {0u, 0u}, + // 1, + // "two_subscriptions_in_two_contexts_without_intraprocess_comm" + //} }; }; @@ -200,12 +204,23 @@ void OnMessageSerialized(const std::shared_ptr m EXPECT_NE(msg->buffer, nullptr); EXPECT_GT(msg->buffer_capacity, 0u); + static auto type_support = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + test_msgs::msg::Strings ros_msg; + rclcpp::Serialization serializer(*type_support); + EXPECT_NO_THROW( + serializer.deserialize_message( + static_cast(msg.get()), &ros_msg)); + EXPECT_EQ(ros_msg.string_value.front(), '0'); + EXPECT_EQ(ros_msg.string_value[6], '6'); + EXPECT_EQ(ros_msg.string_value.back(), '9'); ++counts[0]; } void OnMessageConst(std::shared_ptr msg) { - EXPECT_EQ(msg->string_value.back(), '9'); + EXPECT_EQ(msg->string_value[6], '6'); ++counts[1]; } @@ -219,7 +234,9 @@ void OnMessageUniquePtr(std::unique_ptr msg) void OnMessage(std::shared_ptr msg) { - EXPECT_EQ(msg->string_value.back(), '9'); + EXPECT_EQ(msg->string_value.front(), '0'); + EXPECT_EQ(msg->string_value[6], '6'); + EXPECT_EQ(msg->string_value.back(), '\0'); ++counts[1]; } @@ -235,15 +252,15 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) parameter.node_options[0]); auto publisher = node->create_publisher("/topic", 10); - auto sub_shared = node->create_subscription( - "/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription( - "/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription( - "/topic", 10, - &OnMessageConst); + //auto sub_shared = node->create_subscription( + // "/topic", 10, + // &OnMessage); + //auto sub_unique = node->create_subscription( + // "/topic", 10, + // &OnMessageUniquePtr); + //auto sub_const_shared = node->create_subscription( + // "/topic", 10, + // &OnMessageConst); auto sub_serialized = node->create_subscription( "/topic", 10, &OnMessageSerialized); @@ -251,7 +268,7 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) rclcpp::sleep_for(offset_); counts.fill(0); - auto string_msg = get_messages_strings()[3]; + std::shared_ptr string_msg = get_messages_strings()[3]; for (size_t i = 0; i < parameter.runs; i++) { auto msg0 = make_serialized_string_msg(string_msg); @@ -259,11 +276,10 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) auto unique_string_msg = std::make_unique(*string_msg); { - auto unique_serialized_msg = std::make_unique(msg0); - //auto unique_rcl_msg = std::make_unique(*unique_serialized_msg.release()); + auto unique_serialized_msg = std::make_unique(std::move(msg0)); //publisher->publish(std::move(unique_serialized_msg)); } - publisher->publish(*string_msg); + //publisher->publish(*string_msg); publisher->publish(std::move(unique_string_msg)); } for (uint32_t i = 0; i < 3; ++i) { From 82fa5bb5eda5d1dc9302a61c01c41509d3a70b5b Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Sat, 18 Apr 2020 21:03:14 -0700 Subject: [PATCH 9/9] wip with multiple intra process ids Signed-off-by: Karsten Knese --- .../experimental/intra_process_manager.hpp | 2 +- rclcpp/include/rclcpp/publisher.hpp | 23 ++++++++++--------- rclcpp/include/rclcpp/publisher_base.hpp | 2 ++ rclcpp/src/rclcpp/intra_process_manager.cpp | 5 ++-- rclcpp/src/rclcpp/publisher_base.cpp | 2 ++ rclcpp/src/rclcpp/serialized_message.cpp | 5 +++- .../test/test_intra_process_communication.cpp | 19 +++++++++++---- 7 files changed, 38 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 383e4ddb11..8fc9a67ff5 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -138,7 +138,7 @@ class IntraProcessManager */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher(rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized = false); /// Unregister a publisher using the publisher's unique id. /** diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 1bc099cad7..122e9e1c08 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -135,8 +135,9 @@ class Publisher : public PublisherBase throw std::invalid_argument( "intraprocess communication allowed only with volatile durability"); } - uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); - this->setup_intra_process(intra_process_publisher_id, ipm); + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), false); + uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(this->shared_from_this(), true); + this->setup_intra_process(intra_process_publisher_id, intra_process_publisher_id_serialized, ipm); } } @@ -222,11 +223,11 @@ class Publisher : public PublisherBase } /// Publish a serialized message. - template - void publish(std::unique_ptr serialized_msg) - { - this->do_serialized_publish(*serialized_msg); - } + //template + //void publish(std::unique_ptr serialized_msg) + //{ + // this->do_serialized_publish(*serialized_msg); + //} /// Publish a serialized message. template @@ -361,7 +362,7 @@ class Publisher : public PublisherBase } void - do_serialized_publish(SerializedMessage serialized_msg) + do_serialized_publish(const SerializedMessage & serialized_msg) { bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); @@ -374,10 +375,10 @@ class Publisher : public PublisherBase } } - auto msg = std::make_unique( - std::move(serialized_msg)); - if (intra_process_is_enabled_) { + auto msg = std::make_unique(); + *msg = serialized_msg; + do_intra_process_publish(std::move(msg), message_allocator_serialized_); } } diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 6c84ebab85..a061dc60d3 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -192,6 +192,7 @@ class PublisherBase : public std::enable_shared_from_this void setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm); RCLCPP_PUBLIC @@ -227,6 +228,7 @@ class PublisherBase : public std::enable_shared_from_this bool intra_process_is_enabled_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; + uint64_t intra_process_publisher_id_serialized_; rmw_gid_t rmw_gid_; diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 305fe4e1b7..0493a13a71 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,7 +32,8 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized) { std::unique_lock lock(mutex_); @@ -41,7 +42,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) publishers_[id].publisher = publisher; publishers_[id].topic_name = publisher->get_topic_name(); publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); - publishers_[id].is_serialized = publisher->is_serialized(); + publishers_[id].is_serialized = is_serialized; // Initialize the subscriptions storage for this publisher. pub_to_subs_[id] = SplittedSubscriptions(); diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 2a90fcfcef..adb94d02e2 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -241,9 +241,11 @@ PublisherBase::operator==(const rmw_gid_t * gid) const void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm) { intra_process_publisher_id_ = intra_process_publisher_id; + intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized; weak_ipm_ = ipm; intra_process_is_enabled_ = true; } diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index 17dd30bc4f..99b20d4d5c 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -109,7 +109,10 @@ SerializedMessage & SerializedMessage::operator=(SerializedMessage && other) SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other) { - *this = other; + this->buffer = other.buffer; + this->buffer_capacity = other.buffer_length; + this->buffer_length = other.buffer_length; + this->allocator = other.allocator; // reset original to prevent double free other = rmw_get_zero_initialized_serialized_message(); diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index e888c879d5..65ab3f010f 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -146,6 +146,15 @@ class TestPublisherSubscriptionSerialized : public ::testing::Test Testing publisher subscription count api and internal process subscription count. Two subscriptions in the same topic, both using intraprocess comm. */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false), + rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, { { rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false), @@ -252,9 +261,9 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) parameter.node_options[0]); auto publisher = node->create_publisher("/topic", 10); - //auto sub_shared = node->create_subscription( - // "/topic", 10, - // &OnMessage); + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); //auto sub_unique = node->create_subscription( // "/topic", 10, // &OnMessageUniquePtr); @@ -277,10 +286,10 @@ TEST_F(TestPublisherSubscriptionSerialized, publish_serialized) { auto unique_serialized_msg = std::make_unique(std::move(msg0)); - //publisher->publish(std::move(unique_serialized_msg)); + publisher->publish(std::move(unique_serialized_msg)); } //publisher->publish(*string_msg); - publisher->publish(std::move(unique_string_msg)); + //publisher->publish(std::move(unique_string_msg)); } for (uint32_t i = 0; i < 3; ++i) { rclcpp::spin_some(node);