Skip to content

Commit

Permalink
MDEV-20645: Replication consistency is broken as workers miss the err…
Browse files Browse the repository at this point in the history
…or notification from an earlier failed group.

Analysis:
========
In general if there are three groups.
1 - Inserts 32 which fails due to local entry '32' on slave.
2 - Inserts 33
3 - Inserts 34

Each group considers itself as a waiter and it waits for prior group 'waitee'.
This is done in 'register_wait_for_prior_event_group_commit'. If there is no
other parallel group being scheduled then no waitee will be there.

Let us assume 3 groups are being scheduled in parallel.

3-> waits for 2-> waits for->1

'1' upon completion it checks is there any registered subsequent waiter. If
so it wakes up the subsequent waiter with its execution status. This execution
status is stored in wakeup_error.

If '1' failed then it sends corresponding wakeup_error to 2. Then '2' aborts
and it propagates error to '3'.  So all further commits are aborted.  This
mechanism works only when all transactions reach a stage where they are
waiting for their prior commit to complete.

In case of optimistic following scenario occurs.

1,2,3 are scheduled in parallel.

3 - Reaches group_commit_code waits for 2 to complete.
1 - errors out sets stop_on_error_sub_id=1.

When a group execution results in error its corresponding sub_id is set to
'stop_on_error_sub_id'. Any new groups queued for execution will check if
their sub_id is > stop_on_error_sub_id.  If it is true their execution will be
skipped as prior group execution failed.  'skip_event_group=1' will be set.
Since the execution of SQL thread is about to stop we just skip execution of
all the following event groups.  We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that everything
is stopped and cleaned up correctly.

Upon error '1' transaction checks for registered waiters. Since no one is
there it simply goes away.

2 - Starts the execution. It checks do I have a waitee.

Since wait_commit_sub_id == entry->last_committed_sub_id no waitee is set.

Secondly: 'entry->stop_on_error_sub_id' is set by '1'st execution.  Now
'handle_parallel_thread' code checks if the current group 'sub_id' is greater
than the 'sub_id' set within 'stop_on_error_sub_id'.

Since the above is true 'skip_event_group=true' is set.  Simply call
'wait_for_prior_commit' to wakeup all waiters.  Group '2' didn't had any
waitee and its execution is skipped.  Hence its wakeup_error=0.It sends a
positive wakeup signal to '3'. Which commits. This results in a missed
transaction. i.e 33 is missed and 34 is committed.

Fix:
===
When a worker learns that an earlier transaction execution has failed, and it
should not proceed for further execution, it should mark its own execution
status as failed so that it alerts its followers to abort as well.
  • Loading branch information
sujatha-s committed Sep 30, 2019
1 parent 677cc64 commit 9b80f93
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
include/master-slave.inc
[connection master]
connection server_2;
include/stop_slave.inc
connection server_2;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
include/start_slave.inc
connection server_2;
connection server_1;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
include/save_master_gtid.inc
connection server_1;
connection server_2;
include/sync_with_master_gtid.inc
connection server_2;
connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
BEGIN;
INSERT INTO t1 VALUES (32);
connection server_1;
INSERT INTO t1 VALUES (32);
connection server_2;
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
connection server_1;
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);
connection server_2;
SET debug_sync='now WAIT_FOR reached_pause';
connection server_1;
INSERT INTO t1 VALUES (34);
connection server_2;
connection con_temp2;
COMMIT;
connection server_2;
include/stop_slave.inc
connection server_2;
include/assert.inc [table t1 should have zero rows where a>32]
connection server_2;
SELECT * FROM t1 WHERE a>32;
a
DELETE FROM t1 WHERE a=32;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
include/start_slave.inc
connection server_2;
connection server_1;
DROP TABLE t1;
include/rpl_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--source suite/rpl/include/rpl_parallel_ignored_errors.inc
112 changes: 112 additions & 0 deletions mysql-test/suite/rpl/include/rpl_parallel_ignored_errors.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# ==== Purpose ====
#
# Test verifies that, in parallel replication, transaction failure notification
# is propagated to all the workers. Workers should abort the execution of
# transaction event groups, whose event positions are higher than the failing
# transaction group.
#
# ==== Implementation ====
#
# Steps:
# 0 - Create a table t1 on master which has a primary key. Enable parallel
# replication on slave with slave_parallel_mode='optimistic' and
# slave_parallel_threads=3.
# 1 - On slave start a transaction and execute a local INSERT statement
# which will insert value 32. This is done to block the INSERT coming
# from master.
# 2 - On master execute an INSERT statement with value 32, so that it is
# blocked on slave.
# 3 - On slave enable a debug sync point such that it holds the worker thread
# execution as soon as work is scheduled to it.
# 4 - INSERT value 33 on master. It will be held on slave by other worker
# thread due to debug simulation.
# 5 - INSERT value 34 on master.
# 6 - On slave, enusre that INSERT 34 has reached a state where it waits for
# its prior transactions to commit.
# 7 - Commit the local INSERT 32 on slave server so that first worker will
# error out.
# 8 - Now send a continue signal to second worker processing 33. It should
# wakeup and propagate the error to INSERT 34.
# 9 - Upon slave stop due to error, check that no rows are found after the
# failed INSERT 32.
#
# ==== References ====
#
# MDEV-20645: Replication consistency is broken as workers miss the error
# notification from an earlier failed group.
#

--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--source include/have_binlog_format_statement.inc
--source include/master-slave.inc

--enable_connect_log
--connection server_2
--source include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
--source include/start_slave.inc

--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
--source include/save_master_gtid.inc

--connection server_2
--source include/sync_with_master_gtid.inc

--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
BEGIN;
INSERT INTO t1 VALUES (32);

--connection server_1
INSERT INTO t1 VALUES (32);

--connection server_2
--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE info like "INSERT INTO t1 VALUES (32)"
--source include/wait_condition.inc
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";

--connection server_1
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);

--connection server_2
SET debug_sync='now WAIT_FOR reached_pause';

--connection server_1
INSERT INTO t1 VALUES (34);

--connection server_2
--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE state like "Waiting for prior transaction to commit"
--source include/wait_condition.inc
--connection con_temp2
COMMIT;

# Clean up.
--connection server_2
--source include/stop_slave.inc
--let $assert_cond= COUNT(*) = 0 FROM t1 WHERE a>32
--let $assert_text= table t1 should have zero rows where a>32
--source include/assert.inc
SELECT * FROM t1 WHERE a>32;
DELETE FROM t1 WHERE a=32;

SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
--source include/start_slave.inc

--connection server_1
DROP TABLE t1;
--disable_connect_log
--source include/rpl_end.inc
57 changes: 57 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel_ignored_errors.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
include/master-slave.inc
[connection master]
connection server_2;
include/stop_slave.inc
connection server_2;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL slave_parallel_mode='optimistic';
SET GLOBAL slave_parallel_threads= 3;
CHANGE MASTER TO master_use_gtid=slave_pos;
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
include/start_slave.inc
connection server_2;
connection server_1;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
include/save_master_gtid.inc
connection server_1;
connection server_2;
include/sync_with_master_gtid.inc
connection server_2;
connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
BEGIN;
INSERT INTO t1 VALUES (32);
connection server_1;
INSERT INTO t1 VALUES (32);
connection server_2;
SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
connection server_1;
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (33);
connection server_2;
SET debug_sync='now WAIT_FOR reached_pause';
connection server_1;
INSERT INTO t1 VALUES (34);
connection server_2;
connection con_temp2;
COMMIT;
connection server_2;
include/stop_slave.inc
connection server_2;
include/assert.inc [table t1 should have zero rows where a>32]
connection server_2;
SELECT * FROM t1 WHERE a>32;
a
DELETE FROM t1 WHERE a=32;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL debug_dbug=@old_debug;
SET DEBUG_SYNC= 'RESET';
include/start_slave.inc
connection server_2;
connection server_1;
DROP TABLE t1;
include/rpl_end.inc
1 change: 1 addition & 0 deletions mysql-test/suite/rpl/t/rpl_parallel_ignored_errors.test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--source include/rpl_parallel_ignored_errors.inc
16 changes: 16 additions & 0 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
DBUG_EXECUTE_IF("hold_worker_on_schedule", {
if (entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX)
{
debug_sync_set_action(thd, STRING_WITH_LEN("now SIGNAL continue_worker"));
}
});

if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
wait_for_pending_deadlock_kill(thd, rgi);
Expand Down Expand Up @@ -1096,6 +1102,13 @@ handle_rpl_parallel_thread(void *arg)
bool did_enter_cond= false;
PSI_stage_info old_stage;

DBUG_EXECUTE_IF("hold_worker_on_schedule", {
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) {
debug_sync_set_action(thd,
STRING_WITH_LEN("now SIGNAL reached_pause WAIT_FOR continue_worker"));
}
});
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) {
Expand Down Expand Up @@ -1137,7 +1150,10 @@ handle_rpl_parallel_thread(void *arg)
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);

if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
{
skip_event_group= true;
rgi->worker_error= 1;
}
if (likely(!skip_event_group))
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);

Expand Down

0 comments on commit 9b80f93

Please sign in to comment.