Skip to content

Commit

Permalink
MDEV-33840 tpool : switch off maintenance timer when not needed.
Browse files Browse the repository at this point in the history
Before patch, maintenance timer will tick every 0.4 seconds.
After this patch, timer will tick every 0.4 seconds when necessary(
there are delayed thread creation), switching off completely after 20
seconds of being idle.
  • Loading branch information
vaintroub committed Apr 9, 2024
1 parent b7b58a2 commit 09bae92
Showing 1 changed file with 36 additions and 34 deletions.
70 changes: 36 additions & 34 deletions tpool/tpool_generic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
OFF, ON
};
timer_state_t m_timer_state= timer_state_t::OFF;
void switch_timer(timer_state_t state);
void switch_timer(timer_state_t state,std::unique_lock<std::mutex> &lk);

/* Updates idle_since, and maybe switches the timer off */
void check_idle(std::chrono::system_clock::time_point now);
void check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk);

/** time point when timer last ran, used as a coarse clock. */
std::chrono::system_clock::time_point m_timestamp;
Expand Down Expand Up @@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
{
((thread_pool_generic *)arg)->maintenance();
}
bool add_thread();
bool add_thread(std::unique_lock<std::mutex> &lk);
bool wake(worker_wake_reason reason, task *t = nullptr);
void maybe_wake_or_create_thread();
void maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk);
bool too_many_active_threads();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
Expand Down Expand Up @@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
*/

static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
constexpr auto max_idle_time= std::chrono::minutes(1);
constexpr auto max_idle_time= std::chrono::seconds(20);

/* Time since maintenance timer had nothing to do */
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk)
{
DBUG_ASSERT(m_task_queue.empty());

Expand All @@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
if (now - idle_since > max_idle_time)
{
idle_since= invalid_timestamp;
switch_timer(timer_state_t::OFF);
switch_timer(timer_state_t::OFF,lk);
}
}

Expand Down Expand Up @@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()

if (m_task_queue.empty())
{
check_idle(m_timestamp);
check_idle(m_timestamp, lk);
m_last_activity = m_tasks_dequeued + m_wakeups;
return;
}
Expand All @@ -701,15 +701,15 @@ void thread_pool_generic::maintenance()
}
}

maybe_wake_or_create_thread();
maybe_wake_or_create_thread(lk);

size_t thread_cnt = (int)thread_count();
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
// no progress made since last iteration. create new
// thread
add_thread();
add_thread(lk);
}
m_last_activity = m_tasks_dequeued + m_wakeups;
m_last_thread_count= thread_cnt;
Expand All @@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
}

/* Create a new worker.*/
bool thread_pool_generic::add_thread()
bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
{
size_t n_threads = thread_count();

if (n_threads >= m_max_threads)
return false;

if (n_threads >= m_min_threads)
if (n_threads >= m_min_threads && m_min_threads != m_max_threads)
{
auto now = std::chrono::system_clock::now();
if (now - m_last_thread_creation <
Expand All @@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread()
Throttle thread creation and wakeup deadlock detection timer,
if is it off.
*/
switch_timer(timer_state_t::ON);
switch_timer(timer_state_t::ON, lk);

return false;
}
Expand Down Expand Up @@ -835,12 +835,10 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
if (!m_concurrency)
m_concurrency = 1;

// start the timer
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
}


void thread_pool_generic::maybe_wake_or_create_thread()
void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk)
{
if (m_task_queue.empty())
return;
Expand All @@ -853,7 +851,7 @@ void thread_pool_generic::maybe_wake_or_create_thread()
}
else
{
add_thread();
add_thread(lk);
}
}

Expand All @@ -872,7 +870,7 @@ void thread_pool_generic::submit_task(task* task)
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
maybe_wake_or_create_thread();
maybe_wake_or_create_thread(lk);
}


Expand All @@ -895,7 +893,7 @@ void thread_pool_generic::wait_begin()
m_waiting_task_count++;

/* Maintain concurrency */
maybe_wake_or_create_thread();
maybe_wake_or_create_thread(lk);
}


Expand All @@ -910,26 +908,30 @@ void thread_pool_generic::wait_end()
}


void thread_pool_generic::switch_timer(timer_state_t state)
void thread_pool_generic::switch_timer(timer_state_t state, std::unique_lock<std::mutex> &lk)
{
if (m_timer_state == state)
return;
/*
We can't use timer::set_time, because mysys timers are deadlock
prone.
Instead, to switch off we increase the timer period
and decrease period to switch on.
/* No maintenance timer for fixed threadpool size.*/
DBUG_ASSERT(m_min_threads != m_max_threads);
DBUG_ASSERT(lk.owns_lock());

This might introduce delays in thread creation when needed,
as period will only be changed when timer fires next time.
For this reason, we can't use very long periods for the "off" state.
*/
m_timer_state= state;
long long period= (state == timer_state_t::OFF) ?
m_timer_interval.count()*10: m_timer_interval.count();

m_maintenance_timer.set_period((int)period);
if(state == timer_state_t::OFF)
{
m_maintenance_timer.set_period(0);
}
else
{
/*
It is necessary to unlock the thread_pool::m_mtx
to avoid the deadlock with thr_timer's LOCK_timer.
Otherwise, lock order would be violated.
*/
lk.unlock();
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
lk.lock();
}
}


Expand Down

0 comments on commit 09bae92

Please sign in to comment.