Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make events threadsafe (gazebo9) #3064

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions gazebo/common/Event.hh
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback();
}
}
Expand All @@ -288,7 +288,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p);
}
}
Expand All @@ -304,7 +304,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2);
}
}
Expand All @@ -321,7 +321,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3);
}
}
Expand All @@ -340,7 +340,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4);
}
}
Expand All @@ -361,7 +361,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5);
}
}
Expand All @@ -383,7 +383,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6);
}
}
Expand All @@ -406,7 +406,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7);
}
}
Expand All @@ -431,7 +431,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8);
}
Expand Down Expand Up @@ -460,7 +460,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(
_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9);
Expand Down Expand Up @@ -491,7 +491,7 @@ namespace gazebo
this->SetSignaled(true);
for (const auto &iter: this->connections)
{
if (iter.second->on)
if ((iter.second != NULL) && iter.second->on)
{
iter.second->callback(
_p1, _p2, _p3, _p4, _p5, _p6, _p7, _p8, _p9, _p10);
Expand Down Expand Up @@ -531,7 +531,7 @@ namespace gazebo
private: EvtConnectionMap connections;

/// \brief A thread lock.
private: std::mutex mutex;
private: mutable std::mutex mutex;

/// \brief List of connections to remove
private: std::list<typename EvtConnectionMap::const_iterator>
Expand All @@ -549,6 +549,7 @@ namespace gazebo
template<typename T>
EventT<T>::~EventT()
{
std::lock_guard<std::mutex> lock(this->mutex);
this->connections.clear();
}

Expand All @@ -557,6 +558,7 @@ namespace gazebo
template<typename T>
ConnectionPtr EventT<T>::Connect(const std::function<T> &_subscriber)
{
std::lock_guard<std::mutex> lock(this->mutex);
int index = 0;
if (!this->connections.empty())
{
Expand All @@ -572,6 +574,7 @@ namespace gazebo
template<typename T>
unsigned int EventT<T>::ConnectionCount() const
{
std::lock_guard<std::mutex> lock(this->mutex);
return this->connections.size();
}

Expand All @@ -580,6 +583,7 @@ namespace gazebo
template<typename T>
void EventT<T>::Disconnect(int _id)
{
std::lock_guard<std::mutex> lock(this->mutex);
// Find the connection
auto const &it = this->connections.find(_id);

Expand All @@ -591,6 +595,7 @@ namespace gazebo
}

/////////////////////////////////////////////
/// \brief Erases all connections from connectionsToRemove.
template<typename T>
void EventT<T>::Cleanup()
{
Expand Down
74 changes: 74 additions & 0 deletions gazebo/common/Event_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

#include <functional>
#include <future>
#include <thread>
#include <gtest/gtest.h>
#include <gazebo/common/Time.hh>
#include <gazebo/common/Event.hh>
Expand Down Expand Up @@ -220,6 +222,78 @@ TEST_F(EventTest, ManyChanges)
EXPECT_EQ(g_callback1, 2);
}


/////////////////////////////////////////////////
// Race condition helper functions
void create_connections(event::EventT<void ()> & evt,
std::future<void> exit_obj_add,
std::list<event::ConnectionPtr> & connection_list)
{
std::cout << "Start create connections thread" << std::endl;
while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::timeout) {
event::ConnectionPtr conn = evt.Connect(std::bind(&callback));
connection_list.push_front(conn);
}
}

void access_connections(event::EventT<void ()> & evt,
std::future<void> exit_obj_add)
{
std::cout << "Start access thread" << std::endl;
while(exit_obj_add.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::timeout)
// just calling the method without any other intent
evt.ConnectionCount();
}


void remove_connections(event::EventT<void ()> & evt,
std::list<event::ConnectionPtr> & connection_list)
{
std::cout << "Start remove thread" << std::endl;
for (auto const& connection : connection_list) {
evt.Disconnect(connection->Id());
}
}

TEST_F(EventTest, RaceConditions)
{
// Create three threads: 1) add_t to insert connections, 2) access_t to call
// the count method 3) remove_t
std::list<event::ConnectionPtr> connection_list;
event::EventT<void ()> evt;
std::promise<void> exit_signal;
std::promise<void> exit_signal2;
std::future<void> exit_obj_add = exit_signal.get_future();
std::future<void> exit_obj_access = exit_signal2.get_future();
// Run threads 1 and 2
std::thread add_t(create_connections, std::ref(evt),
std::move(exit_obj_add),
std::ref(connection_list));
std::thread access_t(access_connections, std::ref(evt),
std::move(exit_obj_access));
// Give 1 second to adding and accessing threads and start the remove thread
// leave all them working for 2 seconds.
std::this_thread::sleep_for(std::chrono::seconds(1));
std::thread remove_t(remove_connections,
std::ref(evt),
std::ref(connection_list));
std::this_thread::sleep_for(std::chrono::seconds(2));
// Enable the event, call connection count and destroy it trying to make a
// possible race condition to appear.
evt();
evt.ConnectionCount();
evt.~EventT();

exit_signal.set_value();
add_t.join();
exit_signal2.set_value();
access_t.join();
remove_t.join();
}


/////////////////////////////////////////////////
int main(int argc, char **argv)
{
Expand Down