Skip to content

Commit

Permalink
MDEV-30046 Refactor and fix idempotent replication
Browse files Browse the repository at this point in the history
Idempotent write_row works same as REPLACE: if there is a duplicating
record in the table, then it will be deleted and re-inserted, with the
same update optimization.

The code in Rows:log_event::write_row was basically copy-pasted from
write_record.

What's done:
REPLACE operation was unified across replication and sql. It is now
representred as two methods: locate and replace.

* locate_dup_record: find the conflicting record and save in in record[1].
  locating through dup_ref is used whenever possible. Locating by the key
  is now done though lookup_handler, if it exists. The reason is that
  duplicate error could be on the normal key, but RND could be inited.
* TABLE::replace_row: delete or update the conflicting row. It does not
  re-insert allowing to continue to general retry-loop of insertion.
  The result is described by replace_execution_result structure,
  containing the type of execution path chosen and a kind of error,
  allowing to treat it differently.
* Unify replace initialization step across implementations:
  add prepare_for_replace and finalize_replace

Drawbacks:
alloca is removed in favor of simple allocation. Anyway using alloca makes it
harder to catch stack overflows related to maximally long
fields. A simple stack allocation of MAX_KEY could be used, but better not
to abuse the stack.
Now there is a memory leak with the statement duration. It'll be fixed a
bit later, in the next refactoring.
  • Loading branch information
FooBarrior committed Mar 28, 2024
1 parent a057a08 commit ae40f27
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 347 deletions.
35 changes: 35 additions & 0 deletions mysql-test/main/long_unique_bugs_replication.result
Expand Up @@ -21,6 +21,41 @@ binlog 'aRf2ZA8BAAAA/AAAAAABAAAAAAQAMTAuNS4xNS1NYXJpYURCLWxvZwAAAAAAAAAAAAAAAAAA
binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw==';
binlog 'bBf2ZBMBAAAANAAAAJgHAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AeqMD4A==bBf2ZBcBAAAANAAAAMwHAAAAAHEAAAAAAAEABP/wj6QAAAEAYgEAZa6/VU0JAAAANteqUw==';
binlog 'bBf2ZBMBAAAANAAAAHUkAAAAAHEAAAAAAAEABHRlc3QAAnQxAAQDDw8IBP0C4h0AaTGFIg==bBf2ZBgBAAAASAAAAL0kAAAAAHEAAAAAAAEABP//8I+kAAABAGIBAGWuv1VNCQAAAPBuWwAAAQBiAQBlrr9VTQkAAADxS9Lu';
select * from t1;
i1 a1
1 d
2 d2
connection master;
drop table t1;
connection slave;
connection master;
# Idempotent scenario, which triggers REPLACE code to be used in the
# event, i.e. duplicated record will be deleted and then re-inserted.
create table t1 (i1 int, a1 text, unique key i1 (a1)) engine=myisam;
connection slave;
connection slave;
set @save_slave_exec_mode= @@slave_exec_mode;
set global slave_exec_mode = idempotent;
insert into t1 values (1,1);
connection master;
insert into t1 values (2,1);
connection slave;
connection slave;
select * from t1;
i1 a1
2 1
connection master;
insert into t1 values (3,3);
update t1 set a1 = 'd' limit 1;
update t1 set a1 = 'd2' where i1= 3;
connection slave;
connection slave;
select * from t1;
i1 a1
2 d
3 d2
set global slave_exec_mode = @save_slave_exec_mode;
connection master;
drop table t1;
set global slave_exec_mode=default;
#
Expand Down
38 changes: 37 additions & 1 deletion mysql-test/main/long_unique_bugs_replication.test
Expand Up @@ -16,10 +16,15 @@ update t1 set a1 = 'd' limit 1;
update t1 set a1 = 'd2' where i1= 2;

sync_slave_with_master;

connection slave;
select * from t1;
connection master;
drop table t1;

sync_slave_with_master;
connection master;


--echo #
--echo # MDEV-32093 long uniques break old->new replication
--echo #
Expand All @@ -46,4 +51,35 @@ set global slave_exec_mode=default;
--echo #
--echo # End of 10.4 tests
--echo #

--echo # Idempotent scenario, which triggers REPLACE code to be used in the
--echo # event, i.e. duplicated record will be deleted and then re-inserted.
create table t1 (i1 int, a1 text, unique key i1 (a1)) engine=myisam;

sync_slave_with_master;
connection slave;
set @save_slave_exec_mode= @@slave_exec_mode;
set global slave_exec_mode = idempotent;
insert into t1 values (1,1);
connection master;
insert into t1 values (2,1);
sync_slave_with_master;
connection slave;
select * from t1;
connection master;
insert into t1 values (3,3);
update t1 set a1 = 'd' limit 1;
update t1 set a1 = 'd2' where i1= 3;
sync_slave_with_master;

connection slave;
select * from t1;
set global slave_exec_mode = @save_slave_exec_mode;
connection master;
drop table t1;

--echo #
--echo # End of 10.4 tests
--echo #

--source include/rpl_end.inc
6 changes: 0 additions & 6 deletions sql/handler.cc
Expand Up @@ -4549,12 +4549,6 @@ uint handler::get_dup_key(int error)
DBUG_RETURN(errkey);
}

bool handler::has_dup_ref() const
{
DBUG_ASSERT(lookup_errkey != (uint)-1 || errkey != (uint)-1);
return ha_table_flags() & HA_DUPLICATE_POS || lookup_errkey != (uint)-1;
}


/**
Delete all files with extension from bas_ext().
Expand Down
7 changes: 3 additions & 4 deletions sql/handler.h
Expand Up @@ -3068,12 +3068,12 @@ class handler :public Sql_alloc
Table_flags cached_table_flags; /* Set on init() and open() */

ha_rows estimation_rows_to_insert;
handler *lookup_handler;
public:
handlerton *ht; /* storage engine of this handler */
uchar *ref; /* Pointer to current row */
uchar *dup_ref; /* Pointer to duplicate row */
uchar *lookup_buffer;
handler *lookup_handler;

ha_statistics stats;

Expand Down Expand Up @@ -3268,8 +3268,8 @@ class handler :public Sql_alloc
handler(handlerton *ht_arg, TABLE_SHARE *share_arg)
:table_share(share_arg), table(0),
estimation_rows_to_insert(0),
lookup_handler(this),
ht(ht_arg), ref(0), lookup_buffer(NULL), end_range(NULL),
ht(ht_arg), ref(0), lookup_buffer(NULL), lookup_handler(this),
end_range(NULL),
implicit_emptied(0),
mark_trx_read_write_done(0),
check_table_binlog_row_based_done(0),
Expand Down Expand Up @@ -3464,7 +3464,6 @@ class handler :public Sql_alloc
virtual void print_error(int error, myf errflag);
virtual bool get_error_message(int error, String *buf);
uint get_dup_key(int error);
bool has_dup_ref() const;
/**
Retrieves the names of the table and the key for which there was a
duplicate entry in the case of HA_ERR_FOREIGN_DUPLICATE_KEY.
Expand Down
171 changes: 43 additions & 128 deletions sql/log_event_server.cc
Expand Up @@ -5659,7 +5659,10 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
master_had_triggers= table->master_had_triggers;
bool transactional_table= table->file->has_transactions_and_rollback();
table->file->prepare_for_insert(get_general_type_code() != WRITE_ROWS_EVENT);
this->slave_exec_mode= slave_exec_mode_options; // fix the mode

table->file->prepare_for_insert(get_general_type_code() != WRITE_ROWS_EVENT
|| slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);

/*
table == NULL means that this table should not be replicated
Expand Down Expand Up @@ -5705,8 +5708,6 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
&m_cols_ai : &m_cols);
bitmap_intersect(table->write_set, after_image);

this->slave_exec_mode= slave_exec_mode_options; // fix the mode

// Do event specific preparations
error= do_before_row_operations(rli);

Expand Down Expand Up @@ -7130,6 +7131,11 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
m_table->mark_auto_increment_column(true);
}

if (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT &&
(m_table->file->ha_table_flags() & HA_DUPLICATE_POS ||
m_table->s->long_unique_table))
error= m_table->file->ha_rnd_init_with_error(0);

return error;
}

Expand Down Expand Up @@ -7171,7 +7177,15 @@ Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *
{
m_table->file->print_error(local_error, MYF(0));
}
return error? error : local_error;
int rnd_error= 0;
if (m_table->file->inited)
{
DBUG_ASSERT(slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
DBUG_ASSERT(m_table->file->ha_table_flags() & HA_DUPLICATE_POS ||
m_table->s->long_unique_table);
rnd_error= m_table->file->ha_rnd_end();
}
return error? error : local_error ? local_error : rnd_error;
}

bool Rows_log_event::process_triggers(trg_event_type event,
Expand All @@ -7194,17 +7208,6 @@ bool Rows_log_event::process_triggers(trg_event_type event,

DBUG_RETURN(result);
}
/*
Check if there are more UNIQUE keys after the given key.
*/
static int
last_uniq_key(TABLE *table, uint keyno)
{
while (++keyno < table->s->keys)
if (table->key_info[keyno].flags & HA_NOSAME)
return 0;
return 1;
}

/**
Check if an error is a duplicate key error.
Expand Down Expand Up @@ -7369,67 +7372,10 @@ Rows_log_event::write_row(rpl_group_info *rgi,
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
/*
We need to retrieve the old row into record[1] to be able to
either update or delete the offending record. We either:
- use rnd_pos() with a row-id (available as dupp_row) to the
offending row, if that is possible (MyISAM and Blackhole), or else
- use index_read_idx() with the key that is duplicated, to
retrieve the offending row.
*/
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
DBUG_PRINT("info",("Locating offending record using rnd_pos()"));

if ((error= table->file->ha_rnd_init_with_error(0)))
{
DBUG_RETURN(error);
}

error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
if (unlikely(error))
{
DBUG_PRINT("info",("rnd_pos() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
table->file->ha_rnd_end();
}
else
{
DBUG_PRINT("info",("Locating offending record using index_read_idx()"));

if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
{
DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
DBUG_RETURN(my_errno);
}

if (key.get() == NULL)
{
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
if (key.get() == NULL)
{
DBUG_PRINT("info",("Can't allocate key buffer"));
DBUG_RETURN(ENOMEM);
}
}

key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
0);
error= table->file->ha_index_read_idx_map(table->record[1], keynum,
(const uchar*)key.get(),
HA_WHOLE_KEY,
HA_READ_KEY_EXACT);
if (unlikely(error))
{
DBUG_PRINT("info",("index_read_idx() returns %s", HA_ERR(error)));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
}
error= locate_dup_record(thd, table, m_key, keynum);
if (unlikely(error))
DBUG_RETURN(error);

/*
Now, record[1] should contain the offending row. That
Expand Down Expand Up @@ -7460,69 +7406,38 @@ Rows_log_event::write_row(rpl_group_info *rgi,
DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);

/*
REPLACE is defined as either INSERT or DELETE + INSERT. If
possible, we can replace it with an UPDATE, but that will not
work on InnoDB if FOREIGN KEY checks are necessary.
I (Matz) am not sure of the reason for the last_uniq_key()
check as, but I'm guessing that it's something along the
following lines.
Suppose that we got the duplicate key to be a key that is not
the last unique key for the table and we perform an update:
then there might be another key for which the unique check will
fail, so we're better off just deleting the row and inserting
the correct row.
Additionally we don't use UPDATE if rbr triggers should be invoked -
when triggers are used we want a simple and predictable execution path.
*/
if (last_uniq_key(table, keynum) && !invoke_triggers &&
!table->file->referenced_by_foreign_key())
{
DBUG_PRINT("info",("Updating row using ha_update_row()"));
error= table->file->ha_update_row(table->record[1],
table->record[0]);
switch (error) {
COPY_INFO info;
auto result= replace_row(table, keynum, &info, invoke_triggers,
m_vers_from_plain &&
m_table->versioned(VERS_TIMESTAMP));

if (result.updated)
{
switch (result.error)
{
case HA_ERR_RECORD_IS_THE_SAME:
DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
" ha_update_row()"));
error= 0;

DBUG_PRINT("info", ("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
" ha_update_row()"));
result.error= 0;
// fall through
case 0:
break;

default:
DBUG_PRINT("info",("ha_update_row() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_PRINT("info", ("ha_update_row() returns error %d", result.error));
table->file->print_error(result.error, MYF(0));
}

DBUG_RETURN(error);
DBUG_RETURN(result.error);
}
else
if (result.error)
{
DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
if (invoke_triggers &&
unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE,
TRUE)))
error= HA_ERR_GENERIC; // in case if error is not set yet
else
{
if (unlikely((error= table->file->ha_delete_row(table->record[1]))))
{
DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
if (invoke_triggers &&
unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_AFTER,
TRUE)))
DBUG_RETURN(HA_ERR_GENERIC); // in case if error is not set yet
}
/* Will retry ha_write_row() with the offending row removed. */
table->file->print_error(result.error, MYF(0));
DBUG_RETURN(result.error);
}
if (result.before_trg_error || result.after_trg_error)
DBUG_RETURN(HA_ERR_GENERIC);

/* Will retry ha_write_row() with the offending row removed. */
}

if (invoke_triggers &&
Expand Down

0 comments on commit ae40f27

Please sign in to comment.