Skip to content
Permalink
Browse files
Minor clean-up of purge code
purge_sys_t::n_submitted: Document that it is only accessed by
srv_purge_coordinator_thread.

purge_sys_t::n_completed: Exclusively use my_atomic access.

srv_task_execute(): Simplify the code.

srv_purge_coordinator_thread(): Test the cheaper condition first.

trx_purge(): Atomically access purge_sys.n_completed.
Remove some code duplication.

trx_purge_wait_for_workers_to_complete(): Atomically access
purge_sys.n_completed. Remove an unnecessary local variable.

trx_purge_stop(): Remove a redundant assignment.
  • Loading branch information
dr-m committed Apr 8, 2018
1 parent 0f6186c commit df44e75
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 67 deletions.
@@ -424,9 +424,12 @@ class purge_sys_t
MY_ALIGNED(CACHE_LINE_SIZE)
ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */
ulint n_submitted; /*!< Count of total tasks submitted
to the task queue */
ulint n_completed; /*!< Count of total tasks completed */
/** Total number of tasks submitted by srv_purge_coordinator_thread.
Not accessed by other threads. */
ulint n_submitted;
/** Number of completed tasks. Accessed by srv_purge_coordinator
and srv_worker_thread by my_atomic. */
ulint n_completed;

/** Iterator to the undo log records of committed transactions */
struct iterator
@@ -2474,38 +2474,25 @@ srv_purge_should_exit(ulint n_purged)
/*********************************************************************//**
Fetch and execute a task from the work queue.
@return true if a task was executed */
static
bool
srv_task_execute(void)
/*==================*/
static bool srv_task_execute()
{
que_thr_t* thr = NULL;

ut_ad(!srv_read_only_mode);
ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);

mutex_enter(&srv_sys.tasks_mutex);

if (UT_LIST_GET_LEN(srv_sys.tasks) > 0) {

thr = UT_LIST_GET_FIRST(srv_sys.tasks);

if (que_thr_t* thr = UT_LIST_GET_FIRST(srv_sys.tasks)) {
ut_a(que_node_get_type(thr->child) == QUE_NODE_PURGE);

UT_LIST_REMOVE(srv_sys.tasks, thr);
}

mutex_exit(&srv_sys.tasks_mutex);

if (thr != NULL) {

mutex_exit(&srv_sys.tasks_mutex);
que_run_threads(thr);

my_atomic_addlint(
&purge_sys.n_completed, 1);
my_atomic_addlint(&purge_sys.n_completed, 1);
return true;
}

return(thr != NULL);
ut_ad(UT_LIST_GET_LEN(srv_sys.tasks) == 0);
mutex_exit(&srv_sys.tasks_mutex);
return false;
}

/*********************************************************************//**
@@ -2781,8 +2768,8 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(

if (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_undo_sources
&& (purge_sys.state == PURGE_STATE_STOP
|| n_total_purged == 0)) {
&& (n_total_purged == 0
|| purge_sys.state == PURGE_STATE_STOP)) {

srv_purge_coordinator_suspend(slot, rseg_history_len);
}
@@ -1510,10 +1510,9 @@ static
void
trx_purge_wait_for_workers_to_complete()
{
ulint n_submitted = purge_sys.n_submitted;

/* Ensure that the work queue empties out. */
while ((ulint) my_atomic_loadlint(&purge_sys.n_completed) != n_submitted) {
while (my_atomic_loadlint(&purge_sys.n_completed)
!= purge_sys.n_submitted) {

if (srv_get_task_queue_length() > 0) {
srv_release_threads(SRV_WORKER, 1);
@@ -1522,9 +1521,6 @@ trx_purge_wait_for_workers_to_complete()
os_thread_yield();
}

/* None of the worker threads should be doing any work. */
ut_a(purge_sys.n_submitted == purge_sys.n_completed);

/* There should be no outstanding tasks as long
as the worker threads are active. */
ut_a(srv_get_task_queue_length() == 0);
@@ -1548,7 +1544,8 @@ trx_purge(
srv_dml_needed_delay = trx_purge_dml_delay();

/* The number of tasks submitted should be completed. */
ut_a(purge_sys.n_submitted == purge_sys.n_completed);
ut_a(purge_sys.n_submitted
== my_atomic_loadlint(&purge_sys.n_completed));

rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view();
@@ -1562,46 +1559,27 @@ trx_purge(

/* Fetch the UNDO recs that need to be purged. */
n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
purge_sys.n_submitted += n_purge_threads;

/* Do we do an asynchronous purge or not ? */
if (n_purge_threads > 1) {
ulint i = 0;

/* Submit the tasks to the work queue. */
for (i = 0; i < n_purge_threads - 1; ++i) {
thr = que_fork_scheduler_round_robin(
purge_sys.query, thr);

ut_a(thr != NULL);

srv_que_task_enqueue_low(thr);
}

/* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_purge_threads; --i; ) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr != NULL);

purge_sys.n_submitted += n_purge_threads - 1;

goto run_synchronously;

/* Do it synchronously. */
} else {
thr = que_fork_scheduler_round_robin(purge_sys.query, NULL);
ut_ad(thr);
ut_a(thr);
srv_que_task_enqueue_low(thr);
}

run_synchronously:
++purge_sys.n_submitted;
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);

que_run_threads(thr);
que_run_threads(thr);

my_atomic_addlint(&purge_sys.n_completed, 1);
my_atomic_addlint(&purge_sys.n_completed, 1);

if (n_purge_threads > 1) {
trx_purge_wait_for_workers_to_complete();
}
if (n_purge_threads > 1) {
trx_purge_wait_for_workers_to_complete();
}

ut_a(purge_sys.n_submitted == purge_sys.n_completed);
ut_a(purge_sys.n_submitted
== my_atomic_loadlint(&purge_sys.n_completed));

if (truncate) {
trx_purge_truncate_history();
@@ -1653,7 +1631,6 @@ trx_purge_stop(void)
case PURGE_STATE_STOP:
ut_ad(srv_n_purge_threads > 0);
++purge_sys.n_stop;
purge_sys.state = PURGE_STATE_STOP;
if (!purge_sys.running) {
goto unlock;
}

0 comments on commit df44e75

Please sign in to comment.