Skip to content

Commit

Permalink
MDEV-12179: Per-engine mysql.gtid_slave_pos table.
Browse files Browse the repository at this point in the history
Intermediate commit.

On server start, look for and read all tables mysql.gtid_slave_pos* to
restore the GTID position.

Simple test case that moves the data to a new
mysql.gtid_slave_pos_innodb table and verifies that the new table is
read at server start.
  • Loading branch information
knielsen committed Apr 21, 2017
1 parent ebce682 commit d3837c6
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 47 deletions.
38 changes: 38 additions & 0 deletions mysql-test/suite/rpl/r/rpl_mdev12179.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
include/rpl_init.inc [topology=1->2]
include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
a
1
SELECT * FROM t1 ORDER BY a;
a
1
include/stop_slave.inc
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
INSERT INTO mysql.gtid_slave_pos_innodb SELECT * FROM mysql.gtid_slave_pos;
TRUNCATE mysql.gtid_slave_pos;
SET sql_log_bin=1;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
SELECT * FROM t1 ORDER BY a;
a
1
2
3
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;
DROP TABLE t1;
include/rpl_end.inc
63 changes: 63 additions & 0 deletions mysql-test/suite/rpl/t/rpl_mdev12179.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--let $rpl_topology=1->2
--source include/rpl_init.inc
--source include/have_innodb.inc

--connection server_2
--source include/stop_slave.inc
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc

--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY);
INSERT INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
--save_master_pos

--connection server_2
--sync_with_master
SELECT * FROM t1 ORDER BY a;
--source include/stop_slave.inc
SET sql_log_bin=0;
CREATE TABLE mysql.gtid_slave_pos_innodb LIKE mysql.gtid_slave_pos;
ALTER TABLE mysql.gtid_slave_pos_innodb ENGINE=InnoDB;
INSERT INTO mysql.gtid_slave_pos_innodb SELECT * FROM mysql.gtid_slave_pos;
TRUNCATE mysql.gtid_slave_pos;
SET sql_log_bin=1;

# Restart the slave mysqld server, and verify that the GTID position is
# read correctly from the new mysql.gtid_slave_pos_innodb table.

--write_file $MYSQLTEST_VARDIR/tmp/mysqld.2.expect
wait
EOF
--shutdown_server 30
--source include/wait_until_disconnected.inc

--connection server_1
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
SELECT * FROM t1 ORDER BY a;
--source include/save_master_gtid.inc

# Let the slave mysqld server start again.
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.2.expect
restart: --skip-slave-start=0
EOF

--connection server_2
--enable_reconnect
--source include/wait_until_connected_again.inc

--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;


--connection server_2
SET sql_log_bin=0;
DROP TABLE mysql.gtid_slave_pos_innodb;
SET sql_log_bin=1;

--connection server_1
DROP TABLE t1;

--source include/rpl_end.inc
162 changes: 115 additions & 47 deletions sql/rpl_rli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "slave.h"
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
#include "lock.h"
#include "sql_table.h"

static int count_relay_log_space(Relay_log_info* rli);

Expand Down Expand Up @@ -1466,41 +1468,22 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)


#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };

static int
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
LEX_STRING *tablename)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
bool array_inited= false;
struct local_element { uint64 sub_id; rpl_gtid gtid; };
struct local_element tmp_entry, *entry;
HASH hash;
DYNAMIC_ARRAY array;
struct gtid_pos_element tmp_entry, *entry;
int err= 0;
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");

mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);

my_hash_init(&hash, &my_charset_bin, 32,
offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
goto end;
array_inited= true;

thd->reset_for_next_command();

tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_READ);
tlist.init_one_table(STRING_WITH_LEN("mysql"), tablename->str,
tablename->length, NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
Expand Down Expand Up @@ -1546,15 +1529,15 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}

if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
{
entry= (struct local_element *)rec;
entry= (struct gtid_pos_element *)rec;
if (entry->sub_id >= sub_id)
continue;
entry->sub_id= sub_id;
Expand All @@ -1564,8 +1547,8 @@ rpl_load_gtid_slave_state(THD *thd)
}
else
{
if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
MYF(MY_WME))))
if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
err= 1;
Expand All @@ -1575,14 +1558,112 @@ rpl_load_gtid_slave_state(THD *thd)
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
if ((err= my_hash_insert(&hash, (uchar *)entry)))
if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
}
err= 0; /* Clear HA_ERR_END_OF_FILE */

end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
{
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
return err;
}


/*
Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
table found into ARRAY. For each domain id, put the row with highest sub_id
into HASH.
*/
static int
scan_all_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array)
{
static LEX_STRING mysql_db_name= {C_STRING_WITH_LEN("mysql")};
char path[FN_REFLEN];
MY_DIR *dirp;

thd->reset_for_next_command();
if (lock_schema_name(thd, mysql_db_name.str))
return 1;

build_table_filename(path, sizeof(path) - 1, mysql_db_name.str, "", "", 0);
if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
{
my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
return 1;
}
else
{
size_t i;
Dynamic_array<LEX_STRING*> files(dirp->number_of_files);
Discovered_table_list tl(thd, &files);
int err;

err= ha_discover_table_names(thd, &mysql_db_name, dirp, &tl, false);
my_dirend(dirp);
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
if (err)
return err;

for (i = 0; i < files.elements(); ++i)
{
if (strncmp(files.at(i)->str,
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length) == 0)
{
if ((err= scan_one_gtid_slave_pos_table(thd, hash, array, files.at(i))))
return err;
}
}
}

return 0;
}


int
rpl_load_gtid_slave_state(THD *thd)
{
bool array_inited= false;
struct gtid_pos_element tmp_entry, *entry;
HASH hash;
DYNAMIC_ARRAY array;
int err= 0;
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");

mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);

my_hash_init(&hash, &my_charset_bin, 32,
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
goto end;
array_inited= true;

if ((err= scan_all_gtid_slave_pos_table(thd, &hash, &array)))
goto end;

mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (rpl_global_gtid_slave_state->loaded)
Expand All @@ -1608,7 +1689,7 @@ rpl_load_gtid_slave_state(THD *thd)

for (i= 0; i < hash.records; ++i)
{
entry= (struct local_element *)my_hash_element(&hash, i);
entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
if (opt_bin_log &&
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
Expand All @@ -1622,20 +1703,7 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);

err= 0; /* Clear HA_ERR_END_OF_FILE */

end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
{
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
if (array_inited)
delete_dynamic(&array);
my_hash_free(&hash);
Expand Down

0 comments on commit d3837c6

Please sign in to comment.