Skip to content

Commit

Permalink
Refactor REPLACE
Browse files Browse the repository at this point in the history
  • Loading branch information
FooBarrior committed Mar 28, 2024
1 parent de001c0 commit ba44c4a
Show file tree
Hide file tree
Showing 10 changed files with 612 additions and 476 deletions.
23 changes: 16 additions & 7 deletions mysql-test/main/long_unique_bugs_replication.result
Expand Up @@ -9,26 +9,34 @@ insert into t1 values (2,2);
update t1 set a1 = 'd' limit 1;
update t1 set a1 = 'd2' where i1= 2;
connection slave;
connection slave;
select * from t1;
i1 a1
1 d
2 d2
connection master;
drop table t1;
connection slave;
connection master;
#
# MDEV-32093 long uniques break old->new replication
#
connection slave;
create table t1 (id int not null, b1 varchar(255) not null, b2 varchar(2550) not null, unique (id), unique key (b1,b2) using hash) default charset utf8mb3;
set global slave_exec_mode=idempotent;
binlog 'aRf2ZA8BAAAA/AAAAAABAAAAAAQAMTAuNS4xNS1NYXJpYURCLWxvZwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABpF/ZkEzgNAAgAEgAEBAQEEgAA5AAEGggAAAAICAgCAAAACgoKAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEEwQADQgICAoKCgFRmTlk';
binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw==';
binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw==';
binlog 'bBf2ZBMBAAAANAAAAHUkAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AaTGFIg==bBf2ZBgBAAAASAAAAL0kAAAAAHEAAAAAAAEABP//8I+kAAABAGIBAGWuv1VNCQAAAPBuWwAAAQBiAQBlrr9VTQkAAADxS9Lu';
connection slave;
select * from t1;
i1 a1
1 d
2 d2
id b1 b2
23406 b e
connection master;
set global slave_exec_mode=default;
drop table t1;
connection slave;
connection master;
#
# End of 10.4 tests
#
# Idempotent scenario, which triggers REPLACE code to be used in the
# event, i.e. duplicated record will be deleted and then re-inserted.
create table t1 (i1 int, a1 text, unique key i1 (a1)) engine=myisam;
Expand Down Expand Up @@ -57,7 +65,8 @@ i1 a1
set global slave_exec_mode = @save_slave_exec_mode;
connection master;
drop table t1;
set global slave_exec_mode=default;
connection slave;
connection master;
#
# End of 10.4 tests
#
Expand Down
8 changes: 6 additions & 2 deletions mysql-test/main/long_unique_bugs_replication.test
Expand Up @@ -31,7 +31,6 @@ connection master;

# this is techically a bug in replication, but it needs an old master
# so we'll run it as a non-replicated test with BINLOG command
sync_slave_with_master;
create table t1 (id int not null, b1 varchar(255) not null, b2 varchar(2550) not null, unique (id), unique key (b1,b2) using hash) default charset utf8mb3;
set global slave_exec_mode=idempotent;

Expand All @@ -45,8 +44,11 @@ binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==
### UPDATE t1 WHERE (42127, 'b', 'e', 39952170926) SET (23406, 'b', 'e', 39952170926)
binlog 'bBf2ZBMBAAAANAAAAHUkAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AaTGFIg==bBf2ZBgBAAAASAAAAL0kAAAAAHEAAAAAAAEABP//8I+kAAABAGIBAGWuv1VNCQAAAPBuWwAAAQBiAQBlrr9VTQkAAADxS9Lu';

drop table t1;
sync_slave_with_master;
select * from t1;
connection master;
set global slave_exec_mode=default;
drop table t1;

--echo #
--echo # End of 10.4 tests
Expand Down Expand Up @@ -77,6 +79,8 @@ select * from t1;
set global slave_exec_mode = @save_slave_exec_mode;
connection master;
drop table t1;
sync_slave_with_master;
connection master;

--echo #
--echo # End of 10.4 tests
Expand Down
6 changes: 5 additions & 1 deletion sql/log_event.h
Expand Up @@ -53,6 +53,7 @@
#include "rpl_record.h"
#include "rpl_reporting.h"
#include "sql_class.h" /* THD */
#include "sql_insert.h"
#endif

#include "rpl_gtid.h"
Expand Down Expand Up @@ -5120,6 +5121,7 @@ class Rows_log_event : public Log_event
*/
MY_BITMAP m_cols_ai;


ulong m_master_reclength; /* Length of record on master side */

/* Bit buffers in the same memory as the class */
Expand Down Expand Up @@ -5155,7 +5157,6 @@ class Rows_log_event : public Log_event

int find_key(); // Find a best key to use in find_row()
int find_row(rpl_group_info *);
int write_row(rpl_group_info *, const bool);
int update_sequence();

// Unpack the current row into m_table->record[0], but with
Expand Down Expand Up @@ -5309,6 +5310,9 @@ class Write_rows_log_event : public Rows_log_event
#endif

#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
COPY_INFO m_copy_info;
Write_record m_write_record;
int write_row(rpl_group_info *, bool);
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(rpl_group_info *);
Expand Down
98 changes: 23 additions & 75 deletions sql/log_event_server.cc
Expand Up @@ -7136,6 +7136,20 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
m_table->s->long_unique_table))
error= m_table->file->ha_rnd_init_with_error(0);

if (!error)
{
bzero(&m_copy_info, sizeof m_copy_info);
m_copy_info.handle_duplicates=
slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT ?
DUP_REPLACE : DUP_ERROR;
m_copy_info.table_list= m_table->pos_in_table_list;
new (&m_write_record) Write_record(thd, m_table, &m_copy_info,
m_vers_from_plain &&
m_table->versioned(VERS_TIMESTAMP),
m_table->triggers && do_invoke_trigger(),
NULL);
}

return error;
}

Expand Down Expand Up @@ -7270,22 +7284,20 @@ is_duplicate_key_error(int errcode)
*/

int
Rows_log_event::write_row(rpl_group_info *rgi,
const bool overwrite)
Write_rows_log_event::write_row(rpl_group_info *rgi,
const bool overwrite)
{
DBUG_ENTER("write_row");
DBUG_ASSERT(m_table != NULL && thd != NULL);

TABLE *table= m_table; // pointer to event's table
int error;
int UNINIT_VAR(keynum);
const bool invoke_triggers= (m_table->triggers && do_invoke_trigger());
auto_afree_ptr<char> key(NULL);

prepare_record(table, m_width, true);

/* unpack row into table->record[0] */
if (unlikely((error= unpack_current_row(rgi))))
int error= unpack_current_row(rgi);
if (unlikely(error))
{
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
Expand Down Expand Up @@ -7354,41 +7366,9 @@ Rows_log_event::write_row(rpl_group_info *rgi,

if (table->s->sequence)
error= update_sequence();
else while (unlikely(error= table->file->ha_write_row(table->record[0])))
else
{
if (error == HA_ERR_LOCK_DEADLOCK ||
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
(keynum= table->file->get_dup_key(error)) < 0 ||
!overwrite)
{
DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
/*
Deadlock, waiting for lock or just an error from the handler
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
Retrieval of the duplicate key number may fail
- either because the error was not "duplicate key" error
- or because the information which key is not available
*/
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}

error= locate_dup_record(thd, table, m_key, keynum);
if (unlikely(error))
DBUG_RETURN(error);

/*
Now, record[1] should contain the offending row. That
will enable us to update it or, alternatively, delete it (so
that we can insert the new row afterwards).
*/
if (table->s->long_unique_table)
{
/* same as for REPLACE/ODKU */
table->move_fields(table->field, table->record[1], table->record[0]);
table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
table->move_fields(table->field, table->record[0], table->record[1]);
}
int error= m_write_record.write_record();

/*
If row is incomplete we will use the record found to fill
Expand All @@ -7402,42 +7382,10 @@ Rows_log_event::write_row(rpl_group_info *rgi,
table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_WRITE);
}

DBUG_PRINT("debug",("preparing for update: before and after image"));
DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);

COPY_INFO info;
auto result= replace_row(table, keynum, &info, invoke_triggers,
m_vers_from_plain &&
m_table->versioned(VERS_TIMESTAMP));

if (result.updated)
{
switch (result.error)
{
case HA_ERR_RECORD_IS_THE_SAME:
DBUG_PRINT("info", ("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
" ha_update_row()"));
result.error= 0;
// fall through
case 0:
break;

default:
DBUG_PRINT("info", ("ha_update_row() returns error %d", result.error));
table->file->print_error(result.error, MYF(0));
}
DBUG_RETURN(result.error);
}
if (result.error)
{
table->file->print_error(result.error, MYF(0));
DBUG_RETURN(result.error);
}
if (result.before_trg_error || result.after_trg_error)
DBUG_RETURN(HA_ERR_GENERIC);
if (error)
table->file->print_error(error, MYF(0));

/* Will retry ha_write_row() with the offending row removed. */
DBUG_RETURN(error);
}

if (invoke_triggers &&
Expand Down
9 changes: 6 additions & 3 deletions sql/sql_class.h
Expand Up @@ -5942,20 +5942,23 @@ class select_dump :public select_to_file {
int send_data(List<Item> &items);
};

class Write_record; // see sql_insert.h


class select_insert :public select_result_interceptor {
public:
select_result *sel_result;
TABLE_LIST *table_list;
TABLE *table;
List<Item> *fields;
Write_record *write;
ulonglong autoinc_value_of_last_inserted_row; // autogenerated or not
COPY_INFO info;
bool insert_into_view;
select_insert(THD *thd_arg, TABLE_LIST *table_list_par, TABLE *table_par,
List<Item> *fields_par, List<Item> *update_fields,
List<Item> *update_values, enum_duplicates duplic,
bool ignore, select_result *sel_ret_list);
bool ignore, select_result *sel_ret_list, Write_record *write);
~select_insert();
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
virtual int prepare2(JOIN *join);
Expand Down Expand Up @@ -5989,9 +5992,9 @@ class select_create: public select_insert {
Table_specification_st *create_info_par,
Alter_info *alter_info_arg,
List<Item> &select_fields,enum_duplicates duplic, bool ignore,
TABLE_LIST *select_tables_arg):
TABLE_LIST *select_tables_arg, Write_record *write):
select_insert(thd_arg, table_arg, NULL, &select_fields, 0, 0, duplic,
ignore, NULL),
ignore, NULL, write),
create_table(table_arg),
create_info(create_info_par),
select_tables(select_tables_arg),
Expand Down

0 comments on commit ba44c4a

Please sign in to comment.