diff --git a/sql/handler.cc b/sql/handler.cc index d471121938a23..29c7f1528b7f1 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -7269,7 +7269,9 @@ int handler::binlog_log_row(TABLE *table, if (thd->variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr, + auto *cache= binlog_get_cache_data(cache_mngr, use_trans_cache(thd, is_trans)); + + bool error= (*log_func)(thd, table, &mysql_bin_log, cache, is_trans, before_record, after_record); DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); } diff --git a/sql/handler.h b/sql/handler.h index 6d115530a2a8b..b30f24e1c220a 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -656,8 +656,8 @@ given at all. */ typedef ulonglong alter_table_operations; class MYSQL_BIN_LOG; -class binlog_cache_mngr; -typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool, +class binlog_cache_data; +typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_data *, bool, const uchar*, const uchar*); /* diff --git a/sql/log.cc b/sql/log.cc index be5be4313c7fd..c432f13dec321 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -2055,29 +2055,31 @@ static int binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { DBUG_ENTER("binlog_truncate_trx_cache"); + + if(!WSREP_EMULATE_BINLOG_NNULL(thd) && !mysql_bin_log.is_open()) + DBUG_RETURN(0); + int error=0; - /* - This function handles transactional changes and as such this flag - equals to true. - */ - bool const is_transactional= TRUE; DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s", FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), all ? "all" : "stmt")); - thd->binlog_remove_pending_rows_event(TRUE, is_transactional); + auto &trx_cache= cache_mngr->trx_cache; + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &trx_cache); + thd->reset_binlog_for_next_statement(); + /* If rolling back an entire transaction or a single statement not inside a transaction, we reset the transaction cache. */ if (ending_trans(thd, all)) { - if (cache_mngr->trx_cache.has_incident()) + if (trx_cache.has_incident()) error= mysql_bin_log.write_incident(thd); - thd->reset_binlog_for_next_statement(); + DBUG_ASSERT(thd->binlog_table_maps == 0); cache_mngr->reset(false, true); } @@ -2086,9 +2088,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) transaction cache to remove the statement. */ else - cache_mngr->trx_cache.restore_prev_position(); + trx_cache.restore_prev_position(); - DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL); + DBUG_ASSERT(trx_cache.pending() == NULL); DBUG_RETURN(error); } @@ -2406,7 +2408,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } - if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) + if (!wsrep_emulate_bin_log && MYSQL_BIN_LOG::check_write_error(thd)) { /* "all == true" means that a "rollback statement" triggered the error and @@ -2458,12 +2460,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) void binlog_reset_cache(THD *thd) { - binlog_cache_mngr *const cache_mngr= opt_bin_log ? + binlog_cache_mngr *const cache_mngr= opt_bin_log ? thd->binlog_get_cache_mngr() : 0; DBUG_ENTER("binlog_reset_cache"); if (cache_mngr) { - thd->binlog_remove_pending_rows_event(TRUE, TRUE); + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache); + thd->reset_binlog_for_next_statement(); cache_mngr->reset(true, true); } DBUG_VOID_RETURN; @@ -6318,30 +6321,31 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr, bool use_trans_cache) { + DBUG_ASSERT(cache_mngr); Rows_log_event* rows= NULL; - /* - This is less than ideal, but here's the story: If there is no cache_mngr, - prepare_pending_rows_event() has never been called (since the cache_mngr - is set up there). In that case, we just return NULL. - */ if (cache_mngr) rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending(); return rows; } +binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr, + bool use_trans_cache) +{ + return cache_mngr->get_binlog_cache_data(use_trans_cache); +} + int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, bool is_transactional, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, - bool use_trans_cache) + binlog_cache_data *cache_data) { /* Mark the event as the last event of a statement if the stmt_end flag is set. */ int error= 0; - auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending(); + auto *pending= cache_data->pending(); if (pending) { if (stmt_end) @@ -6350,36 +6354,12 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, thd->reset_binlog_for_next_statement(); } - error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr, + error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_data, is_transactional); } return error; } -/** - This function stores a pending row event into a cache which is specified - through the parameter @c is_transactional. Respectively, when it is @c - true, the pending event is stored into the transactional cache. Otherwise - into the non-transactional cache. - - @param evt a pointer to the row event. - @param use_trans_cache @c true indicates a transactional cache, - otherwise @c false a non-transactional. -*/ -void -THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache) -{ - binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data(); - - DBUG_ASSERT(cache_mngr); - - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache); - - cache_data->set_pending(ev); -} - - /** This function removes the pending rows event, discarding any outstanding rows. If there is no pending rows event available, this is effectively a @@ -6390,17 +6370,10 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache) otherwise @c false a non-transactional. */ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); - - DBUG_ASSERT(cache_mngr); - - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); - if (Rows_log_event* pending= cache_data->pending()) { delete pending; @@ -6414,6 +6387,7 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) Moves the last bunch of rows from the pending Rows event to a cache (either transactional cache if is_transaction is @c true, or the non-transactional cache otherwise. Sets a new pending event. + In case of error during flushing, sets write_error=1 to itself. @param thd a pointer to the user thread. @param evt a pointer to the row event. @@ -6423,19 +6397,13 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) int MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, - binlog_cache_mngr *cache_mngr, + binlog_cache_data *cache_data, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: %p", event)); - DBUG_ASSERT(cache_mngr); - - bool should_use_trans_cache= use_trans_cache(thd, is_transactional); - binlog_cache_data *cache_data= - cache_mngr->get_binlog_cache_data(should_use_trans_cache); - DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending())); if (Rows_log_event* pending= cache_data->pending()) @@ -6463,11 +6431,89 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, delete pending; } - thd->binlog_set_pending_rows_event(event, should_use_trans_cache); + cache_data->set_pending(event); DBUG_RETURN(0); } +/* + Member function for ensuring that there is an rows log + event of the apropriate type before proceeding. + + POST CONDITION: + If a non-NULL pointer is returned, the pending event for thread 'thd' will + be an event created by callback hold by event_factory, and + will be either empty or have enough space to hold 'needed' bytes. + In addition, the columns bitmap will be correct for the row, meaning that + the pending event will be flushed if the columns in the event differ from + the columns suppled to the function. + + RETURNS + If no error, a non-NULL pending event (either one which already existed or + the newly created one). + If error, NULL. + */ + +Rows_log_event* +MYSQL_BIN_LOG::prepare_pending_rows_event(THD *thd, TABLE* table, + binlog_cache_data *cache_data, + uint32 serv_id, size_t needed, + bool is_transactional, + Rows_event_factory event_factory) +{ + DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event"); + /* Pre-conditions */ + DBUG_ASSERT(table->s->table_map_id != ~0UL); + + /* + There is no good place to set up the transactional data, so we + have to do it here. + */ + Rows_log_event* pending= cache_data->pending(); + + if (unlikely(pending && !pending->is_valid())) + DBUG_RETURN(NULL); + + /* + Check if the current event is non-NULL and a write-rows + event. Also check if the table provided is mapped: if it is not, + then we have switched to writing to a new table. + If there is no pending event, we need to create one. If there is a pending + event, but it's not about the same table id, or not of the same type + (between Write, Update and Delete), or not the same affected columns, or + going to be too big, flush this event to disk and create a new pending + event. + */ + if (!pending || + pending->server_id != serv_id || + pending->get_table_id() != table->s->table_map_id || + pending->get_general_type_code() != event_factory.type_code || + pending->get_data_size() + needed > opt_binlog_rows_event_max_size || + pending->read_write_bitmaps_cmp(table) == FALSE) + { + /* Create a new RowsEventT... */ + Rows_log_event* const + ev= event_factory.create(thd, table, table->s->table_map_id, + is_transactional); + if (unlikely(!ev)) + DBUG_RETURN(NULL); + ev->server_id= serv_id; // I don't like this, it's too easy to forget. + /* + flush the pending event and replace it with the newly created + event... + */ + if (unlikely(flush_and_set_pending_rows_event(thd, ev, cache_data, + is_transactional))) + { + delete ev; + DBUG_RETURN(NULL); + } + + DBUG_RETURN(ev); /* This is the new pending event */ + } + DBUG_RETURN(pending); /* This is the current pending event */ +} + /* Generate a new global transaction ID, and write it to the binlog */ @@ -12108,7 +12154,8 @@ void wsrep_thd_binlog_stmt_rollback(THD * thd) binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr(); if (cache_mngr) { - thd->binlog_remove_pending_rows_event(TRUE, TRUE); + MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache); + thd->reset_binlog_for_next_statement(); cache_mngr->stmt_cache.reset(); } DBUG_VOID_RETURN; diff --git a/sql/log.h b/sql/log.h index a007408a740cf..c8cf8bdb27989 100644 --- a/sql/log.h +++ b/sql/log.h @@ -350,6 +350,31 @@ class MYSQL_LOG enum cache_type io_cache_type_arg); }; +/** + @struct Rows_event_factory + + Holds an event type code and a callback function to create it. + Should be created by Rows_event_factory::get. +*/ +struct Rows_event_factory +{ + int type_code; + + Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional); + + template + static Rows_event_factory get() + { + return { RowsEventT::TYPE_CODE, + [](THD* thd, TABLE* table, ulong flags, bool is_transactional) + -> Rows_log_event* + { + return new RowsEventT(thd, table, flags, is_transactional); + } + }; + } +}; + /* Tell the io thread if we can delay the master info sync. */ #define SEMI_SYNC_SLAVE_DELAY_SYNC 1 /* Tell the io thread if the current event needs a ack. */ @@ -419,6 +444,7 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG #define BINLOG_COOKIE_IS_DUMMY(c) \ ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID ) + class binlog_cache_mngr; class binlog_cache_data; struct rpl_gtid; @@ -723,11 +749,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG Format_description_log_event *fdle, bool do_xa); int do_binlog_recovery(const char *opt_name, bool do_xa_recovery); #if !defined(MYSQL_CLIENT) + Rows_log_event* + prepare_pending_rows_event(THD *thd, TABLE* table, + binlog_cache_data *cache_data, + uint32 serv_id, size_t needed, + bool is_transactional, + Rows_event_factory event_factory); int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, - binlog_cache_mngr *cache_mngr, + binlog_cache_data *cache_data, bool is_transactional); - int remove_pending_rows_event(THD *thd, bool is_transactional); + + static int remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data); #endif /* !defined(MYSQL_CLIENT) */ void reset_bytes_written() @@ -824,7 +857,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG void write_binlog_checkpoint_event_already_locked(const char *name, uint len); int write_cache(THD *thd, IO_CACHE *cache); void set_write_error(THD *thd, bool is_transactional); - bool check_write_error(THD *thd); + static bool check_write_error(THD *thd); void start_union_events(THD *thd, query_id_t query_id_param); void stop_union_events(THD *thd); @@ -1179,10 +1212,11 @@ bool write_annotated_row(THD *thd); int binlog_flush_pending_rows_event(THD *thd, bool stmt_end, bool is_transactional, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, - bool use_trans_cache); + binlog_cache_data *cache_data); Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr, bool use_trans_cache); +binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr, + bool use_trans_cache); extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern handlerton *binlog_hton; diff --git a/sql/log_event.h b/sql/log_event.h index a19572e16d156..142f3c44d4175 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -4927,14 +4927,14 @@ class Write_rows_log_event : public Rows_log_event #if defined(MYSQL_SERVER) static bool binlog_row_logging_function(THD *thd, TABLE *table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, + binlog_cache_data *cache_data, bool is_transactional, const uchar *before_record __attribute__((unused)), const uchar *after_record) { DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); - return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional, + return thd->binlog_write_row(table, bin_log, cache_data, is_transactional, after_record); } #endif @@ -5013,13 +5013,13 @@ class Update_rows_log_event : public Rows_log_event #ifdef MYSQL_SERVER static bool binlog_row_logging_function(THD *thd, TABLE *table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, + binlog_cache_data *cache_data, bool is_transactional, const uchar *before_record, const uchar *after_record) { DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); - return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional, + return thd->binlog_update_row(table, bin_log, cache_data, is_transactional, before_record, after_record); } #endif @@ -5104,14 +5104,14 @@ class Delete_rows_log_event : public Rows_log_event #ifdef MYSQL_SERVER static bool binlog_row_logging_function(THD *thd, TABLE *table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, + binlog_cache_data *cache_data, bool is_transactional, const uchar *before_record, const uchar *after_record __attribute__((unused))) { DBUG_ASSERT(!table->versioned(VERS_TRX_ID)); - return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional, + return thd->binlog_delete_row(table, bin_log, cache_data, is_transactional, before_record); } #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a92e74c52f6d4..86c113ae047c0 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -6899,116 +6899,6 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db) binlog_filter->db_ok(db->str))); } -struct RowsEventFactory -{ - int type_code; - - Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional); -}; - -/** - Creates RowsEventFactory, responsible for creating Rows_log_event descendant. - @tparam RowsEventT is a type which will be constructed by - RowsEventFactory::create. - @return a RowsEventFactory object with type_code equal to RowsEventT::TYPE_CODE - and create containing pointer to a RowsEventT constructor callback. - */ -template -static RowsEventFactory binlog_get_rows_event_creator() -{ - return { RowsEventT::TYPE_CODE, - [](THD* thd, TABLE* table, ulong flags, bool is_transactional) - -> Rows_log_event* - { - return new RowsEventT(thd, table, flags, is_transactional); - } - }; -} - -/* - Template member function for ensuring that there is an rows log - event of the apropriate type before proceeding. - - PRE CONDITION: - - Events of type 'RowEventT' have the type code 'type_code'. - - POST CONDITION: - If a non-NULL pointer is returned, the pending event for thread 'thd' will - be an event of type 'RowEventT' (which have the type code 'type_code') - will either empty or have enough space to hold 'needed' bytes. In - addition, the columns bitmap will be correct for the row, meaning that - the pending event will be flushed if the columns in the event differ from - the columns suppled to the function. - - RETURNS - If no error, a non-NULL pending event (either one which already existed or - the newly created one). - If error, NULL. - */ - -Rows_log_event* -binlog_prepare_pending_rows_event(THD *thd, TABLE* table, - MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, - uint32 serv_id, size_t needed, - bool is_transactional, - RowsEventFactory event_factory) -{ - DBUG_ENTER("binlog_prepare_pending_rows_event"); - /* Pre-conditions */ - DBUG_ASSERT(table->s->table_map_id != ~0UL); - - /* - There is no good place to set up the transactional data, so we - have to do it here. - */ - Rows_log_event* pending= binlog_get_pending_rows_event(cache_mngr, - use_trans_cache(thd, - is_transactional)); - - if (unlikely(pending && !pending->is_valid())) - DBUG_RETURN(NULL); - - /* - Check if the current event is non-NULL and a write-rows - event. Also check if the table provided is mapped: if it is not, - then we have switched to writing to a new table. - If there is no pending event, we need to create one. If there is a pending - event, but it's not about the same table id, or not of the same type - (between Write, Update and Delete), or not the same affected columns, or - going to be too big, flush this event to disk and create a new pending - event. - */ - if (!pending || - pending->server_id != serv_id || - pending->get_table_id() != table->s->table_map_id || - pending->get_general_type_code() != event_factory.type_code || - pending->get_data_size() + needed > opt_binlog_rows_event_max_size || - pending->read_write_bitmaps_cmp(table) == FALSE) - { - /* Create a new RowsEventT... */ - Rows_log_event* const - ev= event_factory.create(thd, table, table->s->table_map_id, - is_transactional); - if (unlikely(!ev)) - DBUG_RETURN(NULL); - ev->server_id= serv_id; // I don't like this, it's too easy to forget. - /* - flush the pending event and replace it with the newly created - event... - */ - if (unlikely(bin_log->flush_and_set_pending_rows_event(thd, ev, cache_mngr, - is_transactional))) - { - delete ev; - DBUG_RETURN(NULL); - } - - DBUG_RETURN(ev); /* This is the new pending event */ - } - DBUG_RETURN(pending); /* This is the current pending event */ -} - /* Declare in unnamed namespace. */ CPP_UNNAMED_NS_START /** @@ -7134,7 +7024,7 @@ CPP_UNNAMED_NS_START CPP_UNNAMED_NS_END int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_trans, + binlog_cache_data *cache_data, bool is_trans, uchar const *record) { /* @@ -7151,13 +7041,12 @@ int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log, size_t const len= pack_row(table, table->rpl_write_set, row_data, record); auto creator= binlog_should_compress(len) ? - binlog_get_rows_event_creator() : - binlog_get_rows_event_creator(); + Rows_event_factory::get() : + Rows_event_factory::get(); - auto *ev= binlog_prepare_pending_rows_event(this, table, - &mysql_bin_log, cache_mngr, - variables.server_id, - len, is_trans, creator); + auto *ev= bin_log->prepare_pending_rows_event(this, table, cache_data, + variables.server_id, + len, is_trans, creator); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -7166,7 +7055,7 @@ int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log, } int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_trans, + binlog_cache_data *cache_data, bool is_trans, const uchar *before_record, const uchar *after_record) { @@ -7214,13 +7103,12 @@ int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log, #endif auto creator= binlog_should_compress(before_size + after_size) ? - binlog_get_rows_event_creator() : - binlog_get_rows_event_creator(); - auto *ev= binlog_prepare_pending_rows_event(this, table, - &mysql_bin_log, cache_mngr, - variables.server_id, - before_size + after_size, - is_trans, creator); + Rows_event_factory::get() : + Rows_event_factory::get(); + auto *ev= bin_log->prepare_pending_rows_event(this, table, cache_data, + variables.server_id, + before_size + after_size, + is_trans, creator); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -7236,7 +7124,7 @@ int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log, } int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_trans, + binlog_cache_data *cache_data, bool is_trans, uchar const *record) { /** @@ -7270,12 +7158,11 @@ int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log, size_t const len= pack_row(table, table->read_set, row_data, record); auto creator= binlog_should_compress(len) ? - binlog_get_rows_event_creator() : - binlog_get_rows_event_creator(); - auto *ev= binlog_prepare_pending_rows_event(this, table, - &mysql_bin_log, cache_mngr, - variables.server_id, - len, is_trans, creator); + Rows_event_factory::get() : + Rows_event_factory::get(); + auto *ev= mysql_bin_log.prepare_pending_rows_event(this, table, cache_data, + variables.server_id, + len, is_trans, creator); if (unlikely(ev == 0)) return HA_ERR_OUT_OF_MEM; @@ -7355,28 +7242,6 @@ void THD::binlog_prepare_row_images(TABLE *table) DBUG_VOID_RETURN; } - - -int THD::binlog_remove_pending_rows_event(bool reset_stmt, - bool is_transactional) -{ - DBUG_ENTER("THD::binlog_remove_pending_rows_event"); - - if(!WSREP_EMULATE_BINLOG_NNULL(this) && !mysql_bin_log.is_open()) - DBUG_RETURN(0); - - /* Ensure that all events in a GTID group are in the same cache */ - if (variables.option_bits & OPTION_GTID_BEGIN) - is_transactional= 1; - - mysql_bin_log.remove_pending_rows_event(this, is_transactional); - - if (reset_stmt) - reset_binlog_for_next_statement(); - DBUG_RETURN(0); -} - - int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); @@ -7395,11 +7260,12 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) auto *cache_mngr= binlog_get_cache_mngr(); if (!cache_mngr) DBUG_RETURN(0); + auto *cache= binlog_get_cache_data(cache_mngr, + use_trans_cache(this, is_transactional)); int error= ::binlog_flush_pending_rows_event(this, stmt_end, is_transactional, - &mysql_bin_log, cache_mngr, - use_trans_cache(this, is_transactional)); + &mysql_bin_log, cache); DBUG_RETURN(error); } diff --git a/sql/sql_class.h b/sql/sql_class.h index 882f28c040414..bb62fe86109cc 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2947,13 +2947,13 @@ class THD: public THD_count, /* this must be first */ void binlog_start_trans_and_stmt(); void binlog_set_stmt_begin(); int binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_transactional, + binlog_cache_data *cache_data, bool is_transactional, const uchar *buf); int binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_transactional, + binlog_cache_data *cache_data, bool is_transactional, const uchar *buf); int binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log, - binlog_cache_mngr *cache_mngr, bool is_transactional, + binlog_cache_data *cache_data, bool is_transactional, const uchar *old_data, const uchar *new_data); bool prepare_handlers_for_update(uint flag); bool binlog_write_annotated_row(Log_event_writer *writer); @@ -2968,14 +2968,12 @@ class THD: public THD_count, /* this must be first */ Member functions to handle pending event for row-level logging. */ binlog_cache_mngr *binlog_get_cache_mngr() const; - void binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache); inline int binlog_flush_pending_rows_event(bool stmt_end) { return (binlog_flush_pending_rows_event(stmt_end, FALSE) || binlog_flush_pending_rows_event(stmt_end, TRUE)); } int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional); - int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional); bool binlog_need_stmt_format(bool is_transactional) const {