Skip to content

Commit

Permalink
MDEV-7936: Assertion `!table || table->in_use == _current_thd()' fail…
Browse files Browse the repository at this point in the history
…ed on parallel replication in optimistic mode

Make sure that in parallel replication, we execute wait_for_prior_commit()
before setting table->in_use for a temporary table. Otherwise we can end up
with two parallel replication worker threads competing with each other for
use of a temporary table.

Re-factor the use of find_temporary_table() to be able to handle errors
in the caller (as wait_for_prior_commit() can return error in case of
deadlock kill).
  • Loading branch information
knielsen committed Apr 13, 2015
1 parent c47fe0e commit 60d094a
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 41 deletions.
20 changes: 20 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel_temptable.result
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/save_master_pos.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
Expand Down
23 changes: 23 additions & 0 deletions mysql-test/suite/rpl/t/rpl_parallel_temptable.test
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS;


--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***

--connection server_1
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_pos.inc

--connection server_2
--source include/sync_with_master_gtid.inc

SELECT * FROM t1 WHERE a >= 30 ORDER BY a;


# Clean up.

--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
Expand Down
106 changes: 66 additions & 40 deletions sql/sql_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
}


static bool
use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
{
*out_table= table;
if (!table)
return false;
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return true;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
return false;
}

bool
find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
out_table);
}


bool
find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
}


/**
Find a temporary table specified by a key in the THD::temporary_tables list.
Expand All @@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
Expand Down Expand Up @@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}

if (!(table= find_temporary_table(thd, tl)))
if (find_and_use_temporary_table(thd, tl, &table))
DBUG_RETURN(TRUE);
if (!table)
{
if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
Expand All @@ -5833,25 +5878,6 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}

/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
DBUG_RETURN(true);

#ifdef WITH_PARTITION_STORAGE_ENGINE
if (tl->partition_names)
{
Expand Down
4 changes: 4 additions & 0 deletions sql/sql_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name,
const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
bool find_and_use_temporary_table(THD *thd, const char *db,
const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length);
void close_thread_tables(THD *thd);
Expand Down
4 changes: 3 additions & 1 deletion sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table())
{
TABLE *tmp_table;
if ((tmp_table= find_temporary_table(thd, db, table_name)))
if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
goto err;
if (tmp_table)
{
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (create_info->options & HA_LEX_CREATE_REPLACE)
Expand Down

0 comments on commit 60d094a

Please sign in to comment.