Skip to content

Commit

Permalink
[rampup][clone] Persist synchronization gtid from P_S.log_status
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:

Differential Revision: https://phabricator.intern.facebook.com/D55614528
  • Loading branch information
sunxiayi committed May 16, 2024
1 parent c8e1f06 commit 3c6bc6d
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
Expand Down Expand Up @@ -29,9 +25,5 @@ col1 col2
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
Expand Down Expand Up @@ -30,9 +26,5 @@ col1 col2
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--gtid-mode=ON --enforce-gtid-consistency
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
# excluding gtid get from binlog_file/offset

--source ../include/clone_connection_begin.inc
--source include/have_clone_plugin.inc
--source include/have_gtid.inc

# Populate gtid
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;

CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
Expand Down Expand Up @@ -92,10 +89,6 @@ file_exists $CLONE_DATADIR/#clone/#synchronization_coordinates;
EOF

# Cleanup
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
--force-rmdir $CLONE_DATADIR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--gtid-mode=ON --enforce-gtid-consistency
3 changes: 2 additions & 1 deletion plugin/clone/include/clone_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ class Client {
@param[in] buffer serialized data descriptor
@param[in] length length of serialized data
@return error code */
int set_synchronization_coordinate(const uchar *buffer, size_t length);
[[nodiscard]] int set_synchronization_coordinate(const uchar *buffer,
size_t length);

/** Extract and set error mesg from remote server
@param[in] buffer Remote error buffer
Expand Down
8 changes: 4 additions & 4 deletions plugin/clone/include/clone_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class Ha_clone_common_cbk : public Ha_clone_cbk {
@param[in] object_name the object name, used for debug printing
@param[in] json_wrapper json wrapper to be converted to be json object
@return converted json object */
[[nodiscard]] const Json_object *get_json_object(std::string_view object_name,
Json_wrapper &json_wrapper);
[[nodiscard]] static const Json_object &get_json_object(
std::string_view object_name, const Json_wrapper &json_wrapper);

/** Extract key and values from local_repl_info.
local_repl_info has 3 pieces of information: gtid, binlog_file, offset.
We will also get gtid corresponding to the binlog_file and offset and
local_repl_info has 3 pieces of information: gtid, binlog_file, offset.
We will also get gtid corresponding to the binlog_file and offset and
record that gtid as well. This is to sanity check that this gtid we get from
binlog_file and offset is same as what we have in local_repl_info.
@param[in] local_repl_info the "local" column we get from log_status table
Expand Down
15 changes: 10 additions & 5 deletions plugin/clone/include/clone_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,15 @@ class Status_pfs : public Table_pfs {
void write(bool write_error);

/** Write to synchronization gtid file.
@param[in] coordinate synchronization coordinate sent from server.
@param[in] remove remove the file if exists */
void write_synchronization_coordinate(const Key_Value &coordinate,
bool remove = false);
@param[in] coordinate synchronization coordinate sent from server. */
void write_synchronization_coordinate(const Key_Value &coordinate);

/** Delete synchronization coordinate file, if the file does not exist, will
* not report error. */
void delete_synchronization_coordinate_file();

/** Get full file name from the given file_name. */
const std::string get_full_file_name(std::string_view file_name);

/* @return true, if destination is current database. */
bool is_local() const {
Expand Down Expand Up @@ -255,7 +260,7 @@ class Status_pfs : public Table_pfs {
m_state = STATE_STARTED;
m_synchronization_coordinates.clear();
write(false);
write_synchronization_coordinate({}, true);
delete_synchronization_coordinate_file();
}

/** Update PFS table data while ending clone operation.
Expand Down
3 changes: 1 addition & 2 deletions plugin/clone/src/clone_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1672,8 +1672,7 @@ void Client::persist_synchronization_coordinate(
mysql_mutex_unlock(&s_table_mutex);
}

[[nodiscard]] int Client::set_synchronization_coordinate(const uchar *packet,
size_t length) {
int Client::set_synchronization_coordinate(const uchar *packet, size_t length) {
Key_Value synchronization_coordinate;
const auto err =
extract_key_value(packet, length, synchronization_coordinate);
Expand Down
32 changes: 16 additions & 16 deletions plugin/clone/src/clone_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
#include <filesystem>
#include <string>
#include <utility>
#include "sql/binlog.h"

#include "lex_string.h"
#include "my_dbug.h"
#include "my_inttypes.h"
#include "plugin/clone/include/clone_hton.h"
#include "plugin/clone/include/clone_status.h"
#include "sql-common/json_dom.h"
#include "sql/binlog.h"
#include "sql/handler.h"
#include "sql/sql_plugin_ref.h"
#include "storage/perfschema/table_log_status.h"
Expand Down Expand Up @@ -64,8 +64,8 @@ int Ha_clone_common_cbk::precopy(THD *thd, uint task_id) {
return 0;
}

const Json_object *Ha_clone_common_cbk::get_json_object(
std::string_view object_name, Json_wrapper &json_wrapper) {
const Json_object &Ha_clone_common_cbk::get_json_object(
std::string_view object_name, const Json_wrapper &json_wrapper) {
assert(json_wrapper.is_dom());
const auto *const json_dom = json_wrapper.get_dom();
// get_dom above returns only a const pointer, correctly. Json_wrapper only
Expand All @@ -81,12 +81,13 @@ const Json_object *Ha_clone_common_cbk::get_json_object(
// Size reverse-engineered from ER_CLONE_SERVER_TRACE used by log_error. No
// way to track it automatically, but very unlikely it would silently shrink.
char msg_buf[512];
snprintf(msg_buf, sizeof(msg_buf), "%s: %.*s", object_name.data(),
snprintf(msg_buf, sizeof(msg_buf), "%.*s: %.*s",
static_cast<int>(object_name.size()), object_name.data(),
static_cast<int>(json_str_buf.length()), json_str_buf.ptr());
log_error(nullptr, false, 0, msg_buf);

assert(json_dom->json_type() == enum_json_type::J_OBJECT);
return static_cast<const Json_object *>(json_dom);
return *static_cast<const Json_object *>(json_dom);
}

int Ha_clone_common_cbk::populate_synchronization_coordinates(
Expand Down Expand Up @@ -132,6 +133,9 @@ int Ha_clone_common_cbk::populate_synchronization_coordinates(
{binary_log_position_key_str, std::to_string(binary_log_position_int)});

// get gtid from binlog file/pos
// based on https://bugs.mysql.com/bug.php?id=102175, gtid from binlog
// file/pos and log_status are not guaranteed to be in sync, so we get those
// two values and downstream can decide what to do with them
static const std::string gtid_from_binlog_file_offset_str =
"gtid_from_binlog_file_offset";
Sid_map sid_map(NULL);
Expand All @@ -148,10 +152,9 @@ int Ha_clone_common_cbk::populate_synchronization_coordinates(
"Reading gtid from binlog %s offset %s", full_file_name,
std::to_string(binary_log_position_int).c_str());
log_error(nullptr, false, 0, info_mesg);
MYSQL_BIN_LOG::enum_read_gtids_from_binlog_status ret =
mysql_bin_log.read_gtids_from_binlog(full_file_name, &gtid_executed, NULL,
NULL, &sid_map, false, false,
binary_log_position_int);
const auto ret = mysql_bin_log.read_gtids_from_binlog(
full_file_name, &gtid_executed, NULL, NULL, &sid_map, false, false,
binary_log_position_int);
if (ret == MYSQL_BIN_LOG::ERROR || ret == MYSQL_BIN_LOG::TRUNCATED) {
return ER_BINLOG_FILE_OPEN_FAILED;
} else {
Expand Down Expand Up @@ -180,22 +183,19 @@ int Ha_clone_common_cbk::synchronize_logs(
if (err != 0) return err;

auto &log_status_row = table->get_row();
const Json_object *storage_engines =
const Json_object &storage_engines =
get_json_object("w_storage_engines", log_status_row.w_storage_engines);
const Json_object *local_repl_info =
const Json_object &local_repl_info =
get_json_object("w_local", log_status_row.w_local);
if (local_repl_info == nullptr || storage_engines == nullptr) {
return ER_KEY_NOT_FOUND;
}
err = populate_synchronization_coordinates(*local_repl_info,
err = populate_synchronization_coordinates(local_repl_info,
synchronization_coordinates);
if (err != 0) {
return err;
}

DEBUG_SYNC_C("after_clone_se_sync");

for (const auto &json_se_pos : *storage_engines) {
for (const auto &json_se_pos : storage_engines) {
const auto &se_name = json_se_pos.first;
const LEX_CSTRING lex_c_se_name{.str = se_name.c_str(),
.length = se_name.length()};
Expand Down
2 changes: 1 addition & 1 deletion plugin/clone/src/clone_local.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ int Local_Callback::apply_cbk(Ha_clone_file to_file, bool apply_file,
return err;
}
auto client = get_clone_client();
for (auto &coordinate : synchronization_coordinates) {
for (const auto &coordinate : synchronization_coordinates) {
client->persist_synchronization_coordinate(coordinate);
}
return 0;
Expand Down
2 changes: 1 addition & 1 deletion plugin/clone/src/clone_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ int Server_Cbk::send_synchronization_coordinate(
String_Key key(synchronization_coordinate.first);
String_Key val(synchronization_coordinate.second);
auto err = server->send_key_value(COM_RES_GTID_V4, key, val);
return (err);
return err;
}

// Perform the cross-engine synchronization: execute a
Expand Down
44 changes: 22 additions & 22 deletions plugin/clone/src/clone_status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,22 @@ int Status_pfs::read_column_value(PSI_field *field, uint32_t index) {
return (0);
}

void Status_pfs::Data::write(bool write_error) {
std::string file_name;
const std::string Status_pfs::Data::get_full_file_name(
std::string_view file_name) {
std::string tmp_file_name;
/* Append data directory if cloning to different place. */
if (!is_local()) {
file_name.assign(m_destination);
file_name.append(FN_DIRSEP);
file_name.append(CLONE_VIEW_STATUS_FILE);
tmp_file_name.assign(m_destination);
tmp_file_name.append(FN_DIRSEP);
tmp_file_name.append(file_name);
} else {
file_name.assign(CLONE_VIEW_STATUS_FILE);
tmp_file_name.assign(file_name);
}
return tmp_file_name;
}

void Status_pfs::Data::write(bool write_error) {
const std::string file_name = get_full_file_name(CLONE_VIEW_STATUS_FILE);
std::ofstream status_file;
status_file.open(file_name, std::ofstream::out | std::ofstream::trunc);
if (!status_file.is_open()) {
Expand Down Expand Up @@ -471,37 +476,32 @@ void Status_pfs::Data::write(bool write_error) {
}

void Status_pfs::Data::write_synchronization_coordinate(
const Key_Value &coordinate, bool remove) {
std::string file_name;
/* Append data directory if cloning to different place. */
if (!is_local()) {
file_name.assign(m_destination);
file_name.append(FN_DIRSEP);
file_name.append(CLONE_SYNCHRONIZATION_COORDINATES_FILE);
} else {
file_name.assign(CLONE_SYNCHRONIZATION_COORDINATES_FILE);
}
if (remove) {
std::filesystem::remove(file_name);
return;
}
const Key_Value &coordinate) {
const std::string file_name =
get_full_file_name(CLONE_SYNCHRONIZATION_COORDINATES_FILE);
m_synchronization_coordinates.push_back(coordinate);
std::ofstream status_file;
status_file.open(file_name, std::ofstream::out | std::ofstream::trunc);
if (!status_file.is_open()) {
char msg_buf[512];
snprintf(msg_buf, sizeof(msg_buf), "Cannot open %s for read",
snprintf(msg_buf, sizeof(msg_buf), "Cannot open %s for write",
CLONE_SYNCHRONIZATION_COORDINATES_FILE);
log_error(thd_get_current_thd(), true, ER_CANT_OPEN_FILE, msg_buf);
return;
}
for (auto &coordinate : m_synchronization_coordinates) {
for (const auto &coordinate : m_synchronization_coordinates) {
status_file << coordinate.first << '\n';
status_file << coordinate.second << '\n';
}
status_file.close();
}

void Status_pfs::Data::delete_synchronization_coordinate_file() {
const std::string file_name =
get_full_file_name(CLONE_SYNCHRONIZATION_COORDINATES_FILE);
std::filesystem::remove(file_name);
}

void Status_pfs::Data::read() {
std::string file_name;
file_name.assign(CLONE_VIEW_STATUS_FILE);
Expand Down

0 comments on commit 3c6bc6d

Please sign in to comment.