Skip to content

Commit c2c637c

Browse files
author
Jan Lindström
authored
Merge pull request #1180 from codership/10.4-load-data-splitting
10.4 make wsrep_load_data_splitting use streaming replication
2 parents 6476126 + f20dfee commit c2c637c

File tree

7 files changed

+44
-96
lines changed

7 files changed

+44
-96
lines changed

mysql-test/suite/galera_sr/r/galera_sr_load_data.result

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ SELECT COUNT(*) = 20000 FROM t1;
99
COUNT(*) = 20000
1010
1
1111
wsrep_last_committed_diff
12-
0
12+
1
1313
DROP TABLE t1;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1+
connection node_2;
2+
connection node_1;
13
SET SESSION wsrep_trx_fragment_size = 512;
24
SET GLOBAL wsrep_load_data_splitting = TRUE;
5+
Warnings:
6+
Warning 1287 '@@wsrep_load_data_splitting' is deprecated and will be removed in a future release
37
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
8+
connection node_2;
9+
connection node_1;
10+
connection node_2;
411
SELECT COUNT(*) = 95000 FROM t1;
512
COUNT(*) = 95000
613
1
714
wsrep_last_committed_diff
815
1
16+
connection node_1;
17+
Warnings:
18+
Warning 1287 '@@wsrep_load_data_splitting' is deprecated and will be removed in a future release
919
DROP TABLE t1;

mysql-test/suite/galera_sr/t/galera_sr_load_data.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
3333
SELECT COUNT(*) = 20000 FROM t1;
3434
# LOAD-ing 20K rows causes 3 commits to be registered
3535
--disable_query_log
36-
--eval SELECT $wsrep_last_committed_after - $wsrep_last_committed_before = 3 AS wsrep_last_committed_diff;
36+
--eval SELECT $wsrep_last_committed_after - $wsrep_last_committed_before = 3 AS wsrep_last_committed_diff
3737
--enable_query_log
3838

3939
DROP TABLE t1;

sql/sql_load.cc

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -100,41 +100,39 @@ class Term_string
100100
#define PUSH(A) *(stack_pos++)=(A)
101101

102102
#ifdef WITH_WSREP
103-
/** If requested by wsrep_load_data_splitting, commit and restart
104-
the transaction after every 10,000 inserted rows. */
105-
106-
static bool wsrep_load_data_split(THD *thd, const TABLE *table,
107-
const COPY_INFO &info)
103+
/** If requested by wsrep_load_data_splitting and streaming replication is
104+
not enabled, replicate a streaming fragment every 10,000 rows.*/
105+
class Wsrep_load_data_split
108106
{
109-
DBUG_ENTER("wsrep_load_data_split");
110-
111-
if (!wsrep_load_data_splitting || !WSREP(thd)
112-
|| !info.records || (info.records % 10000)
113-
|| !thd->transaction.stmt.ha_list
114-
|| thd->transaction.stmt.ha_list->ht() != binlog_hton
115-
|| !thd->transaction.stmt.ha_list->next()
116-
|| thd->transaction.stmt.ha_list->next()->next())
117-
DBUG_RETURN(false);
118-
119-
if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
107+
public:
108+
Wsrep_load_data_split(THD *thd)
109+
: m_thd(thd)
110+
, m_load_data_splitting(wsrep_load_data_splitting)
111+
, m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
112+
, m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
120113
{
121-
if (!(hton->flags & HTON_WSREP_REPLICATION))
122-
DBUG_RETURN(false);
123-
WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
124-
wsrep_tc_log_commit(thd);
125-
table->file->extra(HA_EXTRA_FAKE_START_STMT);
114+
if (WSREP(m_thd) && m_load_data_splitting)
115+
{
116+
/* Override streaming settings with backward compatible values for
117+
load data splitting */
118+
m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
119+
}
126120
}
127121

128-
DBUG_RETURN(false);
129-
}
130-
# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
131-
if (wsrep_load_data_split(thd,table,info)) \
132-
{ \
133-
table->auto_increment_field_not_null= FALSE; \
134-
DBUG_RETURN(1); \
122+
~Wsrep_load_data_split()
123+
{
124+
if (WSREP(m_thd) && m_load_data_splitting)
125+
{
126+
/* Restore original settings */
127+
m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
128+
}
135129
}
136-
#else /* WITH_WSREP */
137-
#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
130+
private:
131+
THD *m_thd;
132+
my_bool m_load_data_splitting;
133+
enum wsrep::streaming_context::fragment_unit m_fragment_unit;
134+
size_t m_fragment_size;
135+
};
138136
#endif /* WITH_WSREP */
139137

140138
class READ_INFO: public Load_data_param
@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
354352
bool transactional_table __attribute__((unused));
355353
DBUG_ENTER("mysql_load");
356354

355+
#ifdef WITH_WSREP
356+
Wsrep_load_data_split wsrep_load_data_split(thd);
357+
#endif /* WITH_WSREP */
357358
/*
358359
Bug #34283
359360
mysqlbinlog leaves tmpfile after termination if binlog contains
@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
10051006
DBUG_RETURN(-1);
10061007
}
10071008

1008-
WSREP_LOAD_DATA_SPLIT(thd, table, info);
10091009
err= write_record(thd, table, &info);
10101010
table->auto_increment_field_not_null= FALSE;
10111011
if (err)
@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
11481148
DBUG_RETURN(-1);
11491149
}
11501150

1151-
WSREP_LOAD_DATA_SPLIT(thd, table, info);
11521151
err= write_record(thd, table, &info);
11531152
table->auto_increment_field_not_null= FALSE;
11541153
if (err)
@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
12711270
DBUG_RETURN(-1);
12721271
}
12731272

1274-
WSREP_LOAD_DATA_SPLIT(thd, table, info);
12751273
err= write_record(thd, table, &info);
12761274
table->auto_increment_field_not_null= false;
12771275
if (err)

sql/wsrep_mysqld.cc

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
24992499
return 0;
25002500
}
25012501

2502-
wsrep_status_t wsrep_tc_log_commit(THD* thd)
2503-
{
2504-
int cookie;
2505-
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
2506-
2507-
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
2508-
if (wsrep_before_commit(thd, true))
2509-
{
2510-
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
2511-
thd->thread_id);
2512-
return WSREP_TRX_FAIL;
2513-
}
2514-
cookie= tc_log->log_and_order(thd, xid, 1, false, true);
2515-
if (wsrep_after_commit(thd, true))
2516-
{
2517-
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
2518-
thd->thread_id);
2519-
return WSREP_TRX_FAIL;
2520-
}
2521-
if (!cookie)
2522-
{
2523-
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
2524-
return WSREP_TRX_FAIL;
2525-
}
2526-
if (tc_log->unlog(cookie, xid))
2527-
{
2528-
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
2529-
return WSREP_TRX_FAIL;
2530-
}
2531-
2532-
if (wsrep_after_statement(thd))
2533-
{
2534-
return WSREP_TRX_FAIL;
2535-
}
2536-
/* Set wsrep transaction id if not set. */
2537-
if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
2538-
{
2539-
if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
2540-
{
2541-
thd->set_wsrep_next_trx_id(thd->query_id);
2542-
}
2543-
DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
2544-
}
2545-
if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
2546-
{
2547-
return WSREP_TRX_FAIL;
2548-
}
2549-
DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
2550-
return WSREP_OK;
2551-
}
2552-
25532502
int wsrep_thd_retry_counter(const THD *thd)
25542503
{
25552504
return thd->wsrep_retry_counter;

sql/wsrep_mysqld.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -444,15 +444,6 @@ bool wsrep_provider_is_SR_capable();
444444
*/
445445
int wsrep_ordered_commit_if_no_binlog(THD*, bool);
446446

447-
/**
448-
* Commit the current transaction with the
449-
* MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h).
450-
* Calling this function will generate and assign a new wsrep transaction id
451-
* for `thd`.
452-
* @return WSREP_OK on success or other WSREP_* error code on failure
453-
*/
454-
wsrep_status_t wsrep_tc_log_commit(THD* thd);
455-
456447
/**
457448
* Initialize WSREP server instance.
458449
*

0 commit comments

Comments
 (0)