Skip to content

Commit

Permalink
[MDEV-6877] Add binlog writing according to columns
Browse files Browse the repository at this point in the history
The logging function now writes to the binlog according
to the bitmap columns implied by the binlog_row_image variable.
  • Loading branch information
cvicentiu committed Jun 30, 2015
1 parent edff3f3 commit 8bd5301
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 38 deletions.
6 changes: 2 additions & 4 deletions sql/handler.cc
Expand Up @@ -5658,8 +5658,7 @@ static int write_locked_table_maps(THD *thd)
}


typedef bool Log_func(THD*, TABLE*, bool, MY_BITMAP*,
uint, const uchar*, const uchar*);
typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*);

static int binlog_log_row(TABLE* table,
const uchar *before_record,
Expand Down Expand Up @@ -5705,8 +5704,7 @@ static int binlog_log_row(TABLE* table,
*/
bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE ||
table->file->has_transactions();
error= (*log_func)(thd, table, has_trans, &cols, table->s->fields,
before_record, after_record);
error= (*log_func)(thd, table, has_trans, before_record, after_record);
}
if (!use_bitbuf)
my_bitmap_free(&cols);
Expand Down
13 changes: 3 additions & 10 deletions sql/log_event.h
Expand Up @@ -4528,14 +4528,11 @@ class Write_rows_log_event : public Rows_log_event
#if defined(MYSQL_SERVER)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
return thd->binlog_write_row(table, is_transactional,
cols, fields, after_record);
return thd->binlog_write_row(table, is_transactional, after_record);
}
#endif

Expand Down Expand Up @@ -4596,13 +4593,11 @@ class Update_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record,
const uchar *after_record)
{
return thd->binlog_update_row(table, is_transactional,
cols, fields, before_record, after_record);
before_record, after_record);
}
#endif

Expand Down Expand Up @@ -4668,14 +4663,12 @@ class Delete_rows_log_event : public Rows_log_event
#ifdef MYSQL_SERVER
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record,
const uchar *after_record
__attribute__((unused)))
{
return thd->binlog_delete_row(table, is_transactional,
cols, fields, before_record);
before_record);
}
#endif

Expand Down
10 changes: 3 additions & 7 deletions sql/log_event_old.h
Expand Up @@ -369,14 +369,11 @@ class Write_rows_log_event_old : public Old_rows_log_event
#if !defined(MYSQL_CLIENT)
static bool binlog_row_logging_function(THD *thd, TABLE *table,
bool is_transactional,
MY_BITMAP *cols,
uint fields,
const uchar *before_record
__attribute__((unused)),
const uchar *after_record)
{
return thd->binlog_write_row(table, is_transactional,
cols, fields, after_record);
return thd->binlog_write_row(table, is_transactional, after_record);
}
#endif

Expand Down Expand Up @@ -452,7 +449,7 @@ class Update_rows_log_event_old : public Old_rows_log_event
const uchar *after_record)
{
return thd->binlog_update_row(table, is_transactional,
cols, fields, before_record, after_record);
before_record, after_record);
}
#endif

Expand Down Expand Up @@ -526,8 +523,7 @@ class Delete_rows_log_event_old : public Old_rows_log_event
const uchar *after_record
__attribute__((unused)))
{
return thd->binlog_delete_row(table, is_transactional,
cols, fields, before_record);
return thd->binlog_delete_row(table, is_transactional, before_record);
}
#endif

Expand Down
123 changes: 109 additions & 14 deletions sql/sql_class.cc
Expand Up @@ -6112,9 +6112,8 @@ CPP_UNNAMED_NS_START

CPP_UNNAMED_NS_END

int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
int THD::binlog_write_row(TABLE* table, bool is_trans,
uchar const *record)
{

DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
Expand All @@ -6129,7 +6128,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,

uchar *row_data= memory.slot(0);

size_t const len= pack_row(table, cols, row_data, record);
size_t const len= pack_row(table, table->write_set, row_data, record);

/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
Expand All @@ -6147,13 +6146,19 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
}

int THD::binlog_update_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
const uchar *before_record,
const uchar *after_record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));

/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;

size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);

Expand All @@ -6164,9 +6169,9 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
uchar *before_row= row_data.slot(0);
uchar *after_row= row_data.slot(1);

size_t const before_size= pack_row(table, cols, before_row,
size_t const before_size= pack_row(table, table->read_set, before_row,
before_record);
size_t const after_size= pack_row(table, cols, after_row,
size_t const after_size= pack_row(table, table->write_set, after_row,
after_record);

/* Ensure that all events in a GTID group are in the same cache */
Expand All @@ -6192,19 +6197,37 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;

return
ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);
int error= ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);

/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);

return error;

}

int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));
/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;

/*
/**
This will remove spurious fields required during execution but
not needed for binlogging. This is done according to the:
binlog-row-image option.
*/
binlog_prepare_row_images(table);

/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
Expand All @@ -6214,7 +6237,8 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,

uchar *row_data= memory.slot(0);

size_t const len= pack_row(table, cols, row_data, record);
DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
size_t const len= pack_row(table, table->read_set, row_data, record);

/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
Expand All @@ -6228,10 +6252,81 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;

return ev->add_row_data(row_data, len);

int error= ev->add_row_data(row_data, len);

/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);

return error;
}


void THD::binlog_prepare_row_images(TABLE *table)
{
DBUG_ENTER("THD::binlog_prepare_row_images");
/**
Remove from read_set spurious columns. The write_set has been
handled before in table->mark_columns_needed_for_update.
*/

DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set);
THD *thd= table->in_use;

/**
if there is a primary key in the table (ie, user declared PK or a
non-null unique index) and we dont want to ship the entire image,
and the handler involved supports this.
*/
if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
{
/**
Just to be sure that tmp_set is currently not in use as
the read_set already.
*/
DBUG_ASSERT(table->read_set != &table->tmp_set);

bitmap_clear_all(&table->tmp_set);

switch(thd->variables.binlog_row_image)
{
case BINLOG_ROW_IMAGE_MINIMAL:
/* MINIMAL: Mark only PK */
table->mark_columns_used_by_index_no_reset(table->s->primary_key,
&table->tmp_set);
break;
case BINLOG_ROW_IMAGE_NOBLOB:
/**
NOBLOB: Remove unnecessary BLOB fields from read_set
(the ones that are not part of PK).
*/
bitmap_union(&table->tmp_set, table->read_set);
for (Field **ptr=table->field ; *ptr ; ptr++)
{
Field *field= (*ptr);
if ((field->type() == MYSQL_TYPE_BLOB) &&
!(field->flags & PRI_KEY_FLAG))
bitmap_clear_bit(&table->tmp_set, field->field_index);
}
break;
default:
DBUG_ASSERT(0); // impossible.
}

/* set the temporary read_set */
table->column_bitmaps_set_no_signal(&table->tmp_set,
table->write_set);
}

DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set);
DBUG_VOID_RETURN;
}



int THD::binlog_remove_pending_rows_event(bool clear_maps,
bool is_transactional)
{
Expand Down
4 changes: 1 addition & 3 deletions sql/sql_class.h
Expand Up @@ -2127,14 +2127,12 @@ class THD :public Statement,
int binlog_write_table_map(TABLE *table, bool is_transactional,
my_bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
int binlog_delete_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
int binlog_update_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *old_data, const uchar *new_data);
void binlog_prepare_row_images(TABLE* table);

void set_server_id(uint32 sid) { variables.server_id = sid; }

Expand Down

0 comments on commit 8bd5301

Please sign in to comment.