Skip to content

Commit

Permalink
Added count matching api and intra-process subscriber count (ros2#628)
Browse files Browse the repository at this point in the history
* Added count matching api to publishers. Also, internal method to count intra-process subscriptions

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Corrected error checking in publisher interprocess subscription count api. Minimal modifications in test

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Moved intraprocess subscription count api to public. Started removing publishers and subscribers from ipm.

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Added publisher count api in subscription class

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Solved Wreorder

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno authored and christopherho-ApexAI committed Jun 3, 2019
1 parent c089354 commit 3d45e56
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 5 deletions.
20 changes: 20 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_publisher ${PROJECT_NAME})
endif()
ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp)
if(TARGET test_publisher_subscription_count_api)
target_include_directories(test_publisher_subscription_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_publisher_subscription_count_api ${PROJECT_NAME})
endif()
ament_add_gtest(test_rate test/test_rate.cpp)
if(TARGET test_rate)
target_include_directories(test_rate PUBLIC
Expand Down Expand Up @@ -290,6 +300,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_subscription ${PROJECT_NAME})
endif()
ament_add_gtest(test_subscription_publisher_count_api test/test_subscription_publisher_count_api.cpp)
if(TARGET test_subscription_publisher_count_api)
target_include_directories(test_subscription_publisher_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_subscription_publisher_count_api ${PROJECT_NAME})
endif()
find_package(test_msgs REQUIRED)
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
if(TARGET test_subscription_traits)
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ class IntraProcessManager
bool
matches_any_publishers(const rmw_gid_t * id) const;

/// Return the number of intraprocess subscriptions to a topic, given the publisher id.
RCLCPP_PUBLIC
size_t
get_subscription_count(uint64_t intra_process_publisher_id) const;

private:
RCLCPP_PUBLIC
static uint64_t
Expand Down
23 changes: 23 additions & 0 deletions rclcpp/include/rclcpp/intra_process_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class IntraProcessManagerImplBase
virtual bool
matches_any_publishers(const rmw_gid_t * id) const = 0;

virtual size_t
get_subscription_count(uint64_t intra_process_publisher_id) const = 0;

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase)
};
Expand Down Expand Up @@ -248,6 +251,26 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase
return false;
}

size_t
get_subscription_count(uint64_t intra_process_publisher_id) const
{
auto publisher_it = publishers_.find(intra_process_publisher_id);
if (publisher_it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return 0;
}
auto publisher = publisher_it->second.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
auto sub_map_it = subscription_ids_by_topic_.find(publisher->get_topic_name());
if (sub_map_it == subscription_ids_by_topic_.end()) {
// No intraprocess subscribers
return 0;
}
return sub_map_it->second.size();
}

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl)

Expand Down
30 changes: 29 additions & 1 deletion rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ namespace node_interfaces
class NodeTopicsInterface;
}

namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}

class PublisherBase
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
Expand Down Expand Up @@ -107,6 +116,18 @@ class PublisherBase
const rcl_publisher_t *
get_publisher_handle() const;

/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
size_t
get_subscription_count() const;

/// Get intraprocess subscription count
/** \return The number of intraprocess subscriptions. */
RCLCPP_PUBLIC
size_t
get_intra_process_subscription_count() const;

/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
Expand All @@ -128,13 +149,16 @@ class PublisherBase
operator==(const rmw_gid_t * gid) const;

using StoreMessageCallbackT = std::function<uint64_t(uint64_t, void *, const std::type_info &)>;
using IntraProcessManagerSharedPtr =
std::shared_ptr<rclcpp::intra_process_manager::IntraProcessManager>;

/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options);

protected:
Expand All @@ -143,6 +167,10 @@ class PublisherBase
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
StoreMessageCallbackT store_intra_process_message_;

Expand Down
27 changes: 26 additions & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ namespace node_interfaces
class NodeTopicsInterface;
} // namespace node_interfaces

namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}

/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
Expand Down Expand Up @@ -129,11 +138,23 @@ class SubscriptionBase
bool
is_serialized() const;

/// Get matching publisher count
/** \return The number of publishers on this topic. */
RCLCPP_PUBLIC
size_t
get_publisher_count() const;

protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_;

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;

private:
RCLCPP_DISABLE_COPY(SubscriptionBase)

Expand Down Expand Up @@ -272,10 +293,13 @@ class Subscription : public SubscriptionBase
using MatchesAnyPublishersCallbackType = std::function<bool (const rmw_gid_t *)>;

/// Implemenation detail.
// TODO(ivanpauno): This can be moved to the base class. No reason to be here.
// Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_.
void setup_intra_process(
uint64_t intra_process_subscription_id,
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)
{
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
Expand All @@ -302,6 +326,8 @@ class Subscription : public SubscriptionBase
intra_process_subscription_id_ = intra_process_subscription_id;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}

/// Implemenation detail.
Expand All @@ -323,7 +349,6 @@ class Subscription : public SubscriptionBase

GetMessageCallbackType get_intra_process_message_callback_;
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
uint64_t intra_process_subscription_id_;
};

} // namespace rclcpp
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/subscription_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ create_subscription_factory(
intra_process_subscription_id,
take_intra_process_message_func,
matches_any_publisher_func,
weak_ipm,
intra_process_options
);
};
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
return impl_->matches_any_publishers(id);
}

size_t
IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const
{
return impl_->get_subscription_count(intra_process_publisher_id);
}

uint64_t
IntraProcessManager::get_next_unique_id()
{
Expand Down
1 change: 1 addition & 0 deletions rclcpp/src/rclcpp/node_interfaces/node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ NodeTopics::create_publisher(
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
ipm,
publisher_options);
}

Expand Down
69 changes: 66 additions & 3 deletions rclcpp/src/rclcpp/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
Expand All @@ -42,7 +43,8 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
use_intra_process_(false), intra_process_publisher_id_(0),
store_intra_process_message_(nullptr)
{
rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
Expand Down Expand Up @@ -94,6 +96,20 @@ PublisherBase::~PublisherBase()
rcl_get_error_string().str);
rcl_reset_error();
}

auto ipm = weak_ipm_.lock();

if (!use_intra_process_) {
return;
}
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a publisher.");
return;
}
ipm->remove_publisher(intra_process_publisher_id_);
}

const char *
Expand Down Expand Up @@ -138,6 +154,49 @@ PublisherBase::get_publisher_handle() const
return &publisher_handle_;
}

size_t
PublisherBase::get_subscription_count() const
{
size_t inter_process_subscription_count = 0;

rmw_ret_t status = rcl_publisher_get_subscription_count(
&publisher_handle_,
&inter_process_subscription_count);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
/* publisher is invalid due to context being shutdown */
return 0;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
}
return inter_process_subscription_count;
}

size_t
PublisherBase::get_intra_process_subscription_count() const
{
auto ipm = weak_ipm_.lock();
if (!use_intra_process_) {
return 0;
}
if (!ipm) {
// TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
// Same as wjwwood comment in publisher_factory create_shared_publish_callback.
// If we don't raise an error here, use_intra_process_ flag is unnecessary.
throw std::runtime_error(
"intra process subscriber count called after "
"destruction of intra process manager");
}
return ipm->get_subscription_count(intra_process_publisher_id_);
}

bool
PublisherBase::operator==(const rmw_gid_t & gid) const
{
Expand Down Expand Up @@ -168,7 +227,8 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options)
{
const char * topic_name = this->get_topic_name();
Expand Down Expand Up @@ -199,7 +259,10 @@ PublisherBase::setup_intra_process(
}

intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
store_intra_process_message_ = store_callback;
weak_ipm_ = ipm;
use_intra_process_ = true;

// Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(
&intra_process_publisher_handle_);
Expand Down
Loading

0 comments on commit 3d45e56

Please sign in to comment.