From a69a6ddac8175a611d97711d167e26ad5d1f9dc8 Mon Sep 17 00:00:00 2001 From: Monty Date: Thu, 8 Oct 2015 10:45:09 +0300 Subject: [PATCH] MDEV-4487 Allow replication from MySQL 5.6+ when GTID is enabled on the master MDEV-8685 MariaDB fails to decode Anonymous_GTID entries MDEV-5705 Replication testing: 5.6->10.0 - Ignoring GTID events from MySQL 5.6+ (Allows replication from MySQL 5.6+ with GTID enabled) - Added ignorable events from MySQL 5.6 - mysqlbinlog now writes information about GTID and ignorable events. - Added more information in error message when replication stops because of wrong information in binary log. - Fixed wrong test when write_on_release() should flush cache. --- sql/log_event.cc | 127 +++++++++++++++++++++++++++++++++++++++++++---- sql/log_event.h | 72 +++++++++++++++++++++++++++ sql/slave.cc | 6 +++ 3 files changed, 194 insertions(+), 11 deletions(-) diff --git a/sql/log_event.cc b/sql/log_event.cc index fef0e70b16443..411229d319a86 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -305,7 +305,7 @@ class Write_on_release_cache ~Write_on_release_cache() { copy_event_cache_to_file_and_reinit(m_cache, m_file); - if (m_flags | FLUSH_F) + if (m_flags & FLUSH_F) fflush(m_file); } @@ -813,6 +813,15 @@ const char* Log_event::get_type_str(Log_event_type type) case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint"; case GTID_EVENT: return "Gtid"; case GTID_LIST_EVENT: return "Gtid_list"; + + /* The following is only for mysqlbinlog */ + case IGNORABLE_LOG_EVENT: return "Ignorable log event"; + case ROWS_QUERY_LOG_EVENT: return "MySQL Rows_query"; + case GTID_LOG_EVENT: return "MySQL Gtid"; + case ANONYMOUS_GTID_LOG_EVENT: return "MySQL Anonymous_Gtid"; + case PREVIOUS_GTIDS_LOG_EVENT: return "MySQL Previous_gtids"; + case HEARTBEAT_LOG_EVENT: return "Heartbeat"; + default: return "Unknown"; /* impossible */ } } @@ -1416,6 +1425,8 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, DBUG_ENTER("Log_event::read_log_event"); DBUG_ASSERT(description_event != 0); char head[LOG_EVENT_MINIMAL_HEADER_LEN]; + my_off_t position= my_b_tell(file); + /* First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to check the event for sanity and to know its length; no need to really parse @@ -1427,7 +1438,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, LOG_EVENT_MINIMAL_HEADER_LEN); LOCK_MUTEX; - DBUG_PRINT("info", ("my_b_tell: %lu", (ulong) my_b_tell(file))); + DBUG_PRINT("info", ("my_b_tell: %llu", (ulonglong) position)); if (my_b_read(file, (uchar *) head, header_size)) { DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \ @@ -1484,8 +1495,9 @@ failed my_b_read")); { DBUG_ASSERT(error != 0); sql_print_error("Error in Log_event::read_log_event(): " - "'%s', data_len: %lu, event_type: %d", - error,data_len,(uchar)(head[EVENT_TYPE_OFFSET])); + "'%s' at offset: %llu data_len: %lu event_type: %d", + error, position, data_len, + (uchar)(head[EVENT_TYPE_OFFSET])); my_free(buf); /* The SQL slave thread will check if file->error<0 to know @@ -1518,10 +1530,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); DBUG_DUMP("data", (unsigned char*) buf, event_len); - /* Check the integrity */ + /* + Check the integrity; This is needed because handle_slave_io() doesn't + check if packet is of proper length. + */ if (event_len < EVENT_LEN_OFFSET || - (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || - (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) + event_len != uint4korr(buf+EVENT_LEN_OFFSET)) { *error="Sanity check failed"; // Needed to free buffer DBUG_RETURN(NULL); // general sanity check - will fail on a partial read @@ -1703,6 +1717,15 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case DELETE_ROWS_EVENT: ev = new Delete_rows_log_event(buf, event_len, description_event); break; + + /* MySQL GTID events are ignored */ + case GTID_LOG_EVENT: + case ANONYMOUS_GTID_LOG_EVENT: + case PREVIOUS_GTIDS_LOG_EVENT: + ev= new Ignorable_log_event(buf, description_event, + get_type_str((Log_event_type) event_type)); + break; + case TABLE_MAP_EVENT: ev = new Table_map_log_event(buf, event_len, description_event); break; @@ -1720,10 +1743,22 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ev = new Annotate_rows_log_event(buf, event_len, description_event); break; default: - DBUG_PRINT("error",("Unknown event code: %d", - (int) buf[EVENT_TYPE_OFFSET])); - ev= NULL; - break; + /* + Create an object of Ignorable_log_event for unrecognized sub-class. + So that SLAVE SQL THREAD will only update the position and continue. + */ + if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F) + { + ev= new Ignorable_log_event(buf, description_event, + get_type_str((Log_event_type) event_type)); + } + else + { + DBUG_PRINT("error",("Unknown event code: %d", + (int) buf[EVENT_TYPE_OFFSET])); + ev= NULL; + break; + } } } @@ -4891,6 +4926,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[HEARTBEAT_LOG_EVENT-1]= 0; post_header_len[IGNORABLE_LOG_EVENT-1]= 0; post_header_len[ROWS_QUERY_LOG_EVENT-1]= 0; + post_header_len[GTID_LOG_EVENT-1]= 0; + post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]= 0; + post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= 0; post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; @@ -12639,6 +12677,52 @@ Incident_log_event::write_data_body(IO_CACHE *file) } +Ignorable_log_event::Ignorable_log_event(const char *buf, + const Format_description_log_event + *descr_event, + const char *event_name) + :Log_event(buf, descr_event), number((int) (uchar) buf[EVENT_TYPE_OFFSET]), + description(event_name) +{ + DBUG_ENTER("Ignorable_log_event::Ignorable_log_event"); + DBUG_VOID_RETURN; +} + +Ignorable_log_event::~Ignorable_log_event() +{ +} + +#ifndef MYSQL_CLIENT +/* Pack info for its unrecognized ignorable event */ +void Ignorable_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf[256]; + size_t bytes; + bytes= my_snprintf(buf, sizeof(buf), "# Ignorable event type %d (%s)", + number, description); + protocol->store(buf, bytes, &my_charset_bin); +} +#endif + +#ifdef MYSQL_CLIENT +/* Print for its unrecognized ignorable event */ +void +Ignorable_log_event::print(FILE *file, + PRINT_EVENT_INFO *print_event_info) +{ + if (print_event_info->short_form) + return; + + print_header(&print_event_info->head_cache, print_event_info, FALSE); + my_b_printf(&print_event_info->head_cache, "\tIgnorable\n"); + my_b_printf(&print_event_info->head_cache, + "# Ignorable event type %d (%s)\n", number, description); + copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, + file); +} +#endif + + #ifdef MYSQL_CLIENT /** The default values for these variables should be values that are @@ -12720,4 +12804,25 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, return TRUE; #endif } + +/** + Check if we should write event to the relay log + + This is used to skip events that is only supported by MySQL + + Return: + 0 ok + 1 Don't write event +*/ + +bool event_that_should_be_ignored(const char *buf) +{ + uint event_type= (uchar)buf[EVENT_TYPE_OFFSET]; + if (event_type == GTID_LOG_EVENT || + event_type == ANONYMOUS_GTID_LOG_EVENT || + event_type == PREVIOUS_GTIDS_LOG_EVENT || + (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)) + return 1; + return 0; +} #endif diff --git a/sql/log_event.h b/sql/log_event.h index 1349ec6187a56..8661e6e49e503 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -260,6 +260,7 @@ struct sql_ex_info #define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN) #define INCIDENT_HEADER_LEN 2 #define HEARTBEAT_HEADER_LEN 0 +#define IGNORABLE_HEADER_LEN 0 #define ROWS_HEADER_LEN_V2 10 #define ANNOTATE_ROWS_HEADER_LEN 0 #define BINLOG_CHECKPOINT_HEADER_LEN 4 @@ -519,6 +520,17 @@ struct sql_ex_info */ #define LOG_EVENT_RELAY_LOG_F 0x40 +/** + @def LOG_EVENT_IGNORABLE_F + + For an event, 'e', carrying a type code, that a slave, + 's', does not recognize, 's' will check 'e' for + LOG_EVENT_IGNORABLE_F, and if the flag is set, then 'e' + is ignored. Otherwise, 's' acknowledges that it has + found an unknown event in the relay log. +*/ +#define LOG_EVENT_IGNORABLE_F 0x80 + /** @def LOG_EVENT_SKIP_REPLICATION_F @@ -697,6 +709,11 @@ enum Log_event_type UPDATE_ROWS_EVENT = 31, DELETE_ROWS_EVENT = 32, + /* MySQL 5.6 GTID events, ignored by MariaDB */ + GTID_LOG_EVENT= 33, + ANONYMOUS_GTID_LOG_EVENT= 34, + PREVIOUS_GTIDS_LOG_EVENT= 35, + /* Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers @@ -4740,6 +4757,60 @@ class Incident_log_event : public Log_event { LEX_STRING m_message; }; +/** + @class Ignorable_log_event + + Base class for ignorable log events. Events deriving from + this class can be safely ignored by slaves that cannot + recognize them. Newer slaves, will be able to read and + handle them. This has been designed to be an open-ended + architecture, so adding new derived events shall not harm + the old slaves that support ignorable log event mechanism + (they will just ignore unrecognized ignorable events). + + @note The only thing that makes an event ignorable is that it has + the LOG_EVENT_IGNORABLE_F flag set. It is not strictly necessary + that ignorable event types derive from Ignorable_log_event; they may + just as well derive from Log_event and pass LOG_EVENT_IGNORABLE_F as + argument to the Log_event constructor. +**/ + +class Ignorable_log_event : public Log_event { +public: + int number; + const char *description; + +#ifndef MYSQL_CLIENT + Ignorable_log_event(THD *thd_arg) + :Log_event(thd_arg, LOG_EVENT_IGNORABLE_F, FALSE), + number(0), description("internal") + { + DBUG_ENTER("Ignorable_log_event::Ignorable_log_event"); + DBUG_VOID_RETURN; + } +#endif + + Ignorable_log_event(const char *buf, + const Format_description_log_event *descr_event, + const char *event_name); + virtual ~Ignorable_log_event(); + +#ifndef MYSQL_CLIENT + void pack_info(THD *, Protocol*); +#endif + +#ifdef MYSQL_CLIENT + virtual void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + + virtual Log_event_type get_type_code() { return IGNORABLE_LOG_EVENT; } + + virtual bool is_valid() const { return 1; } + + virtual int get_data_size() { return IGNORABLE_HEADER_LEN; } +}; + + static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { @@ -4797,6 +4868,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, ulonglong *relay_log_pos); bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg); +bool event_that_should_be_ignored(const char *buf); uint8 get_checksum_alg(const char* buf, ulong len); extern TYPELIB binlog_checksum_typelib; diff --git a/sql/slave.cc b/sql/slave.cc index 0243272c443df..5af48b6a79361 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5615,6 +5615,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error_msg.append(llbuf, strlen(llbuf)); goto err; } + + /* + Heartbeat events doesn't count in the binlog size, so we don't have to + increment mi->master_log_pos + */ goto skip_relay_logging; } break; @@ -5844,6 +5849,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) else if ((s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || + event_that_should_be_ignored(buf) || /* the following conjunction deals with IGNORE_SERVER_IDS, if set If the master is on the ignore list, execution of