From 2a6675315f47244cc252394542802543f30db2d9 Mon Sep 17 00:00:00 2001 From: c-jimenez <18682655+c-jimenez@users.noreply.github.com> Date: Sat, 10 Feb 2024 10:02:23 +0100 Subject: [PATCH] [tools] Fix some multi threading issues --- src/rpc/RpcBase.cpp | 9 +++++++++ src/tools/helpers/Queue.h | 3 ++- src/tools/helpers/TimerPool.h | 5 +++-- src/tools/helpers/WorkerThreadPool.h | 9 +++++---- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/rpc/RpcBase.cpp b/src/rpc/RpcBase.cpp index 7ef526bf..44569edb 100644 --- a/src/rpc/RpcBase.cpp +++ b/src/rpc/RpcBase.cpp @@ -278,6 +278,15 @@ void RpcBase::processDisconnected() // Disable queues m_requests_queue.setEnable(false); m_results_queue.setEnable(false); + + // Check if a pool has been configured + if (m_pool) + { + // Disable owner + m_rpc_owner->lock.lock(); + m_rpc_owner->is_operational = false; + m_rpc_owner->lock.unlock(); + } } /** @brief Process received data */ diff --git a/src/tools/helpers/Queue.h b/src/tools/helpers/Queue.h index a6048aee..d939ec31 100644 --- a/src/tools/helpers/Queue.h +++ b/src/tools/helpers/Queue.h @@ -19,6 +19,7 @@ along with OpenOCPP. If not, see . #ifndef OPENOCPP_QUEUE_H #define OPENOCPP_QUEUE_H +#include #include #include #include @@ -208,7 +209,7 @@ class Queue /** @brief Queue to store data */ std::queue m_queue; /** @brief Indicate that the queue is enabled */ - bool m_enabled; + std::atomic m_enabled; }; } // namespace helpers diff --git a/src/tools/helpers/TimerPool.h b/src/tools/helpers/TimerPool.h index 411ab2d9..01a540fa 100644 --- a/src/tools/helpers/TimerPool.h +++ b/src/tools/helpers/TimerPool.h @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see . #include "ITimerPool.h" +#include #include #include #include @@ -54,9 +55,9 @@ class TimerPool : public ITimerPool private: /** @brief Indicate that the timers must stop */ - bool m_stop; + std::atomic m_stop; /** @brief Indicate that the next wakeup time has changed */ - bool m_update_wakeup_time; + std::atomic m_update_wakeup_time; /** @brief Mutex for wakeup condition */ std::mutex m_wakeup_mutex; /** @brief Wakeup condition */ diff --git a/src/tools/helpers/WorkerThreadPool.h b/src/tools/helpers/WorkerThreadPool.h index a3dee129..8580623c 100644 --- a/src/tools/helpers/WorkerThreadPool.h +++ b/src/tools/helpers/WorkerThreadPool.h @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see . #include "Queue.h" +#include #include #include #include @@ -64,7 +65,7 @@ class JobBase : public IJob /** @brief Condition variable for end of job synchronization */ std::condition_variable end_of_job_var; /** @brief Indicate end of job */ - bool end; + std::atomic end; /** @brief Function to execute */ std::function function; }; @@ -159,7 +160,7 @@ class Waiter { Job* job = dynamic_cast*>(m_job.get()); std::unique_lock lock(job->end_of_job_mutex); - return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; }); + return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); }); } private: @@ -186,7 +187,7 @@ class Waiter { Job* job = dynamic_cast*>(m_job.get()); std::unique_lock lock(job->end_of_job_mutex); - return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; }); + return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); }); } private: @@ -229,7 +230,7 @@ class WorkerThreadPool private: /** @brief Indicate that the threads must stop */ - bool m_stop; + std::atomic m_stop; /** @brief Worker threads */ std::vector m_threads; /** @brief Job queue */