Skip to content

Commit

Permalink
MDEV-6551: Some replication errors are ignored if slave_parallel_thre…
Browse files Browse the repository at this point in the history
…ads > 0

The problem occured when using parallel replication, and an error occured that
caused the SQL thread to stop when the IO thread had already reached a
following binlog file from the master (or otherwise performed a relay log
rotation).

In this case, the Rotate Event at the end of the relay log file could still be
executed, even though an earlier event in that relay log file had gotten an
error. This would cause the position to be incorrectly updated, so that upon
restart of the SQL thread, the event that had failed would be silently skipped
and ignored, causing replication corruption.

Fixed by checking before executing Rotate Event, whether an earlier event
has failed. If so, the Rotate Event is not executed, just dequeued, same as
for other normal events following a failing event.
  • Loading branch information
knielsen committed Aug 15, 2014
1 parent 65ac881 commit cfa1ce8
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 8 deletions.
41 changes: 41 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel.result
Expand Up @@ -882,6 +882,47 @@ a
SELECT * FROM t6 ORDER BY a;
a
4
*** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***
INSERT INTO t2 VALUES (31);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads= 0;
include/start_slave.inc
SET sql_log_bin= 0;
INSERT INTO t2 VALUES (32);
SET sql_log_bin= 1;
INSERT INTO t2 VALUES (32);
FLUSH LOGS;
INSERT INTO t2 VALUES (33);
INSERT INTO t2 VALUES (34);
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
33
34
include/save_master_gtid.inc
include/wait_for_slave_sql_error.inc [errno=1062]
include/stop_slave_io.inc
SET GLOBAL slave_parallel_threads=10;
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1062]
START SLAVE SQL_THREAD;
include/wait_for_slave_sql_error.inc [errno=1062]
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
SET sql_slave_skip_counter= 1;
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
33
34
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
Expand Down
58 changes: 58 additions & 0 deletions mysql-test/suite/rpl/t/rpl_parallel.test
Expand Up @@ -1408,6 +1408,64 @@ SELECT * FROM t6 ORDER BY a;
SELECT * FROM t6 ORDER BY a;


--echo *** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***

--connection server_1
INSERT INTO t2 VALUES (31);
--source include/save_master_gtid.inc

--connection server_2
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads= 0;
--source include/start_slave.inc

# Force a duplicate key error on the slave.
SET sql_log_bin= 0;
INSERT INTO t2 VALUES (32);
SET sql_log_bin= 1;

--connection server_1
INSERT INTO t2 VALUES (32);
# Rotate the binlog; the bug is triggered when the master binlog file changes
# after the event group that causes the duplicate key error.
FLUSH LOGS;
INSERT INTO t2 VALUES (33);
INSERT INTO t2 VALUES (34);
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
--source include/save_master_gtid.inc

--connection server_2
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc

--connection server_2
--source include/stop_slave_io.inc
SET GLOBAL slave_parallel_threads=10;
START SLAVE;

--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc

# Note: IO thread is still running at this point.
# The bug seems to have been that restarting the SQL thread after an error with
# the IO thread still running, somehow picks up a later relay log position and
# thus ends up skipping the failing event, rather than re-executing.

START SLAVE SQL_THREAD;
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc

SELECT * FROM t2 WHERE a >= 30 ORDER BY a;

# Skip the duplicate error, so we can proceed.
SET sql_slave_skip_counter= 1;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc

SELECT * FROM t2 WHERE a >= 30 ORDER BY a;


--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
Expand Down
30 changes: 23 additions & 7 deletions sql/rpl_parallel.cc
Expand Up @@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
Log_event *ev;

DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
ev= qev->ev;

thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
ev->thd= thd;

/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);

thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
Expand All @@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
rpl_parallel_entry *e;

/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
Expand All @@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;

/* Do not update position if an earlier event group caused an error abort. */
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
e= qev->entry_for_queued;
if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
return;

rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
Expand Down Expand Up @@ -566,14 +579,15 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending;

total_event_size+= events->event_size;
if (!events->ev)
if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
continue;
}
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);

thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
Expand Down Expand Up @@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
}
qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
Expand Down Expand Up @@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
/*
Queue an empty event, so that the position will be updated in a
Queue a position update, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
Expand All @@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
qev->ev= NULL;
qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
qev->entry_for_queued= e;
}
else
{
Expand Down
10 changes: 9 additions & 1 deletion sql/rpl_parallel.h
Expand Up @@ -72,7 +72,15 @@ struct rpl_parallel_thread {
rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
/*
queued_event can hold either an event to be executed, or just a binlog
position to be updated without any associated event.
*/
enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ;
union {
Log_event *ev; /* QUEUED_EVENT */
rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */
};
rpl_group_info *rgi;
inuse_relaylog *ir;
ulonglong future_event_relay_log_pos;
Expand Down

0 comments on commit cfa1ce8

Please sign in to comment.