Skip to content

Commit

Permalink
lib: cleanup watchdog_timer and timer_thread
Browse files Browse the repository at this point in the history
- renamed functions and variables
- for state of the thread used a enum instead of a binary flag
  • Loading branch information
franku committed Oct 15, 2019
1 parent e92fd72 commit 3540c00
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 126 deletions.
176 changes: 84 additions & 92 deletions core/src/lib/timer_thread.cc
Expand Up @@ -18,13 +18,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
/*
* BAREOS thread watchdog routine. General routine that
* allows setting a watchdog timer with a callback that is
* called when the timer goes off.
*
* Kern Sibbald, January MMII
*/

#include "include/bareos.h"
#include "lib/timer_thread.h"
Expand All @@ -44,82 +37,86 @@ namespace TimerThread {

/* clang-format off */
static std::chrono::milliseconds idle_timeout_interval_milliseconds(60 * 1000);
static std::atomic<std::time_t> calendar_time_on_last_watchdog_run(time(nullptr));
static std::atomic<std::time_t> calendar_time_on_last_timer_run(time(nullptr));
/* clang-format on */

static std::mutex watchdog_sleep_mutex;
static std::condition_variable watchdog_sleep_condition;
static std::mutex timer_sleep_mutex;
static std::condition_variable timer_sleep_condition;
static bool wakeup_event_occured = false;

static void WatchdogThread(void);
static void TimerThread(void);

static std::atomic<bool> quit(false);
static std::atomic<bool> is_initialized(false);
enum class TimerThreadState
{
IS_NOT_INITIALZED,
IS_STARTING,
IS_RUNNING,
IS_SHUTTING_DOWN,
IS_SHUT_DOWN
};

static std::unique_ptr<std::thread> watchdog_thread;
static std::atomic<TimerThreadState> timer_thread_state(
TimerThreadState::IS_NOT_INITIALZED);
static std::atomic<bool> quit_timer_thread(false);

static std::unique_ptr<std::thread> timer_thread;
static std::mutex controlled_items_list_mutex;

static std::vector<TimerThread::TimerControlledItem*> controlled_items_list;

bool CurrentThreadIsTimerThread()
bool Start(void)
{
if (is_initialized.load() &&
(std::this_thread::get_id() == watchdog_thread->get_id())) {
return true;
} else {
if (timer_thread_state != TimerThreadState::IS_NOT_INITIALZED &&
timer_thread_state != TimerThreadState::IS_SHUT_DOWN) {
return false;
}
}

int StartTimerThread(void)
{
if (is_initialized.load()) { return 0; }

Dmsg0(800, "Starting watchdog thread\n");
Dmsg0(800, "Starting timer thread\n");

quit.store(false);
watchdog_thread = std::make_unique<std::thread>(WatchdogThread);
quit_timer_thread = false;
timer_thread = std::make_unique<std::thread>(TimerThread);

int timeout = 0;
while (!is_initialized.load() && ++timeout < 2000) {
while (timer_thread_state.load() != TimerThreadState::IS_RUNNING &&
++timeout < 2000) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

return 0;
return true;
}

static void WakeWatchdog()
static void WakeTimer()
{
std::lock_guard<std::mutex> l(watchdog_sleep_mutex);
std::lock_guard<std::mutex> l(timer_sleep_mutex);
wakeup_event_occured = true;
watchdog_sleep_condition.notify_one();
timer_sleep_condition.notify_one();
}

void StopTimerThread(void)
void Stop(void)
{
if (!is_initialized.load()) { return; }
if (timer_thread_state != TimerThreadState::IS_RUNNING) { return; }

quit.store(true);
WakeWatchdog();
quit_timer_thread = true;
WakeTimer();

watchdog_thread->join();
timer_thread->join();
}

TimerThread::TimerControlledItem* NewTimerControlledItem(void)
TimerThread::TimerControlledItem* NewControlledItem(void)
{
TimerThread::TimerControlledItem* t = new TimerThread::TimerControlledItem;

std::lock_guard<std::mutex> l(controlled_items_list_mutex);
controlled_items_list.push_back(t);

if (!is_initialized.load()) { StartTimerThread(); }
if (timer_thread_state != TimerThreadState::IS_RUNNING) { Start(); }

return t;
}

bool RegisterTimer(TimerThread::TimerControlledItem* t)
{
assert(t->callback != nullptr);
assert(t->user_callback != nullptr);

TimerThread::TimerControlledItem wd_copy;

Expand All @@ -137,45 +134,35 @@ bool RegisterTimer(TimerThread::TimerControlledItem* t)
wd_copy = *t;
}

Dmsg3(800, "Registered watchdog interval %d%s\n", wd_copy.interval,
wd_copy.one_shot ? " one shot" : "");
Dmsg3(800, "Registered timer interval %d%s\n", wd_copy.interval,
wd_copy.single_shot ? " one shot" : "");

WakeWatchdog();
WakeTimer();

return true;
}

bool UnregisterTimer(TimerThread::TimerControlledItem* t)
{
if (!is_initialized.load()) {
Jmsg0(nullptr, M_ABORT, 0,
_("UnregisterWatchdog_unlocked called before StartTimerThread\n"));
}

std::lock_guard<std::mutex> l(controlled_items_list_mutex);

auto pos =
std::find(controlled_items_list.begin(), controlled_items_list.end(), t);

if (pos != controlled_items_list.end()) {
if ((*pos)->destructor) { (*pos)->destructor((*pos)); }
if ((*pos)->user_destructor) { (*pos)->user_destructor((*pos)); }
delete (*pos);
controlled_items_list.erase(pos);
Dmsg1(800, "Unregistered watchdog %p\n", t);
Dmsg1(800, "Unregistered timer %p\n", t);
return true;
} else {
Dmsg1(800, "Failed to unregister watchdog %p\n", t);
Dmsg1(800, "Failed to unregister timer %p\n", t);
return false;
}
}

bool IsRegisteredTimer(const TimerThread::TimerControlledItem* t)
{
if (!is_initialized.load()) {
Jmsg0(nullptr, M_ABORT, 0,
_("UnregisterWatchdog_unlocked called before StartTimerThread\n"));
}

std::lock_guard<std::mutex> l(controlled_items_list_mutex);

auto pos =
Expand All @@ -189,105 +176,110 @@ static void Cleanup()
std::lock_guard<std::mutex> l(controlled_items_list_mutex);

for (auto p : controlled_items_list) {
if (p->destructor) { p->destructor(p); }
if (p->user_destructor) { p->user_destructor(p); }
delete p;
}
controlled_items_list.clear();
}

static void SleepUntil(std::chrono::steady_clock::time_point next_watchdog_run)
static void SleepUntil(std::chrono::steady_clock::time_point next_timer_run)
{
std::unique_lock<std::mutex> l(watchdog_sleep_mutex);
watchdog_sleep_condition.wait_until(l, next_watchdog_run,
[]() { return wakeup_event_occured; });
std::unique_lock<std::mutex> l(timer_sleep_mutex);
timer_sleep_condition.wait_until(l, next_timer_run,
[]() { return wakeup_event_occured; });
wakeup_event_occured = false;
}

static bool LogMessage(TimerThread::TimerControlledItem* p)
static void LogMessage(TimerThread::TimerControlledItem* p)
{
Dmsg2(3400, "Watchdog callback p=0x%p scheduled_run_timepoint=%d\n", p,
Dmsg2(3400, "Timer callback p=0x%p scheduled_run_timepoint=%d\n", p,
p->scheduled_run_timepoint);
}

static bool RunOneItem(TimerThread::TimerControlledItem* p,
std::chrono::steady_clock::time_point& next_watchdog_run)
std::chrono::steady_clock::time_point& next_timer_run)
{
std::chrono::time_point<std::chrono::steady_clock>
last_watchdog_run_timepoint = std::chrono::steady_clock::now();
std::chrono::time_point<std::chrono::steady_clock> last_timer_run_timepoint =
std::chrono::steady_clock::now();

bool remove_from_list = false;
if (p->is_active &&
last_watchdog_run_timepoint > p->scheduled_run_timepoint) {
if (p->is_active && last_timer_run_timepoint > p->scheduled_run_timepoint) {
LogMessage(p);
p->callback(p);
if (p->one_shot) {
if (p->destructor) { p->destructor(p); }
p->user_callback(p);
if (p->single_shot) {
if (p->user_destructor) { p->user_destructor(p); }
delete p;
remove_from_list = true;
} else {
p->scheduled_run_timepoint = last_watchdog_run_timepoint + p->interval;
p->scheduled_run_timepoint = last_timer_run_timepoint + p->interval;
}
}
next_watchdog_run = min(p->scheduled_run_timepoint, next_watchdog_run);
next_timer_run = min(p->scheduled_run_timepoint, next_timer_run);
return remove_from_list;
}

static void RunAllItemsAndRemoveOneShotItems(
std::chrono::steady_clock::time_point& next_watchdog_run)
std::chrono::steady_clock::time_point& next_timer_run)
{
std::unique_lock<std::mutex> l(controlled_items_list_mutex);

auto new_end_of_vector = std::remove_if( // one_shot items will be removed
controlled_items_list.begin(), controlled_items_list.end(),
[&next_watchdog_run](TimerThread::TimerControlledItem* p) {
calendar_time_on_last_watchdog_run.store(time(nullptr));
[&next_timer_run](TimerThread::TimerControlledItem* p) {
calendar_time_on_last_timer_run = time(nullptr);

return RunOneItem(p, next_watchdog_run);
return RunOneItem(p, next_timer_run);
});

controlled_items_list.erase(new_end_of_vector, controlled_items_list.end());
}

static void WatchdogThread(void)
static void TimerThread(void)
{
SetJcrInThreadSpecificData(INVALID_JCR);

Dmsg0(800, "Watchdog thread started\n");
is_initialized.store(true);
Dmsg0(800, "Timer thread started\n");
timer_thread_state = TimerThreadState::IS_RUNNING;

while (!quit.load()) {
std::chrono::steady_clock::time_point next_watchdog_run =
while (!quit_timer_thread) {
std::chrono::steady_clock::time_point next_timer_run =
std::chrono::steady_clock::now() + idle_timeout_interval_milliseconds;

RunAllItemsAndRemoveOneShotItems(next_watchdog_run);
RunAllItemsAndRemoveOneShotItems(next_timer_run);

SleepUntil(next_watchdog_run);
} // while (!quit)
SleepUntil(next_timer_run);
} // while (!quit_timer_thread)

Cleanup();
is_initialized.store(false);
timer_thread_state = TimerThreadState::IS_SHUT_DOWN;
}

class StopWatchdogThreadGuard {
class TimerThreadGuard {
public:
StopWatchdogThreadGuard() = default;
~StopWatchdogThreadGuard()
TimerThreadGuard() = default;
~TimerThreadGuard()
{
if (is_initialized.load()) { StopTimerThread(); }
if (timer_thread_state == TimerThreadState::IS_RUNNING) { Stop(); }
}
};

static StopWatchdogThreadGuard stop_watchdog_thread_guard;
static TimerThreadGuard timer_thread_guard;

void SetTimerIdleSleepTime(std::chrono::seconds time)
{
std::lock_guard<std::mutex> lg(controlled_items_list_mutex);
idle_timeout_interval_milliseconds = time;
}

bool CurrentThreadIsTimerThread()
{
return (timer_thread_state == TimerThreadState::IS_RUNNING &&
(std::this_thread::get_id() == timer_thread->get_id()));
}

std::time_t GetCalenderTimeOnLastTimerThreadRun()
{
return calendar_time_on_last_watchdog_run.load();
return calendar_time_on_last_timer_run;
}

} // namespace TimerThread
26 changes: 12 additions & 14 deletions core/src/lib/timer_thread.h
Expand Up @@ -19,13 +19,7 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
/*
* Kern Sibbald, December MMII
*/
/**
* @file
* Watchdog timer routines
*/

#ifndef BAREOS_LIB_TIMER_THREAD_H_
#define BAREOS_LIB_TIMER_THREAD_H_

Expand All @@ -36,24 +30,28 @@
namespace TimerThread {

struct TimerControlledItem {
bool one_shot = true;
bool single_shot = true;
bool is_active = false;
std::chrono::seconds interval;
void (*callback)(struct TimerControlledItem* t) = nullptr;
void (*destructor)(struct TimerControlledItem* t) = nullptr;
void* data = nullptr;
void (*user_callback)(TimerControlledItem* t) = nullptr;
void (*user_destructor)(TimerControlledItem* t) = nullptr;
void* user_data = nullptr;

std::chrono::steady_clock::time_point scheduled_run_timepoint;
};

int StartTimerThread(void);
void StopTimerThread(void);
TimerControlledItem* NewTimerControlledItem(void);
bool Start(void);
void Stop(void);

TimerControlledItem* NewControlledItem(void);

bool RegisterTimer(TimerControlledItem* t);
bool UnregisterTimer(TimerControlledItem* t);
bool IsRegisteredTimer(const TimerControlledItem* t);

bool CurrentThreadIsTimerThread();
void SetTimerIdleSleepTime(std::chrono::seconds time);

std::time_t GetCalenderTimeOnLastTimerThreadRun();

} // namespace TimerThread
Expand Down

0 comments on commit 3540c00

Please sign in to comment.