Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/rpc/RpcBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ void RpcBase::processDisconnected()
// Disable queues
m_requests_queue.setEnable(false);
m_results_queue.setEnable(false);

// Check if a pool has been configured
if (m_pool)
{
// Disable owner
m_rpc_owner->lock.lock();
m_rpc_owner->is_operational = false;
m_rpc_owner->lock.unlock();
}
}

/** @brief Process received data */
Expand Down
3 changes: 2 additions & 1 deletion src/tools/helpers/Queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.
#ifndef OPENOCPP_QUEUE_H
#define OPENOCPP_QUEUE_H

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
Expand Down Expand Up @@ -208,7 +209,7 @@ class Queue
/** @brief Queue to store data */
std::queue<ItemType> m_queue;
/** @brief Indicate that the queue is enabled */
bool m_enabled;
std::atomic<bool> m_enabled;
};

} // namespace helpers
Expand Down
5 changes: 3 additions & 2 deletions src/tools/helpers/TimerPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.

#include "ITimerPool.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
Expand Down Expand Up @@ -54,9 +55,9 @@ class TimerPool : public ITimerPool

private:
/** @brief Indicate that the timers must stop */
bool m_stop;
std::atomic<bool> m_stop;
/** @brief Indicate that the next wakeup time has changed */
bool m_update_wakeup_time;
std::atomic<bool> m_update_wakeup_time;
/** @brief Mutex for wakeup condition */
std::mutex m_wakeup_mutex;
/** @brief Wakeup condition */
Expand Down
9 changes: 5 additions & 4 deletions src/tools/helpers/WorkerThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.

#include "Queue.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
Expand Down Expand Up @@ -64,7 +65,7 @@ class JobBase : public IJob
/** @brief Condition variable for end of job synchronization */
std::condition_variable end_of_job_var;
/** @brief Indicate end of job */
bool end;
std::atomic<bool> end;
/** @brief Function to execute */
std::function<ReturnType()> function;
};
Expand Down Expand Up @@ -159,7 +160,7 @@ class Waiter
{
Job<ReturnType>* job = dynamic_cast<Job<ReturnType>*>(m_job.get());
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
}

private:
Expand All @@ -186,7 +187,7 @@ class Waiter<void>
{
Job<void>* job = dynamic_cast<Job<void>*>(m_job.get());
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
}

private:
Expand Down Expand Up @@ -229,7 +230,7 @@ class WorkerThreadPool

private:
/** @brief Indicate that the threads must stop */
bool m_stop;
std::atomic<bool> m_stop;
/** @brief Worker threads */
std::vector<std::thread*> m_threads;
/** @brief Job queue */
Expand Down