Skip to content

Commit

Permalink
Fixing an illegal mem ref bug caused by large trxs
Browse files Browse the repository at this point in the history
Summary:
Dependency replication uses table map events to find dependencies. The
coordinator assumes that the table map event is always available because in
steady state the worker doesn't destroy events of the transaction till it's
committed. This assumption is not true however, the worker will destroy buffered
transactions after it hits a max threshold. To fix this, we track the number of
events per trx in the coordinator and make it a sync trx when it reaches the
threshold. A sync trx is executed in isolation and hence doesn't do any
dependency calculation.

Reviewed By: anirbanr-fb

Differential Revision: D16568788

fbshipit-source-id: 2520fda
  • Loading branch information
abhinav04sharma authored and facebook-github-bot committed Aug 7, 2019
1 parent 22e4515 commit 0464fa5
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 0 deletions.
23 changes: 23 additions & 0 deletions mysql-test/suite/rpl_mts/r/rpl_dep_exceed_worker_queue_size.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
[connection master]
create table t1(a int) engine = innodb;
include/sync_slave_sql_with_master.inc
include/stop_slave.inc
insert into t1 values(1), (2), (3), (4), (5);
set @@global.debug = '+d,dep_wait_for_last_row_prepare';
set @@global.debug = '+d,slave_worker_queue_size';
set @@global.debug = '+d,after_clear_current_group_events';
include/start_slave.inc
set debug_sync = "now wait_for clear.reached";
set debug_sync = "now wait_for prepare.reached";
set @@global.debug = '-d,dep_wait_for_last_row_prepare';
set @@global.debug = '-d,after_clear_current_group_events';
set @@global.debug = '-d,slave_worker_queue_size';
set debug_sync= 'now signal prepare.done';
set debug_sync= 'now signal clear.done';
drop table t1;
include/sync_slave_sql_with_master.inc
include/rpl_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--gtid_mode=ON --enforce_gtid_consistency --log_slave_updates
--binlog_rows_event_max_rows=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--gtid_mode=ON --enforce_gtid_consistency --log_slave_updates
--slave_parallel_workers=8
44 changes: 44 additions & 0 deletions mysql-test/suite/rpl_mts/t/rpl_dep_exceed_worker_queue_size.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
source include/master-slave.inc;
source include/have_mts_dependency_replication.inc;
source include/have_debug.inc;

connection master;
create table t1(a int) engine = innodb;
source include/sync_slave_sql_with_master.inc;

connection slave;
# Stop the slave
source include/stop_slave.inc;

connection master;
# Execute a multi-insert trx, this will generate 5 row events tied to the same
# table map event (because binlog_rows_event_max_rows=1 in master.opt)
insert into t1 values(1), (2), (3), (4), (5);

connection slave;
# This will wait just before preparing the last row event in coordinator thread
set @@global.debug = '+d,dep_wait_for_last_row_prepare';
# This will initialize the max queue size to 5, which we're guaranteed to exceed
set @@global.debug = '+d,slave_worker_queue_size';
# Wait for worker queue to be cleared
set @@global.debug = '+d,after_clear_current_group_events';

source include/start_slave.inc;

# Wait till the worker thread has cleared the queue
set debug_sync = "now wait_for clear.reached";
# Wait till we're just about to prepare the last row event in the coordinator
set debug_sync = "now wait_for prepare.reached";

# Now continue both threads
set @@global.debug = '-d,dep_wait_for_last_row_prepare';
set @@global.debug = '-d,after_clear_current_group_events';
set @@global.debug = '-d,slave_worker_queue_size';
set debug_sync= 'now signal prepare.done';
set debug_sync= 'now signal clear.done';

connection master;
drop table t1;
source include/sync_slave_sql_with_master.inc;

source include/rpl_end.inc;
35 changes: 35 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,17 @@ void Log_event::schedule_dep(Relay_log_info *rli)
(rli->dbs_accessed_by_group.size() > 1);
}

// when the number of events in a group is greater than max worker queue
// length the worker threads will clear the queue (i.e. destroy all buffered
// events, see @clear_current_group_events()), at that point we should treat
// the group as a sync group because we've lost its lineage and we don't want
// to accidentally reference any previous events to calculate dependencies
if (unlikely(
rli->num_events_in_current_group >= rli->mts_slave_worker_queue_len_max))
{
rli->dep_sync_group= true;
}

if (unlikely(rli->dep_sync_group))
{
wait_for_dep_workers_to_finish(rli, rli->trx_queued);
Expand Down Expand Up @@ -3346,11 +3357,26 @@ void Log_event::schedule_dep(Relay_log_info *rli)
rli->dep_sync_group= false;
}

#ifndef DBUG_OFF
// assert: if we're done with an end event (i.e. done with scheduling the
// current trx) we should have reset the sync variable by now, otherwise the
// next trx will also be synced
if (ev->is_end_event)
DBUG_ASSERT(!rli->dep_sync_group);
#endif

if (likely(!ev->is_end_event))
++rli->num_events_in_current_group;
else
rli->num_events_in_current_group= 0;

DBUG_VOID_RETURN;
}

/**
Encapsulation for things to be done for terminal begin and end events
TODO (abhinavsharma): Refactor this, maybe split between start, end and
partition info events
*/
void
Log_event::handle_terminal_dep_event(Relay_log_info *rli,
Expand All @@ -3362,6 +3388,7 @@ Log_event::handle_terminal_dep_event(Relay_log_info *rli,
DBUG_ASSERT(rli->table_map_events.empty());
DBUG_ASSERT(rli->keys_accessed_by_group.empty());
DBUG_ASSERT(!rli->trx_queued);
DBUG_ASSERT(!rli->num_events_in_current_group);

// update rli state
rli->current_begin_event= ev;
Expand Down Expand Up @@ -12175,6 +12202,14 @@ void Rows_log_event::prepare_dep(Relay_log_info *rli,
{
DBUG_ENTER("Rows_log_event::prepare_dep");

DBUG_EXECUTE_IF("dep_wait_for_last_row_prepare", {
if (get_flags(STMT_END_F))
{
const char act[]= "now signal prepare.reached wait_for prepare.done";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act)));
};});

DBUG_ASSERT(rli->prev_event != NULL);
DBUG_ASSERT(rli->table_map_events.count(get_table_id()));

Expand Down
2 changes: 2 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ class Relay_log_info : public Rpl_info

mysql_cond_t dep_trx_all_done_cond;
ulonglong num_in_flight_trx= 0;
ulonglong num_events_in_current_group= 0;

// Statistics
std::atomic<ulonglong> begin_event_waits{0};
Expand Down Expand Up @@ -1232,6 +1233,7 @@ class Relay_log_info : public Rpl_info
mysql_mutex_unlock(&dep_key_lookup_mutex);

trx_queued= false;
num_events_in_current_group= 0;

if (need_dep_lock)
mysql_mutex_unlock(&dep_lock);
Expand Down
8 changes: 8 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
DBUG_EXECUTE_IF("slave_worker_queue_size",
{
c_rli->mts_slave_worker_queue_len_max=
jobs.entry = jobs.size = 5;
}
);
Expand Down Expand Up @@ -2375,6 +2376,13 @@ void clear_current_group_events(Slave_worker *worker,
worker->trans_retries = ULONG_MAX;
else
worker->trans_retries = 0;

DBUG_EXECUTE_IF("after_clear_current_group_events", {
const char act[]= "now signal clear.reached wait_for clear.done";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(worker->info_thd,
STRING_WITH_LEN(act)));
};);
}

/**
Expand Down

0 comments on commit 0464fa5

Please sign in to comment.