Skip to content

Commit

Permalink
MDEV-4487 Allow replication from MySQL 5.6+ when GTID is enabled on t…
Browse files Browse the repository at this point in the history
…he master

MDEV-8685 MariaDB fails to decode Anonymous_GTID entries
MDEV-5705 Replication testing: 5.6->10.0

- Ignoring GTID events from MySQL 5.6+ (Allows replication from MySQL 5.6+ with GTID enabled)
- Added ignorable events from MySQL 5.6
- mysqlbinlog now writes information about GTID and ignorable events.
- Added more information in error message when replication stops because of wrong information in binary log.
- Fixed wrong test when write_on_release() should flush cache.
  • Loading branch information
montywi committed Oct 8, 2015
1 parent 7c1e2fe commit a69a6dd
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 11 deletions.
127 changes: 116 additions & 11 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class Write_on_release_cache
~Write_on_release_cache()
{
copy_event_cache_to_file_and_reinit(m_cache, m_file);
if (m_flags | FLUSH_F)
if (m_flags & FLUSH_F)
fflush(m_file);
}

Expand Down Expand Up @@ -813,6 +813,15 @@ const char* Log_event::get_type_str(Log_event_type type)
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
case GTID_EVENT: return "Gtid";
case GTID_LIST_EVENT: return "Gtid_list";

/* The following is only for mysqlbinlog */
case IGNORABLE_LOG_EVENT: return "Ignorable log event";
case ROWS_QUERY_LOG_EVENT: return "MySQL Rows_query";
case GTID_LOG_EVENT: return "MySQL Gtid";
case ANONYMOUS_GTID_LOG_EVENT: return "MySQL Anonymous_Gtid";
case PREVIOUS_GTIDS_LOG_EVENT: return "MySQL Previous_gtids";
case HEARTBEAT_LOG_EVENT: return "Heartbeat";

default: return "Unknown"; /* impossible */
}
}
Expand Down Expand Up @@ -1416,6 +1425,8 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
DBUG_ENTER("Log_event::read_log_event");
DBUG_ASSERT(description_event != 0);
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
my_off_t position= my_b_tell(file);

/*
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
check the event for sanity and to know its length; no need to really parse
Expand All @@ -1427,7 +1438,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
LOG_EVENT_MINIMAL_HEADER_LEN);

LOCK_MUTEX;
DBUG_PRINT("info", ("my_b_tell: %lu", (ulong) my_b_tell(file)));
DBUG_PRINT("info", ("my_b_tell: %llu", (ulonglong) position));
if (my_b_read(file, (uchar *) head, header_size))
{
DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \
Expand Down Expand Up @@ -1484,8 +1495,9 @@ failed my_b_read"));
{
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %lu, event_type: %d",
error,data_len,(uchar)(head[EVENT_TYPE_OFFSET]));
"'%s' at offset: %llu data_len: %lu event_type: %d",
error, position, data_len,
(uchar)(head[EVENT_TYPE_OFFSET]));
my_free(buf);
/*
The SQL slave thread will check if file->error<0 to know
Expand Down Expand Up @@ -1518,10 +1530,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
DBUG_DUMP("data", (unsigned char*) buf, event_len);

/* Check the integrity */
/*
Check the integrity; This is needed because handle_slave_io() doesn't
check if packet is of proper length.
*/
if (event_len < EVENT_LEN_OFFSET ||
(uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
Expand Down Expand Up @@ -1703,6 +1717,15 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case DELETE_ROWS_EVENT:
ev = new Delete_rows_log_event(buf, event_len, description_event);
break;

/* MySQL GTID events are ignored */
case GTID_LOG_EVENT:
case ANONYMOUS_GTID_LOG_EVENT:
case PREVIOUS_GTIDS_LOG_EVENT:
ev= new Ignorable_log_event(buf, description_event,
get_type_str((Log_event_type) event_type));
break;

case TABLE_MAP_EVENT:
ev = new Table_map_log_event(buf, event_len, description_event);
break;
Expand All @@ -1720,10 +1743,22 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
ev = new Annotate_rows_log_event(buf, event_len, description_event);
break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
/*
Create an object of Ignorable_log_event for unrecognized sub-class.
So that SLAVE SQL THREAD will only update the position and continue.
*/
if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)
{
ev= new Ignorable_log_event(buf, description_event,
get_type_str((Log_event_type) event_type));
}
else
{
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
}
}
}

Expand Down Expand Up @@ -4891,6 +4926,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
post_header_len[IGNORABLE_LOG_EVENT-1]= 0;
post_header_len[ROWS_QUERY_LOG_EVENT-1]= 0;
post_header_len[GTID_LOG_EVENT-1]= 0;
post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]= 0;
post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= 0;
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
Expand Down Expand Up @@ -12639,6 +12677,52 @@ Incident_log_event::write_data_body(IO_CACHE *file)
}


Ignorable_log_event::Ignorable_log_event(const char *buf,
const Format_description_log_event
*descr_event,
const char *event_name)
:Log_event(buf, descr_event), number((int) (uchar) buf[EVENT_TYPE_OFFSET]),
description(event_name)
{
DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
DBUG_VOID_RETURN;
}

Ignorable_log_event::~Ignorable_log_event()
{
}

#ifndef MYSQL_CLIENT
/* Pack info for its unrecognized ignorable event */
void Ignorable_log_event::pack_info(THD *thd, Protocol *protocol)
{
char buf[256];
size_t bytes;
bytes= my_snprintf(buf, sizeof(buf), "# Ignorable event type %d (%s)",
number, description);
protocol->store(buf, bytes, &my_charset_bin);
}
#endif

#ifdef MYSQL_CLIENT
/* Print for its unrecognized ignorable event */
void
Ignorable_log_event::print(FILE *file,
PRINT_EVENT_INFO *print_event_info)
{
if (print_event_info->short_form)
return;

print_header(&print_event_info->head_cache, print_event_info, FALSE);
my_b_printf(&print_event_info->head_cache, "\tIgnorable\n");
my_b_printf(&print_event_info->head_cache,
"# Ignorable event type %d (%s)\n", number, description);
copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
file);
}
#endif


#ifdef MYSQL_CLIENT
/**
The default values for these variables should be values that are
Expand Down Expand Up @@ -12720,4 +12804,25 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
return TRUE;
#endif
}

/**
Check if we should write event to the relay log
This is used to skip events that is only supported by MySQL
Return:
0 ok
1 Don't write event
*/

bool event_that_should_be_ignored(const char *buf)
{
uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
if (event_type == GTID_LOG_EVENT ||
event_type == ANONYMOUS_GTID_LOG_EVENT ||
event_type == PREVIOUS_GTIDS_LOG_EVENT ||
(uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F))
return 1;
return 0;
}
#endif
72 changes: 72 additions & 0 deletions sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
#define HEARTBEAT_HEADER_LEN 0
#define IGNORABLE_HEADER_LEN 0
#define ROWS_HEADER_LEN_V2 10
#define ANNOTATE_ROWS_HEADER_LEN 0
#define BINLOG_CHECKPOINT_HEADER_LEN 4
Expand Down Expand Up @@ -519,6 +520,17 @@ struct sql_ex_info
*/
#define LOG_EVENT_RELAY_LOG_F 0x40

/**
@def LOG_EVENT_IGNORABLE_F
For an event, 'e', carrying a type code, that a slave,
's', does not recognize, 's' will check 'e' for
LOG_EVENT_IGNORABLE_F, and if the flag is set, then 'e'
is ignored. Otherwise, 's' acknowledges that it has
found an unknown event in the relay log.
*/
#define LOG_EVENT_IGNORABLE_F 0x80

/**
@def LOG_EVENT_SKIP_REPLICATION_F
Expand Down Expand Up @@ -697,6 +709,11 @@ enum Log_event_type
UPDATE_ROWS_EVENT = 31,
DELETE_ROWS_EVENT = 32,

/* MySQL 5.6 GTID events, ignored by MariaDB */
GTID_LOG_EVENT= 33,
ANONYMOUS_GTID_LOG_EVENT= 34,
PREVIOUS_GTIDS_LOG_EVENT= 35,

/*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
Expand Down Expand Up @@ -4740,6 +4757,60 @@ class Incident_log_event : public Log_event {
LEX_STRING m_message;
};

/**
@class Ignorable_log_event
Base class for ignorable log events. Events deriving from
this class can be safely ignored by slaves that cannot
recognize them. Newer slaves, will be able to read and
handle them. This has been designed to be an open-ended
architecture, so adding new derived events shall not harm
the old slaves that support ignorable log event mechanism
(they will just ignore unrecognized ignorable events).
@note The only thing that makes an event ignorable is that it has
the LOG_EVENT_IGNORABLE_F flag set. It is not strictly necessary
that ignorable event types derive from Ignorable_log_event; they may
just as well derive from Log_event and pass LOG_EVENT_IGNORABLE_F as
argument to the Log_event constructor.
**/

class Ignorable_log_event : public Log_event {
public:
int number;
const char *description;

#ifndef MYSQL_CLIENT
Ignorable_log_event(THD *thd_arg)
:Log_event(thd_arg, LOG_EVENT_IGNORABLE_F, FALSE),
number(0), description("internal")
{
DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
DBUG_VOID_RETURN;
}
#endif

Ignorable_log_event(const char *buf,
const Format_description_log_event *descr_event,
const char *event_name);
virtual ~Ignorable_log_event();

#ifndef MYSQL_CLIENT
void pack_info(THD *, Protocol*);
#endif

#ifdef MYSQL_CLIENT
virtual void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
#endif

virtual Log_event_type get_type_code() { return IGNORABLE_LOG_EVENT; }

virtual bool is_valid() const { return 1; }

virtual int get_data_size() { return IGNORABLE_HEADER_LEN; }
};


static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache,
FILE *file)
{
Expand Down Expand Up @@ -4797,6 +4868,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
ulonglong *relay_log_pos);

bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
bool event_that_should_be_ignored(const char *buf);
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;

Expand Down
6 changes: 6 additions & 0 deletions sql/slave.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5615,6 +5615,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(llbuf, strlen(llbuf));
goto err;
}

/*
Heartbeat events doesn't count in the binlog size, so we don't have to
increment mi->master_log_pos
*/
goto skip_relay_logging;
}
break;
Expand Down Expand Up @@ -5844,6 +5849,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
else
if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
event_that_should_be_ignored(buf) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of
Expand Down

0 comments on commit a69a6dd

Please sign in to comment.