Skip to content

Commit

Permalink
MDEV-16264 - some improvements
Browse files Browse the repository at this point in the history
- wait notification, tpool_wait_begin/tpool_wait_end - to notify the
threadpool that current thread is going to wait

Use it to wait for IOs to complete and also when purge waits for workers.
  • Loading branch information
vaintroub committed Dec 9, 2019
1 parent d3b2625 commit 66de4fe
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 37 deletions.
16 changes: 14 additions & 2 deletions storage/innobase/os/os0file.cc
Expand Up @@ -84,10 +84,12 @@ class io_slots
private:
tpool::cache<tpool::aiocb> m_cache;
tpool::task_group m_group;
int m_max_aio;
public:
io_slots(int max_submitted_io, int max_callback_concurrency) :
m_cache(max_submitted_io),
m_group(max_callback_concurrency)
m_group(max_callback_concurrency),
m_max_aio(max_submitted_io)
{
}
/* Get cached AIO control block */
Expand All @@ -112,6 +114,11 @@ class io_slots
m_cache.wait();
}

size_t pending_io_count()
{
return (size_t)m_max_aio - m_cache.size();
}

tpool::task_group* get_task_group()
{
return &m_group;
Expand Down Expand Up @@ -4058,7 +4065,12 @@ void os_aio_free()
be other, synchronous, pending writes. */
void os_aio_wait_until_no_pending_writes()
{
write_slots->wait();
if (write_slots->pending_io_count())
{
tpool::tpool_wait_begin();
write_slots->wait();
tpool::tpool_wait_end();
}
}


Expand Down
8 changes: 7 additions & 1 deletion storage/innobase/trx/trx0purge.cc
Expand Up @@ -1239,7 +1239,13 @@ extern tpool::waitable_task purge_worker_task;
/** Wait for pending purge jobs to complete. */
static void trx_purge_wait_for_workers_to_complete()
{
purge_worker_task.wait();
if (purge_worker_task.get_ref_count())
{
tpool::tpool_wait_begin();
purge_worker_task.wait();
tpool::tpool_wait_end();
}

/* There should be no outstanding tasks as long
as the worker threads are active. */
ut_ad(srv_get_task_queue_length() == 0);
Expand Down
1 change: 1 addition & 0 deletions tpool/CMakeLists.txt
Expand Up @@ -22,6 +22,7 @@ ADD_LIBRARY(tpool STATIC
tpool_generic.cc
task_group.cc
task.cc
wait_notification.cc
${EXTRA_SOURCES}
)

Expand Down
4 changes: 4 additions & 0 deletions tpool/tpool.h
Expand Up @@ -214,13 +214,17 @@ class thread_pool
int bind(native_file_handle &fd) { return m_aio->bind(fd); }
void unbind(const native_file_handle &fd) { m_aio->unbind(fd); }
int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
virtual void wait_begin() {};
virtual void wait_end() {};
virtual ~thread_pool() {}
};
const int DEFAULT_MIN_POOL_THREADS= 1;
const int DEFAULT_MAX_POOL_THREADS= 500;
extern thread_pool *
create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS);
extern "C" void tpool_wait_begin();
extern "C" void tpool_wait_end();
#ifdef _WIN32
extern thread_pool *
create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
Expand Down
100 changes: 66 additions & 34 deletions tpool/tpool_generic.cc
Expand Up @@ -70,8 +70,6 @@ namespace tpool
and also ensures that idle timeout works well. LIFO wakeup order ensures
that active threads stay active, and idle ones stay idle.
- to minimize spurious wakeups, some items are not put into the queue. Instead
submit() will pass the data directly to the thread it woke up.
*/

/**
Expand Down Expand Up @@ -109,7 +107,8 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
{
NONE = 0,
EXECUTING_TASK = 1,
LONG_TASK = 2
LONG_TASK = 2,
WAITING = 4
};

int m_state;
Expand Down Expand Up @@ -154,6 +153,9 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
}
};


static thread_local worker_data* tls_worker_data;

class thread_pool_generic : public thread_pool
{
/** Cache for per-worker structures */
Expand Down Expand Up @@ -186,6 +188,7 @@ class thread_pool_generic : public thread_pool

/** Overall number of enqueues*/
unsigned long long m_tasks_enqueued;
unsigned long long m_group_enqueued;
/** Overall number of dequeued tasks. */
unsigned long long m_tasks_dequeued;

Expand All @@ -212,6 +215,8 @@ class thread_pool_generic : public thread_pool
adjusting concurrency */
int m_long_tasks_count;

int m_waiting_task_count;

/** Last time thread was created*/
std::chrono::system_clock::time_point m_last_thread_creation;

Expand All @@ -237,7 +242,8 @@ class thread_pool_generic : public thread_pool
}
bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr);
void wake_or_create_thread();
void maybe_wake_or_create_thread();
bool too_many_active_threads();
bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_var);
Expand All @@ -250,6 +256,8 @@ class thread_pool_generic : public thread_pool
public:
thread_pool_generic(int min_threads, int max_threads);
~thread_pool_generic();
void wait_begin() override;
void wait_end() override;
void submit_task(task *task) override;
virtual aio *create_native_aio(int max_io) override
{
Expand Down Expand Up @@ -447,31 +455,24 @@ bool thread_pool_generic::get_task(worker_data *thread_var, task **t)

thread_var->m_state = worker_data::NONE;

if (m_task_queue.empty())
while (m_task_queue.empty())
{
if (m_in_shutdown)
return false;

if (!wait_for_tasks(lk, thread_var))
return false;

/* Task was handed over directly by signaling thread.*/
if (thread_var->m_wake_reason == WAKE_REASON_TASK)
if (m_task_queue.empty())
{
*t= thread_var->m_task;
goto end;
m_spurious_wakeups++;
continue;
}

if (m_task_queue.empty())
return false;
}

/* Dequeue from the task queue.*/
*t= m_task_queue.front();
m_task_queue.pop();
m_tasks_dequeued++;

end:
thread_var->m_state |= worker_data::EXECUTING_TASK;
thread_var->m_task_start_time = m_timestamp;
return true;
Expand All @@ -491,14 +492,18 @@ void thread_pool_generic::worker_end(worker_data* thread_data)
}
}

extern "C" void set_tls_pool(tpool::thread_pool* pool);

/* The worker get/execute task loop.*/
void thread_pool_generic::worker_main(worker_data *thread_var)
{
task* task;

set_tls_pool(this);
if(m_worker_init_callback)
m_worker_init_callback();

tls_worker_data = thread_var;

while (get_task(thread_var, &task) && task)
{
task->execute();
Expand Down Expand Up @@ -557,12 +562,10 @@ void thread_pool_generic::maintainence()
m_long_tasks_count++;
}
}

maybe_wake_or_create_thread();

size_t thread_cnt = (int)thread_count();
if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR)
{
wake_or_create_thread();
return;
}
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{
Expand Down Expand Up @@ -638,7 +641,7 @@ bool thread_pool_generic::add_thread()
}

/** Wake a standby thread, and hand the given task over to this thread. */
bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
bool thread_pool_generic::wake(worker_wake_reason reason, task *)
{
assert(reason != WAKE_REASON_NONE);

Expand All @@ -650,10 +653,6 @@ bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
assert(var->m_wake_reason == WAKE_REASON_NONE);
var->m_wake_reason= reason;
var->m_cv.notify_one();
if (t)
{
var->m_task= t;
}
m_wakeups++;
return true;
}
Expand All @@ -673,10 +672,11 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_tasks_dequeued(),
m_wakeups(),
m_spurious_wakeups(),
m_concurrency(std::thread::hardware_concurrency()),
m_concurrency(std::thread::hardware_concurrency()*2),
m_in_shutdown(),
m_timestamp(),
m_long_tasks_count(),
m_waiting_task_count(),
m_last_thread_creation(),
m_min_threads(min_threads),
m_max_threads(max_threads),
Expand All @@ -700,21 +700,27 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
}


void thread_pool_generic::wake_or_create_thread()
void thread_pool_generic::maybe_wake_or_create_thread()
{
assert(!m_task_queue.empty());
if (m_task_queue.empty())
return;
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
return;
if (!m_standby_threads.empty())
{
auto t= m_task_queue.front();
m_task_queue.pop();
wake(WAKE_REASON_TASK, t);
wake(WAKE_REASON_TASK);
}
else
{
add_thread();
}
}

bool thread_pool_generic::too_many_active_threads()
{
return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
m_concurrency* OVERSUBSCRIBE_FACTOR;
}

/** Submit a new task*/
void thread_pool_generic::submit_task(task* task)
Expand All @@ -725,9 +731,35 @@ void thread_pool_generic::submit_task(task* task)
task->add_ref();
m_tasks_enqueued++;
m_task_queue.push(task);
maybe_wake_or_create_thread();
}


/* Notify thread pool that current thread is going to wait */
void thread_pool_generic::wait_begin()
{
if (!tls_worker_data || tls_worker_data->is_long_task())
return;
tls_worker_data->m_state |= worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx);
m_waiting_task_count++;

/* Maintain concurrency */
if (m_task_queue.empty())
return;
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count < m_concurrency)
maybe_wake_or_create_thread();
}


if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR)
wake_or_create_thread();
void thread_pool_generic::wait_end()
{
if (tls_worker_data && (tls_worker_data->m_state & worker_data::WAITING))
{
tls_worker_data->m_state &= ~worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx);
m_waiting_task_count--;
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions tpool/tpool_structs.h
Expand Up @@ -105,6 +105,11 @@ template<typename T> class cache
m_cv.wait(lk);
m_waiters--;
}

size_t size()
{
return m_cache.size();
}
};


Expand Down
21 changes: 21 additions & 0 deletions tpool/wait_notification.cc
@@ -0,0 +1,21 @@
#include <tpool.h>

static thread_local tpool::thread_pool* tls_thread_pool;

extern "C" void set_tls_pool(tpool::thread_pool* pool)
{
tls_thread_pool = pool;
}

extern "C" void tpool_wait_begin()
{
if (tls_thread_pool)
tls_thread_pool->wait_begin();
}


extern "C" void tpool_wait_end()
{
if (tls_thread_pool)
tls_thread_pool->wait_end();
}

0 comments on commit 66de4fe

Please sign in to comment.