Skip to content
Permalink
Browse files
LOCK_thread_count and COND_thread_count removed from wsrep modules (#…
…1197)

Refactored wsrep patch to not use LOCK_thread_count and COND_thread_count anymore.
This has partially been replaced by using old LOCK_wsrep_slave_threads mutex.
For slave thread count change waiting, new COND_wsrep_slave_threads signal has been added

Added LOCK_wsrep_cluster_config mutex to control that cluster address change cannot happen in parallel

Protected wsrep_slave_threads variable changes with LOCK_cluster_config mutex
This is for avoiding concurrent slave thread count and cluster joining operations to happen

Fixes according to Teemu's review
  • Loading branch information
sjaakola authored and Sergey Vojtovich committed Feb 26, 2019
1 parent bb970dd commit 785092e
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 104 deletions.
@@ -1536,11 +1536,6 @@ static my_bool kill_all_threads(THD *thd, void *)
if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
return 0;

#ifdef WITH_WSREP
/* skip wsrep system threads as well */
if (WSREP(thd) && (wsrep_thd_is_applying(thd) || thd->wsrep_applier))
return 0;
#endif
thd->set_killed(KILL_SERVER_HARD);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
@@ -1593,34 +1588,6 @@ static my_bool kill_all_threads_once_again(THD *thd, void *)
}
#endif

#ifdef WITH_WSREP
/*
* WSREP_TODO:
* this code block may turn out redundant. wsrep->disconnect()
* should terminate slave threads gracefully, and we don't need
* to signal them here.
* The code here makes sure mysqld will not hang during shutdown
* even if wsrep provider has problems in shutting down.
*/
if (WSREP(thd) && wsrep_thd_is_applying(thd))
{
sql_print_information("closing wsrep system thread");
thd->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (thd->mysys_var)
{
thd->mysys_var->abort=1;
mysql_mutex_lock(&thd->mysys_var->mutex);
if (thd->mysys_var->current_cond)
{
mysql_mutex_lock(thd->mysys_var->current_mutex);
mysql_cond_broadcast(thd->mysys_var->current_cond);
mysql_mutex_unlock(thd->mysys_var->current_mutex);
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
}
#endif
return 0;
}

@@ -5347,13 +5347,13 @@ static Sys_var_charptr Sys_wsrep_cluster_name(
ON_CHECK(wsrep_cluster_name_check),
ON_UPDATE(wsrep_cluster_name_update));

static PolyLock_mutex PLock_wsrep_slave_threads(&LOCK_wsrep_slave_threads);
static PolyLock_mutex PLock_wsrep_cluster_config(&LOCK_wsrep_cluster_config);
static Sys_var_charptr Sys_wsrep_cluster_address (
"wsrep_cluster_address", "Address to initially connect to cluster",
PREALLOCATED GLOBAL_VAR(wsrep_cluster_address),
CMD_LINE(REQUIRED_ARG),
IN_SYSTEM_CHARSET, DEFAULT(""),
&PLock_wsrep_slave_threads, NOT_IN_BINLOG,
&PLock_wsrep_cluster_config, NOT_IN_BINLOG,
ON_CHECK(wsrep_cluster_address_check),
ON_UPDATE(wsrep_cluster_address_update));

@@ -5384,7 +5384,7 @@ static Sys_var_ulong Sys_wsrep_slave_threads(
"wsrep_slave_threads", "Number of slave appliers to launch",
GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1),
&PLock_wsrep_slave_threads, NOT_IN_BINLOG,
&PLock_wsrep_cluster_config, NOT_IN_BINLOG,
ON_CHECK(0),
ON_UPDATE(wsrep_slave_threads_update));

@@ -144,6 +144,8 @@ mysql_cond_t COND_wsrep_sst_init;
mysql_mutex_t LOCK_wsrep_replaying;
mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_cond_t COND_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_cluster_config;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
mysql_mutex_t LOCK_wsrep_SR_pool;
@@ -158,15 +160,15 @@ PSI_mutex_key
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
key_LOCK_wsrep_config_state,
key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config,
key_LOCK_wsrep_SR_pool,
key_LOCK_wsrep_SR_store,
key_LOCK_wsrep_thd_queue;

PSI_cond_key key_COND_wsrep_thd,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
key_COND_wsrep_thd_queue;
key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads;


PSI_file_key key_file_wsrep_gra_log;
@@ -180,6 +182,7 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
@@ -193,7 +196,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL}
};

static PSI_file_info wsrep_files[]=
@@ -758,6 +762,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL);
mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_SR_pool,
@@ -860,6 +866,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_replaying);
mysql_cond_destroy(&COND_wsrep_replaying);
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_cond_destroy(&COND_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_cluster_config);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
@@ -897,7 +905,7 @@ void wsrep_recover()

void wsrep_stop_replication(THD *thd)
{
WSREP_INFO("Stop replication");
WSREP_INFO("Stop replication by %llu", (thd) ? thd->thread_id : 0);
if (Wsrep_server_state::instance().state() !=
Wsrep_server_state::s_disconnected)
{
@@ -956,6 +964,7 @@ bool wsrep_start_replication()
if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
{
// if provider is non-trivial, but no address is specified, wait for address
WSREP_DEBUG("wsrep_start_replication exit due to empty address");
return true;
}

@@ -2280,23 +2289,15 @@ static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)

void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{
/* Clear thread cache */
kill_cached_threads++;
flush_thread_cache();

/*
First signal all threads that it's time to die
*/

mysql_mutex_lock(&LOCK_thread_count); // For unlink from list

bool kill_cached_threads_saved= kill_cached_threads;
kill_cached_threads= true; // prevent future threads caching
mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die

server_threads.iterate(kill_all_threads, except_caller_thd);
mysql_mutex_unlock(&LOCK_thread_count);

if (thread_count)
sleep(2); // Give threads time to die

mysql_mutex_lock(&LOCK_thread_count);
/*
Force remaining threads to die by closing the connection to the client
*/
@@ -2309,14 +2310,10 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)

while (wait_to_end && server_threads.iterate(have_client_connections))
{
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
sleep(1);
DBUG_PRINT("quit",("One thread died (count=%u)", uint32_t(thread_count)));
}

kill_cached_threads= kill_cached_threads_saved;

mysql_mutex_unlock(&LOCK_thread_count);

/* All client connection threads have now been aborted */
}

@@ -2348,42 +2345,30 @@ void wsrep_close_threads(THD *thd)
void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
while (wsrep_running_threads > 2)
/*
2 is for rollbacker thread which needs to be killed explicitly.
This gotta be fixed in a more elegant manner if we gonna have arbitrary
number of non-applier wsrep threads.
*/
{
if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
{
mysql_mutex_unlock(&LOCK_thread_count);
my_sleep(100);
mysql_mutex_lock(&LOCK_thread_count);
}
else
mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
DBUG_PRINT("quit",("One applier died (count=%u)", uint32_t(thread_count)));
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
}
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
DBUG_PRINT("quit",("applier threads have died (count=%u)",
uint32_t(wsrep_running_threads)));

/* Now kill remaining wsrep threads: rollbacker */
wsrep_close_threads (thd);
/* and wait for them to die */
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
while (wsrep_running_threads > 0)
{
if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
{
mysql_mutex_unlock(&LOCK_thread_count);
my_sleep(100);
mysql_mutex_lock(&LOCK_thread_count);
}
else
mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
DBUG_PRINT("quit",("One thread died (count=%u)", uint32_t(thread_count)));
mysql_cond_wait(&COND_wsrep_slave_threads, &LOCK_wsrep_slave_threads);
}
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
DBUG_PRINT("quit",("all wsrep system threads have died"));

/* All wsrep applier threads have now been aborted. However, if this thread
is also applier, we are still running...
@@ -2698,10 +2683,10 @@ void* start_wsrep_THD(void *arg)
thd->proc_info= 0;
thd->set_command(COM_SLEEP);
thd->init_for_queries();
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_running_threads++;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);

WSREP_DEBUG("wsrep system thread %llu, %p starting",
thd->thread_id, thd);
@@ -2718,11 +2703,11 @@ void* start_wsrep_THD(void *arg)

delete thd_args;

mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_running_threads--;
WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_wsrep_slave_threads);
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
/*
Note: We can't call THD destructor without crashing
if plugins have not been initialized. However, in most of the
@@ -301,6 +301,8 @@ extern int wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_replaying;
extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_cond_t COND_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_cluster_config;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_SR_pool;
extern mysql_mutex_t LOCK_wsrep_SR_store;
@@ -327,6 +329,8 @@ extern PSI_cond_key key_COND_wsrep_sst_thread;
extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_cond_key key_COND_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_cluster_config;
extern PSI_mutex_key key_LOCK_wsrep_desync;
extern PSI_mutex_key key_LOCK_wsrep_SR_pool;
extern PSI_mutex_key key_LOCK_wsrep_SR_store;
@@ -580,12 +580,9 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->security_ctx->skip_grants();
thd->system_thread= SYSTEM_THREAD_GENERIC;

mysql_mutex_lock(&LOCK_thread_count);

thd->real_id=pthread_self(); // Keep purify happy

thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
(void) mysql_mutex_unlock(&LOCK_thread_count);

/* */
thd->variables.wsrep_on = 0;
@@ -1337,9 +1334,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd;

mysql_mutex_lock(&LOCK_thread_count);
thd->real_id= pthread_self();
mysql_mutex_unlock(&LOCK_thread_count);

applier= new Wsrep_applier_service(thd);
server_state.start_streaming_applier(server_id, transaction_id,

0 comments on commit 785092e

Please sign in to comment.