Skip to content

Commit 421d52e

Browse files
sjaakolaJan Lindström
authored andcommitted
MDEV-6860 Parallel async replication hangs (#1400)
Instrumenting parallel slave worker thread with wsrep replication hooks. Added mtr test for testing parallel slave support. The test is based on the test attached in MDEV-6860 jira tracker.
1 parent 899c843 commit 421d52e

File tree

4 files changed

+94
-0
lines changed

4 files changed

+94
-0
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
connection node_2;
2+
connection node_1;
3+
connection node_2;
4+
START SLAVE;
5+
connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3;
6+
connection node_3;
7+
CREATE TABLE t1 (f1 int, f2 int) ENGINE=InnoDB;
8+
connection node_2;
9+
connection node_1;
10+
connection node_3;
11+
DROP TABLE t1;
12+
connection node_2;
13+
STOP SLAVE;
14+
RESET SLAVE ALL;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
!include ../galera_2nodes_as_slave.cnf
2+
3+
[mysqld.2]
4+
slave-parallel-threads=2
5+
slave-parallel-mode=optimistic
6+
[mysqld.1]
7+
wsrep-slave-threads=10
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
--source include/have_innodb.inc
2+
3+
--source include/galera_cluster.inc
4+
5+
--connection node_2
6+
--disable_query_log
7+
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_3, MASTER_USE_GTID=slave_pos;
8+
--enable_query_log
9+
START SLAVE;
10+
11+
--connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3
12+
--connection node_3
13+
--let $inserts=1000
14+
CREATE TABLE t1 (f1 int, f2 int) ENGINE=InnoDB;
15+
16+
--let $count=0
17+
--disable_query_log
18+
while($count < $inserts)
19+
{
20+
--eval insert into t1 values ($count,1)
21+
--inc $count
22+
}
23+
--enable_query_log
24+
25+
--connection node_2
26+
27+
--let $wait_condition = SELECT COUNT(*) = $inserts FROM t1
28+
--source include/wait_condition.inc
29+
30+
--connection node_1
31+
--let $wait_condition = SELECT COUNT(*) = $inserts FROM t1
32+
--source include/wait_condition.inc
33+
34+
--connection node_3
35+
DROP TABLE t1;
36+
37+
--connection node_2
38+
--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'test'
39+
--source include/wait_condition.inc
40+
41+
STOP SLAVE;
42+
RESET SLAVE ALL;

sql/rpl_parallel.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
#include "rpl_mi.h"
55
#include "sql_parse.h"
66
#include "debug_sync.h"
7+
#include "wsrep_mysqld.h"
8+
#ifdef WITH_WSREP
9+
#include "wsrep_trans_observer.h"
10+
#endif
711

812
/*
913
Code for optional parallel execution of replicated events on the slave.
@@ -35,6 +39,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
3539

3640
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
3741
ev= qev->ev;
42+
#ifdef WITH_WSREP
43+
if (wsrep_before_statement(thd))
44+
{
45+
WSREP_WARN("Parallel slave failed at wsrep_before_statement() hook");
46+
return(1);
47+
}
48+
#endif /* WITH_WSREP */
3849

3950
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
4051
ev->thd= thd;
@@ -50,6 +61,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
5061
err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
5162

5263
thread_safe_increment64(&rli->executed_entries);
64+
#ifdef WITH_WSREP
65+
if (wsrep_after_statement(thd))
66+
{
67+
WSREP_WARN("Parallel slave failed at wsrep_after_statement() hook");
68+
err= 1;
69+
}
70+
#endif /* WITH_WSREP */
5371
/* ToDo: error handling. */
5472
return err;
5573
}
@@ -1066,6 +1084,14 @@ handle_rpl_parallel_thread(void *arg)
10661084
mysql_cond_signal(&rpt->COND_rpl_thread);
10671085

10681086
thd->set_command(COM_SLAVE_WORKER);
1087+
#ifdef WITH_WSREP
1088+
wsrep_open(thd);
1089+
if (wsrep_before_command(thd))
1090+
{
1091+
WSREP_WARN("Parallel slave failed at wsrep_before_command() hook");
1092+
rpt->stop = true;
1093+
}
1094+
#endif /* WITH_WSREP */
10691095
while (!rpt->stop)
10701096
{
10711097
uint wait_count= 0;
@@ -1436,6 +1462,11 @@ handle_rpl_parallel_thread(void *arg)
14361462
rpt->pool->release_thread(rpt);
14371463
}
14381464
}
1465+
#ifdef WITH_WSREP
1466+
wsrep_after_command_before_result(thd);
1467+
wsrep_after_command_after_result(thd);
1468+
wsrep_close(thd);
1469+
#endif /* WITH_WSREP */
14391470

14401471
rpt->thd= NULL;
14411472
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);

0 commit comments

Comments
 (0)