Skip to content

Commit

Permalink
MDEV-16329 [2/5] refactor binlog and cache_mngr
Browse files Browse the repository at this point in the history
pump up binlog and cache manager to level of binlog_log_row_internal
  • Loading branch information
FooBarrior authored and vuvova committed Aug 15, 2023
1 parent 0dfbb05 commit 429f635
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 136 deletions.
18 changes: 15 additions & 3 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7249,16 +7249,28 @@ int handler::binlog_log_row(TABLE *table,
const uchar *after_record,
Log_func *log_func)
{
bool error;
THD *thd= table->in_use;
DBUG_ENTER("binlog_log_row");

if (!thd->binlog_table_maps &&
thd->binlog_write_table_maps())
DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);

error= (*log_func)(thd, table, row_logging_has_trans,
before_record, after_record);
DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
DBUG_ASSERT((WSREP_NNULL(thd) && wsrep_emulate_bin_log)
|| mysql_bin_log.is_open());

auto *cache_mngr= thd->binlog_setup_trx_data();
if (cache_mngr == NULL)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);

bool is_trans= row_logging_has_trans;
/* Ensure that all events in a GTID group are in the same cache */
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;

bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
is_trans, before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
}

Expand Down
11 changes: 5 additions & 6 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,11 @@ given at all. */
#define HA_CREATE_PRINT_ALL_OPTIONS (1UL << 26)

typedef ulonglong alter_table_operations;
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);

class MYSQL_BIN_LOG;
class binlog_cache_mngr;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
const uchar*, const uchar*);

/*
These flags are set by the parser and describes the type of
Expand Down Expand Up @@ -5623,11 +5627,6 @@ inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRIN
return ((lower_case_table_names == 2 && info->alias.str) ? &info->alias : name);
}

typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);
int binlog_log_row(TABLE* table,
const uchar *before_record,
const uchar *after_record,
Log_func *log_func);

/**
@def MYSQL_TABLE_IO_WAIT
Expand Down
53 changes: 37 additions & 16 deletions sql/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
else
cache_mngr->trx_cache.restore_prev_position();

DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL);
DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
DBUG_RETURN(error);
}

Expand Down Expand Up @@ -6309,30 +6309,51 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
is @c true, the pending event is returned from the transactional cache.
Otherwise from the non-transactional cache.
@param is_transactional @c true indicates a transactional cache,
@param cache_mngr cache manager to return pending row from
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional.
@return
The row event if any.
*/
Rows_log_event*
THD::binlog_get_pending_rows_event(bool is_transactional) const
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
Rows_log_event* rows= NULL;
binlog_cache_mngr *const cache_mngr= binlog_get_cache_mngr();

/*
This is less than ideal, but here's the story: If there is no cache_mngr,
prepare_pending_rows_event() has never been called (since the cache_mngr
is set up there). In that case, we just return NULL.
*/
if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
}

int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
if (pending)
{
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
thd->reset_binlog_for_next_statement();
}

rows= cache_data->pending();
error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
is_transactional);
}
return (rows);
return error;
}

/**
Expand All @@ -6342,18 +6363,18 @@ THD::binlog_get_pending_rows_event(bool is_transactional) const
into the non-transactional cache.
@param evt a pointer to the row event.
@param is_transactional @c true indicates a transactional cache,
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional.
*/
void
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional)
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
{
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();

DBUG_ASSERT(cache_mngr);

binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
cache_mngr->get_binlog_cache_data(use_trans_cache);

cache_data->set_pending(ev);
}
Expand Down Expand Up @@ -6402,18 +6423,18 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: %p", event));

binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();

DBUG_ASSERT(cache_mngr);

bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));
cache_mngr->get_binlog_cache_data(should_use_trans_cache);

DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));

Expand Down Expand Up @@ -6442,7 +6463,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending;
}

thd->binlog_set_pending_rows_event(event, is_transactional);
thd->binlog_set_pending_rows_event(event, should_use_trans_cache);

DBUG_RETURN(0);
}
Expand Down
8 changes: 8 additions & 0 deletions sql/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
#if !defined(MYSQL_CLIENT)

int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
bool is_transactional);
int remove_pending_rows_event(THD *thd, bool is_transactional);

Expand Down Expand Up @@ -1175,6 +1176,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd);
bool write_annotated_row(THD *thd);
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);

extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
extern handlerton *binlog_hton;
Expand Down
13 changes: 10 additions & 3 deletions sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -4926,13 +4926,16 @@ class Write_rows_log_event : public Rows_log_event
#endif
#if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_write_row(table, is_transactional, after_record);
return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional,
after_record);
}
#endif

Expand Down Expand Up @@ -5009,12 +5012,14 @@ class Update_rows_log_event : public Rows_log_event

#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_update_row(table, is_transactional,
return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional,
before_record, after_record);
}
#endif
Expand Down Expand Up @@ -5098,13 +5103,15 @@ class Delete_rows_log_event : public Rows_log_event
#endif
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool is_transactional,
const uchar *before_record,
const uchar *after_record
__attribute__((unused)))
{
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
return thd->binlog_delete_row(table, is_transactional,
return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional,
before_record);
}
#endif
Expand Down
Loading

0 comments on commit 429f635

Please sign in to comment.