Skip to content

Commit

Permalink
[rampup][clone] Persist synchronization gtid from P_S.log_status
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:

Differential Revision: https://phabricator.intern.facebook.com/D55614528
  • Loading branch information
sunxiayi committed Apr 30, 2024
1 parent f010842 commit 2ddce33
Show file tree
Hide file tree
Showing 15 changed files with 499 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');
INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN';
SET GLOBAL clone_autotune_concurrency = OFF;
SET GLOBAL clone_max_concurrency = 8;
CLONE LOCAL DATA DIRECTORY = 'CLONE_DATADIR';
select ID, STATE, ERROR_NO from performance_schema.clone_status;
ID STATE ERROR_NO
1 Completed 0
select ID, STAGE, STATE from performance_schema.clone_progress;
ID STAGE STATE
1 DROP DATA Completed
1 FILE COPY Completed
1 PAGE COPY Completed
1 SST COPY Completed
1 REDO COPY Completed
1 FILE SYNC Completed
1 RESTART Not Started
1 RECOVERY Not Started
SELECT * from t1 ORDER BY col1;
col1 col2
10 clone row 1
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;
CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');
INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN';
SET GLOBAL clone_autotune_concurrency = OFF;
SET GLOBAL clone_max_concurrency = 8;
SET GLOBAL clone_valid_donor_list = 'AnY';
CLONE INSTANCE FROM USER@HOST:PORT IDENTIFIED BY '' DATA DIRECTORY = 'CLONE_DATADIR';
select ID, STATE, ERROR_NO from performance_schema.clone_status;
ID STATE ERROR_NO
1 Completed 0
select ID, STAGE, STATE from performance_schema.clone_progress;
ID STAGE STATE
1 DROP DATA Completed
1 FILE COPY Completed
1 PAGE COPY Completed
1 SST COPY Completed
1 REDO COPY Completed
1 FILE SYNC Completed
1 RESTART Not Started
1 RECOVERY Not Started
SELECT * from t1 ORDER BY col1;
col1 col2
10 clone row 1
20 clone row 2
30 clone row 3
#synchronization_coordinates contents match content from log_status table.
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
104 changes: 104 additions & 0 deletions mysql-test/suite/clone/t/local_create_synchronization_coordinates.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Test after local clone command, synchronization_coordinates file is created
# synchronization_coordinates file contains 4 key/val pairs, only examine 3 Here
# excluding gtid get from binlog_file/offset

--source include/have_example_plugin.inc
--source ../include/clone_connection_begin.inc

# Populate gtid
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=ON;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=ON;

CREATE TABLE t1(col1 INT PRIMARY KEY, col2 char(64));
INSERT INTO t1 VALUES(10, 'clone row 1');
INSERT INTO t1 VALUES(20, 'clone row 2');
INSERT INTO t1 VALUES(30, 'clone row 3');

# Install Clone Plugin
--replace_result $CLONE_PLUGIN CLONE_PLUGIN
--eval INSTALL PLUGIN clone SONAME '$CLONE_PLUGIN'

# Clone data
--connection clone_conn_1
--let $CLONE_DATADIR = $MYSQL_TMP_DIR/data_new
--source ../include/clone_command.inc
--connection default

# Validate data
SELECT * from t1 ORDER BY col1;

# Validate gtid value
--disable_query_log
output $MYSQLTEST_VARDIR/tmp/v_local.json;
SELECT local FROM performance_schema.log_status;
--enable_query_log
file_exists $CLONE_DATADIR/#clone/#synchronization_coordinates;

--perl
use JSON;
my $json_text;
{
open(my $json_fh, "<:encoding(UTF-8)", "$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json")
or die("Can't open \$ENV{MYSQLTEST_VARDIR}/tmp/v_local.json\": $!\n");
my $line_count = 0;
while (my $line = <$json_fh>) {
$line_count++;
if ($line_count == 2) {
$json_text = $line;
last; # Exit the loop after reading the second line
}
}
close($json_fh);
}

my $data = decode_json($json_text);
my $gtid = $data->{"gtid_executed"};
my $binlog_file = $data->{"binary_log_file"};
my $binlog_pos = $data->{"binary_log_position"};
chomp($gtid); # Normalize
chomp($binlog_file); # Normalize
chomp($binlog_pos); # Normalize
open(my $fh, '<', "$ENV{MYSQL_TMP_DIR}/data_new/#clone/#synchronization_coordinates") or die "Cannot open #synchronization_coordinates file: $!";
my $line_number = 1;
my ($gtid_created, $binlog_file_created, $binlog_pos_created);
while (my $line = <$fh>) {
chomp $line; # Remove newline character
if ($line_number == 2) {
$gtid_created = $line;
}
elsif ($line_number == 4) {
$binlog_file_created = $line;
}
elsif ($line_number == 6) {
$binlog_pos_created = $line;
}
last if $line_number >= 6; # Stop processing after line 6
$line_number++;
}
close $fh;

if ($gtid eq $gtid_created && $binlog_file == $binlog_file_created && $binlog_pos == $binlog_pos_created) {
print "#synchronization_coordinates contents match content from log_status table.\n";
} else {
print "gtid from log_status: |$gtid|\n";
print "gtid from file: |$gtid_created|\n";
print "binlog_file from log_status: |$binlog_file|\n";
print "binlog_file from file: |$binlog_file_created|\n";
print "binlog_pos from log_status: |$binlog_pos|\n";
print "binlog_pos from file: |$binlog_pos_created|\n";
die "Mismatch between #synchronization_coordinates and log_status table.";
}
EOF

# Cleanup
SET @@GLOBAL.GTID_MODE=ON_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE=OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY=OFF;
DROP TABLE t1;
UNINSTALL PLUGIN clone;
--force-rmdir $CLONE_DATADIR
--exec rm -f $MYSQLTEST_VARDIR/tmp/v_local.json;
--source ../include/clone_connection_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Test after remote clone command to replace data directory, synchronization_coordinates file is created

--let $remote_clone = 1
--let $any_donor_address = 1
--let $HOST = 127.0.0.1
--let $PORT =`select @@port`
--let $USER = root

--source local_create_synchronization_coordinates.test
9 changes: 8 additions & 1 deletion plugin/clone/include/clone.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ const uint32_t CLONE_PROTOCOL_VERSION_V2 = 0x0101;
/** Send more configurations required by recipient. */
const uint32_t CLONE_PROTOCOL_VERSION_V3 = 0x0102;

/** Send GTID executed set after engine syncronization. */
const uint32_t CLONE_PROTOCOL_VERSION_V4 = 0x0103;

/** Clone protocol latest version */
const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V3;
const uint32_t CLONE_PROTOCOL_VERSION = CLONE_PROTOCOL_VERSION_V4;

/** Flag to indicate no backup lock for DDL. This is multiplexed with
clone_ddl_timeout and sent to donor server. */
Expand Down Expand Up @@ -194,6 +197,10 @@ typedef enum Type_Command_Response : uchar {
/** Additional configuration : introduced in version 0x0102 */
COM_RES_CONFIG_V3,

/** GTID executed set after engine synchronization: introduced in version
0x0103 */
COM_RES_GTID_V4,

/** End of response data */
COM_RES_COMPLETE = 99,

Expand Down
16 changes: 16 additions & 0 deletions plugin/clone/include/clone_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Clone Plugin: Client Interface
#ifndef CLONE_CLIENT_H
#define CLONE_CLIENT_H

#include "clone.h"
#include "plugin/clone/include/clone.h"
#include "plugin/clone/include/clone_common.h"
#include "plugin/clone/include/clone_hton.h"
Expand Down Expand Up @@ -580,6 +581,11 @@ class Client {
/** Destroy PFS mutex for table. */
static void uninit_pfs();

/** Write syncronization coordinate to a new file.
@param[in] syncronization_coordinate syncronization coordinate */
void persist_syncronization_coordinate(
const Key_Value &syncronization_coordinate);

private:
/** Connect to remote server
@param[in] is_restart restarting clone after network failure
Expand Down Expand Up @@ -701,6 +707,13 @@ class Client {
@return error code */
int set_descriptor(const uchar *buffer, size_t length);

/** Set syncronization coordinate by recording that in a new file,
after extracting key and value from the buffer.
@param[in] buffer serialized data descriptor
@param[in] length length of serialized data
@return error code */
int set_syncronization_coordinate(const uchar *buffer, size_t length);

/** Extract and set error mesg from remote server
@param[in] buffer Remote error buffer
@param[in] length length of error buffer
Expand Down Expand Up @@ -826,6 +839,9 @@ class Client_Cbk : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(uint64_t estimate_delta) override;

/** Synchronize engines callback: Not used for client. */
[[nodiscard]] int synchronize_engines() override;

private:
/** Apply data to local file or buffer.
@param[in,out] to_file destination file
Expand Down
33 changes: 32 additions & 1 deletion plugin/clone/include/clone_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef CLONE_COMMON_H
#define CLONE_COMMON_H

#include "clone.h"
#include "sql/handler.h"

namespace myclone {
Expand All @@ -25,7 +26,37 @@ class Ha_clone_common_cbk : public Ha_clone_cbk {
public:
[[nodiscard]] int precopy(THD *thd, uint task_id) override;

[[nodiscard]] int synchronize_engines() override;
protected:
/** Perform the cross-engine synchronization for logs: execute a
performance_schema.log_status query, and call set_log_stop for each storage
engine with its part of STORAGE_ENGINES column JSON object from that query.
Also assign synchronization coordinates get from log_status table to the
parameter synchronization_coordinates.
@param[in] synchronization_coordinates syncronization coordinates
@return error code */
int synchronize_logs(Key_Values &synchronization_coordinates);

private:
/** Get json object from the json wrapper after doing json validations
@param[in] object_name the object name, used for debug printing
@param[in] json_wrapper json wrapper to be converted to be json object
@return converted json object */
const Json_object *get_json_object(const char *object_name,
Json_wrapper &json_wrapper);

/** Extract key and values from local_repl_info.
local_repl_info has 3 pieces of information: gtid, binlog_file, offset.
We will also get gtid corresponding to the binlog_file and offset and
record that gtid as well. This is to sanity check that this gtid we get from
binlog_file and offset is same as what we have in local_repl_info.
@param[in] local_repl_info the "local" column we get from log_status table
@param[in] synchronization_coordinates Key_Values of gtid(from log_status
table), binlog_file, binlog_offset and gtid(from binlog_file and
binlog_offset)
@return error code */
int populate_synchronization_coordinates(
const Json_object *local_repl_info,
Key_Values &synchronization_coordinates);
};

} // namespace myclone
Expand Down
5 changes: 5 additions & 0 deletions plugin/clone/include/clone_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ class Local_Callback : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(std::uint64_t estimated_delta) override;

/** Synchronize engines callback: synchronize logs for every engine, then
persist coordinates to a file.
@return error code */
[[nodiscard]] int synchronize_engines() override;

private:
/** Apply data using storage engine apply interface.
@return error code */
Expand Down
18 changes: 18 additions & 0 deletions plugin/clone/include/clone_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class Server {
return m_protocol_version < CLONE_PROTOCOL_VERSION_V3;
}

/** @return true iff sending synchronization coordinates(gtid & binlog info)
*/
bool should_send_synchronization_coordinates() const {
return m_protocol_version >= CLONE_PROTOCOL_VERSION_V4;
}

private:
/** Extract client ddl timeout and backup lock flag.
@param[in] client_timeout timeout value received from client */
Expand Down Expand Up @@ -286,9 +292,21 @@ class Server_Cbk : public Ha_clone_common_cbk {
@param[in] estimate_delta how many bytes to add to the estimate */
void add_to_data_size_estimate(std::uint64_t) override { assert(0); }

/** Synchronize engines callback: synchronize logs for every engine, then
send the synchronization coordinates to client.
@return error code */
[[nodiscard]] int synchronize_engines() override;

private:
/** Clone server object */
Server *m_clone_server;

/** Send coordinate after engine synchronization
@param[in] server the server handle for the callback
@param[in] syncronization_coordinate synchronization coordinate
@return 0 if successful, non-zero if an error occurred. */
int send_synchronization_coordinate(
Server *server, const Key_Value &syncronization_coordinate);
};

} // namespace myclone
Expand Down
13 changes: 13 additions & 0 deletions plugin/clone/include/clone_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Clone Plugin: Client Status Interface

#include <mysql/components/services/pfs_plugin_table_service.h>
#include <array>
#include <random>
#include "clone.h"
#include "my_systime.h"
#include "plugin/clone/include/clone.h"

Expand Down Expand Up @@ -212,6 +214,12 @@ class Status_pfs : public Table_pfs {
@param[in] write_error write error information. */
void write(bool write_error);

/** Write to synchronization gtid file.
@param[in] coordinate synchronization coordinate sent from server.
@param[in] remove remove the file if exists */
void write_synchronization_coordinate(const Key_Value &coordinate,
bool remove = false);

/* @return true, if destination is current database. */
bool is_local() const {
return (0 == strncmp(&m_destination[0], &g_local_string[0],
Expand Down Expand Up @@ -247,7 +255,9 @@ class Status_pfs : public Table_pfs {
m_start_time = my_micro_time();
m_end_time = 0;
m_state = STATE_STARTED;
m_synchronization_coordinates.clear();
write(false);
write_synchronization_coordinate({}, true);
}

/** Update PFS table data while ending clone operation.
Expand Down Expand Up @@ -316,6 +326,9 @@ class Status_pfs : public Table_pfs {

/** Clone GTID set */
std::string m_gtid_string;

/** Synchronization coordinates. */
Key_Values m_synchronization_coordinates;
};

private:
Expand Down
Loading

0 comments on commit 2ddce33

Please sign in to comment.