From 8ca4586445b82de4d25a2d6e48caef19d71f5471 Mon Sep 17 00:00:00 2001 From: sonia Date: Fri, 13 Aug 2021 10:31:12 -0700 Subject: [PATCH] Make events threadsafe - Initial connection (insert) and cleanup (remove) locked with mutex - Signal (access) checks for object not null before attempting to access (not locking because would need recursive mutex) - Declare mutex as mutable so EventCount const method can block it - Include race condition test to play with connections inside an event (contribution by Sonia Jin from AWS, back port of #3042) Co-authored-by: Jose Luis Rivero --- gazebo/common/Event.hh | 29 +++++++++------ gazebo/common/Event_TEST.cc | 74 +++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 12 deletions(-) diff --git a/gazebo/common/Event.hh b/gazebo/common/Event.hh index 96f5989970..47d986e431 100644 --- a/gazebo/common/Event.hh +++ b/gazebo/common/Event.hh @@ -273,7 +273,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(); } } @@ -288,7 +288,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p); } } @@ -304,7 +304,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2); } } @@ -321,7 +321,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2, _p3); } } @@ -340,7 +340,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2, _p3, _p4); } } @@ -361,7 +361,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2, _p3, _p4, _p5); } } @@ -383,7 +383,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6); } } @@ -406,7 +406,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7); } } @@ -431,7 +431,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) { iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8); } @@ -460,7 +460,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) { iter.second->callback( _p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9); @@ -491,7 +491,7 @@ namespace gazebo this->SetSignaled(true); for (const auto &iter: this->connections) { - if (iter.second->on) + if ((iter.second != NULL) && iter.second->on) { iter.second->callback( _p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9, _p10); @@ -531,7 +531,7 @@ namespace gazebo private: EvtConnectionMap connections; /// \brief A thread lock. - private: std::mutex mutex; + private: mutable std::mutex mutex; /// \brief List of connections to remove private: std::list @@ -549,6 +549,7 @@ namespace gazebo template EventT::~EventT() { + std::lock_guard lock(this->mutex); this->connections.clear(); } @@ -557,6 +558,7 @@ namespace gazebo template ConnectionPtr EventT::Connect(const std::function &_subscriber) { + std::lock_guard lock(this->mutex); int index = 0; if (!this->connections.empty()) { @@ -572,6 +574,7 @@ namespace gazebo template unsigned int EventT::ConnectionCount() const { + std::lock_guard lock(this->mutex); return this->connections.size(); } @@ -580,6 +583,7 @@ namespace gazebo template void EventT::Disconnect(int _id) { + std::lock_guard lock(this->mutex); // Find the connection auto const &it = this->connections.find(_id); @@ -591,6 +595,7 @@ namespace gazebo } ///////////////////////////////////////////// + /// \brief Erases all connections from connectionsToRemove. template void EventT::Cleanup() { diff --git a/gazebo/common/Event_TEST.cc b/gazebo/common/Event_TEST.cc index ceb0d516a0..4a763fd27b 100644 --- a/gazebo/common/Event_TEST.cc +++ b/gazebo/common/Event_TEST.cc @@ -16,6 +16,8 @@ */ #include +#include +#include #include #include #include @@ -220,6 +222,78 @@ TEST_F(EventTest, ManyChanges) EXPECT_EQ(g_callback1, 2); } + +///////////////////////////////////////////////// +// Race condition helper functions +void create_connections(event::EventT & evt, + std::future exit_obj_add, + std::list & connection_list) +{ + std::cout << "Start create connections thread" << std::endl; + while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) == + std::future_status::timeout) { + event::ConnectionPtr conn = evt.Connect(std::bind(&callback)); + connection_list.push_front(conn); + } +} + +void access_connections(event::EventT & evt, + std::future exit_obj_add) +{ + std::cout << "Start access thread" << std::endl; + while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) == + std::future_status::timeout) + // just calling the method without any other intent + evt.ConnectionCount(); +} + + +void remove_connections(event::EventT & evt, + std::list & connection_list) +{ + std::cout << "Start remove thread" << std::endl; + for (auto const& connection : connection_list) { + evt.Disconnect(connection->Id()); + } +} + +TEST_F(EventTest, RaceConditions) +{ + // Create three threads: 1) add_t to insert connections, 2) access_t to call + // the count method 3) remove_t + std::list connection_list; + event::EventT evt; + std::promise exit_signal; + std::promise exit_signal2; + std::future exit_obj_add = exit_signal.get_future(); + std::future exit_obj_access = exit_signal2.get_future(); + // Run threads 1 and 2 + std::thread add_t(create_connections, std::ref(evt), + std::move(exit_obj_add), + std::ref(connection_list)); + std::thread access_t(access_connections, std::ref(evt), + std::move(exit_obj_access)); + // Give 1 second to adding and accessing threads and start the remove thread + // leave all them working for 2 seconds. + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::thread remove_t(remove_connections, + std::ref(evt), + std::ref(connection_list)); + std::this_thread::sleep_for(std::chrono::seconds(2)); + // Enable the event, call connection count and destroy it trying to make a + // possible race condition to appear. + evt(); + evt.ConnectionCount(); + evt.~EventT(); + + exit_signal.set_value(); + add_t.join(); + exit_signal2.set_value(); + access_t.join(); + remove_t.join(); +} + + ///////////////////////////////////////////////// int main(int argc, char **argv) {