Skip to content

Commit

Permalink
MDEV-16329 [3/5] use binlog_cache_data directly in most places
Browse files Browse the repository at this point in the history
* Eliminate most usages of THD::use_trans_table. Only 3 left, and they are
  at quite high levels, and really essential.
* Eliminate is_transactional argument when possible. Lots of places are
  left though, because of some WSREP error handling in
  MYSQL_BIN_LOG::set_write_error.
* Remove junk binlog functions from THD
* binlog_prepare_pending_rows_event is moved to log.cc inside MYSQL_BIN_LOG
  and is not anymore template. Instead it accepls event factory with a type
  code, and a callback to a constructing function in it.
  • Loading branch information
FooBarrior authored and vuvova committed Aug 15, 2023
1 parent 429f635 commit 6427e34
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 238 deletions.
4 changes: 3 additions & 1 deletion sql/handler.cc
Expand Up @@ -7269,7 +7269,9 @@ int handler::binlog_log_row(TABLE *table,
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;

bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
auto *cache= binlog_get_cache_data(cache_mngr, use_trans_cache(thd, is_trans));

bool error= (*log_func)(thd, table, &mysql_bin_log, cache,
is_trans, before_record, after_record);
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
}
Expand Down
4 changes: 2 additions & 2 deletions sql/handler.h
Expand Up @@ -656,8 +656,8 @@ given at all. */
typedef ulonglong alter_table_operations;

class MYSQL_BIN_LOG;
class binlog_cache_mngr;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
class binlog_cache_data;
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_data *, bool,
const uchar*, const uchar*);

/*
Expand Down
173 changes: 110 additions & 63 deletions sql/log.cc
Expand Up @@ -2055,29 +2055,31 @@ static int
binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
{
DBUG_ENTER("binlog_truncate_trx_cache");

if(!WSREP_EMULATE_BINLOG_NNULL(thd) && !mysql_bin_log.is_open())
DBUG_RETURN(0);

int error=0;
/*
This function handles transactional changes and as such this flag
equals to true.
*/
bool const is_transactional= TRUE;

DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s",
FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
all ? "all" : "stmt"));

thd->binlog_remove_pending_rows_event(TRUE, is_transactional);
auto &trx_cache= cache_mngr->trx_cache;
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &trx_cache);
thd->reset_binlog_for_next_statement();

/*
If rolling back an entire transaction or a single statement not
inside a transaction, we reset the transaction cache.
*/
if (ending_trans(thd, all))
{
if (cache_mngr->trx_cache.has_incident())
if (trx_cache.has_incident())
error= mysql_bin_log.write_incident(thd);

thd->reset_binlog_for_next_statement();
DBUG_ASSERT(thd->binlog_table_maps == 0);

cache_mngr->reset(false, true);
}
Expand All @@ -2086,9 +2088,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
transaction cache to remove the statement.
*/
else
cache_mngr->trx_cache.restore_prev_position();
trx_cache.restore_prev_position();

DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
DBUG_ASSERT(trx_cache.pending() == NULL);
DBUG_RETURN(error);
}

Expand Down Expand Up @@ -2406,7 +2408,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
thd->reset_binlog_for_next_statement();
DBUG_RETURN(error);
}
if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
if (!wsrep_emulate_bin_log && MYSQL_BIN_LOG::check_write_error(thd))
{
/*
"all == true" means that a "rollback statement" triggered the error and
Expand Down Expand Up @@ -2458,12 +2460,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)

void binlog_reset_cache(THD *thd)
{
binlog_cache_mngr *const cache_mngr= opt_bin_log ?
binlog_cache_mngr *const cache_mngr= opt_bin_log ?
thd->binlog_get_cache_mngr() : 0;
DBUG_ENTER("binlog_reset_cache");
if (cache_mngr)
{
thd->binlog_remove_pending_rows_event(TRUE, TRUE);
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache);
thd->reset_binlog_for_next_statement();
cache_mngr->reset(true, true);
}
DBUG_VOID_RETURN;
Expand Down Expand Up @@ -6318,30 +6321,31 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
DBUG_ASSERT(cache_mngr);
Rows_log_event* rows= NULL;

/*
This is less than ideal, but here's the story: If there is no cache_mngr,
prepare_pending_rows_event() has never been called (since the cache_mngr
is set up there). In that case, we just return NULL.
*/
if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
}

binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
return cache_mngr->get_binlog_cache_data(use_trans_cache);
}

int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
bool is_transactional,
MYSQL_BIN_LOG *bin_log,
binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
binlog_cache_data *cache_data)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
auto *pending= cache_data->pending();
if (pending)
{
if (stmt_end)
Expand All @@ -6350,36 +6354,12 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
thd->reset_binlog_for_next_statement();
}

error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_data,
is_transactional);
}
return error;
}

/**
This function stores a pending row event into a cache which is specified
through the parameter @c is_transactional. Respectively, when it is @c
true, the pending event is stored into the transactional cache. Otherwise
into the non-transactional cache.
@param evt a pointer to the row event.
@param use_trans_cache @c true indicates a transactional cache,
otherwise @c false a non-transactional.
*/
void
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
{
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();

DBUG_ASSERT(cache_mngr);

binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache);

cache_data->set_pending(ev);
}


/**
This function removes the pending rows event, discarding any outstanding
rows. If there is no pending rows event available, this is effectively a
Expand All @@ -6390,17 +6370,10 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
otherwise @c false a non-transactional.
*/
int
MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data)
{
DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event");

binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();

DBUG_ASSERT(cache_mngr);

binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));

if (Rows_log_event* pending= cache_data->pending())
{
delete pending;
Expand All @@ -6414,6 +6387,7 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
Moves the last bunch of rows from the pending Rows event to a cache (either
transactional cache if is_transaction is @c true, or the non-transactional
cache otherwise. Sets a new pending event.
In case of error during flushing, sets write_error=1 to itself.
@param thd a pointer to the user thread.
@param evt a pointer to the row event.
Expand All @@ -6423,19 +6397,13 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event,
binlog_cache_mngr *cache_mngr,
binlog_cache_data *cache_data,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: %p", event));

DBUG_ASSERT(cache_mngr);

bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(should_use_trans_cache);

DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));

if (Rows_log_event* pending= cache_data->pending())
Expand Down Expand Up @@ -6463,11 +6431,89 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending;
}

thd->binlog_set_pending_rows_event(event, should_use_trans_cache);
cache_data->set_pending(event);

DBUG_RETURN(0);
}

/*
Member function for ensuring that there is an rows log
event of the apropriate type before proceeding.
POST CONDITION:
If a non-NULL pointer is returned, the pending event for thread 'thd' will
be an event created by callback hold by event_factory, and
will be either empty or have enough space to hold 'needed' bytes.
In addition, the columns bitmap will be correct for the row, meaning that
the pending event will be flushed if the columns in the event differ from
the columns suppled to the function.
RETURNS
If no error, a non-NULL pending event (either one which already existed or
the newly created one).
If error, NULL.
*/

Rows_log_event*
MYSQL_BIN_LOG::prepare_pending_rows_event(THD *thd, TABLE* table,
binlog_cache_data *cache_data,
uint32 serv_id, size_t needed,
bool is_transactional,
Rows_event_factory event_factory)
{
DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event");
/* Pre-conditions */
DBUG_ASSERT(table->s->table_map_id != ~0UL);

/*
There is no good place to set up the transactional data, so we
have to do it here.
*/
Rows_log_event* pending= cache_data->pending();

if (unlikely(pending && !pending->is_valid()))
DBUG_RETURN(NULL);

/*
Check if the current event is non-NULL and a write-rows
event. Also check if the table provided is mapped: if it is not,
then we have switched to writing to a new table.
If there is no pending event, we need to create one. If there is a pending
event, but it's not about the same table id, or not of the same type
(between Write, Update and Delete), or not the same affected columns, or
going to be too big, flush this event to disk and create a new pending
event.
*/
if (!pending ||
pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id ||
pending->get_general_type_code() != event_factory.type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->read_write_bitmaps_cmp(table) == FALSE)
{
/* Create a new RowsEventT... */
Rows_log_event* const
ev= event_factory.create(thd, table, table->s->table_map_id,
is_transactional);
if (unlikely(!ev))
DBUG_RETURN(NULL);
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
/*
flush the pending event and replace it with the newly created
event...
*/
if (unlikely(flush_and_set_pending_rows_event(thd, ev, cache_data,
is_transactional)))
{
delete ev;
DBUG_RETURN(NULL);
}

DBUG_RETURN(ev); /* This is the new pending event */
}
DBUG_RETURN(pending); /* This is the current pending event */
}


/* Generate a new global transaction ID, and write it to the binlog */

Expand Down Expand Up @@ -12108,7 +12154,8 @@ void wsrep_thd_binlog_stmt_rollback(THD * thd)
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
if (cache_mngr)
{
thd->binlog_remove_pending_rows_event(TRUE, TRUE);
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache);
thd->reset_binlog_for_next_statement();
cache_mngr->stmt_cache.reset();
}
DBUG_VOID_RETURN;
Expand Down

0 comments on commit 6427e34

Please sign in to comment.