Skip to content

Commit

Permalink
Make events threadsafe
Browse files Browse the repository at this point in the history
- Initial connection (insert) and cleanup (remove) locked with mutex
- Signal (access) checks for object not null before attempting to access (not locking because would need recursive mutex)
- Declare mutex as mutable so EventCount const method can block it
- Include race condition test to play with connections inside an event

(contribution by Sonia Jin from AWS, back port of #3042)
Co-authored-by: Jose Luis Rivero <jrivero@osrfoundation.org>
  • Loading branch information
lihui815 authored and j-rivero committed Aug 13, 2021
1 parent f8b4d09 commit 8ca4586
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 12 deletions.
29 changes: 17 additions & 12 deletions gazebo/common/Event.hh
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback();
}
}
Expand All @@ -288,7 +288,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p);
}
}
Expand All @@ -304,7 +304,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2);
}
}
Expand All @@ -321,7 +321,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3);
}
}
Expand All @@ -340,7 +340,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4);
}
}
Expand All @@ -361,7 +361,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5);
}
}
Expand All @@ -383,7 +383,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6);
}
}
Expand All @@ -406,7 +406,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7);
}
}
Expand All @@ -431,7 +431,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8);
}
Expand Down Expand Up @@ -460,7 +460,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(
_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9);
Expand Down Expand Up @@ -491,7 +491,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(
_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9, _p10);
Expand Down Expand Up @@ -531,7 +531,7 @@ namespace gazebo
private: EvtConnectionMap connections;

/// \brief A thread lock.
private: std::mutex mutex;
private: mutable std::mutex mutex;

/// \brief List of connections to remove
private: std::list<typename EvtConnectionMap::const_iterator>
Expand All @@ -549,6 +549,7 @@ namespace gazebo
template<typename T>
EventT<T>::~EventT()
{
std::lock_guard<std::mutex> lock(this->mutex);
this->connections.clear();
}

Expand All @@ -557,6 +558,7 @@ namespace gazebo
template<typename T>
ConnectionPtr EventT<T>::Connect(const std::function<T> &_subscriber)
{
std::lock_guard<std::mutex> lock(this->mutex);
int index = 0;
if (!this->connections.empty())
{
Expand All @@ -572,6 +574,7 @@ namespace gazebo
template<typename T>
unsigned int EventT<T>::ConnectionCount() const
{
std::lock_guard<std::mutex> lock(this->mutex);
return this->connections.size();
}

Expand All @@ -580,6 +583,7 @@ namespace gazebo
template<typename T>
void EventT<T>::Disconnect(int _id)
{
std::lock_guard<std::mutex> lock(this->mutex);
// Find the connection
auto const &it = this->connections.find(_id);

Expand All @@ -591,6 +595,7 @@ namespace gazebo
}

/////////////////////////////////////////////
/// \brief Erases all connections from connectionsToRemove.
template<typename T>
void EventT<T>::Cleanup()
{
Expand Down
74 changes: 74 additions & 0 deletions gazebo/common/Event_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

#include <functional>
#include <future>
#include <thread>
#include <gtest/gtest.h>
#include <gazebo/common/Time.hh>
#include <gazebo/common/Event.hh>
Expand Down Expand Up @@ -220,6 +222,78 @@ TEST_F(EventTest, ManyChanges)
EXPECT_EQ(g_callback1, 2);
}


/////////////////////////////////////////////////
// Race condition helper functions
void create_connections(event::EventT<void ()> & evt,
std::future<void> exit_obj_add,
std::list<event::ConnectionPtr> & connection_list)
{
std::cout << "Start create connections thread" << std::endl;
while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::timeout) {
event::ConnectionPtr conn = evt.Connect(std::bind(&callback));
connection_list.push_front(conn);
}
}

void access_connections(event::EventT<void ()> & evt,
std::future<void> exit_obj_add)
{
std::cout << "Start access thread" << std::endl;
while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::timeout)
// just calling the method without any other intent
evt.ConnectionCount();
}


void remove_connections(event::EventT<void ()> & evt,
std::list<event::ConnectionPtr> & connection_list)
{
std::cout << "Start remove thread" << std::endl;
for (auto const& connection : connection_list) {
evt.Disconnect(connection->Id());
}
}

TEST_F(EventTest, RaceConditions)
{
// Create three threads: 1) add_t to insert connections, 2) access_t to call
// the count method 3) remove_t
std::list<event::ConnectionPtr> connection_list;
event::EventT<void ()> evt;
std::promise<void> exit_signal;
std::promise<void> exit_signal2;
std::future<void> exit_obj_add = exit_signal.get_future();
std::future<void> exit_obj_access = exit_signal2.get_future();
// Run threads 1 and 2
std::thread add_t(create_connections, std::ref(evt),
std::move(exit_obj_add),
std::ref(connection_list));
std::thread access_t(access_connections, std::ref(evt),
std::move(exit_obj_access));
// Give 1 second to adding and accessing threads and start the remove thread
// leave all them working for 2 seconds.
std::this_thread::sleep_for(std::chrono::seconds(1));
std::thread remove_t(remove_connections,
std::ref(evt),
std::ref(connection_list));
std::this_thread::sleep_for(std::chrono::seconds(2));
// Enable the event, call connection count and destroy it trying to make a
// possible race condition to appear.
evt();
evt.ConnectionCount();
evt.~EventT();

exit_signal.set_value();
add_t.join();
exit_signal2.set_value();
access_t.join();
remove_t.join();
}


/////////////////////////////////////////////////
int main(int argc, char **argv)
{
Expand Down

0 comments on commit 8ca4586

Please sign in to comment.