Skip to content

Commit

Permalink
Fix accumulation of old rows in mysql.gtid_slave_pos
Browse files Browse the repository at this point in the history
This would happen especially in optimistic parallel replication, where there
is a good chance that a transaction will be rolled back (due to conflicts)
after it has executed record_gtid(). If the transaction did any deletions of
old rows as part of record_gtid(), those deletions will be undone as well.
And the code did not properly ensure that the deletions would be re-tried.

This patch makes record_gtid() remember the list of deletions done as part
of a transaction. Then in rpl_slave_state::update() when the changes have
been committed, we discard the list. However, in case of error and rollback,
in cleanup_context() we will instead put the list back into
rpl_global_gtid_slave_state so that the deletions will be re-tried later.

Probably fixes part of the cause of MDEV-12147 as well.

Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
  • Loading branch information
knielsen committed Oct 7, 2018
1 parent 1fc5a6f commit 2f4a0c5
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 24 deletions.
6 changes: 6 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel_optimistic.result
Expand Up @@ -571,4 +571,10 @@ SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1, t2, t3;
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
Check that no more than the expected last two GTIDs are in mysql.gtid_slave_pos
select count(*) from mysql.gtid_slave_pos order by domain_id, sub_id;
count(*)
2
include/rpl_end.inc
17 changes: 17 additions & 0 deletions mysql-test/suite/rpl/t/rpl_parallel_optimistic.test
Expand Up @@ -549,5 +549,22 @@ SET GLOBAL slave_parallel_threads=@old_parallel_threads;

--connection server_1
DROP TABLE t1, t2, t3;
--source include/save_master_gtid.inc

--connection server_2
--source include/sync_with_master_gtid.inc
# Check for left-over rows in table mysql.gtid_slave_pos (MDEV-12147).
#
# There was a bug when a transaction got a conflict and was rolled back. It
# might have also handled deletion of some old rows, and these deletions would
# then also be rolled back. And since the deletes were never re-tried, old no
# longer needed rows would accumulate in the table without limit.
#
# The earlier part of this test file have plenty of transactions being rolled
# back. But the last DROP TABLE statement runs on its own and should never
# conflict, thus at this point the mysql.gtid_slave_pos table should be clean.
--echo Check that no more than the expected last two GTIDs are in mysql.gtid_slave_pos
select count(*) from mysql.gtid_slave_pos order by domain_id, sub_id;

--connection server_1
--source include/rpl_end.inc
6 changes: 3 additions & 3 deletions sql/log_event.cc
Expand Up @@ -4429,7 +4429,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,

gtid= rgi->current_gtid;
if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
true, false))
rgi, false))
{
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
Expand Down Expand Up @@ -7132,7 +7132,7 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
sub_id_list[i],
false, false)))
NULL, false)))
return ret;
rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
NULL);
Expand Down Expand Up @@ -7639,7 +7639,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
rgi->gtid_pending= false;

gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, rgi,
false);
if (err)
{
Expand Down
64 changes: 44 additions & 20 deletions sql/rpl_gtid.cc
Expand Up @@ -77,7 +77,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, rgi);
}
Expand Down Expand Up @@ -328,6 +328,8 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
}
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;

rgi->pending_gtid_deletes_clear();
}

if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
Expand Down Expand Up @@ -377,15 +379,24 @@ int
rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
{
element *e;
int err= 0;

mysql_mutex_lock(&LOCK_slave_state);
if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
return 1;
{
err= 1;
goto end;
}
while (list)
{
list_element *next= list->next;
e->add(list);
list= next;
}
return 0;

end:
mysql_mutex_unlock(&LOCK_slave_state);
return err;
}


Expand Down Expand Up @@ -468,26 +479,26 @@ gtid_check_rpl_slave_state_table(TABLE *table)
/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
transaction if !in_transaction (eg. MyISAM or DDL).
gtid The global transaction id for this event group.
sub_id Value allocated within the sub_id when the event group was
read (sub_id must be consistent with commit order in master binlog).
rgi rpl_group_info context, if we are recording the gtid transactionally
as part of replicating a transactional event. NULL if called from
outside of a replicated transaction.
Note that caller must later ensure that the new gtid and sub_id is inserted
into the appropriate HASH element with rpl_slave_state.add(), so that it can
be deleted later. But this must only be done after COMMIT if in transaction.
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement)
rpl_group_info *rgi, bool in_statement)
{
TABLE_LIST tlist;
int err= 0;
bool table_opened= false;
TABLE *table;
list_element *elist= 0, *next;
list_element *elist= 0, *cur, *next;
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
Expand Down Expand Up @@ -558,7 +569,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
thd->wsrep_ignore_table= true;
#endif

if (!in_transaction)
if (!rgi)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
Expand Down Expand Up @@ -601,9 +612,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if ((elist= elem->grab_list()) != NULL)
{
/* Delete any old stuff, but keep around the most recent one. */
list_element *cur= elist;
uint64 best_sub_id= cur->sub_id;
uint64 best_sub_id= elist->sub_id;
list_element **best_ptr_ptr= &elist;
cur= elist;
while ((next= cur->next))
{
if (next->sub_id > best_sub_id)
Expand Down Expand Up @@ -636,7 +647,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
while (elist)
cur = elist;
while (cur)
{
uchar key_buffer[4+8];

Expand All @@ -646,9 +658,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
/* `break' does not work inside DBUG_EXECUTE_IF */
goto dbug_break; });

next= elist->next;
next= cur->next;

table->field[1]->store(elist->sub_id, true);
table->field[1]->store(cur->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */
key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
if (table->file->ha_index_read_map(table->record[1], key_buffer,
Expand All @@ -662,8 +674,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
not want to endlessly error on the same element in case of table
corruption or such.
*/
my_free(elist);
elist= next;
cur= next;
if (err)
break;
}
Expand All @@ -686,18 +697,31 @@ IF_DBUG(dbug_break:, )
*/
if (elist)
{
mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
mysql_mutex_unlock(&LOCK_slave_state);
elist = 0;
}

ha_rollback_trans(thd, FALSE);
}
close_thread_tables(thd);
if (in_transaction)
if (rgi)
{
thd->mdl_context.release_statement_locks();
/*
Save the list of old gtid entries we deleted. If this transaction
fails later for some reason and is rolled back, the deletion of those
entries will be rolled back as well, and we will need to put them back
on the to-be-deleted list so we can re-do the deletion. Otherwise
redundant rows in mysql.gtid_slave_pos may accumulate if transactions
are rolled back and retried after record_gtid().
*/
rgi->pending_gtid_deletes_save(gtid->domain_id, elist);
}
else
{
thd->mdl_context.release_transactional_locks();
rpl_group_info::pending_gtid_deletes_free(elist);
}
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
Expand Down Expand Up @@ -1080,7 +1104,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,

if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
record_gtid(thd, &gtid, sub_id, false, in_statement) ||
record_gtid(thd, &gtid, sub_id, NULL, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
return 1;
if (state_from_master == end)
Expand Down
2 changes: 1 addition & 1 deletion sql/rpl_gtid.h
Expand Up @@ -182,7 +182,7 @@ struct rpl_slave_state
uint64 seq_no, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
rpl_group_info *rgi, bool in_statement);
uint64 next_sub_id(uint32 domain_id);
int iterate(int (*cb)(rpl_gtid *, void *), void *data,
rpl_gtid *extra_gtids, uint32 num_extra,
Expand Down
79 changes: 79 additions & 0 deletions sql/rpl_rli.cc
Expand Up @@ -1680,6 +1680,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
long_find_row_note_printed= false;
did_mark_start_commit= false;
gtid_ev_flags2= 0;
pending_gtid_delete_list= NULL;
last_master_timestamp = 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
Expand Down Expand Up @@ -1804,6 +1805,12 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
erroneously update the GTID position.
*/
gtid_pending= false;

/*
Rollback will have undone any deletions of old rows we might have made
in mysql.gtid_slave_pos. Put those rows back on the list to be deleted.
*/
pending_gtid_deletes_put_back();
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);
Expand Down Expand Up @@ -2027,6 +2034,78 @@ rpl_group_info::unmark_start_commit()
}


/*
When record_gtid() has deleted any old rows from the table
mysql.gtid_slave_pos as part of a replicated transaction, save the list of
rows deleted here.
If later the transaction fails (eg. optimistic parallel replication), the
deletes will be undone when the transaction is rolled back. Then we can
put back the list of rows into the rpl_global_gtid_slave_state, so that
we can re-do the deletes and avoid accumulating old rows in the table.
*/
void
rpl_group_info::pending_gtid_deletes_save(uint32 domain_id,
rpl_slave_state::list_element *list)
{
/*
We should never get to a state where we try to save a new pending list of
gtid deletes while we still have an old one. But make sure we handle it
anyway just in case, so we avoid leaving stray entries in the
mysql.gtid_slave_pos table.
*/
DBUG_ASSERT(!pending_gtid_delete_list);
if (unlikely(pending_gtid_delete_list))
pending_gtid_deletes_put_back();

pending_gtid_delete_list= list;
pending_gtid_delete_list_domain= domain_id;
}


/*
Take the list recorded by pending_gtid_deletes_save() and put it back into
rpl_global_gtid_slave_state. This is needed if deletion of the rows was
rolled back due to transaction failure.
*/
void
rpl_group_info::pending_gtid_deletes_put_back()
{
if (pending_gtid_delete_list)
{
rpl_global_gtid_slave_state->put_back_list(pending_gtid_delete_list_domain,
pending_gtid_delete_list);
pending_gtid_delete_list= NULL;
}
}


/*
Free the list recorded by pending_gtid_deletes_save(). Done when the deletes
in the list have been permanently committed.
*/
void
rpl_group_info::pending_gtid_deletes_clear()
{
pending_gtid_deletes_free(pending_gtid_delete_list);
pending_gtid_delete_list= NULL;
}


void
rpl_group_info::pending_gtid_deletes_free(rpl_slave_state::list_element *list)
{
rpl_slave_state::list_element *next;

while (list)
{
next= list->next;
my_free(list);
list= next;
}
}


rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{
Expand Down
11 changes: 11 additions & 0 deletions sql/rpl_rli.h
Expand Up @@ -676,6 +676,11 @@ struct rpl_group_info
/* Needs room for "Gtid D-S-N\x00". */
char gtid_info_buf[5+10+1+10+1+20+1];

/* List of not yet committed deletions in mysql.gtid_slave_pos. */
rpl_slave_state::list_element *pending_gtid_delete_list;
/* Domain associated with pending_gtid_delete_list. */
uint32 pending_gtid_delete_list_domain;

/*
The timestamp, from the master, of the commit event.
Used to do delayed update of rli->last_master_timestamp, for getting
Expand Down Expand Up @@ -817,6 +822,12 @@ struct rpl_group_info
char *gtid_info();
void unmark_start_commit();

static void pending_gtid_deletes_free(rpl_slave_state::list_element *list);
void pending_gtid_deletes_save(uint32 domain_id,
rpl_slave_state::list_element *list);
void pending_gtid_deletes_put_back();
void pending_gtid_deletes_clear();

time_t get_row_stmt_start_timestamp()
{
return row_stmt_start_timestamp;
Expand Down

0 comments on commit 2f4a0c5

Please sign in to comment.