Skip to content

Commit

Permalink
Repurpose SLAVE_EVENT as METADATA_EVENT
Browse files Browse the repository at this point in the history
Summary:
Repurpse SLAVE_EVENT as METADATA_EVENT. This new METADATA_EVENT will follow
TLV encoding - this will ensure that it is easy to extend this event in future.

This new method has a 64 bit flags in the data-header part (header is not
encoded as TLV). The data-body currently consists of a 64 bit hlc timestamp.

This needs to be ported to 8.0 before using in prod since we use 8.0 mysqlbinlog/mysqlclient in prod

Reviewed By: abhinav04sharma

Differential Revision: D15733469

fbshipit-source-id: 0aa5053
  • Loading branch information
bhatvinay authored and facebook-github-bot committed Jun 22, 2019
1 parent de0bf94 commit 2b0611e
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 13 deletions.
17 changes: 16 additions & 1 deletion client/mysqlbinlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ static bool shall_skip_gtids(Log_event* ev, Gtid *cached_gtid)
outputted.
*/
case START_EVENT_V3: /* for completion */
case SLAVE_EVENT: /* for completion */
case METADATA_EVENT:
case STOP_EVENT:
case FORMAT_DESCRIPTION_EVENT:
case ROTATE_EVENT:
Expand Down Expand Up @@ -1920,6 +1920,21 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
"any case. If you want to exclude or include transactions, "
"you should use the options --exclude-gtids or "
"--include-gtids, respectively, instead.");
case METADATA_EVENT:
{
ev->print(result_file, print_event_info);
if (head->error == -1)
goto err;

/* Copy and flush head cache and body cache */
if (copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
result_file, stop_never) ||
copy_event_cache_to_file_and_reinit(&print_event_info->body_cache,
result_file, stop_never))
goto err;

goto end;
}

/* fall through */
default:
Expand Down
4 changes: 2 additions & 2 deletions mysql-test/r/show_rpl.result
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Rpl_count_format_description 0
Rpl_count_incident 0
Rpl_count_intvar 0
Rpl_count_load 0
Rpl_count_metadata 0
Rpl_count_new_load 0
Rpl_count_other 0
Rpl_count_pre_ga_delete_rows 0
Expand All @@ -19,7 +20,6 @@ Rpl_count_pre_ga_write_rows 0
Rpl_count_query 0
Rpl_count_rand 0
Rpl_count_rotate 0
Rpl_count_slave 0
Rpl_count_start_v3 0
Rpl_count_stop 0
Rpl_count_table_map 0
Expand All @@ -41,6 +41,7 @@ Rpl_seconds_format_description 0.000000
Rpl_seconds_incident 0.000000
Rpl_seconds_intvar 0.000000
Rpl_seconds_load 0.000000
Rpl_seconds_metadata 0.000000
Rpl_seconds_new_load 0.000000
Rpl_seconds_other 0.000000
Rpl_seconds_pre_ga_delete_rows 0.000000
Expand All @@ -49,7 +50,6 @@ Rpl_seconds_pre_ga_write_rows 0.000000
Rpl_seconds_query 0.000000
Rpl_seconds_rand 0.000000
Rpl_seconds_rotate 0.000000
Rpl_seconds_slave 0.000000
Rpl_seconds_start_v3 0.000000
Rpl_seconds_stop 0.000000
Rpl_seconds_table_map 0.000000
Expand Down
259 changes: 253 additions & 6 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ const char* Log_event::get_type_str(Log_event_type type)
case ROTATE_EVENT: return "Rotate";
case INTVAR_EVENT: return "Intvar";
case LOAD_EVENT: return "Load";
case METADATA_EVENT: return "Metadata";
case NEW_LOAD_EVENT: return "New_load";
case CREATE_FILE_EVENT: return "Create_file";
case APPEND_BLOCK_EVENT: return "Append_block";
Expand Down Expand Up @@ -1740,6 +1741,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case LOAD_EVENT:
ev = new Load_log_event(buf, event_len, description_event);
break;
case METADATA_EVENT:
ev = new Metadata_log_event(buf, event_len, description_event);
break;
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len, description_event);
break;
Expand Down Expand Up @@ -1871,10 +1875,8 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
(because constructor is "void") ; so instead we leave the pointer we
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
Same for Format_description_log_event, member 'post_header_len'.

SLAVE_EVENT is never used, so it should not be read ever.
*/
if (!ev || !ev->is_valid() || (event_type == SLAVE_EVENT))
if (!ev || !ev->is_valid())
{
DBUG_PRINT("error",("Found invalid event in binary log"));

Expand Down Expand Up @@ -6131,7 +6133,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
post_header_len[INTVAR_EVENT-1]= INTVAR_HEADER_LEN;
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
post_header_len[SLAVE_EVENT-1]= 0; /* Unused because the code for Slave log event was removed. (15th Oct. 2010) */
post_header_len[METADATA_EVENT-1]= METADATA_HEADER_LEN;
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
Expand Down Expand Up @@ -6221,7 +6223,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
post_header_len[INTVAR_EVENT-1]= 0;
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
post_header_len[SLAVE_EVENT-1]= 0; /* Unused because the code for Slave log event was removed. (15th Oct. 2010) */
post_header_len[METADATA_EVENT-1]= METADATA_HEADER_LEN;
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
Expand Down Expand Up @@ -6382,7 +6384,7 @@ Format_description_log_event(const char* buf,
static const uint8 perm[EVENT_TYPE_PERMUTATION_NUM]=
{
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
INTVAR_EVENT, LOAD_EVENT, METADATA_EVENT, CREATE_FILE_EVENT,
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
NEW_LOAD_EVENT,
RAND_EVENT, USER_VAR_EVENT,
Expand Down Expand Up @@ -15727,6 +15729,251 @@ int Previous_gtids_log_event::do_update_pos(Relay_log_info *rli)
}
#endif

#ifndef MYSQL_CLIENT
Metadata_log_event::Metadata_log_event(THD *thd_arg, bool using_trans)
: Log_event(thd_arg, 0,
using_trans ? Log_event::EVENT_TRANSACTIONAL_CACHE :
Log_event::EVENT_STMT_CACHE,
Log_event::EVENT_NORMAL_LOGGING) {}

Metadata_log_event::Metadata_log_event(
THD *thd_arg, bool using_trans, uint64_t hlc_time_ns)
: Metadata_log_event(thd_arg, using_trans)
{
set_hlc_time(hlc_time_ns);
}
#endif

Metadata_log_event::Metadata_log_event(
const char *buffer,
uint event_len,
const Format_description_log_event *descr_event)
: Log_event(buffer, descr_event)
{
uint8 const common_header_len=
descr_event->common_header_len;
uint read_len= common_header_len;

char const *ptr_buffer= buffer + common_header_len;

/* Read and intialize every type in the stream for this event */
while (read_len < event_len)
{
unsigned char type = *ptr_buffer;
ptr_buffer+= ENCODED_TYPE_SIZE;
read_len+= ENCODED_TYPE_SIZE;

DBUG_ASSERT(read_len < event_len);

uint length=
read_type(static_cast<Metadata_log_event_types>(type), ptr_buffer);

ptr_buffer+= length;
read_len+= length;
}

DBUG_ASSERT((read_len == event_len) &&
(read_len == get_total_size()));
}

void Metadata_log_event::set_hlc_time(uint64_t hlc_time_ns)
{
hlc_time_ns_= hlc_time_ns;
set_exist(Metadata_log_event_types::HLC_TYPE);
// Update the size of the event when it gets serialized into the stream.
size_= size_ + ENCODED_TYPE_SIZE + ENCODED_LENGTH_SIZE + ENCODED_HLC_SIZE;
}

void Metadata_log_event::set_exist(Metadata_log_event_types type)
{
std::size_t index= static_cast<std::size_t>(type);
DBUG_ASSERT(index < existing_types_.size());
existing_types_[index]= true;
}

bool Metadata_log_event::does_exist(Metadata_log_event_types type) const
{
std::size_t index= static_cast<std::size_t>(type);
DBUG_ASSERT(index < existing_types_.size());
return existing_types_[index];
}

uint64_t Metadata_log_event::get_hlc_time() const
{
return hlc_time_ns_;
}

uint Metadata_log_event::read_type(
Metadata_log_event_types type, char const* buffer)
{
DBUG_ENTER("Metadata_log_event::read_type");
using RLET= Metadata_log_event_types;

// Read the 'length' of the field's value
uint value_length= uint2korr(buffer);

switch (type)
{
case RLET::HLC_TYPE:
// HLC is a 8 byte numerical field
DBUG_ASSERT(value_length == 8);
set_hlc_time(uint8korr(buffer + ENCODED_LENGTH_SIZE));
break;
default:
// This is a event which we do not know about. Just skip this
// NO_LINT_DEBUG
sql_print_information(
"Unknown field type %u. Skipping past this field.", (uint)type);
break;
}

// return total length read (ENCODED_LENGTH_SIZE and the length of the
// 'VALUE' of the field)
DBUG_RETURN(value_length + ENCODED_LENGTH_SIZE);
}

#ifdef MYSQL_SERVER
bool Metadata_log_event::write_data_body(IO_CACHE *file)
{
DBUG_ENTER("Metadata_log_event::write_data_body");

if (write_hlc_time(file))
DBUG_RETURN(1);

DBUG_RETURN(0);
}

bool Metadata_log_event::write_hlc_time(IO_CACHE* file)
{
DBUG_ENTER("Metadata_log_event::write_hlc_time");

if (!does_exist(Metadata_log_event_types::HLC_TYPE))
DBUG_RETURN(0); /* No need to write HLC time */

if (write_type_and_length(
file,
Metadata_log_event_types::HLC_TYPE,
sizeof(hlc_time_ns_)))
{
DBUG_RETURN(1);
}

char buffer[ENCODED_HLC_SIZE];
char* ptr_buffer= buffer;

int8store(ptr_buffer, hlc_time_ns_);
ptr_buffer+= ENCODED_HLC_SIZE;

DBUG_ASSERT(ptr_buffer == (buffer + sizeof(buffer)));

bool ret= wrapper_my_b_safe_write(file, (uchar *) buffer, sizeof(buffer));
DBUG_RETURN(ret);
}

bool Metadata_log_event::write_type_and_length(
IO_CACHE* file, Metadata_log_event_types type, uint32_t length)
{
DBUG_ENTER("Metadata_log_event::write_type_and_length");

char buffer[ENCODED_TYPE_SIZE + ENCODED_LENGTH_SIZE];
char* ptr_buffer= buffer;

*(unsigned char *)ptr_buffer= static_cast<unsigned char>(type);
ptr_buffer+= ENCODED_TYPE_SIZE;

int2store(ptr_buffer, length);
ptr_buffer+= ENCODED_LENGTH_SIZE;

DBUG_ASSERT(ptr_buffer == (buffer + sizeof(buffer)));

bool ret= wrapper_my_b_safe_write(file, (uchar *) buffer, sizeof(buffer));
DBUG_RETURN(ret);
}

#endif // MYSQL_SERVER

// Get the type of thyis event
Log_event_type Metadata_log_event::get_type_code()
{
return METADATA_EVENT;
}

// The size of the data section of the event
int Metadata_log_event::get_data_size()
{
/* data header + data body size for this event when serialized into the
stream */
return size_;
}

// Total size of the event including common header
uint Metadata_log_event::get_total_size()
{
/* common header + data header + data body */
return LOG_EVENT_HEADER_LEN + get_data_size();
}

#ifndef MYSQL_CLIENT
int Metadata_log_event::pack_info(Protocol *protocol)
{
std::string buffer;

if (does_exist(Metadata_log_event_types::HLC_TYPE))
{
buffer.append("HLC time: ");
buffer.append(std::to_string(hlc_time_ns_));
}

if (buffer.length() > 0)
protocol->store(buffer.c_str(), buffer.length(), &my_charset_bin);

return 0;
}
#endif

#ifdef MYSQL_CLIENT
void
Metadata_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
{
IO_CACHE *const head= &print_event_info->head_cache;
IO_CACHE *const body= &print_event_info->body_cache;

if (!print_event_info->short_form)
{
std::string buffer;
buffer.append("\tMetadata");

if (does_exist(Metadata_log_event_types::HLC_TYPE))
buffer.append("\tHLC time: " + std::to_string(hlc_time_ns_));

print_header(head, print_event_info, FALSE);
my_b_printf(head, "%s\n", buffer.c_str());
print_base64(body, print_event_info, FALSE);
}

return;
}
#endif

#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
int Metadata_log_event::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Metadata_log_event::do_apply_event");
DBUG_ASSERT(rli->info_thd == thd);
int error= 0;

// Stash the HLC commit timestamp of this transaction in master
thd->hlc_time_ns_next= hlc_time_ns_;

DBUG_RETURN(error);
}

int Metadata_log_event::do_update_pos(Relay_log_info *rli)
{
rli->inc_event_relay_log_pos();
return 0;
}
#endif

#ifdef MYSQL_CLIENT
/**
Expand Down
Loading

0 comments on commit 2b0611e

Please sign in to comment.