Skip to content

Commit

Permalink
Refs #5060 Trying to make tests more stable by using condition variables
Browse files Browse the repository at this point in the history
  • Loading branch information
Raquel Alvarez committed Jun 19, 2019
1 parent 8196486 commit 64410ce
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 103 deletions.
24 changes: 12 additions & 12 deletions include/fastrtps/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,6 @@ class LivelinessManager
//! A method called if the timer expires
void timer_expired();

//! A boolean indicating whether we are managing writers with automatic liveliness
bool manage_automatic_;

//! A vector of liveliness data
ResourceLimitedVector<LivelinessData> writers_;

//! A timed callback expiring when a writer (the timer owner) loses its liveliness
TimedCallback timer_;

//! The timer owner, i.e. the writer which is next due to lose its liveliness
LivelinessData* timer_owner_;

//! A callback to inform outside classes that a writer lost its liveliness
std::function<void(
const GUID_t&,
Expand All @@ -175,8 +163,20 @@ class LivelinessManager
const LivelinessQosPolicyKind&,
const Duration_t&)> liveliness_recovered_callback_;

//! A boolean indicating whether we are managing writers with automatic liveliness
bool manage_automatic_;

//! A vector of liveliness data
ResourceLimitedVector<LivelinessData> writers_;

//! A mutex to protect the liveliness data
std::mutex mutex_;

//! The timer owner, i.e. the writer which is next due to lose its liveliness
LivelinessData* timer_owner_;

//! A timed callback expiring when a writer (the timer owner) loses its liveliness
TimedCallback timer_;
};

} /* namespace rtps */
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/rtps/writer/LivelinessManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ LivelinessManager::LivelinessManager(
asio::io_service& service,
const std::thread& event_thread,
bool manage_automatic)
: manage_automatic_(manage_automatic)
: liveliness_lost_callback_(liveliness_lost_callback)
, liveliness_recovered_callback_(liveliness_recovered_callback)
, manage_automatic_(manage_automatic)
, writers_()
, mutex_()
, timer_owner_(nullptr)
, timer_(
std::bind(&LivelinessManager::timer_expired, this),
0,
service,
event_thread)
, timer_owner_(nullptr)
, liveliness_lost_callback_(liveliness_lost_callback)
, liveliness_recovered_callback_(liveliness_recovered_callback)
{
}

Expand Down
84 changes: 31 additions & 53 deletions test/blackbox/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_Reliable)
uint32_t num_samples = 3;

// Liveliness lease duration and announcement period, in seconds
Duration_t liveliness_s(writer_sleep_ms * 5.0 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 5.0 * 1e-3 * 0.1);
Duration_t liveliness_s(writer_sleep_ms * 100.0 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 100.0 * 1e-3 * 0.1);

reader.reliability(RELIABLE_RELIABILITY_QOS)
.liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
Expand Down Expand Up @@ -264,22 +264,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_Reliable)
reader.block_for_at_least(count);
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);

for (count = 0; count < num_samples; count++)
{
writer.assert_liveliness();
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_recovered(), 2u);
reader.wait_liveliness_recovered();

// Liveliness shouldn't have been lost
EXPECT_EQ(writer.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);
}

//! Tests liveliness with the following paramters
Expand All @@ -296,8 +291,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_BestEffort)
uint32_t writer_samples = 3;

// Liveliness lease duration and announcement period, in seconds
Duration_t liveliness_s(writer_sleep_ms * 5.0 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 5.0 * 1e-3 * 0.1);
Duration_t liveliness_s(writer_sleep_ms * 100.0 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 100.0 * 1e-3 * 0.1);

reader.reliability(BEST_EFFORT_RELIABILITY_QOS)
.liveliness_kind(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
Expand Down Expand Up @@ -327,22 +322,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByParticipant_BestEffort)
reader.block_for_at_least(count);
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);

for (count = 0; count < writer_samples; count++)
{
writer.assert_liveliness();
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_recovered(), 2u);
reader.wait_liveliness_recovered();

// Liveliness shouldn't have been lost
EXPECT_EQ(writer.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);
}

//! Tests liveliness with the following paramters
Expand Down Expand Up @@ -481,8 +471,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_Reliable)
uint32_t num_samples = 3;

// Liveliness lease duration and announcement period, in seconds
Duration_t liveliness_s(writer_sleep_ms * 5 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 5 * 1e-3 * 0.1);
Duration_t liveliness_s(writer_sleep_ms * 100 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 100 * 1e-3 * 0.1);

reader.reliability(RELIABLE_RELIABILITY_QOS)
.liveliness_kind(MANUAL_BY_TOPIC_LIVELINESS_QOS)
Expand Down Expand Up @@ -513,22 +503,17 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_Reliable)
reader.block_for_at_least(count);
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);

for (count=0; count<num_samples; count++)
{
writer.assert_liveliness();
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_lost(), 2u);
EXPECT_EQ(reader.times_liveliness_recovered(), 2u);
reader.wait_liveliness_recovered();

// Liveliness shouldn't have been lost
EXPECT_EQ(writer.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);
}

//! Tests liveliness with the following paramters
Expand All @@ -545,8 +530,8 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_BestEffort)
uint32_t num_samples = 3;

// Liveliness lease duration and announcement period, in seconds
Duration_t liveliness_s(writer_sleep_ms * 5 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 5 * 1e-3 * 0.1);
Duration_t liveliness_s(writer_sleep_ms * 100 * 1e-3);
Duration_t announcement_period(writer_sleep_ms * 100 * 1e-3 * 0.1);

reader.reliability(BEST_EFFORT_RELIABILITY_QOS)
.liveliness_kind(MANUAL_BY_TOPIC_LIVELINESS_QOS)
Expand Down Expand Up @@ -577,23 +562,16 @@ TEST(LivelinessQos, LongLiveliness_ManualByTopic_BestEffort)
reader.block_for_at_least(count);
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_lost(), 1u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);

for(count = 0; count < num_samples; count++)
{
writer.assert_liveliness();
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms));
}
// Wait a bit longer
std::this_thread::sleep_for(std::chrono::milliseconds(writer_sleep_ms * 10));
EXPECT_EQ(writer.times_liveliness_lost(), 2u);
// Note that MANUAL_BY_TOPIC liveliness relies on sending heartbeats when using the assert method
// However best-effor writers do not send heartbeats, so the reader will never get notified
EXPECT_EQ(reader.times_liveliness_lost(), 1u);
reader.wait_liveliness_recovered();

// Liveliness shouldn't have been lost
EXPECT_EQ(writer.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_lost(), 0u);
EXPECT_EQ(reader.times_liveliness_recovered(), 1u);
}

Expand Down
67 changes: 43 additions & 24 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ class PubSubReader
Listener(PubSubReader &reader)
: reader_(reader)
, times_deadline_missed_(0)
, times_liveliness_lost_(0)
, times_liveliness_recovered_(0)
{}

~Listener(){}
Expand Down Expand Up @@ -166,13 +164,11 @@ class PubSubReader

if (status.alive_count_change == 1)
{
// Liveliness recovered
times_liveliness_recovered_++;
reader_.liveliness_recovered();
}
else if (status.not_alive_count_change == 1)
{
// Liveliness lost
times_liveliness_lost_++;
reader_.liveliness_lost();
}
}

Expand All @@ -181,16 +177,6 @@ class PubSubReader
return times_deadline_missed_;
}

unsigned int times_liveliness_lost() const
{
return times_liveliness_lost_;
}

unsigned int times_liveliness_recovered() const
{
return times_liveliness_recovered_;
}

private:

Listener& operator=(const Listener&) = delete;
Expand All @@ -199,10 +185,6 @@ class PubSubReader

//! Number of times deadline was missed
unsigned int times_deadline_missed_;
//! Number of times liveliness was lost
unsigned int times_liveliness_lost_;
//! Number of times liveliness was recovered
unsigned int times_liveliness_recovered_;

} listener_;

Expand All @@ -228,6 +210,10 @@ class PubSubReader
, authorized_(0)
, unauthorized_(0)
#endif
, liveliness_mutex_()
, liveliness_cv_()
, times_liveliness_lost_(0)
, times_liveliness_recovered_(0)
{
subscriber_attr_.topic.topicDataType = type_.getName();
// Generate topic name
Expand Down Expand Up @@ -387,6 +373,13 @@ class PubSubReader
std::cout << "Reader removal finished..." << std::endl;
}

void wait_liveliness_recovered()
{
std::unique_lock<std::mutex> lock(liveliness_mutex_);

liveliness_cv_.wait(lock, [&](){ return times_liveliness_recovered_ == 1; });
}

#if HAVE_SECURITY
void waitAuthorized()
{
Expand Down Expand Up @@ -745,14 +738,31 @@ class PubSubReader
return listener_.missed_deadlines();
}

unsigned int times_liveliness_lost() const
void liveliness_lost()
{
return listener_.times_liveliness_lost();
std::unique_lock<std::mutex> lock(liveliness_mutex_);
times_liveliness_lost_++;
}

unsigned int times_liveliness_recovered() const
void liveliness_recovered()
{
return listener_.times_liveliness_recovered();
std::unique_lock<std::mutex> lock(liveliness_mutex_);
times_liveliness_recovered_++;
liveliness_cv_.notify_one();
}

unsigned int times_liveliness_lost()
{
std::unique_lock<std::mutex> lock(liveliness_mutex_);

return times_liveliness_lost_;
}

unsigned int times_liveliness_recovered()
{
std::unique_lock<std::mutex> lock(liveliness_mutex_);

return times_liveliness_recovered_;
}

bool is_matched() const
Expand Down Expand Up @@ -874,6 +884,15 @@ class PubSubReader
unsigned int authorized_;
unsigned int unauthorized_;
#endif

//! A mutex for liveliness status
std::mutex liveliness_mutex_;
//! A condition variable to notify when liveliness was recovered
std::condition_variable liveliness_cv_;
//! Number of times liveliness was lost
unsigned int times_liveliness_lost_;
//! Number of times liveliness was recovered
unsigned int times_liveliness_recovered_;
};

#endif // _TEST_BLACKBOX_PUBSUBREADER_HPP_
Loading

0 comments on commit 64410ce

Please sign in to comment.