diff --git a/include/fastrtps/rtps/writer/LivelinessManager.h b/include/fastrtps/rtps/writer/LivelinessManager.h index 3ece943e43b..0eb19dbd5e7 100644 --- a/include/fastrtps/rtps/writer/LivelinessManager.h +++ b/include/fastrtps/rtps/writer/LivelinessManager.h @@ -151,18 +151,6 @@ class LivelinessManager //! A method called if the timer expires void timer_expired(); - //! A boolean indicating whether we are managing writers with automatic liveliness - bool manage_automatic_; - - //! A vector of liveliness data - ResourceLimitedVector writers_; - - //! A timed callback expiring when a writer (the timer owner) loses its liveliness - TimedCallback timer_; - - //! The timer owner, i.e. the writer which is next due to lose its liveliness - LivelinessData* timer_owner_; - //! A callback to inform outside classes that a writer lost its liveliness std::function liveliness_recovered_callback_; + //! A boolean indicating whether we are managing writers with automatic liveliness + bool manage_automatic_; + + //! A vector of liveliness data + ResourceLimitedVector writers_; + //! A mutex to protect the liveliness data std::mutex mutex_; + + //! The timer owner, i.e. the writer which is next due to lose its liveliness + LivelinessData* timer_owner_; + + //! A timed callback expiring when a writer (the timer owner) loses its liveliness + TimedCallback timer_; }; } /* namespace rtps */ diff --git a/src/cpp/rtps/writer/LivelinessManager.cpp b/src/cpp/rtps/writer/LivelinessManager.cpp index b8a30e433d2..b24efc90660 100644 --- a/src/cpp/rtps/writer/LivelinessManager.cpp +++ b/src/cpp/rtps/writer/LivelinessManager.cpp @@ -21,15 +21,17 @@ LivelinessManager::LivelinessManager( asio::io_service& service, const std::thread& event_thread, bool manage_automatic) - : manage_automatic_(manage_automatic) + : liveliness_lost_callback_(liveliness_lost_callback) + , liveliness_recovered_callback_(liveliness_recovered_callback) + , manage_automatic_(manage_automatic) + , writers_() + , mutex_() + , timer_owner_(nullptr) , timer_( std::bind(&LivelinessManager::timer_expired, this), 0, service, event_thread) - , timer_owner_(nullptr) - , liveliness_lost_callback_(liveliness_lost_callback) - , liveliness_recovered_callback_(liveliness_recovered_callback) { } diff --git a/test/blackbox/BlackboxTestsLivelinessQos.cpp b/test/blackbox/BlackboxTestsLivelinessQos.cpp index d850c5d4efb..9cfbe41931c 100644 --- a/test/blackbox/BlackboxTestsLivelinessQos.cpp +++ b/test/blackbox/BlackboxTestsLivelinessQos.cpp @@ -233,8 +233,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_Reliable) uint32_t num_samples = 3; // Liveliness lease duration and announcement period, in seconds - Duration_t liveliness_s(writer_sleep_ms * 5.0 * 1e-3); - Duration_t announcement_period(writer_sleep_ms * 5.0 * 1e-3 * 0.1); + Duration_t liveliness_s(writer_sleep_ms * 100.0 * 1e-3); + Duration_t announcement_period(writer_sleep_ms * 100.0 * 1e-3 * 0.1); reader.reliability(RELIABLE_RELIABILITY_QOS) .liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) @@ -264,22 +264,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_Reliable) reader.block_for_at_least(count); std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); } - // Wait a bit longer - std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10)); - EXPECT_EQ(writer.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_recovered(), 1u); - for (count = 0; count < num_samples; count++) { writer.assert_liveliness(); std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); } - // Wait a bit longer - std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10)); - EXPECT_EQ(writer.times_liveliness_lost(), 2u); - EXPECT_EQ(reader.times_liveliness_lost(), 2u); - EXPECT_EQ(reader.times_liveliness_recovered(), 2u); + reader.wait_liveliness_recovered(); + + // Liveliness shouldn't have been lost + EXPECT_EQ(writer.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_recovered(), 1u); } //! Tests liveliness with the following paramters @@ -296,8 +291,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_BestEffort) uint32_t writer_samples = 3; // Liveliness lease duration and announcement period, in seconds - Duration_t liveliness_s(writer_sleep_ms * 5.0 * 1e-3); - Duration_t announcement_period(writer_sleep_ms * 5.0 * 1e-3 * 0.1); + Duration_t liveliness_s(writer_sleep_ms * 100.0 * 1e-3); + Duration_t announcement_period(writer_sleep_ms * 100.0 * 1e-3 * 0.1); reader.reliability(BEST_EFFORT_RELIABILITY_QOS) .liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS) @@ -327,22 +322,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_BestEffort) reader.block_for_at_least(count); std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); } - // Wait a bit longer - std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10)); - EXPECT_EQ(writer.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_recovered(), 1u); - for (count = 0; count < writer_samples; count++) { writer.assert_liveliness(); std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); } - // Wait a bit longer - std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10)); - EXPECT_EQ(writer.times_liveliness_lost(), 2u); - EXPECT_EQ(reader.times_liveliness_lost(), 2u); - EXPECT_EQ(reader.times_liveliness_recovered(), 2u); + reader.wait_liveliness_recovered(); + + // Liveliness shouldn't have been lost + EXPECT_EQ(writer.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_lost(), 0u); + EXPECT_EQ(reader.times_liveliness_recovered(), 1u); } //! Tests liveliness with the following paramters @@ -481,8 +471,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_Reliable) uint32_t num_samples = 3; // Liveliness lease duration and announcement period, in seconds - Duration_t liveliness_s(writer_sleep_ms * 5 * 1e-3); - Duration_t announcement_period(writer_sleep_ms * 5 * 1e-3 * 0.1); + Duration_t liveliness_s(writer_sleep_ms * 100 * 1e-3); + Duration_t announcement_period(writer_sleep_ms * 100 * 1e-3 * 0.1); reader.reliability(RELIABLE_RELIABILITY_QOS) .liveliness_kind(MANUAL_BY_TOPIC_LIVELINESS_QOS) @@ -513,22 +503,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_Reliable) reader.block_for_at_least(count); std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms)); } - // Wait a bit longer - std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10)); - EXPECT_EQ(writer.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_lost(), 1u); - EXPECT_EQ(reader.times_liveliness_recovered(), 1u); - for (count=0; count lock(liveliness_mutex_); + + liveliness_cv_.wait(lock, [&](){ return times_liveliness_recovered_ == 1; }); + } + #if HAVE_SECURITY void waitAuthorized() { @@ -745,14 +738,31 @@ class PubSubReader return listener_.missed_deadlines(); } - unsigned int times_liveliness_lost() const + void liveliness_lost() { - return listener_.times_liveliness_lost(); + std::unique_lock lock(liveliness_mutex_); + times_liveliness_lost_++; } - unsigned int times_liveliness_recovered() const + void liveliness_recovered() { - return listener_.times_liveliness_recovered(); + std::unique_lock lock(liveliness_mutex_); + times_liveliness_recovered_++; + liveliness_cv_.notify_one(); + } + + unsigned int times_liveliness_lost() + { + std::unique_lock lock(liveliness_mutex_); + + return times_liveliness_lost_; + } + + unsigned int times_liveliness_recovered() + { + std::unique_lock lock(liveliness_mutex_); + + return times_liveliness_recovered_; } bool is_matched() const @@ -874,6 +884,15 @@ class PubSubReader unsigned int authorized_; unsigned int unauthorized_; #endif + + //! A mutex for liveliness status + std::mutex liveliness_mutex_; + //! A condition variable to notify when liveliness was recovered + std::condition_variable liveliness_cv_; + //! Number of times liveliness was lost + unsigned int times_liveliness_lost_; + //! Number of times liveliness was recovered + unsigned int times_liveliness_recovered_; }; #endif // _TEST_BLACKBOX_PUBSUBREADER_HPP_ diff --git a/test/unittest/rtps/writer/LivelinessManagerTests.cpp b/test/unittest/rtps/writer/LivelinessManagerTests.cpp index 74e7c58165e..a6e519572df 100644 --- a/test/unittest/rtps/writer/LivelinessManagerTests.cpp +++ b/test/unittest/rtps/writer/LivelinessManagerTests.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include class TimedEventEnvironment : public ::testing::Environment @@ -57,20 +58,41 @@ class TimedEventEnvironment : public ::testing::Environment void liveliness_lost(eprosima::fastrtps::rtps::GUID_t guid) { + std::unique_lock lock(liveliness_lost_mutex_); writer_losing_liveliness = guid; num_writers_lost++; + liveliness_lost_cv_.notify_one(); } void liveliness_recovered(eprosima::fastrtps::rtps::GUID_t guid) { + std::unique_lock lock(liveliness_recovered_mutex_); writer_recovering_liveliness = guid; num_writers_recovered++; + liveliness_recovered_cv_.notify_one(); + } + + void wait_liveliness_lost(unsigned int num_lost) + { + std::unique_lock lock(liveliness_lost_mutex_); + liveliness_lost_cv_.wait(lock, [&](){ return num_writers_lost == num_lost; }); + } + + void wait_liveliness_recovered(unsigned int num_recovered) + { + std::unique_lock lock(liveliness_recovered_mutex_); + liveliness_recovered_cv_.wait(lock, [&](){ return num_writers_recovered == num_recovered;}); } eprosima::fastrtps::rtps::GUID_t writer_losing_liveliness; eprosima::fastrtps::rtps::GUID_t writer_recovering_liveliness; unsigned int num_writers_lost; unsigned int num_writers_recovered; + + std::mutex liveliness_lost_mutex_; + std::condition_variable liveliness_lost_cv_; + std::mutex liveliness_recovered_mutex_; + std::condition_variable liveliness_recovered_cv_; }; TimedEventEnvironment* const env = dynamic_cast(testing::AddGlobalTestEnvironment(new TimedEventEnvironment)); @@ -294,15 +316,16 @@ TEST(LivelinessManagerTests, TimerExpired_Automatic) env->num_writers_recovered = 0u; // Wait so that first writer loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 1)); // Wait a bit longer so that second writer loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + env->wait_liveliness_lost(2u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 2)); // Assert first writer liveliness_manager.assert_liveliness(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.1)); + env->wait_liveliness_recovered(2u); EXPECT_EQ(env->num_writers_recovered, 2u); } @@ -327,17 +350,18 @@ TEST(LivelinessManagerTests, TimerExpired_ManualByParticipant) env->num_writers_recovered = 0u; // Wait so that first writer loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 1)); EXPECT_EQ(env->num_writers_lost, 1u); // Wait a bit longer so that second writer loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + env->wait_liveliness_lost(2u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 2)); EXPECT_EQ(env->num_writers_lost, 2u); // Assert first writer liveliness_manager.assert_liveliness(GUID_t(guidP, 1), MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(0.1)); + env->wait_liveliness_recovered(2u); EXPECT_EQ(env->num_writers_recovered, 2u); } @@ -359,9 +383,10 @@ TEST(LivelinessManagerTests, TimerExpired_ManualByTopic) // Assert first writer liveliness_manager.assert_liveliness(GUID_t(guidP, 1), MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(0.1)); + env->wait_liveliness_recovered(1u); // Wait so that first writer loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 1)); EXPECT_EQ(env->num_writers_lost, 1u); @@ -372,10 +397,11 @@ TEST(LivelinessManagerTests, TimerExpired_ManualByTopic) // Assert second writer liveliness_manager.assert_liveliness(GUID_t(guidP, 2), MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(0.2)); + env->wait_liveliness_recovered(2u); env->num_writers_lost = 0u; // Wait so that it loses liveliness - std::this_thread::sleep_for(std::chrono::milliseconds(400)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 2)); EXPECT_EQ(env->num_writers_lost, 1u); } @@ -399,15 +425,15 @@ TEST(LivelinessManagerTests, TimerOwnerCalculation) liveliness_manager.assert_liveliness(AUTOMATIC_LIVELINESS_QOS); - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 1)); EXPECT_EQ(env->num_writers_lost, 1u); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + env->wait_liveliness_lost(2u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 3)); EXPECT_EQ(env->num_writers_lost, 2u); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + env->wait_liveliness_lost(3u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 2)); EXPECT_EQ(env->num_writers_lost, 3u); } @@ -431,7 +457,7 @@ TEST(LivelinessManagerTests, TimerOwnerRemoved) liveliness_manager.assert_liveliness(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5)); liveliness_manager.remove_writer(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5)); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + env->wait_liveliness_lost(1u); EXPECT_EQ(env->writer_losing_liveliness, GUID_t(guidP, 2)); EXPECT_EQ(env->num_writers_lost, 1u); }