Skip to content

Commit

Permalink
Revert "MDEV-33840 tpool : switch off maintenance timer when not need…
Browse files Browse the repository at this point in the history
…ed."

This reverts commit 09bae92.
  • Loading branch information
vaintroub committed Apr 17, 2024
1 parent 3a3fe30 commit 2ba79ab
Showing 1 changed file with 34 additions and 36 deletions.
70 changes: 34 additions & 36 deletions tpool/tpool_generic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
OFF, ON
};
timer_state_t m_timer_state= timer_state_t::OFF;
void switch_timer(timer_state_t state,std::unique_lock<std::mutex> &lk);
void switch_timer(timer_state_t state);

/* Updates idle_since, and maybe switches the timer off */
void check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk);
void check_idle(std::chrono::system_clock::time_point now);

/** time point when timer last ran, used as a coarse clock. */
std::chrono::system_clock::time_point m_timestamp;
Expand Down Expand Up @@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
{
((thread_pool_generic *)arg)->maintenance();
}
bool add_thread(std::unique_lock<std::mutex> &lk);
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
void maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk);
void maybe_wake_or_create_thread();
bool too_many_active_threads();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
Expand Down Expand Up @@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
*/

static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
constexpr auto max_idle_time= std::chrono::seconds(20);
constexpr auto max_idle_time= std::chrono::minutes(1);

/* Time since maintenance timer had nothing to do */
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk)
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
{
DBUG_ASSERT(m_task_queue.empty());

Expand All @@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now,
if (now - idle_since > max_idle_time)
{
idle_since= invalid_timestamp;
switch_timer(timer_state_t::OFF,lk);
switch_timer(timer_state_t::OFF);
}
}

Expand Down Expand Up @@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()

if (m_task_queue.empty())
{
check_idle(m_timestamp, lk);
check_idle(m_timestamp);
m_last_activity = m_tasks_dequeued + m_wakeups;
return;
}
Expand All @@ -701,15 +701,15 @@ void thread_pool_generic::maintenance()
}
}

maybe_wake_or_create_thread(lk);
maybe_wake_or_create_thread();

size_t thread_cnt = (int)thread_count();
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
// no progress made since last iteration. create new
// thread
add_thread(lk);
add_thread();
}
m_last_activity = m_tasks_dequeued + m_wakeups;
m_last_thread_count= thread_cnt;
Expand All @@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
}

/* Create a new worker.*/
bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
bool thread_pool_generic::add_thread()
{
size_t n_threads = thread_count();

if (n_threads >= m_max_threads)
return false;

if (n_threads >= m_min_threads && m_min_threads != m_max_threads)
if (n_threads >= m_min_threads)
{
auto now = std::chrono::system_clock::now();
if (now - m_last_thread_creation <
Expand All @@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
Throttle thread creation and wakeup deadlock detection timer,
if is it off.
*/
switch_timer(timer_state_t::ON, lk);
switch_timer(timer_state_t::ON);

return false;
}
Expand Down Expand Up @@ -835,10 +835,12 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
if (!m_concurrency)
m_concurrency = 1;

// start the timer
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
}


void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk)
void thread_pool_generic::maybe_wake_or_create_thread()
{
if (m_task_queue.empty())
return;
Expand All @@ -851,7 +853,7 @@ void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mute
}
else
{
add_thread(lk);
add_thread();
}
}

Expand All @@ -870,7 +872,7 @@ void thread_pool_generic::submit_task(task* task)
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
maybe_wake_or_create_thread(lk);
maybe_wake_or_create_thread();
}


Expand All @@ -893,7 +895,7 @@ void thread_pool_generic::wait_begin()
m_waiting_task_count++;

/* Maintain concurrency */
maybe_wake_or_create_thread(lk);
maybe_wake_or_create_thread();
}


Expand All @@ -908,30 +910,26 @@ void thread_pool_generic::wait_end()
}


void thread_pool_generic::switch_timer(timer_state_t state, std::unique_lock<std::mutex> &lk)
void thread_pool_generic::switch_timer(timer_state_t state)
{
if (m_timer_state == state)
return;
/* No maintenance timer for fixed threadpool size.*/
DBUG_ASSERT(m_min_threads != m_max_threads);
DBUG_ASSERT(lk.owns_lock());
/*
We can't use timer::set_time, because mysys timers are deadlock
prone.
Instead, to switch off we increase the timer period
and decrease period to switch on.
This might introduce delays in thread creation when needed,
as period will only be changed when timer fires next time.
For this reason, we can't use very long periods for the "off" state.
*/
m_timer_state= state;
if(state == timer_state_t::OFF)
{
m_maintenance_timer.set_period(0);
}
else
{
/*
It is necessary to unlock the thread_pool::m_mtx
to avoid the deadlock with thr_timer's LOCK_timer.
Otherwise, lock order would be violated.
*/
lk.unlock();
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
lk.lock();
}
long long period= (state == timer_state_t::OFF) ?
m_timer_interval.count()*10: m_timer_interval.count();

m_maintenance_timer.set_period((int)period);
}


Expand Down

0 comments on commit 2ba79ab

Please sign in to comment.