From c8e1f06a2df18cb18f96c6357e6c45e99b88c712 Mon Sep 17 00:00:00 2001 From: sunxiayi Date: Mon, 1 Apr 2024 16:17:28 -0700 Subject: [PATCH 1/2] [rampup][clone] Persist synchronization gtid from P_S.log_status Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: Differential Revision: https://phabricator.intern.facebook.com/D55614528 --- ..._create_synchronization_coordinates.result | 37 +++++ ..._create_synchronization_coordinates.result | 38 +++++ ...al_create_synchronization_coordinates.test | 103 +++++++++++++ ...te_create_synchronization_coordinates.test | 9 ++ plugin/clone/include/clone.h | 9 +- plugin/clone/include/clone_client.h | 15 ++ plugin/clone/include/clone_common.h | 33 +++- plugin/clone/include/clone_local.h | 5 + plugin/clone/include/clone_server.h | 18 +++ plugin/clone/include/clone_status.h | 12 ++ plugin/clone/src/clone_client.cc | 29 ++++ plugin/clone/src/clone_common.cc | 141 ++++++++++++++---- plugin/clone/src/clone_local.cc | 13 ++ plugin/clone/src/clone_server.cc | 35 ++++- plugin/clone/src/clone_status.cc | 37 +++++ 15 files changed, 504 insertions(+), 30 deletions(-) create mode 100644 mysql-test/suite/clone/r/local_create_synchronization_coordinates.result create mode 100644 mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result create mode 100644 mysql-test/suite/clone/t/local_create_synchronization_coordinates.test create mode 100644 mysql-test/suite/clone/t/remote_create_synchronization_coordinates.test diff --git a/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result b/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result new file mode 100644 index 000000000000..3091b2fa7703 --- /dev/null +++ b/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result @@ -0,0 +1,37 @@ +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON; +CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); +INSERT INTO t1 VALUES(10, 'clone row 1'); +INSERT INTO t1 VALUES(20, 'clone row 2'); +INSERT INTO t1 VALUES(30, 'clone row 3'); +INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN'; +SET GLOBAL clone_autotune_concurrency = OFF; +SET GLOBAL clone_max_concurrency = 8; +CLONE LOCAL DATA DIRECTORY = 'CLONE_DATADIR'; +select ID, STATE, ERROR_NO from performance_schema.clone_status; +ID STATE ERROR_NO +1 Completed 0 +select ID, STAGE, STATE from performance_schema.clone_progress; +ID STAGE STATE +1 DROP DATA Completed +1 FILE COPY Completed +1 PAGE COPY Completed +1 SST COPY Completed +1 REDO COPY Completed +1 FILE SYNC Completed +1 RESTART Not Started +1 RECOVERY Not Started +SELECT * from t1 ORDER BY col1; +col1 col2 +10 clone row 1 +20 clone row 2 +30 clone row 3 +#synchronization_coordinates contents match content from log_status table. +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF; +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; +DROP TABLE t1; +UNINSTALL PLUGIN clone; diff --git a/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result b/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result new file mode 100644 index 000000000000..7bf6e3c570aa --- /dev/null +++ b/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result @@ -0,0 +1,38 @@ +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON; +CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); +INSERT INTO t1 VALUES(10, 'clone row 1'); +INSERT INTO t1 VALUES(20, 'clone row 2'); +INSERT INTO t1 VALUES(30, 'clone row 3'); +INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN'; +SET GLOBAL clone_autotune_concurrency = OFF; +SET GLOBAL clone_max_concurrency = 8; +SET GLOBAL clone_valid_donor_list = 'AnY'; +CLONE INSTANCE FROM USER@HOST:PORT IDENTIFIED BY '' DATA DIRECTORY = 'CLONE_DATADIR'; +select ID, STATE, ERROR_NO from performance_schema.clone_status; +ID STATE ERROR_NO +1 Completed 0 +select ID, STAGE, STATE from performance_schema.clone_progress; +ID STAGE STATE +1 DROP DATA Completed +1 FILE COPY Completed +1 PAGE COPY Completed +1 SST COPY Completed +1 REDO COPY Completed +1 FILE SYNC Completed +1 RESTART Not Started +1 RECOVERY Not Started +SELECT * from t1 ORDER BY col1; +col1 col2 +10 clone row 1 +20 clone row 2 +30 clone row 3 +#synchronization_coordinates contents match content from log_status table. +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF; +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; +DROP TABLE t1; +UNINSTALL PLUGIN clone; diff --git a/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test b/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test new file mode 100644 index 000000000000..ed85373837f1 --- /dev/null +++ b/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test @@ -0,0 +1,103 @@ +# Test that after a local clone command, synchronization_coordinates file is created +# containing 4 key/val pairs, we only examine 3 here +# excluding gtid get from binlog_file/offset + +--source ../include/clone_connection_begin.inc + +# Populate gtid +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=ON; + +CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); +INSERT INTO t1 VALUES(10, 'clone row 1'); +INSERT INTO t1 VALUES(20, 'clone row 2'); +INSERT INTO t1 VALUES(30, 'clone row 3'); + +# Install Clone Plugin +--replace_result $CLONE_PLUGIN CLONE_PLUGIN +--eval INSTALL PLUGIN clone SONAME '$CLONE_PLUGIN' + +# Clone data +--connection clone_conn_1 +--let $CLONE_DATADIR = $MYSQL_TMP_DIR/data_new +--source ../include/clone_command.inc +--connection default + +# Validate data +SELECT * from t1 ORDER BY col1; + +# Validate gtid value +--disable_query_log +output $MYSQLTEST_VARDIR/tmp/v_local.json; +SELECT local FROM performance_schema.log_status; +--enable_query_log +file_exists $CLONE_DATADIR/#clone/#synchronization_coordinates; + +--perl + use JSON; + my $json_text; + { + open(my $json_fh, "<:encoding(UTF-8)", "$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json") + or die("Can't open \$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json\": $!\n"); + my $line_count = 0; + while (my $line = <$json_fh>) { + $line_count++; + if ($line_count == 2) { + $json_text = $line; + last; # Exit the loop after reading the second line + } + } + close($json_fh); + } + + my $data = decode_json($json_text); + my $gtid = $data->{"gtid_executed"}; + my $binlog_file = $data->{"binary_log_file"}; + my $binlog_pos = $data->{"binary_log_position"}; + chomp($gtid); # Normalize + chomp($binlog_file); # Normalize + chomp($binlog_pos); # Normalize + open(my $fh, '<', "$ENV{MYSQL_TMP_DIR}/data_new/#clone/#synchronization_coordinates") or die "Cannot open #synchronization_coordinates file: $!"; + my $line_number = 1; + my ($gtid_created, $binlog_file_created, $binlog_pos_created); + while (my $line = <$fh>) { + chomp $line; # Remove newline character + if ($line_number == 2) { + $gtid_created = $line; + } + elsif ($line_number == 4) { + $binlog_file_created = $line; + } + elsif ($line_number == 6) { + $binlog_pos_created = $line; + } + last if $line_number >= 6; # Stop processing after line 6 + $line_number++; + } + close $fh; + + if ($gtid eq $gtid_created && $binlog_file == $binlog_file_created && $binlog_pos == $binlog_pos_created) { + print "#synchronization_coordinates contents match content from log_status table.\n"; + } else { + print "gtid from log_status: |$gtid|\n"; + print "gtid from file: |$gtid_created|\n"; + print "binlog_file from log_status: |$binlog_file|\n"; + print "binlog_file from file: |$binlog_file_created|\n"; + print "binlog_pos from log_status: |$binlog_pos|\n"; + print "binlog_pos from file: |$binlog_pos_created|\n"; + die "Mismatch between #synchronization_coordinates and log_status table."; + } +EOF + +# Cleanup +SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; +SET @@GLOBAL.GTID_MODE=OFF; +SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; +DROP TABLE t1; +UNINSTALL PLUGIN clone; +--force-rmdir $CLONE_DATADIR +--remove_file $MYSQLTEST_VARDIR/tmp/v_local.json +--source ../include/clone_connection_end.inc diff --git a/mysql-test/suite/clone/t/remote_create_synchronization_coordinates.test b/mysql-test/suite/clone/t/remote_create_synchronization_coordinates.test new file mode 100644 index 000000000000..4b6fea07c898 --- /dev/null +++ b/mysql-test/suite/clone/t/remote_create_synchronization_coordinates.test @@ -0,0 +1,9 @@ +# Test after remote clone command to replace data directory, synchronization_coordinates file is created + +--let $remote_clone = 1 +--let $any_donor_address = 1 +--let $HOST = 127.0.0.1 +--let $PORT =`select @@port` +--let $USER = root + +--source local_create_synchronization_coordinates.test diff --git a/plugin/clone/include/clone.h b/plugin/clone/include/clone.h index 56b21d53171a..b476d62baeab 100644 --- a/plugin/clone/include/clone.h +++ b/plugin/clone/include/clone.h @@ -135,8 +135,11 @@ const uint32_t CLONE_PROTOCOL_VERSION_V2 = 0x0101; /** Send more configurations required by recipient. */ const uint32_t CLONE_PROTOCOL_VERSION_V3 = 0x0102; +/** Send GTID executed set after engine synchronization. */ +const uint32_t CLONE_PROTOCOL_VERSION_V4 = 0x0103; + /** Clone protocol latest version */ -const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V3; +const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V4; /** Flag to indicate no backup lock for DDL. This is multiplexed with clone_ddl_timeout and sent to donor server. */ @@ -194,6 +197,10 @@ typedef enum Type_Command_Response : uchar { /** Additional configuration : introduced in version 0x0102 */ COM_RES_CONFIG_V3, + /** GTID executed set after engine synchronization: introduced in version + 0x0103 */ + COM_RES_GTID_V4, + /** End of response data */ COM_RES_COMPLETE = 99, diff --git a/plugin/clone/include/clone_client.h b/plugin/clone/include/clone_client.h index 075e62331b9b..755710580e3a 100644 --- a/plugin/clone/include/clone_client.h +++ b/plugin/clone/include/clone_client.h @@ -580,6 +580,11 @@ class Client { /** Destroy PFS mutex for table. */ static void uninit_pfs(); + /** Write synchronization coordinate to a new file. + @param[in] synchronization_coordinate synchronization coordinate */ + void persist_synchronization_coordinate( + const Key_Value &synchronization_coordinate); + private: /** Connect to remote server @param[in] is_restart restarting clone after network failure @@ -701,6 +706,13 @@ class Client { @return error code */ int set_descriptor(const uchar *buffer, size_t length); + /** Set synchronization coordinate by recording that in a new file, + after extracting key and value from the buffer. + @param[in] buffer serialized data descriptor + @param[in] length length of serialized data + @return error code */ + int set_synchronization_coordinate(const uchar *buffer, size_t length); + /** Extract and set error mesg from remote server @param[in] buffer Remote error buffer @param[in] length length of error buffer @@ -826,6 +838,9 @@ class Client_Cbk : public Ha_clone_common_cbk { @param[in] estimate_delta how many bytes to add to the estimate */ void add_to_data_size_estimate(uint64_t estimate_delta) override; + /** Synchronize engines callback: Not used for client. */ + [[nodiscard]] int synchronize_engines() override; + private: /** Apply data to local file or buffer. @param[in,out] to_file destination file diff --git a/plugin/clone/include/clone_common.h b/plugin/clone/include/clone_common.h index 7761016151a1..3262c49fad17 100644 --- a/plugin/clone/include/clone_common.h +++ b/plugin/clone/include/clone_common.h @@ -17,6 +17,7 @@ #ifndef CLONE_COMMON_H #define CLONE_COMMON_H +#include "clone.h" #include "sql/handler.h" namespace myclone { @@ -25,7 +26,37 @@ class Ha_clone_common_cbk : public Ha_clone_cbk { public: [[nodiscard]] int precopy(THD *thd, uint task_id) override; - [[nodiscard]] int synchronize_engines() override; + protected: + /** Perform the cross-engine synchronization for logs: execute a + performance_schema.log_status query, and call set_log_stop for each storage + engine with its part of STORAGE_ENGINES column JSON object from that query. + Also assign synchronization coordinates get from log_status table to the + parameter synchronization_coordinates. + @param[in] synchronization_coordinates synchronization coordinates + @return error code */ + [[nodiscard]] int synchronize_logs(Key_Values &synchronization_coordinates); + + private: + /** Get json object from the json wrapper after doing json validations + @param[in] object_name the object name, used for debug printing + @param[in] json_wrapper json wrapper to be converted to be json object + @return converted json object */ + [[nodiscard]] const Json_object *get_json_object(std::string_view object_name, + Json_wrapper &json_wrapper); + + /** Extract key and values from local_repl_info. + local_repl_info has 3 pieces of information: gtid, binlog_file, offset. + We will also get gtid corresponding to the binlog_file and offset and + record that gtid as well. This is to sanity check that this gtid we get from + binlog_file and offset is same as what we have in local_repl_info. + @param[in] local_repl_info the "local" column we get from log_status table + @param[in] synchronization_coordinates Key_Values of gtid(from log_status + table), binlog_file, binlog_offset and gtid(from binlog_file and + binlog_offset) + @return error code */ + [[nodiscard]] int populate_synchronization_coordinates( + const Json_object &local_repl_info, + Key_Values &synchronization_coordinates); }; } // namespace myclone diff --git a/plugin/clone/include/clone_local.h b/plugin/clone/include/clone_local.h index 04b87ae7a01c..89d7a2367aa3 100644 --- a/plugin/clone/include/clone_local.h +++ b/plugin/clone/include/clone_local.h @@ -134,6 +134,11 @@ class Local_Callback : public Ha_clone_common_cbk { @param[in] estimate_delta how many bytes to add to the estimate */ void add_to_data_size_estimate(std::uint64_t estimated_delta) override; + /** Synchronize engines callback: synchronize logs for every engine, then + persist coordinates to a file. + @return error code */ + [[nodiscard]] int synchronize_engines() override; + private: /** Apply data using storage engine apply interface. @return error code */ diff --git a/plugin/clone/include/clone_server.h b/plugin/clone/include/clone_server.h index f2a02586bc26..42492723f1d3 100644 --- a/plugin/clone/include/clone_server.h +++ b/plugin/clone/include/clone_server.h @@ -127,6 +127,12 @@ class Server { return m_protocol_version < CLONE_PROTOCOL_VERSION_V3; } + /** @return true iff sending synchronization coordinates(gtid & binlog info) + */ + [[nodiscard]] bool should_send_synchronization_coordinates() const { + return m_protocol_version >= CLONE_PROTOCOL_VERSION_V4; + } + private: /** Extract client ddl timeout and backup lock flag. @param[in] client_timeout timeout value received from client */ @@ -286,9 +292,21 @@ class Server_Cbk : public Ha_clone_common_cbk { @param[in] estimate_delta how many bytes to add to the estimate */ void add_to_data_size_estimate(std::uint64_t) override { assert(0); } + /** Synchronize engines callback: synchronize logs for every engine, then + send the synchronization coordinates to client. + @return error code */ + [[nodiscard]] int synchronize_engines() override; + private: /** Clone server object */ Server *m_clone_server; + + /** Send coordinate after engine synchronization + @param[in] server the server handle for the callback + @param[in] synchronization_coordinate synchronization coordinate + @return 0 if successful, non-zero if an error occurred. */ + int send_synchronization_coordinate( + Server *server, const Key_Value &synchronization_coordinate); }; } // namespace myclone diff --git a/plugin/clone/include/clone_status.h b/plugin/clone/include/clone_status.h index 7bfd13f40030..165d4e1fcca6 100644 --- a/plugin/clone/include/clone_status.h +++ b/plugin/clone/include/clone_status.h @@ -212,6 +212,12 @@ class Status_pfs : public Table_pfs { @param[in] write_error write error information. */ void write(bool write_error); + /** Write to synchronization gtid file. + @param[in] coordinate synchronization coordinate sent from server. + @param[in] remove remove the file if exists */ + void write_synchronization_coordinate(const Key_Value &coordinate, + bool remove = false); + /* @return true, if destination is current database. */ bool is_local() const { return (0 == strncmp(&m_destination[0], &g_local_string[0], @@ -247,7 +253,9 @@ class Status_pfs : public Table_pfs { m_start_time = my_micro_time(); m_end_time = 0; m_state = STATE_STARTED; + m_synchronization_coordinates.clear(); write(false); + write_synchronization_coordinate({}, true); } /** Update PFS table data while ending clone operation. @@ -316,6 +324,10 @@ class Status_pfs : public Table_pfs { /** Clone GTID set */ std::string m_gtid_string; + + /** Synchronization coordinates consisting of gtid, binlog file/offset from + * log_status. */ + Key_Values m_synchronization_coordinates; }; private: diff --git a/plugin/clone/src/clone_client.cc b/plugin/clone/src/clone_client.cc index 43cf2d51dc83..198fe418849c 100644 --- a/plugin/clone/src/clone_client.cc +++ b/plugin/clone/src/clone_client.cc @@ -26,6 +26,7 @@ Clone Plugin: Client implementation */ #include +#include #include "plugin/clone/include/clone_client.h" #include "plugin/clone/include/clone_os.h" @@ -1534,6 +1535,10 @@ int Client::handle_response(const uchar *packet, size_t length, int in_err, is_last = true; break; + case COM_RES_GTID_V4: + err = set_synchronization_coordinate(packet, length); + break; + case COM_RES_DATA: /* Allow data packet to skip */ if (in_err != 0) { @@ -1660,6 +1665,24 @@ int Client::set_locators(const uchar *buffer, size_t length) { return (err); } +void Client::persist_synchronization_coordinate( + const Key_Value &synchronization_coordinate) { + mysql_mutex_lock(&s_table_mutex); + s_status_data.write_synchronization_coordinate(synchronization_coordinate); + mysql_mutex_unlock(&s_table_mutex); +} + +[[nodiscard]] int Client::set_synchronization_coordinate(const uchar *packet, + size_t length) { + Key_Value synchronization_coordinate; + const auto err = + extract_key_value(packet, length, synchronization_coordinate); + if (err == 0) { + persist_synchronization_coordinate(synchronization_coordinate); + } + return err; +} + int Client::set_descriptor(const uchar *buffer, size_t length) { int err = 0; @@ -1962,4 +1985,10 @@ int Client_Cbk::apply_cbk(Ha_clone_file to_file, bool apply_file, return (err); } +[[nodiscard]] int Client_Cbk::synchronize_engines() { + my_error(ER_FEATURE_UNSUPPORTED, MYF(0), "Remote Clone Client"); + assert(false); + return ER_FEATURE_UNSUPPORTED; +} + } // namespace myclone diff --git a/plugin/clone/src/clone_common.cc b/plugin/clone/src/clone_common.cc index e909ad99abec..413eee99cfdd 100644 --- a/plugin/clone/src/clone_common.cc +++ b/plugin/clone/src/clone_common.cc @@ -16,8 +16,10 @@ #include "plugin/clone/include/clone_common.h" +#include #include #include +#include "sql/binlog.h" #include "lex_string.h" #include "my_dbug.h" @@ -62,29 +64,10 @@ int Ha_clone_common_cbk::precopy(THD *thd, uint task_id) { return 0; } -// Perform the cross-engine synchronization: execute a -// performance_schema.log_status query, and call set_log_stop for each storage -// engine with its part of STORAGE_ENGINES column JSON object from that query. -// If InnoDB is present, this is called between SST COPY and REDO COPY clone -// stages. When MyRocks is the sole storage engine, it should be called after -// creating the final checkpoint. -int Ha_clone_common_cbk::synchronize_engines() { - const auto &all_locators = get_all_locators(); - - std::unique_ptr table{static_cast( - table_log_status::create(&table_log_status::m_share))}; - - auto err = table->rnd_init(true); - assert(err == 0); - - err = table->rnd_next(); - if (err != 0) return err; - - auto &log_status_row = table->get_row(); - const auto &se_positions = log_status_row.w_storage_engines; - assert(se_positions.is_dom()); - - const auto *const json_dom = se_positions.get_dom(); +const Json_object *Ha_clone_common_cbk::get_json_object( + std::string_view object_name, Json_wrapper &json_wrapper) { + assert(json_wrapper.is_dom()); + const auto *const json_dom = json_wrapper.get_dom(); // get_dom above returns only a const pointer, correctly. Json_wrapper only // takes a non-const pointer. Let's trust it will not modify the passed value. const Json_wrapper json_for_str{const_cast(json_dom), true}; @@ -98,16 +81,121 @@ int Ha_clone_common_cbk::synchronize_engines() { // Size reverse-engineered from ER_CLONE_SERVER_TRACE used by log_error. No // way to track it automatically, but very unlikely it would silently shrink. char msg_buf[512]; - snprintf(msg_buf, sizeof(msg_buf), "engine positions: %.*s", + snprintf(msg_buf, sizeof(msg_buf), "%s: %.*s", object_name.data(), static_cast(json_str_buf.length()), json_str_buf.ptr()); log_error(nullptr, false, 0, msg_buf); assert(json_dom->json_type() == enum_json_type::J_OBJECT); + return static_cast(json_dom); +} + +int Ha_clone_common_cbk::populate_synchronization_coordinates( + const Json_object &local_repl_info, + Key_Values &synchronization_coordinates) { + // get synchronization coordinates from log status + static const std::string gtid_executed_key_str = "gtid_executed"; + const auto >id_executed_json = local_repl_info.get(gtid_executed_key_str); + assert(gtid_executed_json->json_type() == enum_json_type::J_STRING); + const auto >id_executed_str_json = + static_cast(*gtid_executed_json); + const auto >id_executed_str = gtid_executed_str_json.value(); + synchronization_coordinates = {{"gtid_from_log_status", gtid_executed_str}}; + + static const std::string binary_log_file_key_str = "binary_log_file"; + const auto &binary_log_file_json_ptr = + local_repl_info.get(binary_log_file_key_str); + if (binary_log_file_json_ptr == nullptr) { + // if there is no binlog file, don't populate + return 0; + } + const auto &binary_log_file_json = *binary_log_file_json_ptr; + assert(binary_log_file_json.json_type() == enum_json_type::J_STRING); + const auto &binary_log_file_str_json = + static_cast(binary_log_file_json); + const auto &binary_log_file_str = binary_log_file_str_json.value(); + synchronization_coordinates.push_back( + {binary_log_file_key_str, binary_log_file_str}); + + static const std::string binary_log_position_key_str = "binary_log_position"; + const auto &binary_log_position_json_ptr = + local_repl_info.get(binary_log_position_key_str); + if (binary_log_position_json_ptr == nullptr) { + // if there is no binlog offset, don't populate + return 0; + } + const auto &binary_log_position_json = *binary_log_position_json_ptr; + assert(binary_log_position_json.json_type() == enum_json_type::J_INT); + const auto &binary_log_position_int_json = + static_cast(binary_log_position_json); + const auto &binary_log_position_int = binary_log_position_int_json.value(); + synchronization_coordinates.push_back( + {binary_log_position_key_str, std::to_string(binary_log_position_int)}); + + // get gtid from binlog file/pos + static const std::string gtid_from_binlog_file_offset_str = + "gtid_from_binlog_file_offset"; + Sid_map sid_map(NULL); + Gtid_set gtid_executed(&sid_map); + char full_file_name[FN_REFLEN]; + mysql_bin_log.make_log_name(full_file_name, binary_log_file_str.c_str()); + // if the binlog has been purged, don't populate + std::filesystem::path file_path(full_file_name); + if (!std::filesystem::exists(file_path)) { + return 0; + } + char info_mesg[512]; + snprintf(info_mesg, sizeof(info_mesg), + "Reading gtid from binlog %s offset %s", full_file_name, + std::to_string(binary_log_position_int).c_str()); + log_error(nullptr, false, 0, info_mesg); + MYSQL_BIN_LOG::enum_read_gtids_from_binlog_status ret = + mysql_bin_log.read_gtids_from_binlog(full_file_name, >id_executed, NULL, + NULL, &sid_map, false, false, + binary_log_position_int); + if (ret == MYSQL_BIN_LOG::ERROR || ret == MYSQL_BIN_LOG::TRUNCATED) { + return ER_BINLOG_FILE_OPEN_FAILED; + } else { + char *gtid_from_binlog_file_offset; + gtid_executed.to_string(>id_from_binlog_file_offset); + synchronization_coordinates.push_back( + {gtid_from_binlog_file_offset_str, + std::string(gtid_from_binlog_file_offset)}); + my_free(gtid_from_binlog_file_offset); + } + + return 0; +} + +int Ha_clone_common_cbk::synchronize_logs( + Key_Values &synchronization_coordinates) { + const auto &all_locators = get_all_locators(); + + std::unique_ptr table{static_cast( + table_log_status::create(&table_log_status::m_share))}; + + auto err = table->rnd_init(true); + assert(err == 0); + + err = table->rnd_next(); + if (err != 0) return err; + + auto &log_status_row = table->get_row(); + const Json_object *storage_engines = + get_json_object("w_storage_engines", log_status_row.w_storage_engines); + const Json_object *local_repl_info = + get_json_object("w_local", log_status_row.w_local); + if (local_repl_info == nullptr || storage_engines == nullptr) { + return ER_KEY_NOT_FOUND; + } + err = populate_synchronization_coordinates(*local_repl_info, + synchronization_coordinates); + if (err != 0) { + return err; + } DEBUG_SYNC_C("after_clone_se_sync"); - const auto *const json_obj = static_cast(json_dom); - for (const auto &json_se_pos : *json_obj) { + for (const auto &json_se_pos : *storage_engines) { const auto &se_name = json_se_pos.first; const LEX_CSTRING lex_c_se_name{.str = se_name.c_str(), .length = se_name.length()}; @@ -133,5 +221,4 @@ int Ha_clone_common_cbk::synchronize_engines() { return 0; } - } // namespace myclone diff --git a/plugin/clone/src/clone_local.cc b/plugin/clone/src/clone_local.cc index 5f31e2c31542..835e726026af 100644 --- a/plugin/clone/src/clone_local.cc +++ b/plugin/clone/src/clone_local.cc @@ -392,4 +392,17 @@ int Local_Callback::apply_cbk(Ha_clone_file to_file, bool apply_file, return (error); } + +[[nodiscard]] int Local_Callback::synchronize_engines() { + Key_Values synchronization_coordinates; + auto err = synchronize_logs(synchronization_coordinates); + if (err != 0) { + return err; + } + auto client = get_clone_client(); + for (auto &coordinate : synchronization_coordinates) { + client->persist_synchronization_coordinate(coordinate); + } + return 0; +} } // namespace myclone diff --git a/plugin/clone/src/clone_server.cc b/plugin/clone/src/clone_server.cc index fa30cabfd129..3d7d16448c1e 100644 --- a/plugin/clone/src/clone_server.cc +++ b/plugin/clone/src/clone_server.cc @@ -409,7 +409,7 @@ int Server::send_key_value(Command_Response rcmd, String_Key &key_str, buf_len += 4; bool send_value = (rcmd == COM_RES_CONFIG || rcmd == COM_RES_PLUGIN_V2 || - rcmd == COM_RES_CONFIG_V3); + rcmd == COM_RES_CONFIG_V3 || rcmd == COM_RES_GTID_V4); /** Add length for value. */ if (send_value) { @@ -730,6 +730,39 @@ int Server_Cbk::buffer_cbk(uchar *from_buffer, uint buf_len) { return (err); } +int Server_Cbk::send_synchronization_coordinate( + Server *server, const Key_Value &synchronization_coordinate) { + String_Key key(synchronization_coordinate.first); + String_Key val(synchronization_coordinate.second); + auto err = server->send_key_value(COM_RES_GTID_V4, key, val); + return (err); +} + +// Perform the cross-engine synchronization: execute a +// performance_schema.log_status query, and call set_log_stop for each storage +// engine with its part of STORAGE_ENGINES column JSON object from that query. +// If InnoDB is present, this is called between SST COPY and REDO COPY clone +// stages. When MyRocks is the sole storage engine, it should be called after +// creating the final checkpoint. +// Also send the synchronization coordinates to client. +int Server_Cbk::synchronize_engines() { + Key_Values synchronization_coordinates; + auto err = synchronize_logs(synchronization_coordinates); + if (err != 0) { + return err; + } + auto server = get_clone_server(); + if (server->should_send_synchronization_coordinates()) { + for (const auto &coordinate : synchronization_coordinates) { + err = send_synchronization_coordinate(server, coordinate); + if (err != 0) { + return err; + } + } + } + return 0; +} + /* purecov: begin deadcode */ int Server_Cbk::apply_file_cbk(Ha_clone_file to_file [[maybe_unused]]) { assert(false); diff --git a/plugin/clone/src/clone_status.cc b/plugin/clone/src/clone_status.cc index 030763f6f769..bc63a90e20af 100644 --- a/plugin/clone/src/clone_status.cc +++ b/plugin/clone/src/clone_status.cc @@ -27,6 +27,7 @@ Clone Plugin: Clone status as performance schema plugin table */ #include "plugin/clone/include/clone_status.h" +#include #include #include #include @@ -53,6 +54,10 @@ SERVICE_TYPE_NO_CONST(pfs_plugin_column_text_v1) *mysql_pfscol_text = nullptr; const char CLONE_RECOVERY_FILE[] = CLONE_FILES_DIR FILE_PREFIX "status_recovery"; +/** Clone synchronization coordinate file. */ +const char CLONE_SYNCHRONIZATION_COORDINATES_FILE[] = + CLONE_FILES_DIR FILE_PREFIX "synchronization_coordinates"; + /** Clone PFS view clone_status persister file */ const char CLONE_VIEW_STATUS_FILE[] = CLONE_FILES_DIR FILE_PREFIX "view_status"; @@ -465,6 +470,38 @@ void Status_pfs::Data::write(bool write_error) { status_file.close(); } +void Status_pfs::Data::write_synchronization_coordinate( + const Key_Value &coordinate, bool remove) { + std::string file_name; + /* Append data directory if cloning to different place. */ + if (!is_local()) { + file_name.assign(m_destination); + file_name.append(FN_DIRSEP); + file_name.append(CLONE_SYNCHRONIZATION_COORDINATES_FILE); + } else { + file_name.assign(CLONE_SYNCHRONIZATION_COORDINATES_FILE); + } + if (remove) { + std::filesystem::remove(file_name); + return; + } + m_synchronization_coordinates.push_back(coordinate); + std::ofstream status_file; + status_file.open(file_name, std::ofstream::out | std::ofstream::trunc); + if (!status_file.is_open()) { + char msg_buf[512]; + snprintf(msg_buf, sizeof(msg_buf), "Cannot open %s for read", + CLONE_SYNCHRONIZATION_COORDINATES_FILE); + log_error(thd_get_current_thd(), true, ER_CANT_OPEN_FILE, msg_buf); + return; + } + for (auto &coordinate : m_synchronization_coordinates) { + status_file << coordinate.first << '\n'; + status_file << coordinate.second << '\n'; + } + status_file.close(); +} + void Status_pfs::Data::read() { std::string file_name; file_name.assign(CLONE_VIEW_STATUS_FILE); From 3c6bc6d5e4fb24cee44aecaccaecb51ecd53f57d Mon Sep 17 00:00:00 2001 From: sunxiayi Date: Mon, 1 Apr 2024 16:17:28 -0700 Subject: [PATCH 2/2] [rampup][clone] Persist synchronization gtid from P_S.log_status Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: Differential Revision: https://phabricator.intern.facebook.com/D55614528 --- ..._create_synchronization_coordinates.result | 8 ---- ..._create_synchronization_coordinates.result | 8 ---- ...ate_synchronization_coordinates-master.opt | 1 + ...al_create_synchronization_coordinates.test | 11 +---- ...ate_synchronization_coordinates-master.opt | 1 + plugin/clone/include/clone_client.h | 3 +- plugin/clone/include/clone_common.h | 8 ++-- plugin/clone/include/clone_status.h | 15 ++++--- plugin/clone/src/clone_client.cc | 3 +- plugin/clone/src/clone_common.cc | 32 +++++++------- plugin/clone/src/clone_local.cc | 2 +- plugin/clone/src/clone_server.cc | 2 +- plugin/clone/src/clone_status.cc | 44 +++++++++---------- 13 files changed, 61 insertions(+), 77 deletions(-) create mode 100644 mysql-test/suite/clone/t/local_create_synchronization_coordinates-master.opt create mode 100644 mysql-test/suite/clone/t/remote_create_synchronization_coordinates-master.opt diff --git a/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result b/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result index 3091b2fa7703..954915f4f750 100644 --- a/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result +++ b/mysql-test/suite/clone/r/local_create_synchronization_coordinates.result @@ -1,7 +1,3 @@ -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON; CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); INSERT INTO t1 VALUES(10, 'clone row 1'); INSERT INTO t1 VALUES(20, 'clone row 2'); @@ -29,9 +25,5 @@ col1 col2 20 clone row 2 30 clone row 3 #synchronization_coordinates contents match content from log_status table. -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF; -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; DROP TABLE t1; UNINSTALL PLUGIN clone; diff --git a/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result b/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result index 7bf6e3c570aa..3d23d853e487 100644 --- a/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result +++ b/mysql-test/suite/clone/r/remote_create_synchronization_coordinates.result @@ -1,7 +1,3 @@ -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON; CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); INSERT INTO t1 VALUES(10, 'clone row 1'); INSERT INTO t1 VALUES(20, 'clone row 2'); @@ -30,9 +26,5 @@ col1 col2 20 clone row 2 30 clone row 3 #synchronization_coordinates contents match content from log_status table. -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF; -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; DROP TABLE t1; UNINSTALL PLUGIN clone; diff --git a/mysql-test/suite/clone/t/local_create_synchronization_coordinates-master.opt b/mysql-test/suite/clone/t/local_create_synchronization_coordinates-master.opt new file mode 100644 index 000000000000..04c76f3dd20e --- /dev/null +++ b/mysql-test/suite/clone/t/local_create_synchronization_coordinates-master.opt @@ -0,0 +1 @@ +--gtid-mode=ON --enforce-gtid-consistency diff --git a/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test b/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test index ed85373837f1..29861d9afeb0 100644 --- a/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test +++ b/mysql-test/suite/clone/t/local_create_synchronization_coordinates.test @@ -3,13 +3,10 @@ # excluding gtid get from binlog_file/offset --source ../include/clone_connection_begin.inc +--source include/have_clone_plugin.inc +--source include/have_gtid.inc # Populate gtid -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=ON; - CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64)); INSERT INTO t1 VALUES(10, 'clone row 1'); INSERT INTO t1 VALUES(20, 'clone row 2'); @@ -92,10 +89,6 @@ file_exists $CLONE_DATADIR/#clone/#synchronization_coordinates; EOF # Cleanup -SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE; -SET @@GLOBAL.GTID_MODE=OFF; -SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF; DROP TABLE t1; UNINSTALL PLUGIN clone; --force-rmdir $CLONE_DATADIR diff --git a/mysql-test/suite/clone/t/remote_create_synchronization_coordinates-master.opt b/mysql-test/suite/clone/t/remote_create_synchronization_coordinates-master.opt new file mode 100644 index 000000000000..04c76f3dd20e --- /dev/null +++ b/mysql-test/suite/clone/t/remote_create_synchronization_coordinates-master.opt @@ -0,0 +1 @@ +--gtid-mode=ON --enforce-gtid-consistency diff --git a/plugin/clone/include/clone_client.h b/plugin/clone/include/clone_client.h index 755710580e3a..298691df4324 100644 --- a/plugin/clone/include/clone_client.h +++ b/plugin/clone/include/clone_client.h @@ -711,7 +711,8 @@ class Client { @param[in] buffer serialized data descriptor @param[in] length length of serialized data @return error code */ - int set_synchronization_coordinate(const uchar *buffer, size_t length); + [[nodiscard]] int set_synchronization_coordinate(const uchar *buffer, + size_t length); /** Extract and set error mesg from remote server @param[in] buffer Remote error buffer diff --git a/plugin/clone/include/clone_common.h b/plugin/clone/include/clone_common.h index 3262c49fad17..b33bb818e5fa 100644 --- a/plugin/clone/include/clone_common.h +++ b/plugin/clone/include/clone_common.h @@ -41,12 +41,12 @@ class Ha_clone_common_cbk : public Ha_clone_cbk { @param[in] object_name the object name, used for debug printing @param[in] json_wrapper json wrapper to be converted to be json object @return converted json object */ - [[nodiscard]] const Json_object *get_json_object(std::string_view object_name, - Json_wrapper &json_wrapper); + [[nodiscard]] static const Json_object &get_json_object( + std::string_view object_name, const Json_wrapper &json_wrapper); /** Extract key and values from local_repl_info. - local_repl_info has 3 pieces of information: gtid, binlog_file, offset. - We will also get gtid corresponding to the binlog_file and offset and + local_repl_info has 3 pieces of information: gtid, binlog_file, offset. + We will also get gtid corresponding to the binlog_file and offset and record that gtid as well. This is to sanity check that this gtid we get from binlog_file and offset is same as what we have in local_repl_info. @param[in] local_repl_info the "local" column we get from log_status table diff --git a/plugin/clone/include/clone_status.h b/plugin/clone/include/clone_status.h index 165d4e1fcca6..aa067588ccfa 100644 --- a/plugin/clone/include/clone_status.h +++ b/plugin/clone/include/clone_status.h @@ -213,10 +213,15 @@ class Status_pfs : public Table_pfs { void write(bool write_error); /** Write to synchronization gtid file. - @param[in] coordinate synchronization coordinate sent from server. - @param[in] remove remove the file if exists */ - void write_synchronization_coordinate(const Key_Value &coordinate, - bool remove = false); + @param[in] coordinate synchronization coordinate sent from server. */ + void write_synchronization_coordinate(const Key_Value &coordinate); + + /** Delete synchronization coordinate file, if the file does not exist, will + * not report error. */ + void delete_synchronization_coordinate_file(); + + /** Get full file name from the given file_name. */ + const std::string get_full_file_name(std::string_view file_name); /* @return true, if destination is current database. */ bool is_local() const { @@ -255,7 +260,7 @@ class Status_pfs : public Table_pfs { m_state = STATE_STARTED; m_synchronization_coordinates.clear(); write(false); - write_synchronization_coordinate({}, true); + delete_synchronization_coordinate_file(); } /** Update PFS table data while ending clone operation. diff --git a/plugin/clone/src/clone_client.cc b/plugin/clone/src/clone_client.cc index 198fe418849c..5a4a0c4228d1 100644 --- a/plugin/clone/src/clone_client.cc +++ b/plugin/clone/src/clone_client.cc @@ -1672,8 +1672,7 @@ void Client::persist_synchronization_coordinate( mysql_mutex_unlock(&s_table_mutex); } -[[nodiscard]] int Client::set_synchronization_coordinate(const uchar *packet, - size_t length) { +int Client::set_synchronization_coordinate(const uchar *packet, size_t length) { Key_Value synchronization_coordinate; const auto err = extract_key_value(packet, length, synchronization_coordinate); diff --git a/plugin/clone/src/clone_common.cc b/plugin/clone/src/clone_common.cc index 413eee99cfdd..ac08af6761b7 100644 --- a/plugin/clone/src/clone_common.cc +++ b/plugin/clone/src/clone_common.cc @@ -19,7 +19,6 @@ #include #include #include -#include "sql/binlog.h" #include "lex_string.h" #include "my_dbug.h" @@ -27,6 +26,7 @@ #include "plugin/clone/include/clone_hton.h" #include "plugin/clone/include/clone_status.h" #include "sql-common/json_dom.h" +#include "sql/binlog.h" #include "sql/handler.h" #include "sql/sql_plugin_ref.h" #include "storage/perfschema/table_log_status.h" @@ -64,8 +64,8 @@ int Ha_clone_common_cbk::precopy(THD *thd, uint task_id) { return 0; } -const Json_object *Ha_clone_common_cbk::get_json_object( - std::string_view object_name, Json_wrapper &json_wrapper) { +const Json_object &Ha_clone_common_cbk::get_json_object( + std::string_view object_name, const Json_wrapper &json_wrapper) { assert(json_wrapper.is_dom()); const auto *const json_dom = json_wrapper.get_dom(); // get_dom above returns only a const pointer, correctly. Json_wrapper only @@ -81,12 +81,13 @@ const Json_object *Ha_clone_common_cbk::get_json_object( // Size reverse-engineered from ER_CLONE_SERVER_TRACE used by log_error. No // way to track it automatically, but very unlikely it would silently shrink. char msg_buf[512]; - snprintf(msg_buf, sizeof(msg_buf), "%s: %.*s", object_name.data(), + snprintf(msg_buf, sizeof(msg_buf), "%.*s: %.*s", + static_cast(object_name.size()), object_name.data(), static_cast(json_str_buf.length()), json_str_buf.ptr()); log_error(nullptr, false, 0, msg_buf); assert(json_dom->json_type() == enum_json_type::J_OBJECT); - return static_cast(json_dom); + return *static_cast(json_dom); } int Ha_clone_common_cbk::populate_synchronization_coordinates( @@ -132,6 +133,9 @@ int Ha_clone_common_cbk::populate_synchronization_coordinates( {binary_log_position_key_str, std::to_string(binary_log_position_int)}); // get gtid from binlog file/pos + // based on https://bugs.mysql.com/bug.php?id=102175, gtid from binlog + // file/pos and log_status are not guaranteed to be in sync, so we get those + // two values and downstream can decide what to do with them static const std::string gtid_from_binlog_file_offset_str = "gtid_from_binlog_file_offset"; Sid_map sid_map(NULL); @@ -148,10 +152,9 @@ int Ha_clone_common_cbk::populate_synchronization_coordinates( "Reading gtid from binlog %s offset %s", full_file_name, std::to_string(binary_log_position_int).c_str()); log_error(nullptr, false, 0, info_mesg); - MYSQL_BIN_LOG::enum_read_gtids_from_binlog_status ret = - mysql_bin_log.read_gtids_from_binlog(full_file_name, >id_executed, NULL, - NULL, &sid_map, false, false, - binary_log_position_int); + const auto ret = mysql_bin_log.read_gtids_from_binlog( + full_file_name, >id_executed, NULL, NULL, &sid_map, false, false, + binary_log_position_int); if (ret == MYSQL_BIN_LOG::ERROR || ret == MYSQL_BIN_LOG::TRUNCATED) { return ER_BINLOG_FILE_OPEN_FAILED; } else { @@ -180,14 +183,11 @@ int Ha_clone_common_cbk::synchronize_logs( if (err != 0) return err; auto &log_status_row = table->get_row(); - const Json_object *storage_engines = + const Json_object &storage_engines = get_json_object("w_storage_engines", log_status_row.w_storage_engines); - const Json_object *local_repl_info = + const Json_object &local_repl_info = get_json_object("w_local", log_status_row.w_local); - if (local_repl_info == nullptr || storage_engines == nullptr) { - return ER_KEY_NOT_FOUND; - } - err = populate_synchronization_coordinates(*local_repl_info, + err = populate_synchronization_coordinates(local_repl_info, synchronization_coordinates); if (err != 0) { return err; @@ -195,7 +195,7 @@ int Ha_clone_common_cbk::synchronize_logs( DEBUG_SYNC_C("after_clone_se_sync"); - for (const auto &json_se_pos : *storage_engines) { + for (const auto &json_se_pos : storage_engines) { const auto &se_name = json_se_pos.first; const LEX_CSTRING lex_c_se_name{.str = se_name.c_str(), .length = se_name.length()}; diff --git a/plugin/clone/src/clone_local.cc b/plugin/clone/src/clone_local.cc index 835e726026af..e038e03e32aa 100644 --- a/plugin/clone/src/clone_local.cc +++ b/plugin/clone/src/clone_local.cc @@ -400,7 +400,7 @@ int Local_Callback::apply_cbk(Ha_clone_file to_file, bool apply_file, return err; } auto client = get_clone_client(); - for (auto &coordinate : synchronization_coordinates) { + for (const auto &coordinate : synchronization_coordinates) { client->persist_synchronization_coordinate(coordinate); } return 0; diff --git a/plugin/clone/src/clone_server.cc b/plugin/clone/src/clone_server.cc index 3d7d16448c1e..8b69ad429233 100644 --- a/plugin/clone/src/clone_server.cc +++ b/plugin/clone/src/clone_server.cc @@ -735,7 +735,7 @@ int Server_Cbk::send_synchronization_coordinate( String_Key key(synchronization_coordinate.first); String_Key val(synchronization_coordinate.second); auto err = server->send_key_value(COM_RES_GTID_V4, key, val); - return (err); + return err; } // Perform the cross-engine synchronization: execute a diff --git a/plugin/clone/src/clone_status.cc b/plugin/clone/src/clone_status.cc index bc63a90e20af..75fd6e2577d0 100644 --- a/plugin/clone/src/clone_status.cc +++ b/plugin/clone/src/clone_status.cc @@ -428,17 +428,22 @@ int Status_pfs::read_column_value(PSI_field *field, uint32_t index) { return (0); } -void Status_pfs::Data::write(bool write_error) { - std::string file_name; +const std::string Status_pfs::Data::get_full_file_name( + std::string_view file_name) { + std::string tmp_file_name; /* Append data directory if cloning to different place. */ if (!is_local()) { - file_name.assign(m_destination); - file_name.append(FN_DIRSEP); - file_name.append(CLONE_VIEW_STATUS_FILE); + tmp_file_name.assign(m_destination); + tmp_file_name.append(FN_DIRSEP); + tmp_file_name.append(file_name); } else { - file_name.assign(CLONE_VIEW_STATUS_FILE); + tmp_file_name.assign(file_name); } + return tmp_file_name; +} +void Status_pfs::Data::write(bool write_error) { + const std::string file_name = get_full_file_name(CLONE_VIEW_STATUS_FILE); std::ofstream status_file; status_file.open(file_name, std::ofstream::out | std::ofstream::trunc); if (!status_file.is_open()) { @@ -471,37 +476,32 @@ void Status_pfs::Data::write(bool write_error) { } void Status_pfs::Data::write_synchronization_coordinate( - const Key_Value &coordinate, bool remove) { - std::string file_name; - /* Append data directory if cloning to different place. */ - if (!is_local()) { - file_name.assign(m_destination); - file_name.append(FN_DIRSEP); - file_name.append(CLONE_SYNCHRONIZATION_COORDINATES_FILE); - } else { - file_name.assign(CLONE_SYNCHRONIZATION_COORDINATES_FILE); - } - if (remove) { - std::filesystem::remove(file_name); - return; - } + const Key_Value &coordinate) { + const std::string file_name = + get_full_file_name(CLONE_SYNCHRONIZATION_COORDINATES_FILE); m_synchronization_coordinates.push_back(coordinate); std::ofstream status_file; status_file.open(file_name, std::ofstream::out | std::ofstream::trunc); if (!status_file.is_open()) { char msg_buf[512]; - snprintf(msg_buf, sizeof(msg_buf), "Cannot open %s for read", + snprintf(msg_buf, sizeof(msg_buf), "Cannot open %s for write", CLONE_SYNCHRONIZATION_COORDINATES_FILE); log_error(thd_get_current_thd(), true, ER_CANT_OPEN_FILE, msg_buf); return; } - for (auto &coordinate : m_synchronization_coordinates) { + for (const auto &coordinate : m_synchronization_coordinates) { status_file << coordinate.first << '\n'; status_file << coordinate.second << '\n'; } status_file.close(); } +void Status_pfs::Data::delete_synchronization_coordinate_file() { + const std::string file_name = + get_full_file_name(CLONE_SYNCHRONIZATION_COORDINATES_FILE); + std::filesystem::remove(file_name); +} + void Status_pfs::Data::read() { std::string file_name; file_name.assign(CLONE_VIEW_STATUS_FILE);