Skip to content

Commit

Permalink
MDEV-6321: close_temporary_tables() in format description event not s…
Browse files Browse the repository at this point in the history
…erialised correctly

Follow-up patch, fixing a possible deadlock issue.

If the master crashes in the middle of an event group, there can be an active
transaction in a worker thread when we encounter the following master restart
format description event. In this case, we need to notify that worker thread
to abort and roll back the partial event group. Otherwise a deadlock occurs:
the worker thread waits for the commit that never arrives, and the SQL driver
thread waits for the worker thread to complete its event group, which it never
does.
  • Loading branch information
knielsen committed Aug 19, 2014
1 parent 4cb1e0e commit 453c29c
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 4 deletions.
37 changes: 37 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel_temptable.result
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ COUNT(*)
SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
*** Test that if master logged partial event group before crash, we finish that group correctly before executing format description event ***
include/stop_slave.inc
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table, and writes to any of them");
SET gtid_domain_id= 1;
DELETE FROM t1;
ALTER TABLE t1 ENGINE=InnoDB;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY);
INSERT INTO t2 VALUES (1);
INSERT INTO t2 VALUES (2);
SET gtid_domain_id= 2;
CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY);
INSERT INTO t3 VALUES (10);
INSERT INTO t3 VALUES (20);
INSERT INTO t1 SELECT a, 'server_1' FROM t2;
INSERT INTO t1 SELECT a, 'default' FROM t3;
INSERT INTO t1 SELECT a+2, '+server_1' FROM t2;
FLUSH TABLES;
SET SESSION debug_dbug="+d,crash_before_writing_xid";
INSERT INTO t1 SELECT a+4, '++server_1' FROM t2;
Got one of the listed errors
INSERT INTO t1 VALUES (0, 1);
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
0 1
1 server_1
2 server_1
3 +server_1
4 +server_1
10 default
20 default
SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
Expand Down
78 changes: 78 additions & 0 deletions mysql-test/suite/rpl/t/rpl_parallel_temptable.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--source include/have_innodb.inc
--source include/have_binlog_format_statement.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
Expand Down Expand Up @@ -132,6 +133,83 @@ SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256;
SHOW STATUS LIKE 'Slave_open_temp_tables';


--echo *** Test that if master logged partial event group before crash, we finish that group correctly before executing format description event ***

--source include/stop_slave.inc

--connection server_1
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table, and writes to any of them");
SET gtid_domain_id= 1;
DELETE FROM t1;
ALTER TABLE t1 ENGINE=InnoDB;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY);
INSERT INTO t2 VALUES (1);
INSERT INTO t2 VALUES (2);

--connection default
SET gtid_domain_id= 2;
CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY);
INSERT INTO t3 VALUES (10);
INSERT INTO t3 VALUES (20);

--connection server_1
INSERT INTO t1 SELECT a, 'server_1' FROM t2;

--connection default
INSERT INTO t1 SELECT a, 'default' FROM t3;

--connection server_1
INSERT INTO t1 SELECT a+2, '+server_1' FROM t2;

# Crash the master server in the middle of writing an event group.
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF

FLUSH TABLES;
SET SESSION debug_dbug="+d,crash_before_writing_xid";
--error 2006,2013
INSERT INTO t1 SELECT a+4, '++server_1' FROM t2;

--source include/wait_until_disconnected.inc
--connection default
--source include/wait_until_disconnected.inc

--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart
EOF

--connection default
--enable_reconnect
--source include/wait_until_connected_again.inc

--connection server_1
--enable_reconnect
--source include/wait_until_connected_again.inc

INSERT INTO t1 VALUES (0, 1);
#--save_master_pos
--source include/save_master_gtid.inc

--connection server_2
# Start the slave replicating the events.
# The main thing to test here is that the slave will know that it
# needs to abort the partially received event group, so that the
# execution of format_description event will not wait infinitely
# for a commit of the incomplete group that never happens.

--source include/start_slave.inc
#--sync_with_master
--source include/sync_with_master_gtid.inc

SELECT * FROM t1 ORDER BY a;
SHOW STATUS LIKE 'Slave_open_temp_tables';

--connection server_1
# This FLUSH can be removed once MDEV-6608 is fixed.
FLUSH LOGS;


--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
Expand Down
81 changes: 81 additions & 0 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,30 @@ handle_rpl_parallel_thread(void *arg)
events= next;
continue;
}
else if (events->typ ==
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
{
if (in_event_group)
{
/*
Master restarted (crashed) in the middle of an event group.
So we need to roll back and discard that event group.
*/
group_rgi->cleanup_context(thd, 1);
in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id,
events->entry_for_queued, group_rgi);

group_rgi->next= rgis_to_free;
rgis_to_free= group_rgi;
thd->rgi_slave= group_rgi= NULL;
}

events->next= qevs_to_free;
qevs_to_free= events;
events= next;
continue;
}
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);

thd->rgi_slave= group_rgi= rgi;
Expand Down Expand Up @@ -1617,6 +1641,54 @@ rpl_parallel::workers_idle()
}


int
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev)
{
uint32 idx;
rpl_parallel_thread *thr;
rpl_parallel_thread::queued_event *qev;
Relay_log_info *rli= rgi->rli;

/*
We only need to queue the server restart if we still have a thread working
on a (potentially partial) event group.
If the last thread we queued for has finished, then it cannot have any
partial event group that needs aborting.
Thus there is no need for the full complexity of choose_thread(). We only
need to check if we have a current worker thread, and queue for it if so.
*/
idx= rpl_thread_idx;
thr= rpl_threads[idx];
if (!thr)
return 0;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
if (thr->current_owner != &rpl_threads[idx])
{
/* No active worker thread, so no need to queue the master restart. */
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}

if (!(qev= thr->get_qev(fdev, 0, rli)))
{
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 1;
}

qev->rgi= rgi;
qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART;
qev->entry_for_queued= this;
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
thr->enqueue(qev);
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}


void
rpl_parallel::wait_for_workers_idle(THD *thd)
{
Expand Down Expand Up @@ -1727,7 +1799,16 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Thus we need to wait for all prior events to execute to completion,
in case they need access to any of the temporary tables.
We also need to notify the worker thread running the prior incomplete
event group (if any), as such event group signifies an incompletely
written group cut short by a master crash, and must be rolled back.
*/
if (current->queue_master_restart(serial_rgi, fdev))
{
delete ev;
return 1;
}
wait_for_workers_idle(rli->sql_driver_thd);
}
}
Expand Down
13 changes: 9 additions & 4 deletions sql/rpl_parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@ struct rpl_parallel_thread {
queued_event can hold either an event to be executed, or just a binlog
position to be updated without any associated event.
*/
enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ;
enum queued_event_t {
QUEUED_EVENT,
QUEUED_POS_UPDATE,
QUEUED_MASTER_RESTART
} typ;
union {
Log_event *ev; /* QUEUED_EVENT */
rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */
rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and
QUEUED_MASTER_RESTART */
};
rpl_group_info *rgi;
inuse_relaylog *ir;
Expand Down Expand Up @@ -224,8 +229,8 @@ struct rpl_parallel_entry {

rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, bool reuse);
group_commit_orderer *get_gco();
void free_gco(group_commit_orderer *gco);
int queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev);
};
struct rpl_parallel {
HASH domain_hash;
Expand Down

0 comments on commit 453c29c

Please sign in to comment.