Skip to content

Commit

Permalink
Store the subscriber, client, service and timer
Browse files Browse the repository at this point in the history
objects as shared pointers (ros2#349)

To prevent an object from being deleted while the rcl_wait_set is
using raw pointers to internal members, we store them as shared
pointers.

The subscriptions are stored as a pair because a single
subscription handle can have both an intra and non-intra
process handle being used by the wait set.  In order to
remove objects based on null wait set handles, we need both.
  • Loading branch information
deng02 committed Jan 16, 2018
1 parent b81f55e commit 87d147c
Showing 1 changed file with 130 additions and 130 deletions.
260 changes: 130 additions & 130 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_

#include <memory>
#include <utility>
#include <vector>

#include "rcl/allocator.h"
Expand Down Expand Up @@ -86,53 +87,56 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

void clear_handles()
{
subscription_handles_.clear();
service_handles_.clear();
client_handles_.clear();
timer_handles_.clear();
sub_pair_ptrs_.clear();
service_ptrs_.clear();
client_ptrs_.clear();
timer_ptrs_.clear();
}

virtual void remove_null_handles(rcl_wait_set_t * wait_set)
{
for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
if (!wait_set->subscriptions[i]) {
subscription_handles_[i] = nullptr;
sub_pair_ptrs_[i].first = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_services; ++i) {
if (!wait_set->services[i]) {
service_handles_[i] = nullptr;
service_ptrs_[i] = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_clients; ++i) {
if (!wait_set->clients[i]) {
client_handles_[i] = nullptr;
client_ptrs_[i] = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
if (!wait_set->timers[i]) {
timer_handles_[i] = nullptr;
timer_ptrs_[i] = nullptr;
}
}

subscription_handles_.erase(
std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
subscription_handles_.end()
);
for (auto it = sub_pair_ptrs_.begin(); it != sub_pair_ptrs_.end();) {
if (it->first == nullptr) {
it = sub_pair_ptrs_.erase(it);
} else {
++it;
}
}

service_handles_.erase(
std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
service_handles_.end()
service_ptrs_.erase(
std::remove(service_ptrs_.begin(), service_ptrs_.end(), nullptr),
service_ptrs_.end()
);

client_handles_.erase(
std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
client_handles_.end()
client_ptrs_.erase(
std::remove(client_ptrs_.begin(), client_ptrs_.end(), nullptr),
client_ptrs_.end()
);

timer_handles_.erase(
std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
timer_handles_.end()
timer_ptrs_.erase(
std::remove(timer_ptrs_.begin(), timer_ptrs_.end(), nullptr),
timer_ptrs_.end()
);
}

Expand All @@ -153,29 +157,32 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
subscription_handles_.push_back(subscription->get_subscription_handle());
sub_pair_ptrs_.push_back(std::make_pair(
subscription->get_subscription_handle(),
subscription));
if (subscription->get_intra_process_subscription_handle()) {
subscription_handles_.push_back(
subscription->get_intra_process_subscription_handle());
sub_pair_ptrs_.push_back(std::make_pair(
subscription->get_intra_process_subscription_handle(),
subscription));
}
}
}
for (auto & weak_service : group->get_service_ptrs()) {
auto service = weak_service.lock();
if (service) {
service_handles_.push_back(service->get_service_handle());
service_ptrs_.push_back(service);
}
}
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client) {
client_handles_.push_back(client->get_client_handle());
client_ptrs_.push_back(client);
}
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto timer = weak_timer.lock();
if (timer) {
timer_handles_.push_back(timer->get_timer_handle());
timer_ptrs_.push_back(timer);
}
}
}
Expand All @@ -185,29 +192,29 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

bool add_handles_to_wait_set(rcl_wait_set_t * wait_set)
{
for (auto subscription : subscription_handles_) {
if (rcl_wait_set_add_subscription(wait_set, subscription) != RCL_RET_OK) {
for (auto subscription : sub_pair_ptrs_) {
if (rcl_wait_set_add_subscription(wait_set, subscription.first) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add subscription to wait set: %s\n", rcl_get_error_string_safe());
return false;
}
}

for (auto client : client_handles_) {
if (rcl_wait_set_add_client(wait_set, client) != RCL_RET_OK) {
for (auto client : client_ptrs_) {
if (rcl_wait_set_add_client(wait_set, client->get_client_handle()) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add client to wait set: %s\n", rcl_get_error_string_safe());
return false;
}
}

for (auto service : service_handles_) {
if (rcl_wait_set_add_service(wait_set, service) != RCL_RET_OK) {
for (auto service : service_ptrs_) {
if (rcl_wait_set_add_service(wait_set, service->get_service_handle()) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add service to wait set: %s\n", rcl_get_error_string_safe());
return false;
}
}

for (auto timer : timer_handles_) {
if (rcl_wait_set_add_timer(wait_set, timer) != RCL_RET_OK) {
for (auto timer : timer_ptrs_) {
if (rcl_wait_set_add_timer(wait_set, timer->get_timer_handle()) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add timer to wait set: %s\n", rcl_get_error_string_safe());
return false;
}
Expand Down Expand Up @@ -235,42 +242,41 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_nodes);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->get_intra_process_subscription_handle()) {
is_intra_process = subscription->get_intra_process_subscription_handle() == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_nodes);
if (!group) {
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
subscription_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
subscription_handles_.erase(it);
return;
auto it = sub_pair_ptrs_.begin();
while (it != sub_pair_ptrs_.end()) {
auto subscription_handle = it->first;
auto subscription = it->second;

// Figure out if this is for intra-process or not.
bool is_intra_process = false;
auto intra_process_handle = subscription->get_intra_process_subscription_handle();
if (intra_process_handle) {
is_intra_process = intra_process_handle == subscription_handle;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_nodes);
if (!group) {
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
it = sub_pair_ptrs_.erase(it);
continue;
}
// Else, the subscription is no longer valid, remove it and continue
subscription_handles_.erase(it);
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
sub_pair_ptrs_.erase(it);
return;
}
}

Expand All @@ -279,66 +285,60 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = service_handles_.begin();
while (it != service_handles_.end()) {
auto service = get_service_by_handle(*it, weak_nodes);
if (service) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
service_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
service_handles_.erase(it);
return;
auto it = service_ptrs_.begin();
while (it != service_ptrs_.end()) {
auto service = *it;

// Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
it = service_ptrs_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Else, the service is no longer valid, remove it and continue
service_handles_.erase(it);
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
service_ptrs_.erase(it);
return;
}
}

virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
auto client = get_client_by_handle(*it, weak_nodes);
if (client) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_client(client, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
client_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
client_handles_.erase(it);
return;
auto it = client_ptrs_.begin();
while (it != client_ptrs_.end()) {
auto client = *it;

// Find the group for this handle and see if it can be serviced
auto group = get_group_by_client(client, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
it = client_ptrs_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Else, the service is no longer valid, remove it and continue
client_handles_.erase(it);
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
client_ptrs_.erase(it);
return;
}
}

Expand All @@ -349,17 +349,17 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

size_t number_of_ready_subscriptions() const
{
return subscription_handles_.size();
return sub_pair_ptrs_.size();
}

size_t number_of_ready_services() const
{
return service_handles_.size();
return service_ptrs_.size();
}

size_t number_of_ready_clients() const
{
return client_handles_.size();
return client_ptrs_.size();
}

size_t number_of_guard_conditions() const
Expand All @@ -369,7 +369,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

size_t number_of_ready_timers() const
{
return timer_handles_.size();
return timer_ptrs_.size();
}

private:
Expand All @@ -379,10 +379,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

VectorRebind<const rcl_guard_condition_t *> guard_conditions_;

VectorRebind<const rcl_subscription_t *> subscription_handles_;
VectorRebind<const rcl_service_t *> service_handles_;
VectorRebind<const rcl_client_t *> client_handles_;
VectorRebind<const rcl_timer_t *> timer_handles_;
VectorRebind<std::pair<const rcl_subscription_t *, SubscriptionBase::SharedPtr>> sub_pair_ptrs_;
VectorRebind<rclcpp::ServiceBase::SharedPtr> service_ptrs_;
VectorRebind<rclcpp::ClientBase::SharedPtr> client_ptrs_;
VectorRebind<rclcpp::TimerBase::SharedPtr> timer_ptrs_;

std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;
Expand Down

0 comments on commit 87d147c

Please sign in to comment.