diff --git a/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result b/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result new file mode 100644 index 000000000000..ed2e982d243e --- /dev/null +++ b/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result @@ -0,0 +1,288 @@ +include/raft_3_node.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_connect.inc [creating server_4] +include/rpl_connect.inc [creating server_5] +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = SERVER_MYPORT_1, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +START SLAVE; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = SERVER_MYPORT_2, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +START SLAVE; +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role LEADER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role +create table t1 (a int primary key) engine = innodb; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +insert into t1 values(1); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +select * from t1; +a +1 +select * from t1; +a +1 +select * from t1; +a +1 +include/raft_promote_to_leader.inc +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +include/raft_promote_to_leader.inc +flush binary logs; +insert into t1 values(5); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +set @@global.debug='+d,dump_wait_before_find_next_log'; +insert into t1 values(6); +insert into t1 values(7); +flush binary logs; +insert into t1 values(8); +insert into t1 values(9); +flush binary logs; +insert into t1 values(10); +insert into t1 values(11); +set debug_sync= 'now wait_for signal.reached'; +purge raft logs to 'LOGNAME'; +set debug_sync= 'now signal signal.done'; +set @@global.debug='-d,dump_wait_before_find_next_log'; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +include/stop_slave.inc +include/rpl_restart_server.inc [server_number=2] +include/rpl_restart_server.inc [server_number=1] +include/raft_promote_to_leader.inc +START SLAVE IO_THREAD; +START SLAVE IO_THREAD; +insert into t1 values(12); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +drop table t1; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +stop slave; +reset slave all; +stop slave; +reset slave all; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result b/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result new file mode 100644 index 000000000000..0a356c7fb991 --- /dev/null +++ b/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result @@ -0,0 +1,34 @@ +include/raft_3_node.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_connect.inc [creating server_4] +include/rpl_connect.inc [creating server_5] +create table t1 (a int) engine = innodb; +insert into t1 values(1); +insert into t1 values(2); +set @@global.rpl_raft_new_leader_uuid = 'uuid2'; +change master to master_host='localhost', master_port=port2, master_auto_position=1, master_user='root'; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +include/start_slave.inc +insert into t1 values(3); +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +set @@global.rpl_raft_new_leader_uuid = 'uuid1'; +drop table t1; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/stop_slave.inc +reset slave all; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test b/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test new file mode 100644 index 000000000000..b996a0ca4f74 --- /dev/null +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test @@ -0,0 +1,271 @@ +source ../include/raft_3_node.inc; +source include/have_debug_sync.inc; + +--disable_query_log +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Create connections to server 4 and 5 (these are not in the ring) +let $rpl_server_number= 4; +let $rpl_connection_name= server_4; +source include/rpl_connect.inc; +--disable_query_log +connection server_4; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +let $rpl_server_number= 5; +let $rpl_connection_name= server_5; +source include/rpl_connect.inc; +--disable_query_log +connection server_5; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Connect server 4 and 5 using COM_BINLOG_DUMP_GTID +# server4 will tail the leader +connection server_4; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1; +eval CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = $SERVER_MYPORT_1, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +START SLAVE; + +# server5 will tail a follower +connection server_5; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2; +eval CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = $SERVER_MYPORT_2, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +START SLAVE; + +# Check raft roles, 4 and 5 should be empty +connection server_1; +show status like 'rpl_raft_role'; +connection server_2; +show status like 'rpl_raft_role'; +connection server_3; +show status like 'rpl_raft_role'; +connection server_4; +show status like 'rpl_raft_role'; +connection server_5; +show status like 'rpl_raft_role'; + +# Create a schema and sync it across replicas +connection server_1; +create table t1 (a int primary key) engine = innodb; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +# Check if server 4 and 5 are tailing raft logs +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; +connection server_5; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; + +# Execute a regular DML (insert) +connection server_1; +insert into t1 values(1); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Transfer leadership from server1 to server2 +let $rpl_raft_leader_number= 2; +source ../include/raft_promote_to_leader.inc; + +# Execute some more trxs +connection server_2; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +let $sync_slave_connection= server_1; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_1; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Check if server 4 and 5 are still tailing raft logs post election +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; +connection server_5; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; + +# Transfer leadership from server2 to server1 +let $rpl_raft_leader_number= 1; +source ../include/raft_promote_to_leader.inc; + +# Check if binlog flushing works as expected +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +flush binary logs; +insert into t1 values(5); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Check interaction of dump logs with purge +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +connection server_1; +set @@global.debug='+d,dump_wait_before_find_next_log'; +insert into t1 values(6); +insert into t1 values(7); +flush binary logs; +insert into t1 values(8); +insert into t1 values(9); +flush binary logs; +insert into t1 values(10); +insert into t1 values(11); +set debug_sync= 'now wait_for signal.reached'; +replace_result $logname LOGNAME; +eval purge raft logs to '$logname'; +set debug_sync= 'now signal signal.done'; +set @@global.debug='-d,dump_wait_before_find_next_log'; +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + + +# Check if clean restart affects dump threads +connection server_2; +source include/stop_slave.inc; +let $rpl_server_number= 2; +source include/rpl_restart_server.inc; + +connection server_1; +let $rpl_server_number= 1; +source include/rpl_restart_server.inc; + +let $rpl_raft_leader_number= 1; +source ../include/raft_promote_to_leader.inc; + +disable_warnings; +connection server_4; +START SLAVE IO_THREAD; +connection server_5; +START SLAVE IO_THREAD; +enable_warnings; + +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +insert into t1 values(12); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +--disable_query_log +connection server_1; +call mtr.add_suppression("Slave has more GTIDs than the master has, using the master's SERVER_UUID"); +--enable_query_log + +# Cleanup +connection server_1; +drop table t1; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +stop slave; +reset slave all; +connection server_5; +stop slave; +reset slave all; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test b/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test new file mode 100644 index 000000000000..f14a22a86cbf --- /dev/null +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test @@ -0,0 +1,81 @@ +# Test auto position based tailing after election. After election we purge +# apply logs that make gtid_purged == gtid_executed even though all transcations +# exist in the raft log. + +source ../include/raft_3_node.inc; +source include/have_debug_sync.inc; +let $use_gtids= 1; + +--disable_query_log +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Create connections to server 4 and 5 (these are not in the ring) +let $rpl_server_number= 4; +let $rpl_connection_name= server_4; +source include/rpl_connect.inc; +--disable_query_log +connection server_4; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +let $rpl_server_number= 5; +let $rpl_connection_name= server_5; +source include/rpl_connect.inc; +--disable_query_log +connection server_5; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Execute some transactions +connection server_1; +let $uuid1= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +create table t1 (a int) engine = innodb; +insert into t1 values(1); +insert into t1 values(2); + +# Trasfer leadership to server_2 (this will clear out the apply logs and make +# gtid_purged == gtid_executed +connection server_2; +let $uuid2= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +let $port2= `select @@global.port`; +connection server_1; +replace_result $uuid2 uuid2; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid2'; + +# Make server_4 tail server_2 +connection server_4; +replace_result $port2 port2; +eval change master to master_host='localhost', master_port=$port2, master_auto_position=1, master_user='root'; +source include/start_slave.inc; +--let $uuid4= `SELECT @@GLOBAL.SERVER_UUID` +connection server_2; +# Note: This insert statement is required to move the last_acked to correct position. +insert into t1 values(3); +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +select * from t1; + +# Cleanup +connection server_2; +replace_result $uuid1 uuid1; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid1'; +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +drop table t1; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +source include/stop_slave.inc; +reset slave all; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test b/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test index bebcaa08931c..cf9991c98d65 100644 --- a/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test @@ -1,6 +1,7 @@ # TODO(pgl) Excluding this test case till the time we have debug raft plugin in tools directory. source include/have_nodebug.inc; source include/have_debug.inc; + source ../include/raft_3_node.inc; --disable_query_log diff --git a/sql/binlog.cc b/sql/binlog.cc index 08afc6e3f16e..0b680a89c334 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -191,6 +191,7 @@ ulong rpl_read_size; bool rpl_semi_sync_source_enabled = false; MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); +Dump_log dump_log; static int binlog_init(void *p); static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event); @@ -3445,11 +3446,12 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *, THD *thd) { */ class Adjust_offset : public Do_THD_Impl { public: - Adjust_offset(my_off_t value) : m_purge_offset(value) {} + Adjust_offset(my_off_t value, bool is_relay_log) + : m_purge_offset(value), m_relay_log(is_relay_log) {} void operator()(THD *thd) override { - LOG_INFO *linfo; + LOG_INFO *linfo = thd->current_linfo; mysql_mutex_lock(&thd->LOCK_thd_data); - if ((linfo = thd->current_linfo)) { + if (linfo && (!enable_raft_plugin || linfo->is_relay_log == m_relay_log)) { /* Index file offset can be less that purge offset only if we just started reading the index file. In that case @@ -3465,6 +3467,25 @@ class Adjust_offset : public Do_THD_Impl { private: my_off_t m_purge_offset; + bool m_relay_log; +}; + +class Adjust_linfo_in_dump_thread : public Do_THD_Impl { + public: + explicit Adjust_linfo_in_dump_thread(bool is_relay_log) { + m_relay_log = is_relay_log; + } + virtual void operator()(THD *thd) override { + LOG_INFO *linfo = thd->current_linfo; + if (linfo && linfo->is_used_by_dump_thd) { + mysql_mutex_lock(&thd->LOCK_thd_data); + linfo->is_relay_log = m_relay_log; + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + } + + private: + bool m_relay_log; }; /* @@ -3472,7 +3493,7 @@ class Adjust_offset : public Do_THD_Impl { SYNOPSIS adjust_linfo_offsets() - purge_offset Number of bytes removed from start of log index file + purge_offset Number of bytes removed from start of log index file NOTES - This is called when doing a PURGE when we delete lines from the @@ -3487,11 +3508,17 @@ class Adjust_offset : public Do_THD_Impl { in the binary log file with flush_relay_log_info. Now they sync is done for next read. */ -static void adjust_linfo_offsets(my_off_t purge_offset) { - Adjust_offset adjust_offset(purge_offset); +static void adjust_linfo_offsets(my_off_t purge_offset, bool is_relay_log) { + Adjust_offset adjust_offset(purge_offset, is_relay_log); Global_THD_manager::get_instance()->do_for_all_thd(&adjust_offset); } +static void adjust_linfo_in_dump_threads(bool is_relay_log) { + Adjust_linfo_in_dump_thread adjust_linfo_in_dump_thread(is_relay_log); + Global_THD_manager::get_instance()->do_for_all_thd( + &adjust_linfo_in_dump_thread); +} + /** This class implements Call back function for do_for_all_thd(). It is called for each thd in thd list to count @@ -4120,7 +4147,7 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) { Protocol *protocol = thd->get_protocol(); List field_list; std::string errmsg; - LOG_INFO linfo; + LOG_INFO linfo(binary_log->is_relay_log); DBUG_TRACE; @@ -4864,7 +4891,8 @@ int MYSQL_BIN_LOG::init_index_file() { // NO_LINT_DEBUG sql_print_information( "Binlog apply index file exists. Recovering mysqld " - "based on binlog apply index file"); + "based on binlog apply index file: %s", + opt_applylog_index_name); index_file_name = opt_applylog_index_name; log_file_name = opt_apply_logname; is_apply_log = true; @@ -4872,7 +4900,8 @@ int MYSQL_BIN_LOG::init_index_file() { // NO_LINT_DEBUG sql_print_information( "Binlog apply index file does not exist. Recovering " - "mysqld based on binlog index file"); + "mysqld based on binlog index file: %s", + opt_binlog_index_name); index_file_name = opt_binlog_index_name; log_file_name = opt_bin_logname; } @@ -5594,7 +5623,7 @@ bool MYSQL_BIN_LOG::find_first_log_not_in_gtid_set(char *binlog_file_name, /* * This is a limited version of init_gtid_sets which is only * called from binlog_change_to_apply. - * Needs to be called under LOCK_log and LOCK_index held. + * Needs to be called LOCK_index held. * The previous_gtid_set_map is cleared and reinitialized from * the index file contents. */ @@ -5605,7 +5634,6 @@ bool MYSQL_BIN_LOG::init_prev_gtid_sets_map() { std::pair iterator; DBUG_ENTER("MYSQL_BIN_LOG::init_prev_gtid_sets_map"); - mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_index); // clear the map as it is being reset @@ -6033,7 +6061,8 @@ bool MYSQL_BIN_LOG::open_binlog( const char *log_name, const char *new_name, ulong max_size_arg, bool null_created_arg, bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event, - uint32 new_index_number, RaftRotateInfo *raft_rotate_info) { + uint32 new_index_number, RaftRotateInfo *raft_rotate_info, + bool need_end_log_pos_lock) { // lock_index must be acquired *before* sid_lock. assert(need_sid_lock || !need_lock_index); DBUG_TRACE; @@ -6313,7 +6342,7 @@ bool MYSQL_BIN_LOG::open_binlog( close_purge_index_file(); - update_binlog_end_pos(); + update_binlog_end_pos(need_end_log_pos_lock); my_free(previous_gtid_set_buffer); return false; @@ -6336,7 +6365,8 @@ bool MYSQL_BIN_LOG::open_binlog( } bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, - ulong max_size_arg) { + ulong max_size_arg, + bool need_end_log_pos_lock) { DBUG_ENTER("MYSQL_BIN_LOG::open_existing_binlog(const char *, ...)"); DBUG_PRINT("enter", ("name: %s", log_name)); @@ -6364,8 +6394,8 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, DBUG_RETURN(1); } - if (!(this->name = - my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { + my_free(name); + if (!(name = my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { // NO_LINT_DEBUG sql_print_error("Could not allocate name %s (error %d)", log_name, errno); DBUG_RETURN(1); @@ -6375,6 +6405,8 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, Binlog_ofile::open_existing(m_key_file_log, existing_file, MYF(MY_WME)); if (!binlog_file) goto err; + // release current point before assign + delete m_binlog_file; m_binlog_file = binlog_file.release(); file = mysql_file_open(m_key_file_log, existing_file, O_CREAT | O_WRONLY, @@ -6398,7 +6430,7 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, if (offset > 0) m_binlog_file->seek(offset); max_size = max_size_arg; - update_binlog_end_pos(); + update_binlog_end_pos(need_end_log_pos_lock); atomic_log_state = LOG_OPENED; DBUG_RETURN(0); // Success @@ -7351,7 +7383,7 @@ int MYSQL_BIN_LOG::remove_logs_from_index(LOG_INFO *log_info, // now update offsets in index file for running threads if (need_update_threads) - adjust_linfo_offsets(log_info->index_file_start_offset); + adjust_linfo_offsets(log_info->index_file_start_offset, is_relay_log); return 0; err: @@ -7565,6 +7597,10 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log, bool included, global_sid_lock->unlock(); } + if (enable_raft_plugin && is_relay_log) { + error = init_prev_gtid_sets_map(); + } + err: if (need_lock_index) mysql_mutex_unlock(&LOCK_index); @@ -10523,8 +10559,12 @@ void MYSQL_BIN_LOG::process_consensus_queue(THD *queue_head) { enable_raft_plugin && rpl_wait_for_semi_sync_ack) { const char *log_file = nullptr; my_off_t log_pos = 0; - queue_head->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + last_thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + last_thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } } } @@ -10737,8 +10777,12 @@ void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) { rpl_wait_for_semi_sync_ack) { my_off_t log_pos; const char *log_file = nullptr; - last_thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } } @@ -10962,8 +11006,12 @@ int MYSQL_BIN_LOG::finish_commit(THD *thd) { enable_raft_plugin && rpl_wait_for_semi_sync_ack) { const char *log_file = nullptr; my_off_t log_pos = 0; - thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } thd->get_transaction()->m_flags.run_hooks = false; } @@ -11648,8 +11696,9 @@ int binlog_change_to_apply() { int error = 0; LOG_INFO linfo; mysql_mutex_lock(mysql_bin_log.get_log_lock()); + dump_log.lock(); mysql_bin_log.lock_index(); - + mysql_bin_log.lock_binlog_end_pos(); mysql_bin_log.close(LOG_CLOSE_INDEX, /*need_lock_log=*/false, /*need_lock_index=*/false); @@ -11675,10 +11724,14 @@ int binlog_change_to_apply() { /*null_created_arg=*/false, /*need_lock_index=*/false, /*need_sid_lock=*/true, - /*extra_description_event=*/NULL)) { + /*extra_description_event=*/NULL, + /*new_index_number =*/0, + /*raft_rotate_info =*/nullptr, + /*need_end_log_pos_lock =*/false)) { error = 1; goto err; } + dump_log.switch_log(/* relay_log= */ true, /* should_lock= */ false); // Purge all apply logs before the last log, because they // are from the previous epoch of being a FOLLOWER, and they @@ -11699,8 +11752,9 @@ int binlog_change_to_apply() { } err: - + mysql_bin_log.unlock_binlog_end_pos(); mysql_bin_log.unlock_index(); + dump_log.unlock(); mysql_mutex_unlock(mysql_bin_log.get_log_lock()); DBUG_RETURN(error); @@ -11720,8 +11774,21 @@ int binlog_change_to_binlog(THD *thd) { std::vector lognames; mysql_mutex_lock(mysql_bin_log.get_log_lock()); + dump_log.lock(); mysql_bin_log.lock_index(); + Master_info *active_mi; + if (!get_and_lock_master_info(&active_mi) || !active_mi || !active_mi->rli) { + error = 1; + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + mysql_bin_log.unlock_index(); + dump_log.unlock(); + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + return error; + } + active_mi->rli->relay_log.lock_binlog_end_pos(); + // Get the index file name std::string indexfn = mysql_bin_log.get_index_fname(); @@ -11813,13 +11880,18 @@ int binlog_change_to_binlog(THD *thd) { mysql_bin_log.is_apply_log = false; mysql_bin_log.apply_file_count.store(0); + dump_log.switch_log(/* relay_log= */ false, /* should_lock= */ false); + // Register new log to raft // Previous mysql_bin_log.close(LOG_CLOSE_INDEX) will also close binlog and // its IO_CACHE. mysql_bin_log.register_log_entities(thd, /*context=*/0, /*need_lock=*/false, /*is_relay_log=*/false); err: + active_mi->rli->relay_log.unlock_binlog_end_pos(); + unlock_master_info(active_mi); mysql_bin_log.unlock_index(); + dump_log.unlock(); mysql_mutex_unlock(mysql_bin_log.get_log_lock()); DBUG_RETURN(error); @@ -11855,6 +11927,39 @@ int MYSQL_BIN_LOG::get_lognames_from_index(bool need_lock, return error; } +Dump_log::Dump_log() { + if (enable_raft_plugin && mysql_bin_log.is_apply_log) { + Master_info *active_mi = nullptr; + if (!get_and_lock_master_info(&active_mi)) { + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + } + assert(active_mi && active_mi->rli); + log_ = &active_mi->rli->relay_log; + unlock_master_info(active_mi); + } else { + log_ = &mysql_bin_log; + } +} + +void Dump_log::switch_log(bool relay_log, bool should_lock) { + if (should_lock) lock(); + mysql_mutex_assert_owner(log_->get_binlog_end_pos_lock()); + log_->update_binlog_end_pos(/* need_lock= */ false); + Master_info *active_mi = nullptr; + if (!get_and_lock_master_info(&active_mi)) { + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + } + assert(active_mi && active_mi->rli); + log_ = relay_log ? &active_mi->rli->relay_log : &mysql_bin_log; + unlock_master_info(active_mi); + // Now let's update the dump thread's linfos + log_->reset_semi_sync_last_acked(); + adjust_linfo_in_dump_threads(relay_log); + if (should_lock) unlock(); +} + // Given a file name of the form 'binlog-file-name.index', it extracts the // and and returns it as a pair // Example: @@ -11875,10 +11980,11 @@ static std::pair extract_file_index( } void MYSQL_BIN_LOG::report_missing_purged_gtids( - const Gtid_set *slave_executed_gtid_set, std::string &errmsg) { + const Gtid_set *lost_gtid_set, const Gtid_set *slave_executed_gtid_set, + std::string &errmsg) { DBUG_TRACE; THD *thd = current_thd; - Gtid_set gtid_missing(gtid_state->get_lost_gtids()->get_sid_map()); + Gtid_set gtid_missing(lost_gtid_set->get_sid_map()); gtid_missing.add_gtid_set(gtid_state->get_lost_gtids()); gtid_missing.remove_gtid_set(slave_executed_gtid_set); @@ -12153,13 +12259,17 @@ void MYSQL_BIN_LOG::update_binlog_end_pos(bool need_lock) { if (need_lock) unlock_binlog_end_pos(); } -inline void MYSQL_BIN_LOG::update_binlog_end_pos(const char *file, - my_off_t pos) { - lock_binlog_end_pos(); - if (is_active(file) && (pos > atomic_binlog_end_pos)) +inline void MYSQL_BIN_LOG::update_binlog_end_pos(const char *file, my_off_t pos, + bool need_lock) { + if (need_lock) + lock_binlog_end_pos(); + else + mysql_mutex_assert_owner(&LOCK_binlog_end_pos); + if (is_active(file) && (pos > atomic_binlog_end_pos)) { atomic_binlog_end_pos = pos; + } signal_update(); - unlock_binlog_end_pos(); + if (need_lock) unlock_binlog_end_pos(); } my_off_t MYSQL_BIN_LOG::get_binlog_end_pos() const { @@ -12193,7 +12303,7 @@ my_off_t MYSQL_BIN_LOG::get_last_acked_pos(bool *wait_for_ack, /* Use by raft plugin */ void signal_semi_sync_ack(const std::string &file, uint pos) { - mysql_bin_log.signal_semi_sync_ack(file.c_str(), pos); + dump_log.signal_semi_sync_ack(file.c_str(), pos); } void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, @@ -12221,7 +12331,7 @@ void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, lock_binlog_end_pos(); if (acked > last_acked.load()) { last_acked = acked; - signal_update(); + update_binlog_end_pos(log_file, log_pos, false); } unlock_binlog_end_pos(); } @@ -12229,7 +12339,10 @@ void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, void MYSQL_BIN_LOG::reset_semi_sync_last_acked() { lock_binlog_end_pos(); /* binary log is rotated and all trxs in previous binlog are already committed - * to the storage engine */ + * to the storage engine. + * Note: when in raft mode we cannot init the coords without consulting the + * plugin, so we reset the coords + */ if (strlen(log_file_name)) { last_acked = {extract_file_index(log_file_name).second, 0}; } else { diff --git a/sql/binlog.h b/sql/binlog.h index 56e147a8f872..4531c080b0b5 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -157,13 +157,17 @@ struct LOG_INFO { bool fatal; // if the purge happens to give us a negative offset int entry_index; // used in purge_logs(), calculatd in find_log_pos(). int encrypted_header_size; - LOG_INFO() + bool is_relay_log; // is this info pointing to a relay log? + bool is_used_by_dump_thd; // is this info being used by a dump thread? + LOG_INFO(bool relay_log = false, bool used_by_dump_thd = false) : index_file_offset(0), index_file_start_offset(0), pos(0), fatal(false), entry_index(0), - encrypted_header_size(0) { + encrypted_header_size(0), + is_relay_log(relay_log), + is_used_by_dump_thd(used_by_dump_thd) { memset(log_file_name, 0, FN_REFLEN); } }; @@ -788,6 +792,16 @@ class MYSQL_BIN_LOG : public TC_LOG { */ bool init_prev_gtid_sets_map(); + void get_lost_gtids(Gtid_set *gtids) { + gtids->clear(); + mysql_mutex_lock(&LOCK_index); + auto it = previous_gtid_set_map.begin(); + if (it != previous_gtid_set_map.end()) + gtids->add_gtid_encoding((const uchar *)it->second.c_str(), + it->second.length()); + mysql_mutex_unlock(&LOCK_index); + } + enum_read_gtids_from_binlog_status read_gtids_from_binlog( const char *filename, Gtid_set *all_gtids, Gtid_set *prev_gtids, Gtid *first_gtid, Sid_map *sid_map, bool verify_checksum, @@ -1123,7 +1137,8 @@ class MYSQL_BIN_LOG : public TC_LOG { void set_max_size(ulong max_size_arg); void update_binlog_end_pos(bool need_lock = true); - void update_binlog_end_pos(const char *file, my_off_t pos); + void update_binlog_end_pos(const char *file, my_off_t pos, + bool need_lock = true); /** Wait until we get a signal that the binary log has been updated. @@ -1193,23 +1208,29 @@ class MYSQL_BIN_LOG : public TC_LOG { after the RESET MASTER TO command is called. @param raft_rotate_info rotate related information passed in by listener callbacks + @param need_end_log_pos_lock If true, LOCK_binlog_end_pos is acquired; + otherwise LOCK_binlog_end_pos must be taken by the caller. */ bool open_binlog(const char *log_name, const char *new_name, ulong max_size_arg, bool null_created_arg, bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event, uint32 new_index_number = 0, - RaftRotateInfo *raft_rotate_info = nullptr); + RaftRotateInfo *raft_rotate_info = nullptr, + bool need_end_log_pos_lock = true); /** Open an existing binlog/relaylog file @param log_name Name of binlog @param max_size The size at which this binlog will be rotated. + @param need_end_log_pos_lock If true, LOCK_binlog_end_pos is acquired; + otherwise LOCK_binlog_end_pos must be taken by the caller. @retval false on success, true on error */ - bool open_existing_binlog(const char *log_name, ulong max_size_arg); + bool open_existing_binlog(const char *log_name, ulong max_size_arg, + bool need_end_log_pos_lock = true); bool open_index_file(const char *index_file_name_arg, const char *log_name, bool need_lock_index); @@ -1516,10 +1537,12 @@ class MYSQL_BIN_LOG : public TC_LOG { This function will be called from mysql_binlog_send() function. + @param lost_gtid_set GTID set of missing gtids @param slave_executed_gtid_set GTID set executed by slave @param errmsg Pointer to the error message */ - void report_missing_purged_gtids(const Gtid_set *slave_executed_gtid_set, + void report_missing_purged_gtids(const Gtid_set *lost_gtid_set, + const Gtid_set *slave_executed_gtid_set, std::string &errmsg); /** @@ -1594,6 +1617,46 @@ struct LOAD_FILE_INFO { extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; +/** + * Encapsulation over binlog or relay log for dumping raft logs during + * COM_BINLOG_DUMP and COM_BINLOG_DUMP_GTID. + */ +class Dump_log { + public: + Dump_log(); + + void init_pthread_objects(); + + void switch_log(bool relay_log, bool should_lock = true); + + MYSQL_BIN_LOG *get_log(bool should_lock = true) { + if (should_lock) lock(); + auto ret = log_; + if (should_lock) unlock(); + return ret; + } + + void signal_semi_sync_ack(const char *const log_file, + const my_off_t log_pos) { + std::lock_guard guard(log_mutex_); + log_->signal_semi_sync_ack(log_file, log_pos); + } + + void reset_semi_sync_last_acked() { + std::lock_guard guard(log_mutex_); + log_->reset_semi_sync_last_acked(); + } + + void lock() { log_mutex_.lock(); } + + void unlock() { log_mutex_.unlock(); } + + private: + MYSQL_BIN_LOG *log_; + std::mutex log_mutex_; +}; +extern MYSQL_PLUGIN_IMPORT Dump_log dump_log; + /** Check if the the transaction is empty. diff --git a/sql/log_event.cc b/sql/log_event.cc index fcb90ad696aa..cfa5838ccec9 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4665,9 +4665,29 @@ void Query_log_event::detach_temp_tables_worker(THD *thd_arg, Query_log_event::do_apply_event() */ int Query_log_event::do_apply_event(Relay_log_info const *rli) { + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might need + // to use this pos during application (e.g. during commit) + Relay_log_info *rli_ptr = const_cast(rli); + thd->set_trans_relay_log_pos(rli_ptr->get_event_relay_log_name(), + future_event_relay_log_pos); return do_apply_event(rli, query, q_len); } +int Query_log_event::do_apply_event_worker(Slave_worker *w) { + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might + // need to use this pos during application (e.g. during commit) + Slave_job_group *ptr_g = w->c_rli->gaq->get_job_group(mts_group_idx); + thd->set_trans_relay_log_pos(ptr_g && ptr_g->group_relay_log_name + ? ptr_g->group_relay_log_name + : w->get_group_relay_log_name(), + future_event_relay_log_pos); + return do_apply_event(w, query, q_len); +} + /* is_silent_error @@ -5912,11 +5932,19 @@ bool Rotate_log_event::write(Basic_ostream *ostream) { write_footer(ostream)); } -int Rotate_log_event::do_apply_event(Relay_log_info const *) { +int Rotate_log_event::do_apply_event(Relay_log_info const *rli) { if (!enable_raft_plugin) return 0; + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might need + // to use this pos during application (e.g. during commit) + Relay_log_info *rli_ptr = const_cast(rli); + thd->set_trans_relay_log_pos(rli_ptr->get_event_relay_log_name(), + future_event_relay_log_pos); int64_t term, index; thd->get_trans_marker(&term, &index); + return RUN_HOOK(raft_replication, after_commit, (thd)); } @@ -6469,6 +6497,9 @@ int Xid_apply_log_event::do_apply_event_worker(Slave_worker *w) { goto err; } + thd->set_trans_relay_log_pos(w->get_group_relay_log_name(), + w->get_group_relay_log_pos()); + DBUG_PRINT( "mts", ("do_apply group master %s %llu group relay %s %llu event %s %llu.", @@ -6608,6 +6639,8 @@ int Xid_apply_log_event::do_apply_event(Relay_log_info const *rli) { strmake(new_group_relay_log_name, rli_ptr->get_group_relay_log_name(), FN_REFLEN - 1); new_group_relay_log_pos = rli_ptr->get_group_relay_log_pos(); + thd->set_trans_relay_log_pos(rli_ptr->get_group_relay_log_name(), + rli_ptr->get_group_relay_log_pos()); /* Rollback positions in memory just before commit. Position values will be reset to their new values only on successful commit operation. @@ -7818,6 +7851,10 @@ int Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli) { return error; } +int Execute_load_query_log_event::do_apply_event_worker(Slave_worker *w) { + return do_apply_event(w); +} + /***************************************************************************** Load_query_generator is used to generate the LOAD DATA statement for binlog ******************************************************************************/ diff --git a/sql/log_event.h b/sql/log_event.h index 421198dc5531..16e77466b8d4 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1543,6 +1543,7 @@ class Query_log_event : public virtual binary_log::Query_event, #if defined(MYSQL_SERVER) enum_skip_reason do_shall_skip(Relay_log_info *rli) override; int do_apply_event(Relay_log_info const *rli) override; + int do_apply_event_worker(Slave_worker *w) override; int do_update_pos(Relay_log_info *rli) override; void prepare_dep(Relay_log_info *rli, std::shared_ptr &ev) override; @@ -2350,6 +2351,7 @@ class Execute_load_query_log_event private: #if defined(MYSQL_SERVER) int do_apply_event(Relay_log_info const *rli) override; + int do_apply_event_worker(Slave_worker *w) override; #endif }; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index b2d9fbde1158..3534b1ac956b 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5472,7 +5472,6 @@ int init_common_variables() { inited before MY_INIT(). So we do it here. */ mysql_bin_log.init_pthread_objects(); - /* TODO: remove this when my_time_t is 64 bit compatible */ if (!is_time_t_valid_for_timestamp(server_start_time)) { LogErr(ERROR_LEVEL, ER_UNSUPPORTED_DATE); @@ -7768,7 +7767,7 @@ static int init_server_components() { rpl_source_io_monitor = new Source_IO_monitor(); udf_load_service.init(); - mysql_bin_log.reset_semi_sync_last_acked(); + dump_log.reset_semi_sync_last_acked(); /* Initialize the optimizer cost module */ init_optimizer_cost_module(true); diff --git a/sql/rpl_binlog_sender.cc b/sql/rpl_binlog_sender.cc index 829639f3c84c..d2d82b364baa 100644 --- a/sql/rpl_binlog_sender.cc +++ b/sql/rpl_binlog_sender.cc @@ -49,6 +49,7 @@ #include "mysql/psi/mysql_mutex.h" #include "mysql/psi/mysql_socket.h" #include "scope_guard.h" +#include "sql/binlog.h" #include "sql/binlog_reader.h" #include "sql/debug_sync.h" // debug_sync_set_action #include "sql/derror.h" // ER_THD @@ -353,12 +354,13 @@ bool Binlog_sender::set_dscp(void) { void Binlog_sender::init() { DBUG_TRACE; THD *thd = m_thd; - + auto raw_log = dump_log.get_log(false); thd->push_diagnostics_area(&m_diag_area); init_heartbeat_period(); m_last_event_sent_ts = now_in_nanosecs(); mysql_mutex_lock(&thd->LOCK_thd_data); + m_linfo = LOG_INFO(raw_log->is_relay_log, /* is_used_by_dump_thd = */ true); thd->current_linfo = &m_linfo; mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -371,7 +373,7 @@ void Binlog_sender::init() { /* Save original query. Will be unset in cleanup */ m_orig_query = thd->query(); - if (!mysql_bin_log.is_open()) { + if (!raw_log->is_open()) { set_fatal_error("Binary log is not open"); return; } @@ -507,8 +509,10 @@ bool is_semi_sync_slave() { void Binlog_sender::run() { DBUG_TRACE; - init(); + dump_log.lock(); + init(); + dump_log.unlock(); m_is_semi_sync_slave = is_semi_sync_slave(); unsigned int max_event_size = @@ -555,28 +559,47 @@ void Binlog_sender::run() { "wait_for continue_dump_thread no_clear_event"; assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); };); - mysql_bin_log.lock_index(); - if (!mysql_bin_log.is_open()) { - if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(), - log_file, false)) { + + // Note: This part is tricky and should be touched if you really know + // what you're doing. We're locking dump log to get the raw log + // pointer, then we're locking lock_index before unlocking the dump + // log. We're taking the lock in the same sequence as when log is + // switched in binlog_change_to_binlog() and binlog_change_to_apply() + // to avoid deadlocks. This locking pattern ensures that we're working + // with the correct raw log and that there is no race between getting + // the raw log and log switching. Log switching will be blocked until + // we release the binlog end pos lock before waiting for signal in + // wait_for_update_bin_log(). + dump_log.lock(); + MYSQL_BIN_LOG *raw_log = dump_log.get_log(false); + raw_log->lock_index(); + dump_log.unlock(); + if (!raw_log->is_open()) { + if (raw_log->open_index_file(raw_log->get_index_fname(), log_file, + false)) { set_fatal_error( "Binary log is not open and failed to open index file " "to retrieve next file."); - mysql_bin_log.unlock_index(); + raw_log->unlock_index(); break; } is_index_file_reopened_on_binlog_disable = true; } - int error = mysql_bin_log.find_next_log(&m_linfo, false); - mysql_bin_log.unlock_index(); + DBUG_EXECUTE_IF("dump_wait_before_find_next_log", { + const char act[] = "now signal signal.reached wait_for signal.done"; + assert(opt_debug_sync_timeout > 0); + assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); + };); + int error = raw_log->find_next_log(&m_linfo, false); + raw_log->unlock_index(); if (unlikely(error)) { DBUG_EXECUTE_IF("waiting_for_disable_binlog", { const char act[] = "now signal consumed_binlog"; assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); };); if (is_index_file_reopened_on_binlog_disable) - mysql_bin_log.close(LOG_CLOSE_INDEX, true /*need_lock_log=true*/, - true /*need_lock_index=true*/); + raw_log->close(LOG_CLOSE_INDEX, true /*need_lock_log=true*/, + true /*need_lock_index=true*/); set_fatal_error("could not find next log"); break; } @@ -688,16 +711,16 @@ std::pair Binlog_sender::get_binlog_end_pos( } bool wait_for_ack = !m_is_semi_sync_slave; + MYSQL_BIN_LOG *raw_log = dump_log.get_log(true); result.first = - mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name); + raw_log->get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name); DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu", m_linfo.log_file_name, read_pos, result.first)); - DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname())); + DBUG_PRINT("info", ("Active file is %s", raw_log->get_log_fname())); /* If this is a cold binlog file, we are done getting the end pos */ - if (unlikely(!wait_for_ack && - !mysql_bin_log.is_active(m_linfo.log_file_name))) { + if (unlikely(!wait_for_ack && !raw_log->is_active(m_linfo.log_file_name))) { return std::make_pair(0, 0); } if (read_pos < result.first) { @@ -932,55 +955,98 @@ int Binlog_sender::wait_new_events(my_off_t log_pos) { LOCK_binlog_end_pos if we reached the end of the hot log and are going to wait for updates on the binary log (Binlog_sender::wait_new_event()). */ - if (stop_waiting_for_update(log_pos)) { + if (stop_waiting_for_update(log_pos, dump_log.get_log(false))) { return 0; } /* Some data may be in net buffer, it should be flushed before waiting */ if (flush_net()) return 1; - mysql_bin_log.lock_binlog_end_pos(); - - m_thd->ENTER_COND(mysql_bin_log.get_log_cond(), - mysql_bin_log.get_binlog_end_pos_lock(), + // Note: This part is tricky and should be touched if you really know + // what you're doing. We're locking dump log to get the raw log + // pointer, then we're locking end log pos before unlocking the dump + // log. We're taking the lock in the same sequence as when log is + // switched in binlog_change_to_binlog() and binlog_change_to_apply() + // to avoid deadlocks. This locking pattern ensures that we're working + // with the correct raw log and that there is no race between getting + // the raw log and log switching. Log switching will be blocked until + // we release the binlog end pos lock before waiting for signal in + // wait_for_update_bin_log(). + dump_log.lock(); + MYSQL_BIN_LOG *raw_log = dump_log.get_log(false); + raw_log->lock_binlog_end_pos(); + dump_log.unlock(); + + m_thd->ENTER_COND(raw_log->get_log_cond(), raw_log->get_binlog_end_pos_lock(), &stage_source_has_sent_all_binlog_to_replica, &old_stage); + // Note: upstream code rewrote wait_*_heartbeat, but because + // log switches can occur, wait_*_heartbeat needs to release + // and check for log switches too. Otherwise, we may never + // return from wait_*_heartbeat because stop_waiting_for_update() + // never returns true. + // + // Hence, the raw_log may change. The conditional wait metrics + // may not be accurate as a result of this, but that is probably fine. if (m_heartbeat_period.count() > 0) - ret = wait_with_heartbeat(log_pos); + ret = wait_with_heartbeat(&raw_log, log_pos); else - ret = wait_without_heartbeat(log_pos); + ret = wait_without_heartbeat(&raw_log, log_pos); - mysql_bin_log.unlock_binlog_end_pos(); + raw_log->unlock_binlog_end_pos(); m_thd->EXIT_COND(&old_stage); return ret; } -bool Binlog_sender::stop_waiting_for_update(my_off_t log_pos) const { +bool Binlog_sender::stop_waiting_for_update(my_off_t log_pos, + MYSQL_BIN_LOG *raw_log) const { bool wait_for_ack = !m_is_semi_sync_slave; - if (mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name) > + if (raw_log->get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name) > log_pos || - (!wait_for_ack && !mysql_bin_log.is_active(m_linfo.log_file_name)) || + (!wait_for_ack && !raw_log->is_active(m_linfo.log_file_name)) || m_thd->killed) { return true; } return false; } -inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos) { +static void swap_and_relock_log(MYSQL_BIN_LOG **raw_log) { + // Switch log within wait_*_heartbeat, see wait_new_events() + (*raw_log)->unlock_binlog_end_pos(); + bool is_locked = dump_log.lock(); + *raw_log = dump_log.get_log(false); + (*raw_log)->lock_binlog_end_pos(); + dump_log.unlock(is_locked); +} + +inline int Binlog_sender::wait_with_heartbeat(MYSQL_BIN_LOG **raw_log, + my_off_t log_pos) { #ifndef NDEBUG ulong hb_info_counter = 0; #endif - while (!stop_waiting_for_update(log_pos)) { - // ignoring timeout on conditional variable - mysql_bin_log.wait_for_update(m_heartbeat_period); - - if (stop_waiting_for_update(log_pos)) { + while (!stop_waiting_for_update(log_pos, *raw_log)) { + // Note: upstream code ignores the timeout on conditional variable + // but due to our implementation of having async readers wait, + // breaking out of wait_for_update early causes heartbeats + // to be sent, so use the return code to determine if + // heartbeats are needed. + int ret = (*raw_log)->wait_for_update(m_heartbeat_period); + + // Upstream unlocks binlog_end_pos, but we'll keep this locked + // through the heartbeat code as in 8.0.28. Unlocking the + // binlog_end_pos is fine too as long as we call get_log() before + // relocking it. + swap_and_relock_log(raw_log); + + if (stop_waiting_for_update(log_pos, *raw_log)) { return 0; } - mysql_bin_log.unlock_binlog_end_pos(); - Scope_guard lock([]() { mysql_bin_log.lock_binlog_end_pos(); }); + + if (!is_timeout(ret)) { + continue; + } #ifndef NDEBUG if (hb_info_counter < 3) { LogErr(INFORMATION_LEVEL, ER_RPL_BINLOG_MASTER_SENDS_HEARTBEAT); @@ -996,10 +1062,14 @@ inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos) { return 0; } -inline int Binlog_sender::wait_without_heartbeat(my_off_t log_pos) { +inline int Binlog_sender::wait_without_heartbeat(MYSQL_BIN_LOG **raw_log, + my_off_t log_pos) { int res = 0; - while (!stop_waiting_for_update(log_pos)) { - res = mysql_bin_log.wait_for_update(); + while (!stop_waiting_for_update(log_pos, *raw_log)) { + res = (*raw_log)->wait_for_update(); + + // Check for log switches, see wait_new_events() + swap_and_relock_log(raw_log); } return res; } @@ -1023,9 +1093,9 @@ int Binlog_sender::check_start_file() { char index_entry_name[FN_REFLEN]; char *name_ptr = nullptr; std::string errmsg; - + auto raw_log = dump_log.get_log(false); if (m_start_file[0] != '\0') { - mysql_bin_log.make_log_name(index_entry_name, m_start_file); + raw_log->make_log_name(index_entry_name, m_start_file); name_ptr = index_entry_name; } else if (m_using_gtid_protocol) { /* @@ -1090,15 +1160,34 @@ int Binlog_sender::check_start_file() { will not find one and an error ER_MASTER_HAS_PURGED_REQUIRED_GTIDS is thrown from there. */ - if (!gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid)) { - mysql_bin_log.report_missing_purged_gtids(m_exclude_gtid, errmsg); + Gtid_set *lost_gtids = const_cast(gtid_state->get_lost_gtids()); + + /** In raft mode we calculate lost gtids from the binlog/relaylog index + * file instead of using the global state that is always based on the + * apply side binlogs. Apply logs are purged on election so global state + * is currently incorrect wrt raft logs. + * + * TODO: Remove this hack after global gtid state is fixed wrt to raft + * logs + */ + Sid_map gtids_lost_sid_map(nullptr); + Gtid_set gtids_lost(>ids_lost_sid_map); + if (enable_raft_plugin) { global_sid_lock->unlock(); + raw_log->get_lost_gtids(>ids_lost); + lost_gtids = >ids_lost; + } + + if (!lost_gtids->is_subset(m_exclude_gtid)) { + mysql_bin_log.report_missing_purged_gtids(lost_gtids, m_exclude_gtid, + errmsg); + if (!enable_raft_plugin) global_sid_lock->unlock(); set_fatal_error(errmsg.c_str()); return 1; } - global_sid_lock->unlock(); + if (!enable_raft_plugin) global_sid_lock->unlock(); Gtid first_gtid = {0, 0}; - if (mysql_bin_log.find_first_log_not_in_gtid_set( + if (raw_log->find_first_log_not_in_gtid_set( index_entry_name, m_exclude_gtid, &first_gtid, errmsg)) { set_fatal_error(errmsg.c_str()); return 1; @@ -1126,7 +1215,7 @@ int Binlog_sender::check_start_file() { then starts from the first file in index file. */ - if (mysql_bin_log.find_log_pos(&m_linfo, name_ptr, true)) { + if (raw_log->find_log_pos(&m_linfo, name_ptr, true)) { set_fatal_error( "Could not find first log file name in binary log " "index file"); diff --git a/sql/rpl_binlog_sender.h b/sql/rpl_binlog_sender.h index ff2a638ff98f..6545af68d0ad 100644 --- a/sql/rpl_binlog_sender.h +++ b/sql/rpl_binlog_sender.h @@ -78,7 +78,7 @@ class Binlog_sender { Checks whether thread should continue awaiting new events @param log_pos Last processed (sent) event id */ - bool stop_waiting_for_update(my_off_t log_pos) const; + bool stop_waiting_for_update(my_off_t log_pos, MYSQL_BIN_LOG *raw_log) const; THD *m_thd; String &m_packet; @@ -421,8 +421,8 @@ class Binlog_sender { @return It returns 0 if succeeds, otherwise 1 is returned. */ int wait_new_events(my_off_t log_pos); - int wait_with_heartbeat(my_off_t log_pos); - int wait_without_heartbeat(my_off_t log_pos); + int wait_with_heartbeat(MYSQL_BIN_LOG **raw_log, my_off_t log_pos); + int wait_without_heartbeat(MYSQL_BIN_LOG **raw_log, my_off_t log_pos); #ifndef NDEBUG /* It is used to count the events that have been sent. */ diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 5d80002ef637..123046f03066 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -616,6 +616,13 @@ int Trans_delegate::before_commit(THD *thd, bool all, (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(¶m.log_file, ¶m.log_pos); + else + thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); + + DBUG_PRINT("enter", + ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos)); int ret = 0; /* After this debug point we mark the transaction as committing in THD. */ @@ -849,7 +856,6 @@ int Trans_delegate::after_commit(THD *thd, bool all) { bool is_real_trans = (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; - thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); param.server_id = thd->server_id; param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type(); @@ -874,7 +880,10 @@ int Trans_delegate::after_rollback(THD *thd, bool all) { bool is_real_trans = (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; - thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(¶m.log_file, ¶m.log_pos); + else + thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); param.server_id = thd->server_id; param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type(); @@ -1408,6 +1417,13 @@ int Raft_replication_delegate::after_commit(THD *thd) { thd->get_trans_marker(¶m.term, ¶m.index); + const char *file = nullptr; + my_off_t pos = 0; + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(&file, &pos); + else + thd->get_trans_fixed_pos(&file, &pos); + int ret = 0; FOREACH_OBSERVER(ret, after_commit, (¶m)); DBUG_RETURN(ret); diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 398637b96016..eeb7af24e17f 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -1759,7 +1759,6 @@ int raft_reset_slave(THD *) { mi->inited = false; mysql_mutex_lock(&mi->rli->data_lock); mi->rli->inited = false; - mi->flush_info(true); /** Clear the retrieved gtid set for this channel. */ @@ -1769,7 +1768,7 @@ int raft_reset_slave(THD *) { mysql_mutex_unlock(&mi->rli->data_lock); mysql_mutex_unlock(&mi->data_lock); - + remove_info(mi); // no longer a slave. will be set again during change master is_slave = false; channel_map.unlock(); diff --git a/sql/sql_class.h b/sql/sql_class.h index ebb7f667b0e5..b8b62602177c 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2012,6 +2012,11 @@ class THD : public MDL_context_owner, NET net; // client connection descriptor String packet; // dynamic buffer for network I/O + /** + * Relay log positions for the transaction + */ + std::pair m_trans_relay_log_pos; + /* The term and index that need to be communicated across different raft * plugin hooks. These fields are not protected by locks since they are * accessed by the same THD serially during different stages of ordered commit @@ -3058,6 +3063,16 @@ class THD : public MDL_context_owner, return; } + void get_trans_relay_log_pos(const char **file_var, my_off_t *pos_var) const { + if (file_var) *file_var = m_trans_relay_log_pos.first.c_str(); + if (pos_var) *pos_var = m_trans_relay_log_pos.second; + } + + void set_trans_relay_log_pos(const std::string &file, my_off_t pos) { + m_trans_relay_log_pos.first = file; + m_trans_relay_log_pos.second = pos; + } + /**@}*/ /* Get the trans marker i.e (term, index) tuple stashed in this THD */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 5cf5cc599c42..1dd6131e895d 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4479,9 +4479,10 @@ static Sys_var_int32 Sys_regexp_stack_limit( static bool update_rpl_wait_for_semi_sync_ack(sys_var *, THD *, enum_var_type) { if (!rpl_wait_for_semi_sync_ack) { - mysql_bin_log.lock_binlog_end_pos(); - mysql_bin_log.signal_update(); - mysql_bin_log.unlock_binlog_end_pos(); + auto raw_log = dump_log.get_log(/*should_lock*/ true); + raw_log->lock_binlog_end_pos(); + raw_log->signal_update(); + raw_log->unlock_binlog_end_pos(); } return false; }