Skip to content

Commit

Permalink
fix(sql,tianmu):fix when binlog format is row, the load data statemen…
Browse files Browse the repository at this point in the history
…t cannot be recorded(stoneatom#1876)

1. Actually, tianmu uses its own code to handle load which lacks support of row format of binlog
2. When tianmu parsing rows,  write table map event first
3. Once tianmu constructs a row, just add it to the rows log event, when parsing is done,
the rows log event will also be ready, then write it to the binlog
  • Loading branch information
augety committed Jun 30, 2023
1 parent 798ca7a commit 56a9557
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 6 deletions.
57 changes: 57 additions & 0 deletions mysql-test/suite/tianmu/r/issue1876.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
[connection master]
create table t1 (b int not null default 1, c varchar(60) default '\\')engine=tianmu;
insert into t1 values(1, 'AAAAAAAA');
insert into t1 values(2, 'BBBBBBBB');
SELECT * from t1 INTO OUTFILE '1876_tmp_dat';
create table t2 like t1;
load data infile '1876_tmp_dat' into table t2;
CREATE TABLE `column_type_test1` (
`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
`c_int` int(11) DEFAULT NULL COMMENT 'int',
`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint',
`c_float` float DEFAULT NULL COMMENT 'float',
`c_double` double DEFAULT NULL COMMENT 'double',
`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal',
`c_date` date DEFAULT NULL COMMENT 'date',
`c_datetime` datetime DEFAULT NULL COMMENT 'datetime',
`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp',
`c_time` time DEFAULT NULL COMMENT 'time',
`c_char` char(10) DEFAULT NULL COMMENT 'char',
`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar',
`c_blob` blob COMMENT 'blob',
`c_text` text COMMENT 'text',
`c_longblob` longblob COMMENT 'longblob'
) engine=tianmu;
insert into column_type_test1 values(100, 100, 100, 100, 100, 5.2, 10.88, 100.08300, '2016-02-25', '2016-02-25 10:20:01', '2007-04-23 08:12:49', '10:20:01', 'stonedb', 'hello', null, 'bcdefghijklmn', null);
insert into column_type_test1 values(101, 101, 101, 101, 101, 5.2, 10.88, 101.08300, '2016-02-25', '2016-02-25 10:20:01', '1985-08-11 09:10:25', '10:20:01', 'stoneatom', 'hello', null, 'bcdefghijklmn', null);
SELECT * from column_type_test1 INTO OUTFILE '1876_tmp1_dat';
create table column_type_test2 like column_type_test1;
load data infile '1876_tmp1_dat' into table column_type_test2;
create table user_t1(id int, department varchar(10)) engine=tianmu;
SELECT * from user_t1 INTO OUTFILE '1876_tmp2_dat';
create table user_t2 like user_t1;
load data infile '1876_tmp2_dat' into table user_t2;
SHOW STATUS LIKE 'Slave_running';
Variable_name Value
Slave_running ON
select * from t2;
b c
1 AAAAAAAA
2 BBBBBBBB
select * from column_type_test2;
c_tinyint c_smallint c_mediumint c_int c_bigint c_float c_double c_decimal c_date c_datetime c_timestamp c_time c_char c_varchar c_blob c_text c_longblob
100 100 100 100 100 5.2 10.88 100.08300 2016-02-25 2016-02-25 10:20:01 2007-04-23 08:12:49 10:20:01 stonedb hello NULL bcdefghijklmn NULL
101 101 101 101 101 5.2 10.88 101.08300 2016-02-25 2016-02-25 10:20:01 1985-08-11 09:10:25 10:20:01 stoneatom hello NULL bcdefghijklmn NULL
checksum table user_t2;
Table Checksum
test.user_t2 536836232
drop table t1, t2;
drop table column_type_test1, column_type_test2;
drop table user_t1, user_t2;
include/rpl_end.inc
5 changes: 5 additions & 0 deletions mysql-test/suite/tianmu/t/issue1876-master.opt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
--testcase-timeout=40
--secure-file-priv=""
--tianmu_insert_delayed=off
--log-bin=bin
--binlog_format=row
65 changes: 65 additions & 0 deletions mysql-test/suite/tianmu/t/issue1876.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
--source include/master-slave.inc

connection master;
create table t1 (b int not null default 1, c varchar(60) default '\\')engine=tianmu;
insert into t1 values(1, 'AAAAAAAA');
insert into t1 values(2, 'BBBBBBBB');
SELECT * from t1 INTO OUTFILE '1876_tmp_dat';
create table t2 like t1;
load data infile '1876_tmp_dat' into table t2;

CREATE TABLE `column_type_test1` (
`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
`c_int` int(11) DEFAULT NULL COMMENT 'int',
`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint',
`c_float` float DEFAULT NULL COMMENT 'float',
`c_double` double DEFAULT NULL COMMENT 'double',
`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal',
`c_date` date DEFAULT NULL COMMENT 'date',
`c_datetime` datetime DEFAULT NULL COMMENT 'datetime',
`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp',
`c_time` time DEFAULT NULL COMMENT 'time',
`c_char` char(10) DEFAULT NULL COMMENT 'char',
`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar',
`c_blob` blob COMMENT 'blob',
`c_text` text COMMENT 'text',
`c_longblob` longblob COMMENT 'longblob'
) engine=tianmu;
insert into column_type_test1 values(100, 100, 100, 100, 100, 5.2, 10.88, 100.08300, '2016-02-25', '2016-02-25 10:20:01', '2007-04-23 08:12:49', '10:20:01', 'stonedb', 'hello', null, 'bcdefghijklmn', null);
insert into column_type_test1 values(101, 101, 101, 101, 101, 5.2, 10.88, 101.08300, '2016-02-25', '2016-02-25 10:20:01', '1985-08-11 09:10:25', '10:20:01', 'stoneatom', 'hello', null, 'bcdefghijklmn', null);
SELECT * from column_type_test1 INTO OUTFILE '1876_tmp1_dat';
create table column_type_test2 like column_type_test1;
load data infile '1876_tmp1_dat' into table column_type_test2;

create table user_t1(id int, department varchar(10)) engine=tianmu;
--disable_query_log
let $i = 0;
while($i < 70000)
{
eval insert into user_t1 values($i, 'stonedb');
inc $i;
}
--enable_query_log
SELECT * from user_t1 INTO OUTFILE '1876_tmp2_dat';
create table user_t2 like user_t1;
load data infile '1876_tmp2_dat' into table user_t2;

--sync_slave_with_master

connection slave;
# check the rpl is running normally
SHOW STATUS LIKE 'Slave_running';

# the data in table t2 in slave is the same to that's in master, means the binlog is written correctly
select * from t2;
select * from column_type_test2;
checksum table user_t2;

connection master;
drop table t1, t2;
drop table column_type_test1, column_type_test2;
drop table user_t1, user_t2;
--sync_slave_with_master
--source include/rpl_end.inc
6 changes: 4 additions & 2 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8802,12 +8802,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
DBUG_RETURN(RESULT_ABORTED);
}
}
else if (real_trans && xid && trn_ctx->rw_ha_count(trx_scope) > 1 &&
!trn_ctx->no_2pc(trx_scope))
else if (thd->tianmu_need_xid || (real_trans && xid && trn_ctx->rw_ha_count(trx_scope) > 1 &&
!trn_ctx->no_2pc(trx_scope)))
{
Xid_log_event end_evt(thd, xid);
if (cache_mngr->trx_cache.finalize(thd, &end_evt))
DBUG_RETURN(RESULT_ABORTED);
// used for tianmu only
thd->tianmu_need_xid= false;
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,8 @@ class THD :public MDL_context_owner,
{ assert(0); return Query_arena::is_conventional(); }

public:
/* Used only for tianmu to wirte xid log event */
bool tianmu_need_xid;
MDL_context mdl_context;

/*
Expand Down
42 changes: 38 additions & 4 deletions storage/tianmu/core/tianmu_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1132,11 +1132,20 @@ uint64_t TianmuTable::ProceedNormal(system::IOParameters &iop) {

auto no_loaded_rows = parser.GetNoRow();

if (no_loaded_rows > 0 && mysql_bin_log.is_open())
if (binlog_load_query_log_event(iop) != 0) {
TIANMU_LOG(LogCtl_Level::ERROR, "Write load binlog fail!");
throw common::FormatException("Write load binlog fail!");
if (no_loaded_rows > 0 && mysql_bin_log.is_open()) {
LOAD_FILE_INFO *lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo();
THD *thd = lf_info->thd;
if (thd->is_current_stmt_binlog_format_row()) { // if binlog format is row
if (binlog_flush_pending_rows_event(iop, true, iop.GetTable()->file->has_transactions()) != 0) {
TIANMU_LOG(LogCtl_Level::ERROR, "Write row binlog fail!");
throw common::FormatException("Write row binlog fail!");
}
} else if (binlog_load_query_log_event(iop) != 0) {
TIANMU_LOG(LogCtl_Level::ERROR, "Write statement binlog fail!");
throw common::FormatException("Write statement binlog fail!");
}
}

timer.Print(__PRETTY_FUNCTION__);

no_rejected_rows = parser.GetNumOfRejectedRows();
Expand All @@ -1151,6 +1160,31 @@ uint64_t TianmuTable::ProceedNormal(system::IOParameters &iop) {
return no_loaded_rows;
}

int TianmuTable::binlog_flush_pending_rows_event(system::IOParameters &iop, bool stmt_end, bool is_transactional) {
DBUG_ENTER(__PRETTY_FUNCTION__);
/*
We shall flush the pending event even if we are not in row-based
mode: it might be the case that we left row-based mode before
flushing anything (e.g., if we have explicitly locked tables).
*/
if (!mysql_bin_log.is_open())
DBUG_RETURN(0);

LOAD_FILE_INFO *lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo();
THD *thd = lf_info->thd;
thd->tianmu_need_xid = true;
int error = 0;

if (Rows_log_event *pending = thd->binlog_get_pending_rows_event(is_transactional)) {
if (stmt_end) {
pending->set_flags(Rows_log_event::STMT_END_F);
thd->clear_binlog_table_maps();
}
error = mysql_bin_log.flush_and_set_pending_rows_event(thd, 0, is_transactional);
}
DBUG_RETURN(error);
}

int TianmuTable::binlog_load_query_log_event(system::IOParameters &iop) {
char *load_data_query, *end, *fname_start, *fname_end, *p = nullptr;
size_t pl = 0;
Expand Down
1 change: 1 addition & 0 deletions storage/tianmu/core/tianmu_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class TianmuTable final : public JustATable {
uint64_t ProceedNormal(system::IOParameters &iop);
uint64_t ProcessDelayed(system::IOParameters &iop);
void Field2VC(Field *f, loader::ValueCache &vc, size_t col);
int binlog_flush_pending_rows_event(system::IOParameters &iop, bool stmt_end, bool is_transactional);
int binlog_load_query_log_event(system::IOParameters &iop);
int binlog_insert2load_log_event(system::IOParameters &iop);
int binlog_insert2load_block(std::vector<loader::ValueCache> &vcs, uint load_obj, system::IOParameters &iop);
Expand Down
15 changes: 15 additions & 0 deletions storage/tianmu/loader/load_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,25 @@ uint LoadParser::GetPackrow(uint no_of_rows, std::vector<ValueCache> &value_buff
value_buffers.emplace_back(pack_size_, init_capacity);
}

THD *thd = io_param_.GetTHD();
TABLE *table = io_param_.GetTable();
bool is_transactional = table->file->has_transactions();
bool need_rows_binlog = false;
if (mysql_bin_log.is_open() && thd->is_current_stmt_binlog_format_row()) {
need_rows_binlog = true;
const bool has_trans = thd->lex->sql_command == SQLCOM_CREATE_TABLE || is_transactional;
bool need_binlog_rows_query = thd->variables.binlog_rows_query_log_events;
/* write table map event */
[[maybe_unused]] int err = thd->binlog_write_table_map(table, has_trans, need_binlog_rows_query);
}

uint no_of_rows_returned;
for (no_of_rows_returned = 0; no_of_rows_returned < no_of_rows; no_of_rows_returned++) {
if (!MakeRow(value_buffers))
break;
/* write row after one row is ready */
if (need_rows_binlog)
[[maybe_unused]] int err = thd->binlog_write_row(table, is_transactional, table->record[0], NULL);
}

last_pack_size_.clear();
Expand Down

0 comments on commit 56a9557

Please sign in to comment.