Skip to content

Commit b39ea86

Browse files
plampiovuvova
authored andcommitted
MDEV-36077: Galera feature: Retry applying writesets at slaves
A new Galera feature that allows retrying of applying of writesets at slave nodes (codership/mysql-wsrep-bugs/#1619). Currently replication applying stops for first non ignored failure occurring in event applying, and node will do emergency abort (or start inconsistency voting). Some failures, however, can be concurrency related, and applying may succeed if the operation is tried at later time. This feature introduces a new dynamic global option variable "wsrep_applier_retry_count" that controls the retry-applying feature: a zero value disables retrying and a positive value sets the maximum number of retry attempts. The default value for this option is zero, which means that this feature is disabled by default. Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
1 parent dae5a99 commit b39ea86

12 files changed

+154
-31
lines changed

mysql-test/suite/galera/r/galera_defaults.result

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ AND VARIABLE_NAME NOT IN (
2121
ORDER BY VARIABLE_NAME;
2222
VARIABLE_NAME VARIABLE_VALUE
2323
WSREP_ALLOWLIST
24+
WSREP_APPLIER_RETRY_COUNT 0
2425
WSREP_AUTO_INCREMENT_CONTROL ON
2526
WSREP_CERTIFICATION_RULES strict
2627
WSREP_CERTIFY_NONPK ON

mysql-test/suite/sys_vars/r/sysvars_wsrep.result

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,21 @@ ENUM_VALUE_LIST NULL
1616
READ_ONLY YES
1717
COMMAND_LINE_ARGUMENT REQUIRED
1818
GLOBAL_VALUE_PATH NULL
19+
VARIABLE_NAME WSREP_APPLIER_RETRY_COUNT
20+
SESSION_VALUE NULL
21+
GLOBAL_VALUE 0
22+
GLOBAL_VALUE_ORIGIN COMPILE-TIME
23+
DEFAULT_VALUE 0
24+
VARIABLE_SCOPE GLOBAL
25+
VARIABLE_TYPE INT UNSIGNED
26+
VARIABLE_COMMENT Maximum number of applier retry attempts
27+
NUMERIC_MIN_VALUE 0
28+
NUMERIC_MAX_VALUE 4294967295
29+
NUMERIC_BLOCK_SIZE 1
30+
ENUM_VALUE_LIST NULL
31+
READ_ONLY NO
32+
COMMAND_LINE_ARGUMENT OPTIONAL
33+
GLOBAL_VALUE_PATH NULL
1934
VARIABLE_NAME WSREP_AUTO_INCREMENT_CONTROL
2035
SESSION_VALUE NULL
2136
GLOBAL_VALUE ON

sql/log_event_server.cc

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4993,7 +4993,33 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
49934993
/* remove trigger's tables */
49944994
goto err;
49954995
}
4996-
4996+
#ifdef WITH_WSREP
4997+
DBUG_EXECUTE_IF("apply_event_fail_once", {
4998+
if (WSREP(thd)) {
4999+
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
5000+
error= HA_ERR_LOCK_WAIT_TIMEOUT;
5001+
slave_rows_error_report(
5002+
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
5003+
get_type_str(), RPL_LOG_NAME, log_pos);
5004+
my_error(error, MYF(0));
5005+
thd->is_slave_error= 1;
5006+
DBUG_SET("-d,apply_event_fail_once");
5007+
goto err;
5008+
}
5009+
};);
5010+
DBUG_EXECUTE_IF("apply_event_fail_always", {
5011+
if (WSREP(thd)) {
5012+
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
5013+
error= HA_ERR_LOCK_WAIT_TIMEOUT;
5014+
slave_rows_error_report(
5015+
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
5016+
get_type_str(), RPL_LOG_NAME, log_pos);
5017+
my_error(error, MYF(0));
5018+
thd->is_slave_error= 1;
5019+
goto err;
5020+
}
5021+
};);
5022+
#endif /* WITH_WSREP */
49975023
/*
49985024
When the open and locking succeeded, we check all tables to
49995025
ensure that they still have the correct type.

sql/sys_vars.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6700,6 +6700,12 @@ static Sys_var_charptr Sys_wsrep_allowlist(
67006700
READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG),
67016701
DEFAULT(""));
67026702

6703+
static Sys_var_uint Sys_wsrep_applier_retry_count (
6704+
"wsrep_applier_retry_count", "Maximum number of applier retry attempts",
6705+
GLOBAL_VAR(wsrep_applier_retry_count), CMD_LINE(OPT_ARG),
6706+
VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1),
6707+
NO_MUTEX_GUARD, NOT_IN_BINLOG);
6708+
67036709
#endif /* WITH_WSREP */
67046710

67056711
static bool fix_host_cache_size(sys_var *, THD *, enum_var_type)

sql/transaction.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,3 +784,13 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
784784

785785
DBUG_RETURN(MY_TEST(res));
786786
}
787+
788+
#ifdef WITH_WSREP
789+
/* check if a named savepoint exists for the current transaction */
790+
bool trans_savepoint_exists(THD *thd, LEX_CSTRING name)
791+
{
792+
SAVEPOINT **sv = find_savepoint(thd, Lex_ident_savepoint(name));
793+
794+
return (*sv != NULL);
795+
}
796+
#endif /* WITH_WSREP */

sql/transaction.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ bool trans_rollback_stmt(THD *thd);
3434
bool trans_savepoint(THD *thd, LEX_CSTRING name);
3535
bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name);
3636
bool trans_release_savepoint(THD *thd, LEX_CSTRING name);
37+
#ifdef WITH_WSREP
38+
bool trans_savepoint_exists(THD *thd, LEX_CSTRING name);
39+
#endif /* WITH_WSREP */
3740

3841
void trans_reset_one_shot_chistics(THD *thd);
3942

sql/wsrep_applier.cc

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,17 +128,19 @@ void wsrep_store_error(const THD* const thd,
128128
dst.size(), dst.size() ? dst.data() : "(null)");
129129
}
130130

131-
int wsrep_apply_events(THD* thd,
132-
Relay_log_info* rli,
133-
const void* events_buf,
134-
size_t buf_len)
131+
static int apply_events(THD* thd,
132+
Relay_log_info* rli,
133+
const void* events_buf,
134+
size_t buf_len,
135+
const LEX_STRING &savepoint,
136+
bool set_savepoint)
135137
{
136138
char *buf= (char *)events_buf;
137139
int rcode= 0;
138140
int event= 1;
139141
Log_event_type typ;
140142

141-
DBUG_ENTER("wsrep_apply_events");
143+
DBUG_ENTER("apply_events");
142144
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
143145
(long long) wsrep_thd_trx_seqno(thd));
144146

@@ -148,6 +150,15 @@ int wsrep_apply_events(THD* thd,
148150
else
149151
thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
150152

153+
bool in_trans = thd->in_active_multi_stmt_transaction();
154+
if (in_trans && set_savepoint) {
155+
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() &&
156+
trans_savepoint(thd, savepoint)) {
157+
rcode = 1;
158+
goto error;
159+
}
160+
}
161+
151162
while (buf_len)
152163
{
153164
int exec_res;
@@ -254,6 +265,19 @@ int wsrep_apply_events(THD* thd,
254265
delete ev;
255266
goto error;
256267
}
268+
269+
/* Transaction was started by the event, set the savepoint to rollback to
270+
* in case of failure. */
271+
if (!in_trans && thd->in_active_multi_stmt_transaction()) {
272+
in_trans = true;
273+
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming()
274+
&& set_savepoint && trans_savepoint(thd, savepoint)) {
275+
delete ev;
276+
rcode = 1;
277+
goto error;
278+
}
279+
}
280+
257281
event++;
258282

259283
delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
@@ -267,3 +291,53 @@ int wsrep_apply_events(THD* thd,
267291

268292
DBUG_RETURN(rcode);
269293
}
294+
295+
int wsrep_apply_events(THD* thd,
296+
Relay_log_info* rli,
297+
const wsrep::const_buffer& data,
298+
wsrep::mutable_buffer& err,
299+
bool const include_msg)
300+
{
301+
static char savepoint_name[20] = "wsrep_retry";
302+
static size_t savepoint_name_len = strlen(savepoint_name);
303+
static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len };
304+
uint n_retries = 0;
305+
bool savepoint_exists = false;
306+
307+
int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true);
308+
309+
while (ret && n_retries < wsrep_applier_retry_count &&
310+
(savepoint_exists = trans_savepoint_exists(thd, savepoint))) {
311+
/* applying failed, retry applying events */
312+
313+
/* rollback to savepoint without telling Wsrep-lib */
314+
thd->variables.wsrep_on = false;
315+
if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) {
316+
thd->variables.wsrep_on = true;
317+
break;
318+
}
319+
thd->variables.wsrep_on = true;
320+
321+
/* reset THD object for retry */
322+
thd->clear_error();
323+
thd->reset_for_next_command();
324+
325+
/* retry applying events */
326+
ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false);
327+
n_retries++;
328+
}
329+
330+
if (savepoint_exists) {
331+
trans_release_savepoint(thd, savepoint);
332+
}
333+
334+
if (ret || wsrep_thd_has_ignored_error(thd))
335+
{
336+
if (ret) {
337+
wsrep_store_error(thd, err, include_msg);
338+
}
339+
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
340+
}
341+
342+
return ret;
343+
}

sql/wsrep_applier.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
#include "rpl_rli.h" // Relay_log_info
2121
#include "log_event.h" // Format_description_log_event
2222

23-
int wsrep_apply_events(THD* thd,
24-
Relay_log_info* rli,
25-
const void* events_buf,
26-
size_t buf_len);
23+
int wsrep_apply_events(THD* thd,
24+
Relay_log_info* rli,
25+
const wsrep::const_buffer& data,
26+
wsrep::mutable_buffer& err,
27+
bool const include_msg);
2728

2829
/* Applier error codes, when nothing better is available. */
2930
#define WSREP_RET_SUCCESS 0 // Success

sql/wsrep_high_priority_service.cc

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -123,23 +123,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd)
123123
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
124124
}
125125

126-
static int apply_events(THD* thd,
127-
Relay_log_info* rli,
128-
const wsrep::const_buffer& data,
129-
wsrep::mutable_buffer& err,
130-
bool const include_msg)
131-
{
132-
int const ret= wsrep_apply_events(thd, rli, data.data(), data.size());
133-
if (ret || wsrep_thd_has_ignored_error(thd))
134-
{
135-
if (ret)
136-
{
137-
wsrep_store_error(thd, err, include_msg);
138-
}
139-
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
140-
}
141-
return ret;
142-
}
143126

144127
/****************************************************************************
145128
High priority service
@@ -442,7 +425,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
442425
#endif
443426

444427
thd->set_time();
445-
int ret= apply_events(thd, m_rli, data, err, false);
428+
int ret= wsrep_apply_events(thd, m_rli, data, err, false);
446429
wsrep_thd_set_ignored_error(thd, false);
447430
trans_commit(thd);
448431

@@ -610,7 +593,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
610593
#endif /* ENABLED_DEBUG_SYNC */
611594

612595
wsrep_setup_uk_and_fk_checks(thd);
613-
int ret= apply_events(thd, m_rli, data, err, true);
596+
int ret= wsrep_apply_events(thd, m_rli, data, err, true);
614597

615598
thd->close_temporary_tables();
616599
if (!ret && !wsrep::commits_transaction(ws_meta.flags()))
@@ -779,7 +762,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
779762
ws_meta,
780763
thd->wsrep_sr().fragments());
781764
}
782-
ret= ret || apply_events(thd, m_rli, data, err, true);
765+
ret= ret || wsrep_apply_events(thd, m_rli, data, err, true);
783766
thd->close_temporary_tables();
784767
if (!ret && !wsrep::commits_transaction(ws_meta.flags()))
785768
{

sql/wsrep_mysqld.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES;
127127
// unit for fragment size
128128
ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE;
129129
uint wsrep_ignore_apply_errors= 0;
130+
uint wsrep_applier_retry_count= 0;
130131

131132
std::atomic <bool> wsrep_thread_create_failed;
132133

0 commit comments

Comments
 (0)