diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 017f9a20c0..149d49fa20 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -248,6 +248,16 @@ if(BUILD_TESTING) ) target_link_libraries(test_publisher ${PROJECT_NAME}) endif() + ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp) + if(TARGET test_publisher_subscription_count_api) + target_include_directories(test_publisher_subscription_count_api PUBLIC + ${rcl_interfaces_INCLUDE_DIRS} + ${rmw_INCLUDE_DIRS} + ${rosidl_generator_cpp_INCLUDE_DIRS} + ${rosidl_typesupport_cpp_INCLUDE_DIRS} + ) + target_link_libraries(test_publisher_subscription_count_api ${PROJECT_NAME}) + endif() ament_add_gtest(test_rate test/test_rate.cpp) if(TARGET test_rate) target_include_directories(test_rate PUBLIC @@ -290,6 +300,16 @@ if(BUILD_TESTING) ) target_link_libraries(test_subscription ${PROJECT_NAME}) endif() + ament_add_gtest(test_subscription_publisher_count_api test/test_subscription_publisher_count_api.cpp) + if(TARGET test_subscription_publisher_count_api) + target_include_directories(test_subscription_publisher_count_api PUBLIC + ${rcl_interfaces_INCLUDE_DIRS} + ${rmw_INCLUDE_DIRS} + ${rosidl_generator_cpp_INCLUDE_DIRS} + ${rosidl_typesupport_cpp_INCLUDE_DIRS} + ) + target_link_libraries(test_subscription_publisher_count_api ${PROJECT_NAME}) + endif() find_package(test_msgs REQUIRED) ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp) if(TARGET test_subscription_traits) diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 356512c647..6c044a4da6 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -346,6 +346,11 @@ class IntraProcessManager bool matches_any_publishers(const rmw_gid_t * id) const; + /// Return the number of intraprocess subscriptions to a topic, given the publisher id. + RCLCPP_PUBLIC + size_t + get_subscription_count(uint64_t intra_process_publisher_id) const; + private: RCLCPP_PUBLIC static uint64_t diff --git a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp index ff8554e399..8b44b8f587 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp @@ -80,6 +80,9 @@ class IntraProcessManagerImplBase virtual bool matches_any_publishers(const rmw_gid_t * id) const = 0; + virtual size_t + get_subscription_count(uint64_t intra_process_publisher_id) const = 0; + private: RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase) }; @@ -248,6 +251,26 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase return false; } + size_t + get_subscription_count(uint64_t intra_process_publisher_id) const + { + auto publisher_it = publishers_.find(intra_process_publisher_id); + if (publisher_it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return 0; + } + auto publisher = publisher_it->second.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + auto sub_map_it = subscription_ids_by_topic_.find(publisher->get_topic_name()); + if (sub_map_it == subscription_ids_by_topic_.end()) { + // No intraprocess subscribers + return 0; + } + return sub_map_it->second.size(); + } + private: RCLCPP_DISABLE_COPY(IntraProcessManagerImpl) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 9fb6a872e0..4500fedab5 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -45,6 +45,15 @@ namespace node_interfaces class NodeTopicsInterface; } +namespace intra_process_manager +{ +/** + * NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp. + * SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class. + */ +class IntraProcessManager; +} + class PublisherBase { friend ::rclcpp::node_interfaces::NodeTopicsInterface; @@ -107,6 +116,18 @@ class PublisherBase const rcl_publisher_t * get_publisher_handle() const; + /// Get subscription count + /** \return The number of subscriptions. */ + RCLCPP_PUBLIC + size_t + get_subscription_count() const; + + /// Get intraprocess subscription count + /** \return The number of intraprocess subscriptions. */ + RCLCPP_PUBLIC + size_t + get_intra_process_subscription_count() const; + /// Compare this publisher to a gid. /** * Note that this function calls the next function. @@ -128,13 +149,16 @@ class PublisherBase operator==(const rmw_gid_t * gid) const; using StoreMessageCallbackT = std::function; + using IntraProcessManagerSharedPtr = + std::shared_ptr; /// Implementation utility function used to setup intra process publishing after creation. RCLCPP_PUBLIC void setup_intra_process( uint64_t intra_process_publisher_id, - StoreMessageCallbackT callback, + StoreMessageCallbackT store_callback, + IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options); protected: @@ -143,6 +167,10 @@ class PublisherBase rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); + using IntraProcessManagerWeakPtr = + std::weak_ptr; + bool use_intra_process_; + IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; StoreMessageCallbackT store_intra_process_message_; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 42abe7183e..2a2a3e4eef 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -46,6 +46,15 @@ namespace node_interfaces class NodeTopicsInterface; } // namespace node_interfaces +namespace intra_process_manager +{ +/** + * NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp. + * SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class. + */ +class IntraProcessManager; +} + /// Virtual base class for subscriptions. This pattern allows us to iterate over different template /// specializations of Subscription, among other things. class SubscriptionBase @@ -129,11 +138,23 @@ class SubscriptionBase bool is_serialized() const; + /// Get matching publisher count + /** \return The number of publishers on this topic. */ + RCLCPP_PUBLIC + size_t + get_publisher_count() const; + protected: std::shared_ptr intra_process_subscription_handle_; std::shared_ptr subscription_handle_; std::shared_ptr node_handle_; + using IntraProcessManagerWeakPtr = + std::weak_ptr; + bool use_intra_process_; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_subscription_id_; + private: RCLCPP_DISABLE_COPY(SubscriptionBase) @@ -272,10 +293,13 @@ class Subscription : public SubscriptionBase using MatchesAnyPublishersCallbackType = std::function; /// Implemenation detail. + // TODO(ivanpauno): This can be moved to the base class. No reason to be here. + // Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_. void setup_intra_process( uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, + IntraProcessManagerWeakPtr weak_ipm, const rcl_subscription_options_t & intra_process_options) { std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra"; @@ -302,6 +326,8 @@ class Subscription : public SubscriptionBase intra_process_subscription_id_ = intra_process_subscription_id; get_intra_process_message_callback_ = get_message_callback; matches_any_intra_process_publishers_ = matches_any_publisher_callback; + weak_ipm_ = weak_ipm; + use_intra_process_ = true; } /// Implemenation detail. @@ -323,7 +349,6 @@ class Subscription : public SubscriptionBase GetMessageCallbackType get_intra_process_message_callback_; MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; - uint64_t intra_process_subscription_id_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index b7d5224a0a..e2bdec7ef2 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -166,6 +166,7 @@ create_subscription_factory( intra_process_subscription_id, take_intra_process_message_func, matches_any_publisher_func, + weak_ipm, intra_process_options ); }; diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 723c43fd8d..19ac424b9d 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -56,6 +56,12 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const return impl_->matches_any_publishers(id); } +size_t +IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const +{ + return impl_->get_subscription_count(intra_process_publisher_id); +} + uint64_t IntraProcessManager::get_next_unique_id() { diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index b390addb68..b380e3c2ec 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -54,6 +54,7 @@ NodeTopics::create_publisher( publisher->setup_intra_process( intra_process_publisher_id, shared_publish_callback, + ipm, publisher_options); } diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp index 45097c5f53..ce6ebadaee 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -30,6 +30,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/exceptions.hpp" +#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" @@ -42,7 +43,8 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_publisher_id_(0), store_intra_process_message_(nullptr) + use_intra_process_(false), intra_process_publisher_id_(0), + store_intra_process_message_(nullptr) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -94,6 +96,20 @@ PublisherBase::~PublisherBase() rcl_get_error_string().str); rcl_reset_error(); } + + auto ipm = weak_ipm_.lock(); + + if (!use_intra_process_) { + return; + } + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a publisher."); + return; + } + ipm->remove_publisher(intra_process_publisher_id_); } const char * @@ -138,6 +154,49 @@ PublisherBase::get_publisher_handle() const return &publisher_handle_; } +size_t +PublisherBase::get_subscription_count() const +{ + size_t inter_process_subscription_count = 0; + + rmw_ret_t status = rcl_publisher_get_subscription_count( + &publisher_handle_, + &inter_process_subscription_count); + + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); /* next call will reset error message if not context */ + if (rcl_publisher_is_valid_except_context(&publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + /* publisher is invalid due to context being shutdown */ + return 0; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count"); + } + return inter_process_subscription_count; +} + +size_t +PublisherBase::get_intra_process_subscription_count() const +{ + auto ipm = weak_ipm_.lock(); + if (!use_intra_process_) { + return 0; + } + if (!ipm) { + // TODO(ivanpauno): should this just return silently? Or maybe return with a warning? + // Same as wjwwood comment in publisher_factory create_shared_publish_callback. + // If we don't raise an error here, use_intra_process_ flag is unnecessary. + throw std::runtime_error( + "intra process subscriber count called after " + "destruction of intra process manager"); + } + return ipm->get_subscription_count(intra_process_publisher_id_); +} + bool PublisherBase::operator==(const rmw_gid_t & gid) const { @@ -168,7 +227,8 @@ PublisherBase::operator==(const rmw_gid_t * gid) const void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, - StoreMessageCallbackT callback, + StoreMessageCallbackT store_callback, + IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options) { const char * topic_name = this->get_topic_name(); @@ -199,7 +259,10 @@ PublisherBase::setup_intra_process( } intra_process_publisher_id_ = intra_process_publisher_id; - store_intra_process_message_ = callback; + store_intra_process_message_ = store_callback; + weak_ipm_ = ipm; + use_intra_process_ = true; + // Life time of this object is tied to the publisher handle. rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle( &intra_process_publisher_handle_); diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription.cpp index be844dd678..ed26acaf17 100644 --- a/rclcpp/src/rclcpp/subscription.cpp +++ b/rclcpp/src/rclcpp/subscription.cpp @@ -20,6 +20,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/logging.hpp" #include "rmw/error_handling.h" @@ -34,6 +35,8 @@ SubscriptionBase::SubscriptionBase( const rcl_subscription_options_t & subscription_options, bool is_serialized) : node_handle_(node_handle), + use_intra_process_(false), + intra_process_subscription_id_(0), type_support_(type_support_handle), is_serialized_(is_serialized) { @@ -80,6 +83,18 @@ SubscriptionBase::SubscriptionBase( SubscriptionBase::~SubscriptionBase() { + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a subscription."); + return; + } + ipm->remove_subscription(intra_process_subscription_id_); } const char * @@ -117,3 +132,18 @@ SubscriptionBase::is_serialized() const { return is_serialized_; } + +size_t +SubscriptionBase::get_publisher_count() const +{ + size_t inter_process_publisher_count = 0; + + rmw_ret_t status = rcl_subscription_get_publisher_count( + subscription_handle_.get(), + &inter_process_publisher_count); + + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get publisher count"); + } + return inter_process_publisher_count; +} diff --git a/rclcpp/test/test_publisher_subscription_count_api.cpp b/rclcpp/test/test_publisher_subscription_count_api.cpp new file mode 100644 index 0000000000..2bc6bc48fd --- /dev/null +++ b/rclcpp/test/test_publisher_subscription_count_api.cpp @@ -0,0 +1,178 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +using rcl_interfaces::msg::IntraProcessMessage; + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expect intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionCount : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + +protected: + void SetUp() {} + + void TearDown() {} + + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionCount::offset = std::chrono::milliseconds(2000); + +void OnMessage(const rcl_interfaces::msg::IntraProcessMessage::SharedPtr msg) +{ + (void)msg; +} + +TEST_P(TestPublisherSubscriptionCount, increasing_and_decreasing_counts) +{ + TestParameters parameters = GetParam(); + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic"); + + EXPECT_EQ(publisher->get_subscription_count(), 0u); + EXPECT_EQ(publisher->get_intra_process_subscription_count(), 0u); + { + auto sub = node->create_subscription("/topic", &OnMessage); + rclcpp::sleep_for(offset); + EXPECT_EQ(publisher->get_subscription_count(), 1u); + EXPECT_EQ( + publisher->get_intra_process_subscription_count(), + parameters.intraprocess_count_results[0]); + { + rclcpp::Node::SharedPtr another_node = std::make_shared( + "another_node", + "/ns", + parameters.node_options[1]); + auto another_sub = + another_node->create_subscription("/topic", &OnMessage); + + rclcpp::sleep_for(offset); + EXPECT_EQ(publisher->get_subscription_count(), 2u); + EXPECT_EQ( + publisher->get_intra_process_subscription_count(), + parameters.intraprocess_count_results[1]); + } + rclcpp::sleep_for(offset); + EXPECT_EQ(publisher->get_subscription_count(), 1u); + EXPECT_EQ( + publisher->get_intra_process_subscription_count(), + parameters.intraprocess_count_results[0]); + } + /** + * Counts should be zero here, as all are subscriptions are out of scope. + * Subscriptions count checking is always preceeded with an sleep, as random failures had been + * detected without it. */ + rclcpp::sleep_for(offset); + EXPECT_EQ(publisher->get_subscription_count(), 0u); + EXPECT_EQ(publisher->get_intra_process_subscription_count(), 0u); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +TestParameters parameters[] = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionCount, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); diff --git a/rclcpp/test/test_subscription_publisher_count_api.cpp b/rclcpp/test/test_subscription_publisher_count_api.cpp new file mode 100644 index 0000000000..287075149a --- /dev/null +++ b/rclcpp/test/test_subscription_publisher_count_api.cpp @@ -0,0 +1,123 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +using rcl_interfaces::msg::IntraProcessMessage; + +struct TestParameters +{ + rclcpp::NodeOptions node_options; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestSubscriptionPublisherCount : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + +protected: + void SetUp() {} + void TearDown() {} + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestSubscriptionPublisherCount::offset = std::chrono::milliseconds(2000); + +void OnMessage(const rcl_interfaces::msg::IntraProcessMessage::SharedPtr msg) +{ + (void)msg; +} + +TEST_P(TestSubscriptionPublisherCount, increasing_and_decreasing_counts) +{ + rclcpp::NodeOptions node_options = GetParam().node_options; + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + node_options); + auto subscription = node->create_subscription("/topic", &OnMessage); + + EXPECT_EQ(subscription->get_publisher_count(), 0u); + { + auto pub = node->create_publisher("/topic"); + rclcpp::sleep_for(offset); + EXPECT_EQ(subscription->get_publisher_count(), 1u); + { + rclcpp::Node::SharedPtr another_node = std::make_shared( + "another_node", + "/ns", + node_options); + auto another_pub = + another_node->create_publisher("/topic"); + + rclcpp::sleep_for(offset); + EXPECT_EQ(subscription->get_publisher_count(), 2u); + } + rclcpp::sleep_for(offset); + EXPECT_EQ(subscription->get_publisher_count(), 1u); + } + rclcpp::sleep_for(offset); + EXPECT_EQ(subscription->get_publisher_count(), 0u); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +TestParameters parameters[] = { + /* + Testing subscription publisher count api. + One context. + */ + { + rclcpp::NodeOptions(), + "one_context_test" + }, + /* + Testing subscription publisher count api. + Two contexts. + */ + { + rclcpp::NodeOptions().context(get_new_context()), + "two_contexts_test" + } +}; + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, + TestSubscriptionPublisherCount, + testing::ValuesIn(parameters), + testing::PrintToStringParamName());