Skip to content

Commit

Permalink
optimize the memory allocation for compressed binlog event
Browse files Browse the repository at this point in the history
  • Loading branch information
vinchen authored and knielsen committed Oct 19, 2016
1 parent 640051e commit d4b2c9b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 53 deletions.
116 changes: 75 additions & 41 deletions sql/log_event.cc
Expand Up @@ -789,7 +789,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
*/

int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen)
const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen)
{
ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src;
Expand All @@ -810,37 +811,54 @@ int query_event_uncompress(const Format_description_log_event *description_event
*newlen = (tmp - src) + un_len;
if(contain_checksum)
*newlen += BINLOG_CHECKSUM_LEN;

*dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE));
if (!*dst)
{
return 1;
}

uint32 alloc_size = ALIGN_SIZE(*newlen);
char *new_dst = NULL;

/* copy the head*/
memcpy(*dst, src , tmp - src);
if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len))
*is_malloc = false;
if (alloc_size <= buf_size)
{
new_dst = buf;
}
else
{
new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
if (!new_dst)
return 1;

*is_malloc = true;
}

/* copy the head*/
memcpy(new_dst, src , tmp - src);
if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{
my_free(*dst);
if (*is_malloc)
my_free(new_dst);

*is_malloc = false;

return 1;
}

(*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT;
int4store(*dst + EVENT_LEN_OFFSET, *newlen);
new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len));
int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
}
*dst = new_dst;
return 0;
}

int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen)
int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen)
{
Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src;
char *buf = NULL;
char *new_dst = NULL;

DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));

Expand Down Expand Up @@ -884,28 +902,39 @@ int Row_log_event_uncompress(const Format_description_log_event *description_eve
*newlen += BINLOG_CHECKSUM_LEN;

uint32 alloc_size = ALIGN_SIZE(*newlen);
buf = (char *)my_malloc(alloc_size , MYF(MY_FAE));
if (!buf)
{
return 1;
}

*is_malloc = false;
if (alloc_size <= buf_size)
{
new_dst = buf;
}
else
{
new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
if (!new_dst)
return 1;

*is_malloc = true;
}

/* copy the head*/
memcpy(buf, src , tmp - src);
memcpy(new_dst, src , tmp - src);
/* uncompress the body */
if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len))
if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{
my_free(buf);
if (*is_malloc)
my_free(new_dst);

return 1;
}

buf[EVENT_TYPE_OFFSET] = type;
int4store(buf + EVENT_LEN_OFFSET, *newlen);
new_dst[EVENT_TYPE_OFFSET] = type;
int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len));
int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
}
*dst = buf;
*dst = new_dst;
return 0;
}

Expand Down Expand Up @@ -3504,14 +3533,15 @@ bool Query_compressed_log_event::write()
{
const char *query_tmp = query;
uint32 q_len_tmp = q_len;
uint32 alloc_size;
bool ret = true;
q_len = binlog_get_compress_len(q_len);
query = (char *)my_malloc(q_len, MYF(MY_FAE));
q_len = alloc_size = binlog_get_compress_len(q_len);
query = (char *)my_safe_alloca(alloc_size);
if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len))
{
ret = Query_log_event::write();
}
my_free((void *)query);
my_safe_afree((void *)query, alloc_size);
query = query_tmp;
q_len = q_len_tmp;
return ret;
Expand Down Expand Up @@ -10786,14 +10816,15 @@ bool Rows_log_event::write_compressed()
uchar *m_rows_buf_tmp = m_rows_buf;
uchar *m_rows_cur_tmp = m_rows_cur;
bool ret = true;
uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE));
uint32 comlen, alloc_size;
comlen = alloc_size = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
m_rows_buf = (uchar *)my_safe_alloca(alloc_size);
if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
{
m_rows_cur = comlen + m_rows_buf;
ret = Log_event::write();
}
my_free(m_rows_buf);
my_safe_afree(m_rows_buf, alloc_size);
m_rows_buf = m_rows_buf_tmp;
m_rows_cur = m_rows_cur_tmp;
return ret;
Expand Down Expand Up @@ -12242,8 +12273,9 @@ void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_
{
char *new_buf;
ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
temp_buf, &new_buf, &len))
bool is_malloc = false;
if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
Expand Down Expand Up @@ -12911,8 +12943,9 @@ void Delete_rows_compressed_log_event::print(FILE *file,
{
char *new_buf;
ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
temp_buf, &new_buf, &len))
bool is_malloc = false;
if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
Expand Down Expand Up @@ -13167,8 +13200,9 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print
{
char *new_buf;
ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
temp_buf, &new_buf, &len))
bool is_malloc = false;
if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
Expand Down
10 changes: 6 additions & 4 deletions sql/log_event.h
Expand Up @@ -5074,11 +5074,13 @@ int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen
uint32 binlog_get_compress_len(uint32 len);
uint32 binlog_get_uncompress_len(const char *buf);

int query_event_uncompress(const Format_description_log_event *description_event,
bool contain_checksum, const char *src, char **dst, ulong *newlen);
int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen);

int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen);
int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen);


#endif /* _log_event_h */
23 changes: 15 additions & 8 deletions sql/slave.cc
Expand Up @@ -5664,7 +5664,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
bool compressed_event = FALSE;
bool is_compress_event = false;
char* new_buf = NULL;
char new_buf_arr[4096];
bool is_malloc = false;

/*
FD_q must have been prepared for the first R_a event
Expand Down Expand Up @@ -6150,7 +6153,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
case QUERY_COMPRESSED_EVENT:
inc_pos= event_len;
if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
Expand All @@ -6159,7 +6163,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(llbuf, strlen(llbuf));
goto err;
}
compressed_event = true;
buf = new_buf;
is_compress_event = true;
goto default_action;

case WRITE_ROWS_COMPRESSED_EVENT:
Expand All @@ -6170,7 +6175,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case DELETE_ROWS_COMPRESSED_EVENT_V1:
inc_pos = event_len;
{
if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
Expand All @@ -6180,7 +6186,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
}
compressed_event = true;
buf = new_buf;
is_compress_event = true;
goto default_action;

#ifndef DBUG_OFF
Expand Down Expand Up @@ -6233,7 +6240,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}

if (!compressed_event)
if (!is_compress_event)
inc_pos= event_len;

break;
Expand Down Expand Up @@ -6429,8 +6436,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());

if(compressed_event)
my_free((void *)buf);
if(is_malloc)
my_free((void *)new_buf);

DBUG_RETURN(error);
}
Expand Down

0 comments on commit d4b2c9b

Please sign in to comment.