Skip to content

Commit

Permalink
Merge MDEV-7936 into 10.0.
Browse files Browse the repository at this point in the history
Conflicts:
	sql/sql_base.cc
  • Loading branch information
knielsen committed Apr 13, 2015
2 parents 50d98e9 + 60d094a commit 17aff4b
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 22 deletions.
20 changes: 20 additions & 0 deletions mysql-test/suite/rpl/r/rpl_parallel_temptable.result
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/save_master_pos.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
Expand Down
23 changes: 23 additions & 0 deletions mysql-test/suite/rpl/t/rpl_parallel_temptable.test
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS;


--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***

--connection server_1
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_pos.inc

--connection server_2
--source include/sync_with_master_gtid.inc

SELECT * FROM t1 WHERE a >= 30 ORDER BY a;


# Clean up.

--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
Expand Down
87 changes: 66 additions & 21 deletions sql/sql_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
}


static bool
use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
{
*out_table= table;
if (!table)
return false;
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return true;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
return false;
}

bool
find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
out_table);
}


bool
find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
}


/**
Find a temporary table specified by a key in the THD::temporary_tables list.
Expand All @@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
Expand Down Expand Up @@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}

if (!(table= find_temporary_table(thd, tl)))
if (find_and_use_temporary_table(thd, tl, &table))
DBUG_RETURN(TRUE);
if (!table)
{
if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
Expand Down
4 changes: 4 additions & 0 deletions sql/sql_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name,
const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
bool find_and_use_temporary_table(THD *thd, const char *db,
const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length);
void close_thread_tables(THD *thd);
Expand Down
4 changes: 3 additions & 1 deletion sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table())
{
TABLE *tmp_table;
if ((tmp_table= find_temporary_table(thd, db, table_name)))
if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
goto err;
if (tmp_table)
{
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (create_info->options & HA_LEX_CREATE_REPLACE)
Expand Down

0 comments on commit 17aff4b

Please sign in to comment.