Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
no functional changes here
  • Loading branch information
vuvova committed Aug 15, 2023
1 parent a8a22b7 commit 0b67af5
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 100 deletions.
109 changes: 37 additions & 72 deletions sql/log.cc
Expand Up @@ -104,8 +104,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc);

int binlog_online_alter_commit(THD *thd, bool all);
void binlog_online_alter_rollback(THD *thd, bool all);
static int binlog_online_alter_commit(THD *thd, bool all);
static void binlog_online_alter_rollback(THD *thd, bool all);

static const LEX_CSTRING write_error_msg=
{ STRING_WITH_LEN("error writing to the binary log") };
Expand Down Expand Up @@ -2271,12 +2271,11 @@ binlog_online_alter_cleanup(ilist<binlog_cache_mngr> &list,
auto it= list.begin();
while (it != list.end())
{
auto &cache= *it;
it++;
list.remove(cache);
auto &cache= *it++;
cache.~binlog_cache_mngr();
my_free(&cache);
}
list.clear();
DBUG_ASSERT(list.empty());
}
}
Expand Down Expand Up @@ -2649,18 +2648,13 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
#ifdef WITH_WSREP
/* for streaming replication, we must replicate savepoint rollback so that
slaves can maintain SR transactions
For streaming replication, we must replicate savepoint rollback so that
slaves can maintain SR transactions
*/
if (unlikely(thd->wsrep_trx().is_streaming() ||
(trans_has_updated_non_trans_table(thd)) ||
(thd->variables.option_bits & OPTION_BINLOG_THIS_TRX)))
#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_BINLOG_THIS_TRX)))
#endif /* WITH_WSREP */
if (IF_WSREP(thd->wsrep_trx().is_streaming(),0) ||
trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_BINLOG_THIS_TRX))
{
char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin);
Expand Down Expand Up @@ -3801,40 +3795,20 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
}


bool Event_log::open(const char *log_name,
const char *new_name, ulong next_file_number,
enum cache_type io_cache_type_arg)
bool Event_log::open(enum cache_type io_cache_type_arg)
{
bool error= false;
if (log_name || new_name)
{
error= MYSQL_LOG::open(
#ifdef HAVE_PSI_INTERFACE
0,
#endif
log_name, LOG_NORMAL, new_name, next_file_number, io_cache_type_arg);
}
else
{
#ifdef HAVE_PSI_INTERFACE
/* Keep the key for reopen */
m_log_file_key= 0;
#endif
error= init_io_cache(&log_file, -1, LOG_BIN_IO_SIZE,
io_cache_type_arg, 0, 0,
MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL));
bool error= init_io_cache(&log_file, -1, LOG_BIN_IO_SIZE, io_cache_type_arg,
0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL));

log_state= LOG_OPENED;
inited= true;
}
log_state= LOG_OPENED;
inited= true;
if (error)
return error;

longlong bytes_written= write_description_event(
(enum_binlog_checksum_alg)binlog_checksum_options,
encrypt_binlog, false, false);
error= bytes_written < 0;
return error;
encrypt_binlog, true, false);
return bytes_written < 0;
}

longlong
Expand Down Expand Up @@ -6446,11 +6420,7 @@ Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache)
{
DBUG_ASSERT(cache_mngr);
Rows_log_event* rows= NULL;

if (cache_mngr)
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
return rows;
return cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
}

binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
Expand All @@ -6464,14 +6434,14 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
Event_log *bin_log,
binlog_cache_data *cache_data)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
auto *pending= cache_data->pending();
if (pending)
{
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
Expand Down Expand Up @@ -6519,10 +6489,9 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data
otherwise @c false a non-transactional.
*/
int
Event_log::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event,
binlog_cache_data *cache_data,
bool is_transactional)
Event_log::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
binlog_cache_data *cache_data,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || is_open());
Expand Down Expand Up @@ -6580,10 +6549,10 @@ Event_log::flush_and_set_pending_rows_event(THD *thd,

Rows_log_event*
Event_log::prepare_pending_rows_event(THD *thd, TABLE* table,
binlog_cache_data *cache_data,
uint32 serv_id, size_t needed,
bool is_transactional,
Rows_event_factory event_factory)
binlog_cache_data *cache_data,
uint32 serv_id, size_t needed,
bool is_transactional,
Rows_event_factory event_factory)
{
DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event");
/* Pre-conditions */
Expand Down Expand Up @@ -7685,7 +7654,8 @@ class CacheWriter: public Log_event_writer
bool first;
};

int cache_copy(IO_CACHE *to, IO_CACHE *from)
#ifdef HAVE_REPLICATION
static int cache_copy(IO_CACHE *to, IO_CACHE *from)
{
DBUG_ENTER("cache_copy");
if (reinit_io_cache(from, READ_CACHE, 0, 0, 0))
Expand All @@ -7704,10 +7674,11 @@ int cache_copy(IO_CACHE *to, IO_CACHE *from)

DBUG_RETURN(0);
}
#endif

int binlog_online_alter_commit(THD *thd, bool all)
static int binlog_online_alter_commit(THD *thd, bool all)
{
DBUG_ENTER("online_alter_commit");
DBUG_ENTER("binlog_online_alter_commit");
int error= 0;
#ifdef HAVE_REPLICATION

Expand All @@ -7721,14 +7692,8 @@ int binlog_online_alter_commit(THD *thd, bool all)
auto *binlog= cache_mngr.share->online_alter_binlog;
DBUG_ASSERT(binlog);

error= binlog_flush_pending_rows_event(thd,
/*
do not set STMT_END for last event
to leave table open in altering thd
*/
false,
true,
binlog,
// do not set STMT_END for last event to leave table open in altering thd
error= binlog_flush_pending_rows_event(thd, false, true, binlog,
is_ending_transaction
? &cache_mngr.trx_cache
: &cache_mngr.stmt_cache);
Expand Down Expand Up @@ -7766,7 +7731,7 @@ int binlog_online_alter_commit(THD *thd, bool all)
DBUG_RETURN(error);
}

void binlog_online_alter_rollback(THD *thd, bool all)
static void binlog_online_alter_rollback(THD *thd, bool all)
{
#ifdef HAVE_REPLICATION
bool is_ending_trans= ending_trans(thd, all);
Expand Down
9 changes: 3 additions & 6 deletions sql/log.h
Expand Up @@ -419,10 +419,7 @@ class Event_log: public MYSQL_LOG
MY_MUTEX_INIT_SLOW);
}

bool open(
const char *log_name,
const char *new_name, ulong next_file_number,
enum cache_type io_cache_type_arg);
bool open(enum cache_type io_cache_type_arg);
virtual IO_CACHE *get_log_file() { return &log_file; }

longlong write_description_event(enum_binlog_checksum_alg checksum_alg,
Expand All @@ -445,7 +442,7 @@ class Event_log: public MYSQL_LOG
TODO should be unnecessary after MDEV-24676 is done
*/
class Cache_flip_event_log: public Event_log {
class Cache_flip_event_log: public Event_log, public Sql_alloc {
IO_CACHE alt_buf;
IO_CACHE *current, *alt;
public:
Expand All @@ -456,7 +453,7 @@ class Cache_flip_event_log: public Event_log {
{
log_file.dir= mysql_tmpdir;
alt_buf.dir= log_file.dir;
bool res= Event_log::open(NULL, NULL, 0, io_cache_type_arg);
bool res= Event_log::open(io_cache_type_arg);
if (res)
return res;

Expand Down
4 changes: 2 additions & 2 deletions sql/rpl_rli.h
Expand Up @@ -950,8 +950,8 @@ struct rpl_group_info
if (ptr->table == table_arg)
{
auto *rpl_table_list= static_cast<RPL_TABLE_LIST*>(ptr);
if (rpl_table_list->m_tabledef_valid)
*tabledef_var= &rpl_table_list->m_tabledef;
DBUG_ASSERT(rpl_table_list->m_tabledef_valid);
*tabledef_var= &rpl_table_list->m_tabledef;
*conv_table_var= rpl_table_list->m_conv_table;
*copy= rpl_table_list->m_online_alter_copy_fields;
*copy_end= rpl_table_list->m_online_alter_copy_fields_end;
Expand Down
27 changes: 7 additions & 20 deletions sql/sql_table.cc
Expand Up @@ -2001,7 +2001,7 @@ bool log_drop_table(THD *thd, const LEX_CSTRING *db_name,
in the binary log. We log this for non temporary tables, as the slave
may use a filter to ignore queries for a specific database.
*/
error= thd->binlog_query(THD::STMT_QUERY_TYPE,
error= thd->binlog_query(THD::STMT_QUERY_TYPE,
query.ptr(), query.length(),
FALSE, FALSE, temporary_table, 0) > 0;
}
Expand Down Expand Up @@ -10025,8 +10025,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
has been already processed.
*/
table_list->required_type= TABLE_TYPE_NORMAL;



if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_SHARED
|| alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE
|| thd->locked_tables_mode == LTM_LOCK_TABLES
Expand Down Expand Up @@ -10132,8 +10131,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
a new one if needed.
*/
table->s->tdc->flushed= 1; // Force close of all instances
if (thd->mdl_context.upgrade_shared_lock(mdl_ticket,
MDL_EXCLUSIVE,
if (thd->mdl_context.upgrade_shared_lock(mdl_ticket, MDL_EXCLUSIVE,
thd->variables.lock_wait_timeout))
DBUG_RETURN(1);
quick_rm_table(thd, table->file->ht, &table_list->db,
Expand All @@ -10142,8 +10140,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
goto end_inplace;
}
if (!if_exists &&
(table->file->partition_ht()->flags &
HTON_TABLE_MAY_NOT_EXIST_ON_SLAVE))
(table->file->partition_ht()->flags & HTON_TABLE_MAY_NOT_EXIST_ON_SLAVE))
{
/*
Table is a shared table that may not exist on the slave.
Expand Down Expand Up @@ -11582,9 +11579,7 @@ static void online_alter_cleanup_binlog(THD *thd, TABLE_SHARE *s)
#ifdef HAVE_REPLICATION
if (!s->online_alter_binlog)
return;
// s->online_alter_binlog->reset_logs(thd, false, NULL, 0, 0);
s->online_alter_binlog->cleanup();
s->online_alter_binlog->~Cache_flip_event_log();
s->online_alter_binlog= NULL;
#endif
}
Expand Down Expand Up @@ -11631,18 +11626,14 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
#ifdef HAVE_REPLICATION
if (online)
{
void *buf= alloc_root(thd->mem_root, sizeof (Cache_flip_event_log));

from->s->online_alter_binlog= new (buf) Cache_flip_event_log();
from->s->online_alter_binlog= new (thd->mem_root) Cache_flip_event_log();
if (!from->s->online_alter_binlog)
DBUG_RETURN(1);

from->s->online_alter_binlog->init_pthread_objects();

error= from->s->online_alter_binlog->open(WRITE_CACHE);

DBUG_ASSERT(!error);

if (!error)
{
/*
Expand Down Expand Up @@ -11988,10 +11979,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);

#ifdef HAVE_REPLICATION
if (likely(online && error < 0))
if (online && error < 0)
{
Ha_trx_info *trx_info_save= thd->transaction->all.ha_list;
thd->transaction->all.ha_list = NULL;
thd_progress_next_stage(thd);
Table_map_log_event table_event(thd, from, from->s->table_map_id,
from->file->has_transactions());
Expand Down Expand Up @@ -12028,10 +12017,8 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
thd_progress_next_stage(thd);
error= online_alter_read_from_binlog(thd, &rgi, binlog);
}

thd->transaction->all.ha_list = trx_info_save;
}
else if (unlikely(online)) // error was on copy stage
else if (online) // error was on copy stage
{
/*
We need to issue a barrier to clean up gracefully.
Expand Down

0 comments on commit 0b67af5

Please sign in to comment.