Skip to content

Commit

Permalink
MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel…
Browse files Browse the repository at this point in the history
… replication causing replication to fail.

After-review changes.

For this patch in 10.0, we do not introduce a new public storage engine API,
we just fix the InnoDB/XtraDB issues. In 10.1, we will make a better public
API that can be used for all storage engines (MDEV-6429).

Eliminate the background thread that did deadlock kills asynchroneously.
Instead, we ensure that the InnoDB/XtraDB code can handle doing the kill from
inside the deadlock detection code (when thd_report_wait_for() needs to kill a
later thread to resolve a deadlock).

(We preserve the part of the original patch that introduces dedicated mutex
and condition for the slave init thread, to remove the abuse of
LOCK_thread_count for start/stop synchronisation of the slave init thread).
  • Loading branch information
knielsen committed Jul 8, 2014
1 parent e5149fa commit 98fc5b3
Show file tree
Hide file tree
Showing 25 changed files with 382 additions and 265 deletions.
75 changes: 1 addition & 74 deletions include/mysql/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ void **thd_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
void thd_storage_lock_wait(MYSQL_THD thd, long long value);
int thd_tx_isolation(const MYSQL_THD thd);
int thd_tx_is_read_only(const MYSQL_THD thd);
int thd_rpl_is_parallel(const MYSQL_THD thd);
/**
Create a temporary file.
Expand Down Expand Up @@ -729,80 +730,6 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);

/*
Used by a storage engine to report that one transaction THD is about to
go to wait for a transactional lock held by another transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave can encounter lock conflicts on the slave that did
not exist on the master, this can cause deadlocks.
The storage engine can report such conflicting locks using this call. This
will allow parallel replication to detect such conflicts and resolve the
deadlock (by killing the second transaction to release the locks that the
first is waiting for, and then later re-try the second killed transaction).
The storage engine should not report false positives. That is, it should not
report any lock waits that do not actually require one transaction to wait
for the other. Nor should it report waits for locks that will be released
before the commit of the other transactions.
*/
void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);

/*
This function can optionally be called to check if thd_report_wait_for()
needs to be called for waits done by a given transaction.
If this function returns false for a given thd, there is no need to do any
calls to thd_report_wait_for() on that thd.
This call is optional; it is safe to call thd_report_wait_for() in any case.
This call can be used to save some redundant calls to thd_report_wait_for()
if desired. (This is unlikely to matter much unless there are _lots_ of
waits to report, as the overhead of thd_report_wait_for() is small).
*/
int thd_need_wait_for(const MYSQL_THD thd);

/*
This function can be called by storage engines to check if the commit order
of two transactions has already been decided by the upper layer. This
happens in parallel replication, where the commit order is forced to be the
same on the slave as it was originally on the master.
If this function returns false, it means that such commit order will be
enforced. This allows the storage engine to optionally omit gap lock waitss
or similar measures that would otherwise be needed to ensure that
transactions would be serialised in a way that would cause a commit order
that is correct for binlogging for statement-based replication.
If this function returns true, normal locking should be done as required by
the binlogging and transaction isolation level in effect.
*/
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);

/*
If the storage engine detects a deadlock, and needs to choose a victim
transaction to roll back, it can call this function to ask the upper
server layer for which of two possible transactions is prefered to be
aborted and rolled back.
In parallel replication, if two transactions are running in parallel and
one is fixed to commit before the other, then the one that commits later
will be prefered as the victim - chosing the early transaction as a victim
will not resolve the deadlock anyway, as the later transaction still needs
to wait for the earlier to commit.
Otherwise, a transaction that uses only transactional tables, and can thus
be safely rolled back, will be prefered as a deadlock victim over a
transaction that also modified non-transactional (eg. MyISAM) tables.
The return value is -1 if the first transaction is prefered as a deadlock
victim, 1 if the second transaction is prefered, or 0 for no preference (in
which case the storage engine can make the choice as it prefers).
*/
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 1 addition & 4 deletions include/mysql/plugin_audit.h.pp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
Expand All @@ -313,10 +314,6 @@
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
struct mysql_event_general
{
unsigned int event_subclass;
Expand Down
5 changes: 1 addition & 4 deletions include/mysql/plugin_auth.h.pp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
Expand All @@ -313,10 +314,6 @@
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{
Expand Down
5 changes: 1 addition & 4 deletions include/mysql/plugin_ftparser.h.pp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
Expand All @@ -266,10 +267,6 @@
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,
Expand Down
11 changes: 0 additions & 11 deletions mysql-test/suite/perfschema/r/threads_mysql.result
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,6 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%';
Expand Down Expand Up @@ -115,5 +105,4 @@ parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler
4 changes: 2 additions & 2 deletions sql/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4115,7 +4115,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
included= false;
break;
}
if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))
if (!included && !strcmp(ir->name, rli->group_relay_log_name))
break;
if (!next)
{
Expand Down Expand Up @@ -9369,7 +9369,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
file= -1;
}

if (0 == strcmp(linfo->log_file_name, last_log_name))
if (!strcmp(linfo->log_file_name, last_log_name))
break; // No more files to do
if ((file= open_binlog(&log, linfo->log_file_name, &errmsg)) < 0)
{
Expand Down
7 changes: 7 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7328,6 +7328,13 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;

/*
XID_EVENT works like a COMMIT statement. And it also updates the
mysql.gtid_slave_pos table with the GTID of the current transaction.

Therefore, it acts much like a normal SQL statement, so we need to do
mysql_reset_thd_for_next_command() as if starting a new statement.
*/
mysql_reset_thd_for_next_command(thd);
/*
Record any GTID in the same transaction, so slave state is transactionally
Expand Down
30 changes: 13 additions & 17 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ static I_List<THD> thread_cache;
static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
static mysql_cond_t COND_thread_cache, COND_flush_thread_cache;
mysql_cond_t COND_slave_background;
mysql_cond_t COND_slave_init;
static DYNAMIC_ARRAY all_options;

/* Global variables */
Expand Down Expand Up @@ -707,7 +707,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init;

mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
Expand Down Expand Up @@ -882,7 +882,7 @@ PSI_mutex_key key_LOCK_stats,
PSI_mutex_key key_LOCK_gtid_waiting;

PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background;
key_LOCK_slave_init;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;

static PSI_mutex_info all_server_mutexes[]=
Expand Down Expand Up @@ -945,7 +945,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
Expand Down Expand Up @@ -1000,7 +1000,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background;
key_COND_prepare_ordered, key_COND_slave_init;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;

static PSI_cond_info all_server_conds[]=
Expand Down Expand Up @@ -1049,15 +1049,15 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_slave_init, "COND_slave_init", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};

PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
key_thread_slave_init, key_rpl_parallel_thread;

static PSI_thread_info all_server_threads[]=
{
Expand All @@ -1083,7 +1083,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};

Expand Down Expand Up @@ -2177,8 +2177,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_background);
mysql_mutex_destroy(&LOCK_slave_init);
mysql_cond_destroy(&COND_slave_init);
DBUG_VOID_RETURN;
}

Expand Down Expand Up @@ -4393,9 +4393,9 @@ static int init_thread_environment()
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
mysql_cond_init(key_COND_slave_init, &COND_slave_init, NULL);

#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,
Expand Down Expand Up @@ -9477,8 +9477,6 @@ PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room i
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};

#ifdef HAVE_PSI_INTERFACE

Expand Down Expand Up @@ -9602,9 +9600,7 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
& stage_master_gtid_wait,
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request
& stage_gtid_wait_other_connection
};

PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
Expand Down
10 changes: 4 additions & 6 deletions sql/mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;

extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
key_rpl_parallel_thread;

extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
Expand Down Expand Up @@ -451,8 +451,6 @@ extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;

#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
Expand Down Expand Up @@ -521,7 +519,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_background;
LOCK_slave_init;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
Expand All @@ -532,7 +530,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_background;
extern mysql_cond_t COND_slave_init;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;
Expand Down
3 changes: 1 addition & 2 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
rgi->killed_for_retry)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed();
Expand Down Expand Up @@ -325,7 +324,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);

strcpy(log_name, ir->name);
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
Expand Down
4 changes: 3 additions & 1 deletion sql/rpl_rli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
strcpy(ir->name, name);
strmake_buf(ir->name, name);

if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
Expand Down Expand Up @@ -1564,6 +1564,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
trans_retries= 0;
last_event_start_time= 0;
gtid_sub_id= 0;
commit_id= 0;
gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
Expand Down Expand Up @@ -1608,6 +1609,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.seq_no= gev->seq_no;
rgi->commit_id= gev->commit_id;
rgi->gtid_pending= true;
return 0;
}
Expand Down
Loading

0 comments on commit 98fc5b3

Please sign in to comment.