Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clone] Persist synchronization gtid from P_S.log_status #1450

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');
INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN';
SET GLOBAL clone_autotune_concurrency = OFF;
SET GLOBAL clone_max_concurrency = 8;
CLONE LOCAL DATA DIRECTORY = 'CLONE_DATADIR';
select ID, STATE, ERROR_NO from performance_schema.clone_status;
ID STATE ERROR_NO
1 Completed 0
select ID, STAGE, STATE from performance_schema.clone_progress;
ID STAGE STATE
1 DROP DATA Completed
1 FILE COPY Completed
1 PAGE COPY Completed
1 SST COPY Completed
1 REDO COPY Completed
1 FILE SYNC Completed
1 RESTART Not Started
1 RECOVERY Not Started
SELECT * from t1 ORDER BY col1;
col1 col2
10 clone row 1
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
DROP TABLE t1;
UNINSTALL PLUGIN clone;
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');
INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN';
SET GLOBAL clone_autotune_concurrency = OFF;
SET GLOBAL clone_max_concurrency = 8;
SET GLOBAL clone_valid_donor_list = 'AnY';
CLONE INSTANCE FROM USER@HOST:PORT IDENTIFIED BY '' DATA DIRECTORY = 'CLONE_DATADIR';
select ID, STATE, ERROR_NO from performance_schema.clone_status;
ID STATE ERROR_NO
1 Completed 0
select ID, STAGE, STATE from performance_schema.clone_progress;
ID STAGE STATE
1 DROP DATA Completed
1 FILE COPY Completed
1 PAGE COPY Completed
1 SST COPY Completed
1 REDO COPY Completed
1 FILE SYNC Completed
1 RESTART Not Started
1 RECOVERY Not Started
SELECT * from t1 ORDER BY col1;
col1 col2
10 clone row 1
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
DROP TABLE t1;
UNINSTALL PLUGIN clone;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--gtid-mode=ON --enforce-gtid-consistency
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Test that after a local clone command, synchronization_coordinates file is created
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved
# containing 4 key/val pairs, we only examine 3 here
# excluding gtid get from binlog_file/offset

--source ../include/clone_connection_begin.inc
--source include/have_clone_plugin.inc
--source include/have_gtid.inc

# Populate gtid
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');

# Install Clone Plugin
--replace_result $CLONE_PLUGIN CLONE_PLUGIN
--eval INSTALL PLUGIN clone SONAME '$CLONE_PLUGIN'

# Clone data
--connection clone_conn_1
--let $CLONE_DATADIR = $MYSQL_TMP_DIR/data_new
--source ../include/clone_command.inc
--connection default

# Validate data
SELECT * from t1 ORDER BY col1;

# Validate gtid value
--disable_query_log
output $MYSQLTEST_VARDIR/tmp/v_local.json;
SELECT local FROM performance_schema.log_status;
--enable_query_log
file_exists $CLONE_DATADIR/#clone/#synchronization_coordinates;

--perl
use JSON;
my $json_text;
{
open(my $json_fh, "<:encoding(UTF-8)", "$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json")
or die("Can't open \$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json\": $!\n");
my $line_count = 0;
while (my $line = <$json_fh>) {
$line_count++;
if ($line_count == 2) {
$json_text = $line;
last; # Exit the loop after reading the second line
}
}
close($json_fh);
}

my $data = decode_json($json_text);
my $gtid = $data->{"gtid_executed"};
my $binlog_file = $data->{"binary_log_file"};
my $binlog_pos = $data->{"binary_log_position"};
chomp($gtid); # Normalize
chomp($binlog_file); # Normalize
chomp($binlog_pos); # Normalize
open(my $fh, '<', "$ENV{MYSQL_TMP_DIR}/data_new/#clone/#synchronization_coordinates") or die "Cannot open #synchronization_coordinates file: $!";
my $line_number = 1;
my ($gtid_created, $binlog_file_created, $binlog_pos_created);
while (my $line = <$fh>) {
chomp $line; # Remove newline character
if ($line_number == 2) {
$gtid_created = $line;
}
elsif ($line_number == 4) {
$binlog_file_created = $line;
}
elsif ($line_number == 6) {
$binlog_pos_created = $line;
}
last if $line_number >= 6; # Stop processing after line 6
$line_number++;
}
close $fh;

if ($gtid eq $gtid_created && $binlog_file == $binlog_file_created && $binlog_pos == $binlog_pos_created) {
print "#synchronization_coordinates contents match content from log_status table.\n";
} else {
print "gtid from log_status: |$gtid|\n";
print "gtid from file: |$gtid_created|\n";
print "binlog_file from log_status: |$binlog_file|\n";
print "binlog_file from file: |$binlog_file_created|\n";
print "binlog_pos from log_status: |$binlog_pos|\n";
print "binlog_pos from file: |$binlog_pos_created|\n";
die "Mismatch between #synchronization_coordinates and log_status table.";
}
EOF

# Cleanup
DROP TABLE t1;
UNINSTALL PLUGIN clone;
--force-rmdir $CLONE_DATADIR
--remove_file $MYSQLTEST_VARDIR/tmp/v_local.json
--source ../include/clone_connection_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--gtid-mode=ON --enforce-gtid-consistency
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Test after remote clone command to replace data directory, synchronization_coordinates file is created

--let $remote_clone = 1
--let $any_donor_address = 1
--let $HOST = 127.0.0.1
--let $PORT =`select @@port`
--let $USER = root

--source local_create_synchronization_coordinates.test
9 changes: 8 additions & 1 deletion plugin/clone/include/clone.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ const uint32_t CLONE_PROTOCOL_VERSION_V2 = 0x0101;
/** Send more configurations required by recipient. */
const uint32_t CLONE_PROTOCOL_VERSION_V3 = 0x0102;

/** Send GTID executed set after engine synchronization. */
const uint32_t CLONE_PROTOCOL_VERSION_V4 = 0x0103;
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved

/** Clone protocol latest version */
const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V3;
const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V4;
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved

/** Flag to indicate no backup lock for DDL. This is multiplexed with
clone_ddl_timeout and sent to donor server. */
Expand Down Expand Up @@ -194,6 +197,10 @@ typedef enum Type_Command_Response : uchar {
/** Additional configuration : introduced in version 0x0102 */
COM_RES_CONFIG_V3,

/** GTID executed set after engine synchronization: introduced in version
0x0103 */
COM_RES_GTID_V4,

/** End of response data */
COM_RES_COMPLETE = 99,

Expand Down
16 changes: 16 additions & 0 deletions plugin/clone/include/clone_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,11 @@ class Client {
/** Destroy PFS mutex for table. */
static void uninit_pfs();

/** Write synchronization coordinate to a new file.
@param[in] synchronization_coordinate synchronization coordinate */
void persist_synchronization_coordinate(
const Key_Value &synchronization_coordinate);

private:
/** Connect to remote server
@param[in] is_restart restarting clone after network failure
Expand Down Expand Up @@ -701,6 +706,14 @@ class Client {
@return error code */
int set_descriptor(const uchar *buffer, size_t length);

/** Set synchronization coordinate by recording that in a new file,
after extracting key and value from the buffer.
@param[in] buffer serialized data descriptor
@param[in] length length of serialized data
@return error code */
[[nodiscard]] int set_synchronization_coordinate(const uchar *buffer,
size_t length);

/** Extract and set error mesg from remote server
@param[in] buffer Remote error buffer
@param[in] length length of error buffer
Expand Down Expand Up @@ -826,6 +839,9 @@ class Client_Cbk : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(uint64_t estimate_delta) override;

/** Synchronize engines callback: Not used for client. */
[[nodiscard]] int synchronize_engines() override;

private:
/** Apply data to local file or buffer.
@param[in,out] to_file destination file
Expand Down
33 changes: 32 additions & 1 deletion plugin/clone/include/clone_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef CLONE_COMMON_H
#define CLONE_COMMON_H

#include "clone.h"
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved
#include "sql/handler.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#include <string_view>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is it used for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::string_view for a get_json_object parameter

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird I can use string_view without declaring the header still?

Copy link
Contributor

@laurynas-biveinis laurynas-biveinis May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably one of the other headers pulls it in, but

  1. transitive includes are brittle as the other headers (including standard library ones) may stop including it at any time
  2. source files and headers should include their dependencies directly - this is also something that tooling like include-what-you-use and clang-tidy enforce


namespace myclone {
Expand All @@ -25,7 +26,37 @@ class Ha_clone_common_cbk : public Ha_clone_cbk {
public:
[[nodiscard]] int precopy(THD *thd, uint task_id) override;

[[nodiscard]] int synchronize_engines() override;
protected:
/** Perform the cross-engine synchronization for logs: execute a
performance_schema.log_status query, and call set_log_stop for each storage
engine with its part of STORAGE_ENGINES column JSON object from that query.
Also assign synchronization coordinates get from log_status table to the
parameter synchronization_coordinates.
@param[in] synchronization_coordinates synchronization coordinates
@return error code */
[[nodiscard]] int synchronize_logs(Key_Values &synchronization_coordinates);

private:
/** Get json object from the json wrapper after doing json validations
@param[in] object_name the object name, used for debug printing
@param[in] json_wrapper json wrapper to be converted to be json object
@return converted json object */
[[nodiscard]] static const Json_object &get_json_object(
std::string_view object_name, const Json_wrapper &json_wrapper);

/** Extract key and values from local_repl_info.
local_repl_info has 3 pieces of information: gtid, binlog_file, offset.
We will also get gtid corresponding to the binlog_file and offset and
record that gtid as well. This is to sanity check that this gtid we get from
sunxiayi marked this conversation as resolved.
Show resolved Hide resolved
binlog_file and offset is same as what we have in local_repl_info.
@param[in] local_repl_info the "local" column we get from log_status table
@param[in] synchronization_coordinates Key_Values of gtid(from log_status
table), binlog_file, binlog_offset and gtid(from binlog_file and
binlog_offset)
@return error code */
[[nodiscard]] int populate_synchronization_coordinates(
const Json_object &local_repl_info,
Key_Values &synchronization_coordinates);
};

} // namespace myclone
Expand Down
5 changes: 5 additions & 0 deletions plugin/clone/include/clone_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ class Local_Callback : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(std::uint64_t estimated_delta) override;

/** Synchronize engines callback: synchronize logs for every engine, then
persist coordinates to a file.
@return error code */
[[nodiscard]] int synchronize_engines() override;

private:
/** Apply data using storage engine apply interface.
@return error code */
Expand Down
18 changes: 18 additions & 0 deletions plugin/clone/include/clone_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class Server {
return m_protocol_version < CLONE_PROTOCOL_VERSION_V3;
}

/** @return true iff sending synchronization coordinates(gtid & binlog info)
*/
[[nodiscard]] bool should_send_synchronization_coordinates() const {
return m_protocol_version >= CLONE_PROTOCOL_VERSION_V4;
}

private:
/** Extract client ddl timeout and backup lock flag.
@param[in] client_timeout timeout value received from client */
Expand Down Expand Up @@ -286,9 +292,21 @@ class Server_Cbk : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(std::uint64_t) override { assert(0); }

/** Synchronize engines callback: synchronize logs for every engine, then
send the synchronization coordinates to client.
@return error code */
[[nodiscard]] int synchronize_engines() override;

private:
/** Clone server object */
Server *m_clone_server;

/** Send coordinate after engine synchronization
@param[in] server the server handle for the callback
@param[in] synchronization_coordinate synchronization coordinate
@return 0 if successful, non-zero if an error occurred. */
int send_synchronization_coordinate(
Server *server, const Key_Value &synchronization_coordinate);
};

} // namespace myclone
Expand Down
17 changes: 17 additions & 0 deletions plugin/clone/include/clone_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ class Status_pfs : public Table_pfs {
@param[in] write_error write error information. */
void write(bool write_error);

/** Write to synchronization gtid file.
@param[in] coordinate synchronization coordinate sent from server. */
void write_synchronization_coordinate(const Key_Value &coordinate);

/** Delete synchronization coordinate file, if the file does not exist, will
* not report error. */
void delete_synchronization_coordinate_file();

/** Get full file name from the given file_name. */
const std::string get_full_file_name(std::string_view file_name);

/* @return true, if destination is current database. */
bool is_local() const {
return (0 == strncmp(&m_destination[0], &g_local_string[0],
Expand Down Expand Up @@ -247,7 +258,9 @@ class Status_pfs : public Table_pfs {
m_start_time = my_micro_time();
m_end_time = 0;
m_state = STATE_STARTED;
m_synchronization_coordinates.clear();
write(false);
delete_synchronization_coordinate_file();
}

/** Update PFS table data while ending clone operation.
Expand Down Expand Up @@ -316,6 +329,10 @@ class Status_pfs : public Table_pfs {

/** Clone GTID set */
std::string m_gtid_string;

/** Synchronization coordinates consisting of gtid, binlog file/offset from
* log_status. */
Key_Values m_synchronization_coordinates;
};

private:
Expand Down
Loading