diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp index 3413cea4c..32370bc62 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp @@ -30,8 +30,7 @@ create_subscription( const char * topic_name, const rmw_qos_profile_t * qos_policies, const rmw_subscription_options_t * subscription_options, - bool keyed, - bool create_subscription_listener); + bool keyed); } // namespace rmw_fastrtps_cpp #endif // RMW_FASTRTPS_CPP__SUBSCRIPTION_HPP_ diff --git a/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp b/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp index 51138919e..7a942a196 100644 --- a/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp +++ b/rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo; static rmw_ret_t -init_context_impl(rmw_context_t * context) +init_context_impl( + rmw_context_t * context) { rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context) (context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0, context->options.enclave, common_context.get()), - [&](CustomParticipantInfo * participant_info) { + [&](CustomParticipantInfo * participant_info) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) { RCUTILS_SAFE_FWRITE_TO_STDERR( "Failed to destroy participant after function: '" @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context) "ros_discovery_info", &qos, &publisher_options, - false, // our fastrtps typesupport doesn't support keyed topics + false, // our fastrtps typesupport doesn't support keyed topics true), - [&](rmw_publisher_t * pub) { + [&](rmw_publisher_t * pub) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher( eprosima_fastrtps_identifier, participant_info.get(), @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context) "ros_discovery_info", &qos, &subscription_options, - false, // our fastrtps typesupport doesn't support keyed topics - true), - [&](rmw_subscription_t * sub) { + false), // our fastrtps typesupport doesn't support keyed topics + [&](rmw_subscription_t * sub) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription( eprosima_fastrtps_identifier, participant_info.get(), @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context) std::unique_ptr> graph_guard_condition( rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier), - [&](rmw_guard_condition_t * p) { + [&](rmw_guard_condition_t * p) + { rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p); if (ret != RMW_RET_OK) { RMW_SAFE_FWRITE_TO_STDERR( @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context) } common_context->graph_cache.set_on_change_callback( - [guard_condition = graph_guard_condition.get()]() { + [guard_condition = graph_guard_condition.get()]() + { rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition( eprosima_fastrtps_identifier, guard_condition); @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context) } rmw_ret_t -rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_t * context) +rmw_fastrtps_cpp::increment_context_impl_ref_count( + rmw_context_t * context) { assert(context); assert(context->impl); diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index efd27d4d4..d6cce9c40 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -269,17 +269,22 @@ rmw_fastrtps_cpp::create_publisher( return nullptr; } - // Creates DataWriter + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->data_writer_ = publisher->create_datawriter( topic.topic, writer_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->data_writer_) { RMW_SET_ERROR_MSG("create_publisher() could not create data writer"); return nullptr; } + // Set the StatusCondition to none to prevent triggering via WaitSets + info->data_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 875e99a3e..57fef8d01 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -324,13 +324,17 @@ rmw_create_client( info->response_reader_ = subscriber->create_datareader( response_topic_desc, reader_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!info->response_reader_) { RMW_SET_ERROR_MSG("create_client() failed to create response DataReader"); return nullptr; } + info->response_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( [subscriber, info]() { @@ -370,17 +374,22 @@ rmw_create_client( return nullptr; } - // Creates DataWriter + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->request_writer_ = publisher->create_datawriter( request_topic.topic, writer_qos, - info->pub_listener_); + info->pub_listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->request_writer_) { RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter"); return nullptr; } + // Set the StatusCondition to none to prevent triggering via WaitSets + info->request_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 6be16ca8e..88b32262b 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -323,13 +323,17 @@ rmw_create_service( info->request_reader_ = subscriber->create_datareader( request_topic_desc, reader_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!info->request_reader_) { RMW_SET_ERROR_MSG("create_service() failed to create request DataReader"); return nullptr; } + info->request_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( [subscriber, info]() { @@ -373,17 +377,22 @@ rmw_create_service( return nullptr; } - // Creates DataWriter + // Creates DataWriter with a mask enabling publication_matched calls for the listener info->response_writer_ = publisher->create_datawriter( response_topic.topic, writer_qos, - info->pub_listener_); + info->pub_listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->response_writer_) { RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter"); return nullptr; } + // Set the StatusCondition to none to prevent triggering via WaitSets + info->response_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 1917b6d8b..bfb0d907a 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -77,8 +77,7 @@ rmw_create_subscription( topic_name, qos_policies, subscription_options, - false, // use no keyed topic - true); // create subscription listener + false); // use no keyed topic if (!subscription) { return nullptr; } diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index e416c1095..737104fc7 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -65,8 +65,7 @@ create_subscription( const char * topic_name, const rmw_qos_profile_t * qos_policies, const rmw_subscription_options_t * subscription_options, - bool keyed, - bool create_subscription_listener) + bool keyed) { ///// // Check input parameters @@ -164,7 +163,8 @@ create_subscription( } auto cleanup_info = rcpputils::make_scope_exit( - [info, dds_participant]() { + [info, dds_participant]() + { delete info->listener_; if (info->type_support_) { dds_participant->unregister_type(info->type_support_.get_type_name()); @@ -208,12 +208,10 @@ create_subscription( ///// // Create Listener - if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); - if (!info->listener_) { - RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); - return nullptr; - } + info->listener_ = new (std::nothrow) SubListener(info); + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); + return nullptr; } ///// @@ -297,9 +295,14 @@ create_subscription( return nullptr; } + // Initialize DataReader's StatusCondition to be notified when new data is available + info->data_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( - [subscriber, info]() { + [subscriber, info]() + { subscriber->delete_datareader(info->data_reader_); }); @@ -316,7 +319,8 @@ create_subscription( return nullptr; } auto cleanup_rmw_subscription = rcpputils::make_scope_exit( - [rmw_subscription]() { + [rmw_subscription]() + { rmw_free(const_cast(rmw_subscription->topic_name)); rmw_subscription_free(rmw_subscription); }); @@ -345,4 +349,5 @@ create_subscription( info->subscription_gid_.data); return rmw_subscription; } + } // namespace rmw_fastrtps_cpp diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/subscription.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/subscription.hpp index 0db879040..6401254c1 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/subscription.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/subscription.hpp @@ -30,8 +30,7 @@ create_subscription( const char * topic_name, const rmw_qos_profile_t * qos_policies, const rmw_subscription_options_t * subscription_options, - bool keyed, - bool create_subscription_listener); + bool keyed); } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp b/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp index aaa658fda..5866e05de 100644 --- a/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo; static rmw_ret_t -init_context_impl(rmw_context_t * context) +init_context_impl( + rmw_context_t * context) { rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context) (context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0, context->options.enclave, common_context.get()), - [&](CustomParticipantInfo * participant_info) { + [&](CustomParticipantInfo * participant_info) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) { RCUTILS_SAFE_FWRITE_TO_STDERR( "Failed to destroy participant after function: '" @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context) "ros_discovery_info", &qos, &publisher_options, - false, // our fastrtps typesupport doesn't support keyed topics + false, // our fastrtps typesupport doesn't support keyed topics true), - [&](rmw_publisher_t * pub) { + [&](rmw_publisher_t * pub) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher( eprosima_fastrtps_identifier, participant_info.get(), @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context) "ros_discovery_info", &qos, &subscription_options, - false, // our fastrtps typesupport doesn't support keyed topics - true), - [&](rmw_subscription_t * sub) { + false), // our fastrtps typesupport doesn't support keyed topics + [&](rmw_subscription_t * sub) + { if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription( eprosima_fastrtps_identifier, participant_info.get(), @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context) std::unique_ptr> graph_guard_condition( rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier), - [&](rmw_guard_condition_t * p) { + [&](rmw_guard_condition_t * p) + { rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p); if (ret != RMW_RET_OK) { RMW_SAFE_FWRITE_TO_STDERR( @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context) } common_context->graph_cache.set_on_change_callback( - [guard_condition = graph_guard_condition.get()]() { + [guard_condition = graph_guard_condition.get()]() + { rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition( eprosima_fastrtps_identifier, guard_condition); @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context) } rmw_ret_t -rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(rmw_context_t * context) +rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count( + rmw_context_t * context) { assert(context); assert(context->impl); diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index 6235cf776..83c656404 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -278,13 +278,17 @@ rmw_fastrtps_dynamic_cpp::create_publisher( info->data_writer_ = publisher->create_datawriter( topic.topic, writer_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->data_writer_) { RMW_SET_ERROR_MSG("create_publisher() could not create data writer"); return nullptr; } + info->data_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 81d7de351..2b67a26f1 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -355,13 +355,17 @@ rmw_create_client( info->response_reader_ = subscriber->create_datareader( response_topic_desc, reader_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!info->response_reader_) { RMW_SET_ERROR_MSG("create_client() failed to create response DataReader"); return nullptr; } + info->response_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( [subscriber, info]() { @@ -405,13 +409,17 @@ rmw_create_client( info->request_writer_ = publisher->create_datawriter( request_topic.topic, writer_qos, - info->pub_listener_); + info->pub_listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->request_writer_) { RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter"); return nullptr; } + info->request_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 269f3cee9..3f3a51f03 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -354,13 +354,17 @@ rmw_create_service( info->request_reader_ = subscriber->create_datareader( request_topic_desc, reader_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!info->request_reader_) { RMW_SET_ERROR_MSG("create_service() failed to create request DataReader"); return nullptr; } + info->request_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( [subscriber, info]() { @@ -408,13 +412,17 @@ rmw_create_service( info->response_writer_ = publisher->create_datawriter( response_topic.topic, writer_qos, - info->pub_listener_); + info->pub_listener_, + eprosima::fastdds::dds::StatusMask::publication_matched()); if (!info->response_writer_) { RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter"); return nullptr; } + info->response_writer_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::none()); + // lambda to delete datawriter auto cleanup_datawriter = rcpputils::make_scope_exit( [publisher, info]() { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp index 7da462d07..868aa7579 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp @@ -80,8 +80,7 @@ rmw_create_subscription( topic_name, qos_policies, subscription_options, - false, - true); + false); if (!subscription) { return nullptr; } diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 2637097fd..3a6426273 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -65,8 +65,7 @@ create_subscription( const char * topic_name, const rmw_qos_profile_t * qos_policies, const rmw_subscription_options_t * subscription_options, - bool keyed, - bool create_subscription_listener) + bool keyed) { RCUTILS_CAN_RETURN_WITH_ERROR_OF(nullptr); @@ -164,7 +163,8 @@ create_subscription( } auto cleanup_info = rcpputils::make_scope_exit( - [info, dds_participant]() { + [info, dds_participant]() + { delete info->listener_; if (info->type_support_) { dds_participant->unregister_type(info->type_support_.get_type_name()); @@ -181,7 +181,8 @@ create_subscription( return nullptr; } auto return_type_support = rcpputils::make_scope_exit( - [&type_registry, type_support]() { + [&type_registry, type_support]() + { type_registry.return_message_type_support(type_support); }); @@ -213,13 +214,11 @@ create_subscription( ///// // Create Listener - if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); + info->listener_ = new (std::nothrow) SubListener(info); - if (!info->listener_) { - RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); - return nullptr; - } + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); + return nullptr; } ///// @@ -266,7 +265,7 @@ create_subscription( return nullptr; } - info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); + info->listener_ = new (std::nothrow) SubListener(info); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener"); return nullptr; @@ -297,7 +296,8 @@ create_subscription( info->data_reader_ = subscriber->create_datareader( des_topic, reader_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!info->data_reader_ && (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED == subscription_options->require_unique_network_flow_endpoints)) @@ -305,7 +305,8 @@ create_subscription( info->data_reader_ = subscriber->create_datareader( des_topic, original_qos, - info->listener_); + info->listener_, + eprosima::fastdds::dds::StatusMask::subscription_matched()); } if (!info->data_reader_) { @@ -313,9 +314,14 @@ create_subscription( return nullptr; } + // Initialize DataReader's StatusCondition to be notified when new data is available + info->data_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( - [subscriber, info]() { + [subscriber, info]() + { subscriber->delete_datareader(info->data_reader_); }); @@ -330,7 +336,8 @@ create_subscription( return nullptr; } auto cleanup_rmw_subscription = rcpputils::make_scope_exit( - [rmw_subscription]() { + [rmw_subscription]() + { rmw_free(const_cast(rmw_subscription->topic_name)); rmw_subscription_free(rmw_subscription); }); @@ -358,4 +365,5 @@ create_subscription( cleanup_info.cancel(); return rmw_subscription; } + } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 980817062..35de2cb24 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -15,9 +15,6 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_ -#include -#include -#include #include #include #include @@ -75,107 +72,34 @@ typedef struct CustomClientResponse { eprosima::fastrtps::rtps::SampleIdentity sample_identity_; std::unique_ptr buffer_; - eprosima::fastdds::dds::SampleInfo sample_info_ {}; } CustomClientResponse; class ClientListener : public eprosima::fastdds::dds::DataReaderListener { public: - explicit ClientListener(CustomClientInfo * info) - : info_(info), list_has_data_(false), - conditionMutex_(nullptr), conditionVariable_(nullptr) {} - - - void - on_data_available(eprosima::fastdds::dds::DataReader * reader) - { - assert(reader); - - CustomClientResponse response; - // Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19 - response.buffer_.reset(new eprosima::fastcdr::FastBuffer()); - - rmw_fastrtps_shared_cpp::SerializedData data; - data.is_cdr_buffer = true; - data.data = response.buffer_.get(); - data.impl = nullptr; // not used when is_cdr_buffer is true - if (reader->take_next_sample(&data, &response.sample_info_) == ReturnCode_t::RETCODE_OK) { - if (response.sample_info_.valid_data) { - response.sample_identity_ = response.sample_info_.related_sample_identity; - - if (response.sample_identity_.writer_guid() == info_->reader_guid_ || - response.sample_identity_.writer_guid() == info_->writer_guid_) - { - std::lock_guard lock(internalMutex_); - const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); - if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { - while (list.size() >= static_cast(history.depth)) { - list.pop_front(); - } - } - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - list.emplace_back(std::move(response)); - // the change to list_has_data_ needs to be mutually exclusive with - // rmw_wait() which checks hasData() and decides if wait() needs to - // be called - list_has_data_.store(true); - clock.unlock(); - conditionVariable_->notify_one(); - } else { - list.emplace_back(std::move(response)); - list_has_data_.store(true); - } - - std::unique_lock lock_mutex(on_new_response_m_); - - if (on_new_response_cb_) { - on_new_response_cb_(user_data_, 1); - } else { - unread_count_++; - } - } - } - } - } - - bool - getResponse(CustomClientResponse & response) + explicit ClientListener( + CustomClientInfo * info) + : info_(info) { - std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - return popResponse(response); - } - return popResponse(response); } void - attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) + on_data_available( + eprosima::fastdds::dds::DataReader *) { - std::lock_guard lock(internalMutex_); - conditionMutex_ = conditionMutex; - conditionVariable_ = conditionVariable; - } + std::unique_lock lock_mutex(on_new_response_m_); - void - detachCondition() - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = nullptr; - conditionVariable_ = nullptr; - } + if (on_new_response_cb_) { + auto unread_responses = get_unread_responses(); - bool - hasData() - { - return list_has_data_.load(); + if (0 < unread_responses) { + on_new_response_cb_(user_data_, unread_responses); + } + } } void on_subscription_matched( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final { if (info_ == nullptr) { @@ -191,6 +115,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener info_->response_subscriber_matched_count_.store(publishers_.size()); } + size_t get_unread_responses() + { + return info_->response_reader_->get_unread_count(true); + } + // Provide handlers to perform an action when a // new event from this listener has ocurred void @@ -201,49 +130,45 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener std::unique_lock lock_mutex(on_new_response_m_); if (callback) { - // Push events arrived before setting the the executor callback - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; + auto unread_responses = get_unread_responses(); + + if (0 < unread_responses) { + callback(user_data, unread_responses); } + user_data_ = user_data; on_new_response_cb_ = callback; + + eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask(); + status_mask |= eprosima::fastdds::dds::StatusMask::data_available(); + info_->response_reader_->set_listener(this, status_mask); } else { + eprosima::fastdds::dds::StatusMask status_mask = info_->response_reader_->get_status_mask(); + status_mask &= ~eprosima::fastdds::dds::StatusMask::data_available(); + info_->response_reader_->set_listener(this, status_mask); + user_data_ = nullptr; on_new_response_cb_ = nullptr; } } private: - bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_) - { - if (!list.empty()) { - response = std::move(list.front()); - list.pop_front(); - list_has_data_.store(!list.empty()); - return true; - } - return false; - }; - CustomClientInfo * info_; - std::mutex internalMutex_; - std::list list RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::atomic_bool list_has_data_; - std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::set publishers_; rmw_event_callback_t on_new_response_cb_{nullptr}; + const void * user_data_{nullptr}; + std::mutex on_new_response_m_; - uint64_t unread_count_ = 0; }; class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener { public: - explicit ClientPubListener(CustomClientInfo * info) + explicit ClientPubListener( + CustomClientInfo * info) : info_(info) { } diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index d7df46611..74da14caa 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -15,14 +15,12 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ -#include -#include -#include -#include #include #include #include "fastcdr/FastBuffer.h" +#include "fastdds/dds/core/condition/StatusCondition.hpp" +#include "fastdds/dds/core/condition/GuardCondition.hpp" #include "rmw/event.h" #include "rmw/event_callback_type.h" @@ -32,77 +30,45 @@ class EventListenerInterface { -protected: - class ConditionalScopedLock; - public: - /// Connect a condition variable so a waiter can be notified of new data. - virtual void attachCondition( - std::mutex * conditionMutex, - std::condition_variable * conditionVariable) = 0; - - /// Unset the information from attachCondition. - virtual void detachCondition() = 0; - - /// Check if there is new data available for a specific event type. - /** - * \param event_type The event type to check on. - * \return `true` if new data is available. - */ - virtual bool hasEvent(rmw_event_type_t event_type) const = 0; + virtual eprosima::fastdds::dds::StatusCondition & get_statuscondition() const = 0; /// Take ready data for an event type. /** - * \param event_type The event type to get data for. - * \param event_info A preallocated event information (from rmw/types.h) to fill with data - * \return `true` if data was successfully taken. - * \return `false` if data was not available, in this case nothing was written to event_info. - */ - virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0; + * \param event_type The event type to get data for. + * \param event_info A preallocated event information (from rmw/types.h) to fill with data + * \return `true` if data was successfully taken. + * \return `false` if data was not available, in this case nothing was written to event_info. + */ + virtual bool take_event( + rmw_event_type_t event_type, + void * event_info) = 0; // Provide handlers to perform an action when a // new event from this listener has ocurred virtual void set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) = 0; - rmw_event_callback_t on_new_event_cb_{nullptr}; - const void * user_data_{nullptr}; - uint64_t unread_events_count_ = 0; - std::mutex on_new_event_m_; -}; - -class EventListenerInterface::ConditionalScopedLock -{ -public: - ConditionalScopedLock( - std::mutex * mutex, - std::condition_variable * condition_variable = nullptr) - : mutex_(mutex), cv_(condition_variable) + eprosima::fastdds::dds::GuardCondition & get_event_guard(rmw_event_type_t event_type) { - if (nullptr != mutex_) { - mutex_->lock(); - } + return event_guard[event_type]; } - ~ConditionalScopedLock() - { - if (nullptr != mutex_) { - mutex_->unlock(); - if (nullptr != cv_) { - cv_->notify_all(); - } - } - } +protected: + eprosima::fastdds::dds::GuardCondition event_guard[RMW_EVENT_INVALID]; + + rmw_event_callback_t on_new_event_cb_[RMW_EVENT_INVALID] = {nullptr}; -private: - std::mutex * mutex_; - std::condition_variable * cv_; + const void * user_data_[RMW_EVENT_INVALID] = {nullptr}; + + std::mutex on_new_event_m_; }; struct CustomEventInfo { - virtual EventListenerInterface * getListener() const = 0; + virtual EventListenerInterface * get_listener() const = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 37db4ca2c..2c6687974 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -15,8 +15,6 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ -#include -#include #include #include @@ -52,20 +50,19 @@ typedef struct CustomPublisherInfo : public CustomEventInfo RMW_FASTRTPS_SHARED_CPP_PUBLIC EventListenerInterface * - getListener() const final; + get_listener() const final; } CustomPublisherInfo; class PubListener : public EventListenerInterface, public eprosima::fastdds::dds::DataWriterListener { public: - explicit PubListener(CustomPublisherInfo * info) - : deadline_changes_(false), - liveliness_changes_(false), - incompatible_qos_changes_(false), - conditionMutex_(nullptr), - conditionVariable_(nullptr) + explicit PubListener( + CustomPublisherInfo * info) + : publisher_info_(info) + , deadline_changes_(false) + , liveliness_changes_(false) + , incompatible_qos_changes_(false) { - (void) info; } // DataWriterListener implementation @@ -75,7 +72,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataWriter * /* writer */, const eprosima::fastdds::dds::PublicationMatchedStatus & info) final { - std::lock_guard lock(internalMutex_); + std::lock_guard lock(discovery_m_); if (info.current_count_change == 1) { subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); } else if (info.current_count_change == -1) { @@ -101,63 +98,54 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataWriter *, const eprosima::fastdds::dds::OfferedIncompatibleQosStatus &) final; - // EventListenerInterface implementation - RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool - hasEvent(rmw_event_type_t event_type) const final; - - RMW_FASTRTPS_SHARED_CPP_PUBLIC - void set_on_new_event_callback( - const void * user_data, - rmw_event_callback_t callback) final; - - RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool - takeNextEvent(rmw_event_type_t event_type, void * event_info) final; - // PubListener API size_t subscriptionCount() { - std::lock_guard lock(internalMutex_); + std::lock_guard lock(discovery_m_); return subscriptions_.size(); } - void - attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = conditionMutex; - conditionVariable_ = conditionVariable; - } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + eprosima::fastdds::dds::StatusCondition & get_statuscondition() const final; - void - detachCondition() - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = nullptr; - conditionVariable_ = nullptr; - } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool take_event( + rmw_event_type_t event_type, + void * event_info) final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void set_on_new_event_callback( + rmw_event_type_t event_type, + const void * user_data, + rmw_event_callback_t callback) final; private: - mutable std::mutex internalMutex_; + void trigger_event(rmw_event_type_t event_type); + + CustomPublisherInfo * publisher_info_ = nullptr; + + mutable std::mutex discovery_m_; std::set subscriptions_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(discovery_m_); + + bool deadline_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool deadline_changes_; eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_missed_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + bool liveliness_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool liveliness_changes_; eprosima::fastdds::dds::LivelinessLostStatus liveliness_lost_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool incompatible_qos_changes_; - eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + bool incompatible_qos_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 0ddbfd04f..4e284d0d8 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -15,9 +15,7 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ -#include #include -#include #include #include #include @@ -50,7 +48,7 @@ class ServicePubListener; enum class client_present_t { - FAILURE, // an error occurred when checking + FAILURE, // an error occurred when checking MAYBE, // reader not matched, writer still present YES, // reader matched GONE // neither reader nor writer @@ -75,10 +73,11 @@ typedef struct CustomServiceRequest { eprosima::fastrtps::rtps::SampleIdentity sample_identity_; eprosima::fastcdr::FastBuffer * buffer_; - eprosima::fastdds::dds::SampleInfo sample_info_ {}; CustomServiceRequest() - : buffer_(nullptr) {} + : buffer_(nullptr) + { + } } CustomServiceRequest; class ServicePubListener : public eprosima::fastdds::dds::DataWriterListener @@ -92,7 +91,8 @@ class ServicePubListener : public eprosima::fastdds::dds::DataWriterListener rmw_fastrtps_shared_cpp::hash_fastrtps_guid>; public: - explicit ServicePubListener(CustomServiceInfo * info) + explicit ServicePubListener( + CustomServiceInfo * info) { (void) info; } @@ -154,7 +154,8 @@ class ServicePubListener : public eprosima::fastdds::dds::DataWriterListener return client_present_t::YES; } - void endpoint_erase_if_exists(const eprosima::fastrtps::rtps::GUID_t & endpointGuid) + void endpoint_erase_if_exists( + const eprosima::fastrtps::rtps::GUID_t & endpointGuid) { std::lock_guard lock(mutex_); auto endpoint = clients_endpoints_.find(endpointGuid); @@ -183,15 +184,15 @@ class ServicePubListener : public eprosima::fastdds::dds::DataWriterListener class ServiceListener : public eprosima::fastdds::dds::DataReaderListener { public: - explicit ServiceListener(CustomServiceInfo * info) - : info_(info), list_has_data_(false), - conditionMutex_(nullptr), conditionVariable_(nullptr) + explicit ServiceListener( + CustomServiceInfo * info) + : info_(info) { } void on_subscription_matched( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final { if (info.current_count_change == -1) { @@ -200,110 +201,22 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener } } - void - on_data_available(eprosima::fastdds::dds::DataReader * reader) final - { - assert(reader); - - CustomServiceRequest request; - request.buffer_ = new eprosima::fastcdr::FastBuffer(); - - rmw_fastrtps_shared_cpp::SerializedData data; - data.is_cdr_buffer = true; - data.data = request.buffer_; - data.impl = nullptr; // not used when is_cdr_buffer is true - if (reader->take_next_sample(&data, &request.sample_info_) == ReturnCode_t::RETCODE_OK) { - if (request.sample_info_.valid_data) { - request.sample_identity_ = request.sample_info_.sample_identity; - // Use response subscriber guid (on related_sample_identity) when present. - const eprosima::fastrtps::rtps::GUID_t & reader_guid = - request.sample_info_.related_sample_identity.writer_guid(); - if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) { - request.sample_identity_.writer_guid() = reader_guid; - } - - // Save both guids in the clients_endpoints map - const eprosima::fastrtps::rtps::GUID_t & writer_guid = - request.sample_info_.sample_identity.writer_guid(); - info_->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid); - - std::lock_guard lock(internalMutex_); - const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); - if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { - while (list.size() >= static_cast(history.depth)) { - list.pop_front(); - } - } - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - list.push_back(request); - // the change to list_has_data_ needs to be mutually exclusive with - // rmw_wait() which checks hasData() and decides if wait() needs to - // be called - list_has_data_.store(true); - clock.unlock(); - conditionVariable_->notify_one(); - } else { - list.push_back(request); - list_has_data_.store(true); - } - - std::unique_lock lock_mutex(on_new_request_m_); - - if (on_new_request_cb_) { - on_new_request_cb_(user_data_, 1); - } else { - unread_count_++; - } - } - } - } - - CustomServiceRequest - getRequest() + size_t get_unread_resquests() { - std::lock_guard lock(internalMutex_); - CustomServiceRequest request; - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - if (!list.empty()) { - request = list.front(); - list.pop_front(); - list_has_data_.store(!list.empty()); - } - } else { - if (!list.empty()) { - request = list.front(); - list.pop_front(); - list_has_data_.store(!list.empty()); - } - } - - return request; + return info_->request_reader_->get_unread_count(true); } void - attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) + on_data_available( + eprosima::fastdds::dds::DataReader *) final { - std::lock_guard lock(internalMutex_); - conditionMutex_ = conditionMutex; - conditionVariable_ = conditionVariable; - } + std::unique_lock lock_mutex(on_new_request_m_); - void - detachCondition() - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = nullptr; - conditionVariable_ = nullptr; - } + auto unread_requests = get_unread_resquests(); - bool - hasData() - { - return list_has_data_.load(); + if (0u < unread_requests) { + on_new_request_cb_(user_data_, unread_requests); + } } // Provide handlers to perform an action when a @@ -316,14 +229,23 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener std::unique_lock lock_mutex(on_new_request_m_); if (callback) { - // Push events arrived before setting the the executor callback - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; + auto unread_requests = get_unread_resquests(); + + if (0 < unread_requests) { + callback(user_data, unread_requests); } + user_data_ = user_data; on_new_request_cb_ = callback; + + eprosima::fastdds::dds::StatusMask status_mask = info_->request_reader_->get_status_mask(); + status_mask |= eprosima::fastdds::dds::StatusMask::data_available(); + info_->request_reader_->set_listener(this, status_mask); } else { + eprosima::fastdds::dds::StatusMask status_mask = info_->request_reader_->get_status_mask(); + status_mask &= ~eprosima::fastdds::dds::StatusMask::data_available(); + info_->request_reader_->set_listener(this, status_mask); + user_data_ = nullptr; on_new_request_cb_ = nullptr; } @@ -331,16 +253,12 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener private: CustomServiceInfo * info_; - std::mutex internalMutex_; - std::list list RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::atomic_bool list_has_data_; - std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); rmw_event_callback_t on_new_request_cb_{nullptr}; + const void * user_data_{nullptr}; + std::mutex on_new_request_m_; - uint64_t unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index a130f2cf6..fc77ce92b 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -15,9 +15,7 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ -#include #include -#include #include #include #include @@ -78,54 +76,51 @@ struct CustomSubscriberInfo : public CustomEventInfo RMW_FASTRTPS_SHARED_CPP_PUBLIC EventListenerInterface * - getListener() const final; + get_listener() const final; }; + class SubListener : public EventListenerInterface, public eprosima::fastdds::dds::DataReaderListener { public: - explicit SubListener(CustomSubscriberInfo * info, size_t qos_depth) - : data_(false), - deadline_changes_(false), - liveliness_changes_(false), - sample_lost_changes_(false), - incompatible_qos_changes_(false), - conditionMutex_(nullptr), - conditionVariable_(nullptr) + explicit SubListener( + CustomSubscriberInfo * info) + : subscriber_info_(info) + , deadline_changes_(false) + , liveliness_changes_(false) + , sample_lost_changes_(false) + , incompatible_qos_changes_(false) { - qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits::max(); - // Field is not used right now - (void)info; } // DataReaderListener implementation void on_subscription_matched( - eprosima::fastdds::dds::DataReader * reader, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final { { - std::lock_guard lock(internalMutex_); + std::lock_guard lock(discovery_m_); if (info.current_count_change == 1) { publishers_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); } else if (info.current_count_change == -1) { publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); } } - update_has_data(reader); } void - on_data_available(eprosima::fastdds::dds::DataReader * reader) final + on_data_available( + eprosima::fastdds::dds::DataReader *) final { - update_has_data(reader); - std::unique_lock lock_mutex(on_new_message_m_); if (on_new_message_cb_) { - on_new_message_cb_(user_data_, 1); - } else { - new_data_unread_count_++; + auto unread_messages = get_unread_messages(); + + if (0 < unread_messages) { + on_new_message_cb_(new_message_user_data_, unread_messages); + } } } @@ -153,59 +148,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::RequestedIncompatibleQosStatus &) final; - // EventListenerInterface implementation - RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool - hasEvent(rmw_event_type_t event_type) const final; - - RMW_FASTRTPS_SHARED_CPP_PUBLIC - void set_on_new_event_callback( - const void * user_data, - rmw_event_callback_t callback) final; - - RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool - takeNextEvent(rmw_event_type_t event_type, void * event_info) final; - - // SubListener API - void - attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = conditionMutex; - conditionVariable_ = conditionVariable; - } - - void - detachCondition() - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = nullptr; - conditionVariable_ = nullptr; - } - - bool - hasData() const - { - return data_.load(std::memory_order_relaxed); - } - - void - update_has_data(eprosima::fastdds::dds::DataReader * reader) - { - // Make sure to call into Fast DDS before taking the lock to avoid an - // ABBA deadlock between internalMutex_ and mutexes inside of Fast DDS. - auto unread_count = reader->get_unread_count(); - bool has_data = unread_count > 0; - - std::lock_guard lock(internalMutex_); - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); - data_.store(has_data, std::memory_order_relaxed); - } - size_t publisherCount() { - std::lock_guard lock(internalMutex_); + std::lock_guard lock(discovery_m_); return publishers_.size(); } @@ -214,55 +159,64 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds void set_on_new_message_callback( const void * user_data, - rmw_event_callback_t callback) - { - std::unique_lock lock_mutex(on_new_message_m_); + rmw_event_callback_t callback); - if (callback) { - // Push events arrived before setting the executor's callback - if (new_data_unread_count_) { - auto unread_count = std::min(new_data_unread_count_, qos_depth_); - callback(user_data, unread_count); - new_data_unread_count_ = 0; - } - user_data_ = user_data; - on_new_message_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_message_cb_ = nullptr; - } + size_t get_unread_messages() + { + return subscriber_info_->data_reader_->get_unread_count(true); } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + eprosima::fastdds::dds::StatusCondition & get_statuscondition() const final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool take_event( + rmw_event_type_t event_type, + void * event_info) final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void set_on_new_event_callback( + rmw_event_type_t event_type, + const void * user_data, + rmw_event_callback_t callback) final; + private: - mutable std::mutex internalMutex_; + CustomSubscriberInfo * subscriber_info_ = nullptr; - std::atomic_bool data_; + bool deadline_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool deadline_changes_; eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_missed_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + bool liveliness_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool liveliness_changes_; eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + bool sample_lost_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool sample_lost_changes_; eprosima::fastdds::dds::SampleLostStatus sample_lost_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::atomic_bool incompatible_qos_changes_; - eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ - RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + bool incompatible_qos_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); - std::set publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::set publishers_ RCPPUTILS_TSA_GUARDED_BY( + discovery_m_); rmw_event_callback_t on_new_message_cb_{nullptr}; + + const void * new_message_user_data_{nullptr}; + std::mutex on_new_message_m_; - size_t qos_depth_; - size_t new_data_unread_count_ = 0; + + std::mutex discovery_m_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 9fc6e4009..2cc784513 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -21,73 +21,178 @@ #include "types/event_types.hpp" EventListenerInterface * -CustomPublisherInfo::getListener() const +CustomPublisherInfo::get_listener() const { return listener_; } +eprosima::fastdds::dds::StatusCondition & PubListener::get_statuscondition() const +{ + return publisher_info_->data_writer_->get_statuscondition(); +} + +bool PubListener::take_event( + rmw_event_type_t event_type, + void * event_info) +{ + assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); + + std::unique_lock lock_mutex(on_new_event_m_); + + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + { + auto rmw_data = static_cast(event_info); + if (liveliness_changes_) { + rmw_data->total_count = liveliness_lost_status_.total_count; + rmw_data->total_count_change = liveliness_lost_status_.total_count_change; + liveliness_changes_ = false; + } else { + eprosima::fastdds::dds::LivelinessLostStatus liveliness_lost_status; + publisher_info_->data_writer_->get_liveliness_lost_status(liveliness_lost_status); + rmw_data->total_count = liveliness_lost_status.total_count; + rmw_data->total_count_change = liveliness_lost_status.total_count_change; + } + liveliness_lost_status_.total_count_change = 0; + } + break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + { + auto rmw_data = static_cast(event_info); + if (deadline_changes_) { + rmw_data->total_count = offered_deadline_missed_status_.total_count; + rmw_data->total_count_change = offered_deadline_missed_status_.total_count_change; + deadline_changes_ = false; + } else { + eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_missed_status; + publisher_info_->data_writer_->get_offered_deadline_missed_status( + offered_deadline_missed_status); + rmw_data->total_count = offered_deadline_missed_status.total_count; + rmw_data->total_count_change = offered_deadline_missed_status.total_count_change; + } + offered_deadline_missed_status_.total_count_change = 0; + } + break; + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + { + auto rmw_data = static_cast(event_info); + if (incompatible_qos_changes_) { + rmw_data->total_count = incompatible_qos_status_.total_count; + rmw_data->total_count_change = incompatible_qos_status_.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + incompatible_qos_status_.last_policy_id); + incompatible_qos_changes_ = false; + } else { + eprosima::fastdds::dds::OfferedIncompatibleQosStatus offered_incompatible_qos_status; + publisher_info_->data_writer_->get_offered_incompatible_qos_status( + offered_incompatible_qos_status); + rmw_data->total_count = offered_incompatible_qos_status.total_count; + rmw_data->total_count_change = offered_incompatible_qos_status.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + offered_incompatible_qos_status.last_policy_id); + } + incompatible_qos_status_.total_count_change = 0; + } + break; + default: + return false; + } + event_guard[event_type].set_trigger_value(false); + return true; +} + +void PubListener::set_on_new_event_callback( + rmw_event_type_t event_type, + const void * user_data, + rmw_event_callback_t callback) +{ + std::unique_lock lock_mutex(on_new_event_m_); + + if (callback) { + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + publisher_info_->data_writer_->get_liveliness_lost_status(liveliness_lost_status_); + callback(user_data, liveliness_lost_status_.total_count_change); + liveliness_lost_status_.total_count_change = 0; + break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + publisher_info_->data_writer_->get_offered_deadline_missed_status( + offered_deadline_missed_status_); + callback( + user_data, + offered_deadline_missed_status_.total_count_change); + offered_deadline_missed_status_.total_count_change = 0; + break; + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + publisher_info_->data_writer_->get_offered_incompatible_qos_status( + incompatible_qos_status_); + callback( + user_data, + incompatible_qos_status_.total_count_change); + incompatible_qos_status_.total_count_change = 0; + break; + default: + break; + } + + user_data_[event_type] = user_data; + on_new_event_cb_[event_type] = callback; + + eprosima::fastdds::dds::StatusMask status_mask = + publisher_info_->data_writer_->get_status_mask(); + status_mask |= rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type); + publisher_info_->data_writer_->set_listener(this, status_mask); + } else { + eprosima::fastdds::dds::StatusMask status_mask = + publisher_info_->data_writer_->get_status_mask(); + status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type); + publisher_info_->data_writer_->set_listener(this, status_mask); + + user_data_[event_type] = nullptr; + on_new_event_cb_[event_type] = nullptr; + } +} + void PubListener::on_offered_deadline_missed( eprosima::fastdds::dds::DataWriter * /* writer */, const eprosima::fastdds::dds::OfferedDeadlineMissedStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to liveliness_lost_count_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::unique_lock lock_mutex(on_new_event_m_); // Assign absolute values offered_deadline_missed_status_.total_count = status.total_count; // Accumulate deltas offered_deadline_missed_status_.total_count_change += status.total_count_change; - deadline_changes_.store(true, std::memory_order_relaxed); - - std::unique_lock lock_mutex(on_new_event_m_); + deadline_changes_ = true; - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + trigger_event(RMW_EVENT_OFFERED_DEADLINE_MISSED); } void PubListener::on_liveliness_lost( eprosima::fastdds::dds::DataWriter * /* writer */, const eprosima::fastdds::dds::LivelinessLostStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to liveliness_lost_count_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::unique_lock lock_mutex(on_new_event_m_); // Assign absolute values liveliness_lost_status_.total_count = status.total_count; // Accumulate deltas liveliness_lost_status_.total_count_change += status.total_count_change; - liveliness_changes_.store(true, std::memory_order_relaxed); + liveliness_changes_ = true; - std::unique_lock lock_mutex(on_new_event_m_); - - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + trigger_event(RMW_EVENT_LIVELINESS_LOST); } void PubListener::on_offered_incompatible_qos( eprosima::fastdds::dds::DataWriter * /* writer */, const eprosima::fastdds::dds::OfferedIncompatibleQosStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to incompatible_qos_status_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::unique_lock lock_mutex(on_new_event_m_); // Assign absolute values incompatible_qos_status_.last_policy_id = status.last_policy_id; @@ -95,82 +200,16 @@ void PubListener::on_offered_incompatible_qos( // Accumulate deltas incompatible_qos_status_.total_count_change += status.total_count_change; - incompatible_qos_changes_.store(true, std::memory_order_relaxed); -} + incompatible_qos_changes_ = true; -bool PubListener::hasEvent(rmw_event_type_t event_type) const -{ - assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); - switch (event_type) { - case RMW_EVENT_LIVELINESS_LOST: - return liveliness_changes_.load(std::memory_order_relaxed); - case RMW_EVENT_OFFERED_DEADLINE_MISSED: - return deadline_changes_.load(std::memory_order_relaxed); - case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: - return incompatible_qos_changes_.load(std::memory_order_relaxed); - default: - break; - } - return false; + trigger_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE); } -void PubListener::set_on_new_event_callback( - const void * user_data, - rmw_event_callback_t callback) +void PubListener::trigger_event(rmw_event_type_t event_type) { - std::unique_lock lock_mutex(on_new_event_m_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (unread_events_count_) { - callback(user_data, unread_events_count_); - unread_events_count_ = 0; - } - user_data_ = user_data; - on_new_event_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_event_cb_ = nullptr; + if (on_new_event_cb_[event_type]) { + on_new_event_cb_[event_type](user_data_[event_type], 1); } -} -bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) -{ - assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); - std::lock_guard lock(internalMutex_); - switch (event_type) { - case RMW_EVENT_LIVELINESS_LOST: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = liveliness_lost_status_.total_count; - rmw_data->total_count_change = liveliness_lost_status_.total_count_change; - liveliness_lost_status_.total_count_change = 0; - liveliness_changes_.store(false, std::memory_order_relaxed); - } - break; - case RMW_EVENT_OFFERED_DEADLINE_MISSED: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = offered_deadline_missed_status_.total_count; - rmw_data->total_count_change = offered_deadline_missed_status_.total_count_change; - offered_deadline_missed_status_.total_count_change = 0; - deadline_changes_.store(false, std::memory_order_relaxed); - } - break; - case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = incompatible_qos_status_.total_count; - rmw_data->total_count_change = incompatible_qos_status_.total_count_change; - rmw_data->last_policy_kind = - rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( - incompatible_qos_status_.last_policy_id); - incompatible_qos_status_.total_count_change = 0; - incompatible_qos_changes_.store(false, std::memory_order_relaxed); - } - break; - default: - return false; - } - return true; + event_guard[event_type].set_trigger_value(true); } diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 6c24c358f..fe8a8e8a0 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -21,47 +21,240 @@ #include "types/event_types.hpp" EventListenerInterface * -CustomSubscriberInfo::getListener() const +CustomSubscriberInfo::get_listener() const { return listener_; } +eprosima::fastdds::dds::StatusCondition & SubListener::get_statuscondition() const +{ + return subscriber_info_->data_reader_->get_statuscondition(); +} + +bool SubListener::take_event( + rmw_event_type_t event_type, + void * event_info) +{ + assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); + + std::unique_lock lock_mutex(on_new_event_m_); + + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + { + auto rmw_data = static_cast(event_info); + if (liveliness_changes_) { + rmw_data->alive_count = liveliness_changed_status_.alive_count; + rmw_data->not_alive_count = liveliness_changed_status_.not_alive_count; + rmw_data->alive_count_change = liveliness_changed_status_.alive_count_change; + rmw_data->not_alive_count_change = liveliness_changed_status_.not_alive_count_change; + liveliness_changes_ = false; + } else { + eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status; + subscriber_info_->data_reader_->get_liveliness_changed_status(liveliness_changed_status); + rmw_data->alive_count = liveliness_changed_status.alive_count; + rmw_data->not_alive_count = liveliness_changed_status.not_alive_count; + rmw_data->alive_count_change = liveliness_changed_status.alive_count_change; + rmw_data->not_alive_count_change = liveliness_changed_status.not_alive_count_change; + } + liveliness_changed_status_.alive_count_change = 0; + liveliness_changed_status_.not_alive_count_change = 0; + } + break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + { + auto rmw_data = static_cast(event_info); + if (deadline_changes_) { + rmw_data->total_count = requested_deadline_missed_status_.total_count; + rmw_data->total_count_change = requested_deadline_missed_status_.total_count_change; + deadline_changes_ = false; + } else { + eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_missed_status; + subscriber_info_->data_reader_->get_requested_deadline_missed_status( + requested_deadline_missed_status); + rmw_data->total_count = requested_deadline_missed_status.total_count; + rmw_data->total_count_change = requested_deadline_missed_status.total_count_change; + } + requested_deadline_missed_status_.total_count_change = 0; + } + break; + case RMW_EVENT_MESSAGE_LOST: + { + auto rmw_data = static_cast(event_info); + if (sample_lost_changes_) { + rmw_data->total_count = sample_lost_status_.total_count; + rmw_data->total_count_change = sample_lost_status_.total_count_change; + sample_lost_changes_ = false; + } else { + eprosima::fastdds::dds::SampleLostStatus sample_lost_status; + subscriber_info_->data_reader_->get_sample_lost_status(sample_lost_status); + rmw_data->total_count = sample_lost_status.total_count; + rmw_data->total_count_change = sample_lost_status.total_count_change; + } + sample_lost_status_.total_count_change = 0; + } + break; + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + { + auto rmw_data = static_cast(event_info); + if (incompatible_qos_changes_) { + rmw_data->total_count = incompatible_qos_status_.total_count; + rmw_data->total_count_change = incompatible_qos_status_.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + incompatible_qos_status_.last_policy_id); + incompatible_qos_changes_ = false; + } else { + eprosima::fastdds::dds::RequestedIncompatibleQosStatus + requested_qos_incompatible_qos_status; + subscriber_info_->data_reader_->get_requested_incompatible_qos_status( + requested_qos_incompatible_qos_status); + rmw_data->total_count = requested_qos_incompatible_qos_status.total_count; + rmw_data->total_count_change = requested_qos_incompatible_qos_status.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + requested_qos_incompatible_qos_status.last_policy_id); + } + incompatible_qos_status_.total_count_change = 0; + } + break; + default: + return false; + } + + event_guard[event_type].set_trigger_value(false); + return true; +} + +void SubListener::set_on_new_event_callback( + rmw_event_type_t event_type, + const void * user_data, + rmw_event_callback_t callback) +{ + std::unique_lock lock_mutex(on_new_event_m_); + + if (callback) { + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + { + subscriber_info_->data_reader_->get_liveliness_changed_status(liveliness_changed_status_); + callback( + user_data, liveliness_changed_status_.alive_count_change + + liveliness_changed_status_.not_alive_count_change); + liveliness_changed_status_.alive_count_change = 0; + liveliness_changed_status_.not_alive_count_change = 0; + } + break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + { + subscriber_info_->data_reader_->get_requested_deadline_missed_status( + requested_deadline_missed_status_); + callback( + user_data, + requested_deadline_missed_status_.total_count_change); + requested_deadline_missed_status_.total_count_change = 0; + } + break; + case RMW_EVENT_MESSAGE_LOST: + { + subscriber_info_->data_reader_->get_sample_lost_status(sample_lost_status_); + callback( + user_data, + sample_lost_status_.total_count_change); + sample_lost_status_.total_count_change = 0; + } + break; + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + { + subscriber_info_->data_reader_->get_requested_incompatible_qos_status( + incompatible_qos_status_); + callback( + user_data, + incompatible_qos_status_.total_count_change); + incompatible_qos_status_.total_count_change = 0; + } + break; + default: + break; + } + + user_data_[event_type] = user_data; + on_new_event_cb_[event_type] = callback; + + eprosima::fastdds::dds::StatusMask status_mask = + subscriber_info_->data_reader_->get_status_mask(); + status_mask |= rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type); + subscriber_info_->data_reader_->set_listener(this, status_mask); + } else { + eprosima::fastdds::dds::StatusMask status_mask = + subscriber_info_->data_reader_->get_status_mask(); + status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type); + subscriber_info_->data_reader_->set_listener(this, status_mask); + + user_data_[event_type] = nullptr; + on_new_event_cb_[event_type] = nullptr; + } +} + +void +SubListener::set_on_new_message_callback( + const void * user_data, + rmw_event_callback_t callback) +{ + std::unique_lock lock_mutex(on_new_message_m_); + + if (callback) { + auto unread_messages = get_unread_messages(); + + if (0 < unread_messages) { + callback(user_data, unread_messages); + } + + new_message_user_data_ = user_data; + on_new_message_cb_ = callback; + + eprosima::fastdds::dds::StatusMask status_mask = + subscriber_info_->data_reader_->get_status_mask(); + status_mask |= eprosima::fastdds::dds::StatusMask::data_available(); + subscriber_info_->data_reader_->set_listener(this, status_mask); + } else { + eprosima::fastdds::dds::StatusMask status_mask = + subscriber_info_->data_reader_->get_status_mask(); + status_mask &= ~eprosima::fastdds::dds::StatusMask::data_available(); + subscriber_info_->data_reader_->set_listener(this, status_mask); + + new_message_user_data_ = nullptr; + on_new_message_cb_ = nullptr; + } +} + void SubListener::on_requested_deadline_missed( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::RequestedDeadlineMissedStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to liveliness_lost_count_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::unique_lock lock_mutex(on_new_event_m_); // Assign absolute values requested_deadline_missed_status_.total_count = status.total_count; // Accumulate deltas requested_deadline_missed_status_.total_count_change += status.total_count_change; - deadline_changes_.store(true, std::memory_order_relaxed); - - std::unique_lock lock_mutex(on_new_event_m_); + deadline_changes_ = true; - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; + if (on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED]) { + on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED](user_data_[ + RMW_EVENT_REQUESTED_DEADLINE_MISSED], 1); } + + event_guard[RMW_EVENT_REQUESTED_DEADLINE_MISSED].set_trigger_value(true); } void SubListener::on_liveliness_changed( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::LivelinessChangedStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to liveliness_lost_count_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::unique_lock lock_mutex(on_new_event_m_); // Assign absolute values liveliness_changed_status_.alive_count = status.alive_count; @@ -70,44 +263,40 @@ void SubListener::on_liveliness_changed( liveliness_changed_status_.alive_count_change += status.alive_count_change; liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; - liveliness_changes_.store(true, std::memory_order_relaxed); - - std::unique_lock lock_mutex(on_new_event_m_); + liveliness_changes_ = true; - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; + if (on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED]) { + on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED](user_data_[RMW_EVENT_LIVELINESS_CHANGED], 1); } + + event_guard[RMW_EVENT_LIVELINESS_CHANGED].set_trigger_value(true); } void SubListener::on_sample_lost( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SampleLostStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to sample_lost_status_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::lock_guard lock_mutex(on_new_event_m_); // Assign absolute values sample_lost_status_.total_count = status.total_count; // Accumulate deltas sample_lost_status_.total_count_change += status.total_count_change; - sample_lost_changes_.store(true, std::memory_order_relaxed); + sample_lost_changes_ = true; + + if (on_new_event_cb_[RMW_EVENT_MESSAGE_LOST]) { + on_new_event_cb_[RMW_EVENT_MESSAGE_LOST](user_data_[RMW_EVENT_MESSAGE_LOST], 1); + } + + event_guard[RMW_EVENT_MESSAGE_LOST].set_trigger_value(true); } void SubListener::on_requested_incompatible_qos( - eprosima::fastdds::dds::DataReader * /* reader */, + eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::RequestedIncompatibleQosStatus & status) { - std::lock_guard lock(internalMutex_); - - // the change to incompatible_qos_status_ needs to be mutually exclusive with - // rmw_wait() which checks hasEvent() and decides if wait() needs to be called - ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + std::lock_guard lock_mutex(on_new_event_m_); // Assign absolute values incompatible_qos_status_.last_policy_id = status.last_policy_id; @@ -115,97 +304,12 @@ void SubListener::on_requested_incompatible_qos( // Accumulate deltas incompatible_qos_status_.total_count_change += status.total_count_change; - incompatible_qos_changes_.store(true, std::memory_order_relaxed); -} + incompatible_qos_changes_ = true; -bool SubListener::hasEvent(rmw_event_type_t event_type) const -{ - assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); - switch (event_type) { - case RMW_EVENT_LIVELINESS_CHANGED: - return liveliness_changes_.load(std::memory_order_relaxed); - case RMW_EVENT_REQUESTED_DEADLINE_MISSED: - return deadline_changes_.load(std::memory_order_relaxed); - case RMW_EVENT_MESSAGE_LOST: - return sample_lost_changes_.load(std::memory_order_relaxed); - case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: - return incompatible_qos_changes_.load(std::memory_order_relaxed); - default: - break; + if (on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE]) { + on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE](user_data_[ + RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE], 1); } - return false; -} -void SubListener::set_on_new_event_callback( - const void * user_data, - rmw_event_callback_t callback) -{ - std::unique_lock lock_mutex(on_new_event_m_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (unread_events_count_) { - callback(user_data, unread_events_count_); - unread_events_count_ = 0; - } - user_data_ = user_data; - on_new_event_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_event_cb_ = nullptr; - return; - } -} - -bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) -{ - assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); - std::lock_guard lock(internalMutex_); - switch (event_type) { - case RMW_EVENT_LIVELINESS_CHANGED: - { - auto rmw_data = static_cast(event_info); - rmw_data->alive_count = liveliness_changed_status_.alive_count; - rmw_data->not_alive_count = liveliness_changed_status_.not_alive_count; - rmw_data->alive_count_change = liveliness_changed_status_.alive_count_change; - rmw_data->not_alive_count_change = liveliness_changed_status_.not_alive_count_change; - liveliness_changed_status_.alive_count_change = 0; - liveliness_changed_status_.not_alive_count_change = 0; - liveliness_changes_.store(false, std::memory_order_relaxed); - } - break; - case RMW_EVENT_REQUESTED_DEADLINE_MISSED: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = requested_deadline_missed_status_.total_count; - rmw_data->total_count_change = requested_deadline_missed_status_.total_count_change; - requested_deadline_missed_status_.total_count_change = 0; - deadline_changes_.store(false, std::memory_order_relaxed); - } - break; - case RMW_EVENT_MESSAGE_LOST: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = sample_lost_status_.total_count; - rmw_data->total_count_change = sample_lost_status_.total_count_change; - sample_lost_status_.total_count_change = 0; - sample_lost_changes_.store(false, std::memory_order_relaxed); - } - break; - case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: - { - auto rmw_data = static_cast(event_info); - rmw_data->total_count = incompatible_qos_status_.total_count; - rmw_data->total_count_change = incompatible_qos_status_.total_count_change; - rmw_data->last_policy_kind = - rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( - incompatible_qos_status_.last_policy_id); - incompatible_qos_status_.total_count_change = 0; - incompatible_qos_changes_.store(false, std::memory_order_relaxed); - } - break; - default: - return false; - } - return true; + event_guard[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE].set_trigger_value(true); } diff --git a/rmw_fastrtps_shared_cpp/src/init_rmw_context_impl.cpp b/rmw_fastrtps_shared_cpp/src/init_rmw_context_impl.cpp index fc3b0db48..917a100a4 100644 --- a/rmw_fastrtps_shared_cpp/src/init_rmw_context_impl.cpp +++ b/rmw_fastrtps_shared_cpp/src/init_rmw_context_impl.cpp @@ -31,7 +31,8 @@ #include "rmw_fastrtps_shared_cpp/listener_thread.hpp" rmw_ret_t -rmw_fastrtps_shared_cpp::decrement_context_impl_ref_count(rmw_context_t * context) +rmw_fastrtps_shared_cpp::decrement_context_impl_ref_count( + rmw_context_t * context) { std::lock_guard guard(context->impl->mutex); diff --git a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp index b659dd4b3..cde9cae51 100644 --- a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp +++ b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp @@ -39,10 +39,12 @@ using rmw_dds_common::operator<<; static void -node_listener(rmw_context_t * context); +node_listener( + rmw_context_t * context); rmw_ret_t -rmw_fastrtps_shared_cpp::run_listener_thread(rmw_context_t * context) +rmw_fastrtps_shared_cpp::run_listener_thread( + rmw_context_t * context) { RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); @@ -76,7 +78,8 @@ rmw_fastrtps_shared_cpp::run_listener_thread(rmw_context_t * context) } rmw_ret_t -rmw_fastrtps_shared_cpp::join_listener_thread(rmw_context_t * context) +rmw_fastrtps_shared_cpp::join_listener_thread( + rmw_context_t * context) { auto common_context = static_cast(context->impl->common); common_context->thread_is_running.exchange(false); @@ -102,22 +105,29 @@ rmw_fastrtps_shared_cpp::join_listener_thread(rmw_context_t * context) return RMW_RET_OK; } -#define TERMINATE_THREAD(msg) \ +#define LOG_THREAD_FATAL_ERROR(msg) \ { \ RCUTILS_SAFE_FWRITE_TO_STDERR( \ RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \ RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY(msg) \ ": ros discovery info listener thread will shutdown ...\n"); \ - break; \ } void -node_listener(rmw_context_t * context) +node_listener( + rmw_context_t * context) { assert(nullptr != context); assert(nullptr != context->impl); assert(nullptr != context->impl->common); auto common_context = static_cast(context->impl->common); + // number of conditions of a subscription is 2 + rmw_wait_set_t * wait_set = rmw_fastrtps_shared_cpp::__rmw_create_wait_set( + context->implementation_identifier, context, 2); + if (nullptr == wait_set) { + LOG_THREAD_FATAL_ERROR("failed to create waitset"); + return; + } while (common_context->thread_is_running.load()) { assert(nullptr != common_context->sub); assert(nullptr != common_context->sub->data); @@ -129,12 +139,6 @@ node_listener(rmw_context_t * context) subscriptions.subscribers = subscriptions_buffer; guard_conditions.guard_condition_count = 1; guard_conditions.guard_conditions = guard_conditions_buffer; - // number of conditions of a subscription is 2 - rmw_wait_set_t * wait_set = rmw_fastrtps_shared_cpp::__rmw_create_wait_set( - context->implementation_identifier, context, 2); - if (nullptr == wait_set) { - TERMINATE_THREAD("failed to create wait set"); - } if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_wait( context->implementation_identifier, &subscriptions, @@ -145,36 +149,41 @@ node_listener(rmw_context_t * context) wait_set, nullptr)) { - TERMINATE_THREAD("rmw_wait failed"); - } - if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_destroy_wait_set( - context->implementation_identifier, wait_set)) - { - TERMINATE_THREAD("failed to destroy wait set"); + LOG_THREAD_FATAL_ERROR("rmw_wait failed"); + break; } if (subscriptions_buffer[0]) { rmw_dds_common::msg::ParticipantEntitiesInfo msg; - bool taken; - if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_take( - context->implementation_identifier, - common_context->sub, - static_cast(&msg), - &taken, - nullptr)) - { - TERMINATE_THREAD("__rmw_take failed"); - } - if (taken) { - if (std::memcmp( - reinterpret_cast(common_context->gid.data), - reinterpret_cast(&msg.gid.data), - RMW_GID_STORAGE_SIZE) == 0) + bool taken = true; + + while (taken) { + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_take( + context->implementation_identifier, + common_context->sub, + static_cast(&msg), + &taken, + nullptr)) { - // ignore local messages - continue; + LOG_THREAD_FATAL_ERROR("__rmw_take failed"); + break; + } + if (taken) { + if (std::memcmp( + reinterpret_cast(common_context->gid.data), + reinterpret_cast(&msg.gid.data), + RMW_GID_STORAGE_SIZE) == 0) + { + // ignore local messages + continue; + } + common_context->graph_cache.update_participant_entities(msg); } - common_context->graph_cache.update_participant_entities(msg); } } } + if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_destroy_wait_set( + context->implementation_identifier, wait_set)) + { + LOG_THREAD_FATAL_ERROR("failed to destroy waitset"); + } } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index dfeb1747c..fe1b74975 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -67,7 +67,8 @@ __rmw_destroy_client( } auto show_previous_error = - [&final_ret]() { + [&final_ret]() + { if (RMW_RET_OK != final_ret) { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "'\n"); @@ -161,4 +162,5 @@ __rmw_client_set_on_new_response_callback( callback); return RMW_RET_OK; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index ccc53753c..b9fa3bb51 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -36,7 +36,41 @@ namespace rmw_fastrtps_shared_cpp namespace internal { -bool is_event_supported(rmw_event_type_t event_type) +eprosima::fastdds::dds::StatusMask rmw_event_to_dds_statusmask( + const rmw_event_type_t event_type) +{ + eprosima::fastdds::dds::StatusMask ret_statusmask = eprosima::fastdds::dds::StatusMask::none(); + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + ret_statusmask = eprosima::fastdds::dds::StatusMask::liveliness_changed(); + break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + ret_statusmask = eprosima::fastdds::dds::StatusMask::requested_deadline_missed(); + break; + case RMW_EVENT_LIVELINESS_LOST: + ret_statusmask = eprosima::fastdds::dds::StatusMask::liveliness_lost(); + break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + ret_statusmask = eprosima::fastdds::dds::StatusMask::offered_deadline_missed(); + break; + case RMW_EVENT_MESSAGE_LOST: + ret_statusmask = eprosima::fastdds::dds::StatusMask::sample_lost(); + break; + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + ret_statusmask = eprosima::fastdds::dds::StatusMask::offered_incompatible_qos(); + break; + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + ret_statusmask = eprosima::fastdds::dds::StatusMask::requested_incompatible_qos(); + break; + default: + break; + } + + return ret_statusmask; +} + +bool is_event_supported( + rmw_event_type_t event_type) { return g_rmw_event_type_set.count(event_type) == 1; } @@ -91,6 +125,11 @@ __rmw_init_event( rmw_event->implementation_identifier = topic_endpoint_impl_identifier; rmw_event->data = data; rmw_event->event_type = event_type; + CustomEventInfo * event = static_cast(rmw_event->data); + eprosima::fastdds::dds::StatusMask status_mask = + event->get_listener()->get_statuscondition().get_enabled_statuses(); + status_mask |= rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type); + event->get_listener()->get_statuscondition().set_enabled_statuses(status_mask); return RMW_RET_OK; } @@ -102,7 +141,8 @@ __rmw_event_set_callback( const void * user_data) { auto custom_event_info = static_cast(rmw_event->data); - custom_event_info->getListener()->set_on_new_event_callback( + custom_event_info->get_listener()->set_on_new_event_callback( + rmw_event->event_type, user_data, callback); return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp index cdcac523c..8aeedc2d0 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp @@ -17,7 +17,7 @@ #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "types/guard_condition.hpp" +#include "fastdds/dds/core/condition/GuardCondition.hpp" namespace rmw_fastrtps_shared_cpp { @@ -28,7 +28,7 @@ __rmw_create_guard_condition(const char * identifier) rmw_guard_condition_t * guard_condition_handle = new rmw_guard_condition_t; guard_condition_handle->implementation_identifier = identifier; - guard_condition_handle->data = new GuardCondition(); + guard_condition_handle->data = new eprosima::fastdds::dds::GuardCondition(); return guard_condition_handle; } @@ -38,7 +38,7 @@ __rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition) rmw_ret_t ret = RMW_RET_ERROR; if (guard_condition) { - delete static_cast(guard_condition->data); + delete static_cast(guard_condition->data); delete guard_condition; ret = RMW_RET_OK; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp index 73ce6c534..ef4e6b41b 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp @@ -18,6 +18,7 @@ #include "fastcdr/FastBuffer.h" #include "fastdds/rtps/common/WriteParams.h" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -91,31 +92,61 @@ __rmw_take_request( auto info = static_cast(service->data); assert(info); - CustomServiceRequest request = info->listener_->getRequest(); + CustomServiceRequest request; + + request.buffer_ = new eprosima::fastcdr::FastBuffer(); if (request.buffer_ != nullptr) { - auto raw_type_support = dynamic_cast( - info->response_type_support_.get()); - eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - if (raw_type_support->deserializeROSmessage( - deser, ros_request, info->request_type_support_impl_)) - { - // Get header - rmw_fastrtps_shared_cpp::copy_from_fastrtps_guid_to_byte_array( - request.sample_identity_.writer_guid(), - request_header->request_id.writer_guid); - request_header->request_id.sequence_number = - ((int64_t)request.sample_identity_.sequence_number().high) << - 32 | request.sample_identity_.sequence_number().low; - request_header->source_timestamp = request.sample_info_.source_timestamp.to_ns(); - request_header->received_timestamp = request.sample_info_.source_timestamp.to_ns(); - *taken = true; + rmw_fastrtps_shared_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = request.buffer_; + data.impl = nullptr; // not used when is_cdr_buffer is true + + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + if (ReturnCode_t::RETCODE_OK == info->request_reader_->take(data_values, info_seq, 1)) { + if (info_seq[0].valid_data) { + request.sample_identity_ = info_seq[0].sample_identity; + // Use response subscriber guid (on related_sample_identity) when present. + const eprosima::fastrtps::rtps::GUID_t & reader_guid = + info_seq[0].related_sample_identity.writer_guid(); + if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown()) { + request.sample_identity_.writer_guid() = reader_guid; + } + + // Save both guids in the clients_endpoints map + const eprosima::fastrtps::rtps::GUID_t & writer_guid = + info_seq[0].sample_identity.writer_guid(); + info->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid); + + auto raw_type_support = dynamic_cast( + info->response_type_support_.get()); + eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (raw_type_support->deserializeROSmessage( + deser, ros_request, info->request_type_support_impl_)) + { + // Get header + rmw_fastrtps_shared_cpp::copy_from_fastrtps_guid_to_byte_array( + request.sample_identity_.writer_guid(), + request_header->request_id.writer_guid); + request_header->request_id.sequence_number = + ((int64_t)request.sample_identity_.sequence_number().high) << + 32 | request.sample_identity_.sequence_number().low; + request_header->source_timestamp = info_seq[0].source_timestamp.to_ns(); + request_header->received_timestamp = info_seq[0].source_timestamp.to_ns(); + *taken = true; + } + } } delete request.buffer_; } + return RMW_RET_OK; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index 1766e4bbd..0166b7cb6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -17,6 +17,7 @@ #include "fastcdr/Cdr.h" #include "fastdds/rtps/common/WriteParams.h" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -54,23 +55,42 @@ __rmw_take_response( CustomClientResponse response; - if (info->listener_->getResponse(response)) { - auto raw_type_support = dynamic_cast( - info->response_type_support_.get()); - eprosima::fastcdr::Cdr deser( - *response.buffer_, - eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - if (raw_type_support->deserializeROSmessage( - deser, ros_response, info->response_type_support_impl_)) - { - request_header->source_timestamp = response.sample_info_.source_timestamp.to_ns(); - request_header->received_timestamp = response.sample_info_.reception_timestamp.to_ns(); - request_header->request_id.sequence_number = - ((int64_t)response.sample_identity_.sequence_number().high) << - 32 | response.sample_identity_.sequence_number().low; - - *taken = true; + // Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19 + response.buffer_.reset(new eprosima::fastcdr::FastBuffer()); + rmw_fastrtps_shared_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = response.buffer_.get(); + data.impl = nullptr; // not used when is_cdr_buffer is true + + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + if (ReturnCode_t::RETCODE_OK == info->response_reader_->take(data_values, info_seq, 1)) { + if (info_seq[0].valid_data) { + response.sample_identity_ = info_seq[0].related_sample_identity; + + if (response.sample_identity_.writer_guid() == info->reader_guid_ || + response.sample_identity_.writer_guid() == info->writer_guid_) + { + auto raw_type_support = dynamic_cast( + info->response_type_support_.get()); + eprosima::fastcdr::Cdr deser( + *response.buffer_, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + if (raw_type_support->deserializeROSmessage( + deser, ros_response, info->response_type_support_impl_)) + { + request_header->source_timestamp = info_seq[0].source_timestamp.to_ns(); + request_header->received_timestamp = info_seq[0].reception_timestamp.to_ns(); + request_header->request_id.sequence_number = + ((int64_t)response.sample_identity_.sequence_number().high) << + 32 | response.sample_identity_.sequence_number().low; + + *taken = true; + } + } } } @@ -143,4 +163,5 @@ __rmw_send_response( return returnedValue; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index 92bd790be..be6d2ad08 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -79,7 +79,8 @@ __rmw_destroy_service( } auto show_previous_error = - [&final_ret]() { + [&final_ret]() + { if (RMW_RET_OK != final_ret) { RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "'\n"); @@ -108,6 +109,7 @@ __rmw_destroy_service( // Delete DataReader listener if (nullptr != info->listener_) { delete info->listener_; + info->listener_ = nullptr; } // Delete DataWriter @@ -122,6 +124,7 @@ __rmw_destroy_service( // Delete DataWriter listener if (nullptr != info->pub_listener_) { delete info->pub_listener_; + info->pub_listener_ = nullptr; } // Delete topics and unregister types @@ -135,7 +138,7 @@ __rmw_destroy_service( rmw_free(const_cast(service->service_name)); rmw_service_free(service); - RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return final_ret; } @@ -173,4 +176,5 @@ __rmw_service_set_on_new_request_callback( callback); return RMW_RET_OK; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index a521b1a69..d6df6d9dc 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -189,9 +189,15 @@ __rmw_subscription_set_content_filter( RMW_SET_ERROR_MSG("create_datareader() could not create data reader"); return RMW_RET_ERROR; } + + // Initialize DataReader's StatusCondition to be notified when new data is available + info->data_reader_->get_statuscondition().set_enabled_statuses( + eprosima::fastdds::dds::StatusMask::data_available()); + // lambda to delete datareader auto cleanup_datareader = rcpputils::make_scope_exit( - [subscriber, info]() { + [subscriber, info]() + { subscriber->delete_datareader(info->data_reader_); }); @@ -272,4 +278,5 @@ __rmw_subscription_set_on_new_message_callback( callback); return RMW_RET_OK; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index e4e3dfb6c..237ca98f3 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -21,6 +21,7 @@ #include "rmw/rmw.h" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "fastrtps/utils/collections/ResourceLimitedVector.hpp" @@ -36,6 +37,8 @@ #include "tracetools/tracetools.h" +#include "rcpputils/scope_exit.hpp" + namespace rmw_fastrtps_shared_cpp { @@ -83,36 +86,39 @@ _take( auto info = static_cast(subscription->data); RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR); - eprosima::fastdds::dds::SampleInfo sinfo; - rmw_fastrtps_shared_cpp::SerializedData data; - data.is_cdr_buffer = false; data.data = ros_message; data.impl = info->type_support_impl_; - while (0 < info->data_reader_->get_unread_count()) { - if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) { - // Update hasData from listener - info->listener_->update_has_data(info->data_reader_); - - if (subscription->options.ignore_local_publications) { - auto sample_writer_guid = - eprosima::fastrtps::rtps::iHandle2GUID(sinfo.publication_handle); - - if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) { - // This is a local publication. Ignore it - continue; - } + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) { + auto reset = rcpputils::make_scope_exit( + [&]() + { + data_values.length(0); + info_seq.length(0); + }); + + if (subscription->options.ignore_local_publications) { + auto sample_writer_guid = + eprosima::fastrtps::rtps::iHandle2GUID(info_seq[0].publication_handle); + + if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) { + // This is a local publication. Ignore it + continue; } + } - if (sinfo.valid_data) { - if (message_info) { - _assign_message_info(identifier, message_info, &sinfo); - } - *taken = true; - break; + if (info_seq[0].valid_data) { + if (message_info) { + _assign_message_info(identifier, message_info, &info_seq[0]); } + *taken = true; + break; } } @@ -147,14 +153,6 @@ _take_sequence( auto info = static_cast(subscription->data); RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR); - // Limit the upper bound of reads to the number unread at the beginning. - // This prevents any samples that are added after the beginning of the - // _take_sequence call from being read. - auto unread_count = info->data_reader_->get_unread_count(); - if (unread_count < count) { - count = unread_count; - } - for (size_t ii = 0; ii < count; ++ii) { taken_flag = false; ret = _take( @@ -196,7 +194,7 @@ __rmw_take_event( return RMW_RET_ERROR); auto event = static_cast(event_handle->data); - if (event->getListener()->takeNextEvent(event_handle->event_type, event_info)) { + if (event->get_listener()->take_event(event_handle->event_type, event_info)) { *taken = true; return RMW_RET_OK; } @@ -318,25 +316,34 @@ _take_serialized_message( data.data = &buffer; data.impl = nullptr; // not used when is_cdr_buffer is true - if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) { - // Update hasData from listener - info->listener_->update_has_data(info->data_reader_); + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) { + auto reset = rcpputils::make_scope_exit( + [&]() + { + data_values.length(0); + info_seq.length(0); + }); - if (sinfo.valid_data) { + if (info_seq[0].valid_data) { auto buffer_size = static_cast(buffer.getBufferSize()); if (serialized_message->buffer_capacity < buffer_size) { auto ret = rmw_serialized_message_resize(serialized_message, buffer_size); if (ret != RMW_RET_OK) { - return ret; // Error message already set + return ret; // Error message already set } } serialized_message->buffer_length = buffer_size; memcpy(serialized_message->buffer, buffer.getBuffer(), serialized_message->buffer_length); if (message_info) { - _assign_message_info(identifier, message_info, &sinfo); + _assign_message_info(identifier, message_info, &info_seq[0]); } *taken = true; + break; } } @@ -395,7 +402,8 @@ struct GenericSequence : public eprosima::fastdds::dds::LoanableCollection { GenericSequence() = default; - void resize(size_type /*new_length*/) override + void resize( + size_type /*new_length*/) override { // This kind of collection should only be used with loans throw std::bad_alloc(); @@ -410,18 +418,21 @@ struct LoanManager eprosima::fastdds::dds::SampleInfoSeq info_seq{}; }; - explicit LoanManager(const eprosima::fastrtps::ResourceLimitedContainerConfig & items_cfg) + explicit LoanManager( + const eprosima::fastrtps::ResourceLimitedContainerConfig & items_cfg) : items(items_cfg) { } - void add_item(std::unique_ptr item) + void add_item( + std::unique_ptr item) { std::lock_guard guard(mtx); items.push_back(std::move(item)); } - std::unique_ptr erase_item(void * loaned_message) + std::unique_ptr erase_item( + void * loaned_message) { std::unique_ptr ret{nullptr}; @@ -488,7 +499,6 @@ __rmw_take_loaned_message_internal( } *loaned_message = item->data_seq.buffer()[0]; *taken = true; - info->listener_->update_has_data(info->data_reader_); info->loan_manager_->add_item(std::move(item)); @@ -501,7 +511,6 @@ __rmw_take_loaned_message_internal( // No data available, return loan information. *taken = false; - info->listener_->update_has_data(info->data_reader_); return RMW_RET_OK; } @@ -536,4 +545,5 @@ __rmw_return_loaned_message_from_subscription( RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription"); return RMW_RET_ERROR; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_trigger_guard_condition.cpp b/rmw_fastrtps_shared_cpp/src/rmw_trigger_guard_condition.cpp index ac3c72013..0f3494ff8 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_trigger_guard_condition.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_trigger_guard_condition.cpp @@ -18,7 +18,8 @@ #include "rmw/rmw.h" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "types/guard_condition.hpp" + +#include "fastdds/dds/core/condition/GuardCondition.hpp" namespace rmw_fastrtps_shared_cpp { @@ -34,8 +35,9 @@ __rmw_trigger_guard_condition( return RMW_RET_ERROR; } - auto guard_condition = static_cast(guard_condition_handle->data); - guard_condition->trigger(); + auto guard_condition = + static_cast(guard_condition_handle->data); + guard_condition->set_trigger_value(true); return RMW_RET_OK; } } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp index 4275e4ffc..ee541700a 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp @@ -21,70 +21,11 @@ #include "rmw_fastrtps_shared_cpp/custom_service_info.hpp" #include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "types/custom_wait_set_info.hpp" -#include "types/guard_condition.hpp" - -// helper function for wait -bool -check_wait_set_for_data( - const rmw_subscriptions_t * subscriptions, - const rmw_guard_conditions_t * guard_conditions, - const rmw_services_t * services, - const rmw_clients_t * clients, - const rmw_events_t * events) -{ - if (subscriptions) { - for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { - void * data = subscriptions->subscribers[i]; - auto custom_subscriber_info = static_cast(data); - // Short circuiting out of this function is possible - if (custom_subscriber_info && custom_subscriber_info->listener_->hasData()) { - return true; - } - } - } - - if (clients) { - for (size_t i = 0; i < clients->client_count; ++i) { - void * data = clients->clients[i]; - auto custom_client_info = static_cast(data); - if (custom_client_info && custom_client_info->listener_->hasData()) { - return true; - } - } - } - - if (services) { - for (size_t i = 0; i < services->service_count; ++i) { - void * data = services->services[i]; - auto custom_service_info = static_cast(data); - if (custom_service_info && custom_service_info->listener_->hasData()) { - return true; - } - } - } +#include "types/event_types.hpp" - if (events) { - for (size_t i = 0; i < events->event_count; ++i) { - auto event = static_cast(events->events[i]); - auto custom_event_info = static_cast(event->data); - if (custom_event_info->getListener()->hasEvent(event->event_type)) { - return true; - } - } - } - - if (guard_conditions) { - for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { - void * data = guard_conditions->guard_conditions[i]; - auto guard_condition = static_cast(data); - if (guard_condition && guard_condition->hasTriggered()) { - return true; - } - } - } - return false; -} +#include "fastdds/dds/core/condition/WaitSet.hpp" +#include "fastdds/dds/core/condition/GuardCondition.hpp" +#include "fastdds/dds/subscriber/DataReader.hpp" namespace rmw_fastrtps_shared_cpp { @@ -114,15 +55,19 @@ __rmw_wait( // error. // - Heap is corrupt. // In all three cases, it's better if this crashes soon enough. - auto wait_set_info = static_cast(wait_set->data); - std::mutex * conditionMutex = &wait_set_info->condition_mutex; - std::condition_variable * conditionVariable = &wait_set_info->condition; + auto fastdds_wait_set = static_cast(wait_set->data); + bool skip_wait = false; if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { void * data = subscriptions->subscribers[i]; auto custom_subscriber_info = static_cast(data); - custom_subscriber_info->listener_->attachCondition(conditionMutex, conditionVariable); + eprosima::fastdds::dds::SampleInfo sample_info; + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)); + fastdds_wait_set->attach_condition( + custom_subscriber_info->data_reader_->get_statuscondition()); } } @@ -130,7 +75,12 @@ __rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { void * data = clients->clients[i]; auto custom_client_info = static_cast(data); - custom_client_info->listener_->attachCondition(conditionMutex, conditionVariable); + eprosima::fastdds::dds::SampleInfo sample_info; + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_client_info->response_reader_->get_first_untaken_info(&sample_info)); + fastdds_wait_set->attach_condition( + custom_client_info->response_reader_->get_statuscondition()); } } @@ -138,7 +88,12 @@ __rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { void * data = services->services[i]; auto custom_service_info = static_cast(data); - custom_service_info->listener_->attachCondition(conditionMutex, conditionVariable); + eprosima::fastdds::dds::SampleInfo sample_info; + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_service_info->request_reader_->get_first_untaken_info(&sample_info)); + fastdds_wait_set->attach_condition( + custom_service_info->request_reader_->get_statuscondition()); } } @@ -146,57 +101,45 @@ __rmw_wait( for (size_t i = 0; i < events->event_count; ++i) { auto event = static_cast(events->events[i]); auto custom_event_info = static_cast(event->data); - custom_event_info->getListener()->attachCondition(conditionMutex, conditionVariable); + fastdds_wait_set->attach_condition( + custom_event_info->get_listener()->get_statuscondition()); + fastdds_wait_set->attach_condition( + custom_event_info->get_listener()->get_event_guard(event->event_type)); } } if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { void * data = guard_conditions->guard_conditions[i]; - auto guard_condition = static_cast(data); - guard_condition->attachCondition(conditionMutex, conditionVariable); + auto guard_condition = static_cast(data); + fastdds_wait_set->attach_condition(*guard_condition); } } - // This mutex prevents any of the listeners - // to change the internal state and notify the condition - // between the call to hasData() / hasTriggered() and wait() - // otherwise the decision to wait might be incorrect - std::unique_lock lock(*conditionMutex); - - bool hasData = check_wait_set_for_data( - subscriptions, guard_conditions, services, clients, events); - auto predicate = [subscriptions, guard_conditions, services, clients, events]() { - return check_wait_set_for_data(subscriptions, guard_conditions, services, clients, events); - }; - - bool timeout = false; - if (!hasData) { - if (!wait_timeout) { - conditionVariable->wait(lock, predicate); - } else if (wait_timeout->sec > 0 || wait_timeout->nsec > 0) { - auto n = std::chrono::duration_cast( - std::chrono::seconds(wait_timeout->sec)); - n += std::chrono::nanoseconds(wait_timeout->nsec); - timeout = !conditionVariable->wait_for(lock, n, predicate); - } else { - timeout = true; - } + eprosima::fastdds::dds::ConditionSeq triggered_coditions; + Duration_t timeout{0, 0}; + if (!skip_wait) { + timeout = (wait_timeout) ? + Duration_t{static_cast(wait_timeout->sec), + static_cast(wait_timeout->nsec)} : eprosima::fastrtps::c_TimeInfinite; } - // Unlock the condition variable mutex to prevent deadlocks that can occur if - // a listener triggers while the condition variable is being detached. - // Listeners will no longer be prevented from changing their internal state, - // but that should not cause issues (if a listener has data / has triggered - // after we check, it will be caught on the next call to this function). - lock.unlock(); + ReturnCode_t ret_code = fastdds_wait_set->wait( + triggered_coditions, + timeout + ); if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { void * data = subscriptions->subscribers[i]; auto custom_subscriber_info = static_cast(data); - custom_subscriber_info->listener_->detachCondition(); - if (!custom_subscriber_info->listener_->hasData()) { + eprosima::fastdds::dds::StatusCondition & status_condition = + custom_subscriber_info->data_reader_->get_statuscondition(); + fastdds_wait_set->detach_condition(status_condition); + eprosima::fastdds::dds::SampleInfo sample_info; + if (ReturnCode_t::RETCODE_OK != + custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)) + { subscriptions->subscribers[i] = 0; } } @@ -206,8 +149,13 @@ __rmw_wait( for (size_t i = 0; i < clients->client_count; ++i) { void * data = clients->clients[i]; auto custom_client_info = static_cast(data); - custom_client_info->listener_->detachCondition(); - if (!custom_client_info->listener_->hasData()) { + eprosima::fastdds::dds::StatusCondition & status_condition = + custom_client_info->response_reader_->get_statuscondition(); + fastdds_wait_set->detach_condition(status_condition); + eprosima::fastdds::dds::SampleInfo sample_info; + if (ReturnCode_t::RETCODE_OK != + custom_client_info->response_reader_->get_first_untaken_info(&sample_info)) + { clients->clients[i] = 0; } } @@ -217,8 +165,13 @@ __rmw_wait( for (size_t i = 0; i < services->service_count; ++i) { void * data = services->services[i]; auto custom_service_info = static_cast(data); - custom_service_info->listener_->detachCondition(); - if (!custom_service_info->listener_->hasData()) { + eprosima::fastdds::dds::StatusCondition & status_condition = + custom_service_info->request_reader_->get_statuscondition(); + fastdds_wait_set->detach_condition(status_condition); + eprosima::fastdds::dds::SampleInfo sample_info; + if (ReturnCode_t::RETCODE_OK != + custom_service_info->request_reader_->get_first_untaken_info(&sample_info)) + { services->services[i] = 0; } } @@ -228,9 +181,33 @@ __rmw_wait( for (size_t i = 0; i < events->event_count; ++i) { auto event = static_cast(events->events[i]); auto custom_event_info = static_cast(event->data); - custom_event_info->getListener()->detachCondition(); - if (!custom_event_info->getListener()->hasEvent(event->event_type)) { - events->events[i] = nullptr; + fastdds_wait_set->detach_condition(custom_event_info->get_listener()->get_statuscondition()); + eprosima::fastdds::dds::StatusCondition & status_condition = + custom_event_info->get_listener()->get_statuscondition(); + fastdds_wait_set->detach_condition(status_condition); + eprosima::fastdds::dds::GuardCondition & guard_condition = + custom_event_info->get_listener()->get_event_guard(event->event_type); + bool active = false; + + if (ReturnCode_t::RETCODE_OK == ret_code) { + eprosima::fastdds::dds::Entity * entity = status_condition.get_entity(); + eprosima::fastdds::dds::StatusMask changed_statuses = entity->get_status_changes(); + if (changed_statuses.is_active( + rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask( + event->event_type))) + { + active = true; + } + + if (guard_condition.get_trigger_value()) { + active = true; + guard_condition.set_trigger_value(false); + } + } + + + if (!active) { + events->events[i] = 0; } } } @@ -238,14 +215,16 @@ __rmw_wait( if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { void * data = guard_conditions->guard_conditions[i]; - auto guard_condition = static_cast(data); - guard_condition->detachCondition(); - if (!guard_condition->getHasTriggered()) { + auto condition = static_cast(data); + fastdds_wait_set->detach_condition(*condition); + if (!condition->get_trigger_value()) { guard_conditions->guard_conditions[i] = 0; } + condition->set_trigger_value(false); } } - return timeout ? RMW_RET_TIMEOUT : RMW_RET_OK; + return (skip_wait || ReturnCode_t::RETCODE_OK == ret_code) ? RMW_RET_OK : RMW_RET_TIMEOUT; } + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait_set.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait_set.cpp index c29137fe2..b706a5006 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait_set.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait_set.cpp @@ -21,7 +21,7 @@ #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "types/custom_wait_set_info.hpp" +#include "fastdds/dds/core/condition/WaitSet.hpp" namespace rmw_fastrtps_shared_cpp { @@ -40,26 +40,26 @@ __rmw_create_wait_set(const char * identifier, rmw_context_t * context, size_t m (void)max_conditions; // From here onward, error results in unrolling in the goto fail block. - CustomWaitsetInfo * wait_set_info = nullptr; + eprosima::fastdds::dds::WaitSet * fastdds_wait_set = nullptr; rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); if (!wait_set) { RMW_SET_ERROR_MSG("failed to allocate wait set"); goto fail; } wait_set->implementation_identifier = identifier; - wait_set->data = rmw_allocate(sizeof(CustomWaitsetInfo)); + wait_set->data = rmw_allocate(sizeof(eprosima::fastdds::dds::WaitSet)); if (!wait_set->data) { RMW_SET_ERROR_MSG("failed to allocate wait set info"); goto fail; } // This should default-construct the fields of CustomWaitsetInfo RMW_TRY_PLACEMENT_NEW( - wait_set_info, + fastdds_wait_set, wait_set->data, goto fail, // cppcheck-suppress syntaxError - CustomWaitsetInfo, ); - (void) wait_set_info; + eprosima::fastdds::dds::WaitSet, ); + (void) fastdds_wait_set; return wait_set; @@ -89,12 +89,13 @@ __rmw_destroy_wait_set(const char * identifier, rmw_wait_set_t * wait_set) // error. // - Heap is corrupt. // In all three cases, it's better if this crashes soon enough. - auto wait_set_info = static_cast(wait_set->data); + auto fastdds_wait_set = static_cast(wait_set->data); if (wait_set->data) { - if (wait_set_info) { + if (fastdds_wait_set) { RMW_TRY_DESTRUCTOR( - wait_set_info->~CustomWaitsetInfo(), wait_set_info, result = RMW_RET_ERROR) + fastdds_wait_set->eprosima::fastdds::dds::WaitSet::~WaitSet(), fastdds_wait_set, + result = RMW_RET_ERROR) } rmw_free(wait_set->data); } diff --git a/rmw_fastrtps_shared_cpp/src/types/custom_wait_set_info.hpp b/rmw_fastrtps_shared_cpp/src/types/custom_wait_set_info.hpp deleted file mode 100644 index 1ebdd5716..000000000 --- a/rmw_fastrtps_shared_cpp/src/types/custom_wait_set_info.hpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef TYPES__CUSTOM_WAIT_SET_INFO_HPP_ -#define TYPES__CUSTOM_WAIT_SET_INFO_HPP_ - -#include -#include - -typedef struct CustomWaitsetInfo -{ - std::condition_variable condition; - std::mutex condition_mutex; -} CustomWaitsetInfo; - -#endif // TYPES__CUSTOM_WAIT_SET_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/types/event_types.hpp b/rmw_fastrtps_shared_cpp/src/types/event_types.hpp index 8eb9d68da..47e10f905 100644 --- a/rmw_fastrtps_shared_cpp/src/types/event_types.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/event_types.hpp @@ -17,6 +17,8 @@ #include "rmw/event.h" +#include "fastdds/dds/core/status/StatusMask.hpp" + namespace rmw_fastrtps_shared_cpp { namespace internal @@ -24,6 +26,9 @@ namespace internal bool is_event_supported(rmw_event_type_t event_type); +eprosima::fastdds::dds::StatusMask rmw_event_to_dds_statusmask( + const rmw_event_type_t event_type); + } // namespace internal } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp deleted file mode 100644 index ab5076522..000000000 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef TYPES__GUARD_CONDITION_HPP_ -#define TYPES__GUARD_CONDITION_HPP_ - -#include -#include -#include -#include -#include -#include - -#include "rcpputils/thread_safety_annotations.hpp" - -class GuardCondition -{ -public: - GuardCondition() - : hasTriggered_(false), - conditionMutex_(nullptr), conditionVariable_(nullptr) {} - - void - trigger() - { - std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - // the change to hasTriggered_ needs to be mutually exclusive with - // rmw_wait() which checks hasTriggered() and decides if wait() needs to - // be called - hasTriggered_ = true; - clock.unlock(); - conditionVariable_->notify_one(); - } else { - hasTriggered_ = true; - } - } - - void - attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = conditionMutex; - conditionVariable_ = conditionVariable; - } - - void - detachCondition() - { - std::lock_guard lock(internalMutex_); - conditionMutex_ = nullptr; - conditionVariable_ = nullptr; - } - - bool - hasTriggered() - { - return hasTriggered_; - } - - bool - getHasTriggered() - { - return hasTriggered_.exchange(false); - } - -private: - std::mutex internalMutex_; - std::atomic_bool hasTriggered_; - std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); -}; - -#endif // TYPES__GUARD_CONDITION_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/utils.cpp b/rmw_fastrtps_shared_cpp/src/utils.cpp index f652abed4..37662cf01 100644 --- a/rmw_fastrtps_shared_cpp/src/utils.cpp +++ b/rmw_fastrtps_shared_cpp/src/utils.cpp @@ -195,7 +195,8 @@ create_datareader( *data_reader = subscriber->create_datareader( des_topic, updated_qos, - listener); + listener, + eprosima::fastdds::dds::StatusMask::subscription_matched()); if (!data_reader && (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED == subscription_options->require_unique_network_flow_endpoints)) @@ -203,7 +204,8 @@ create_datareader( *data_reader = subscriber->create_datareader( des_topic, datareader_qos, - listener); + listener, + eprosima::fastdds::dds::StatusMask::subscription_matched()); } return true; }