1
- /* Copyright (C) 2015-2017 Codership Oy <info@codership.com>
1
+ /* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
2
2
3
3
This program is free software; you can redistribute it and/or modify
4
4
it under the terms of the GNU General Public License as published by
@@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd)
584
584
585
585
thd->real_id =pthread_self (); // Keep purify happy
586
586
587
- WSREP_DEBUG (" Wsrep_thd_pool: creating system thread: %lld" ,
588
- (long long )thd->thread_id );
589
587
thd->prior_thr_create_utime = thd->start_utime = thd->thr_create_utime ;
590
588
(void ) mysql_mutex_unlock (&LOCK_thread_count);
591
589
@@ -1115,16 +1113,21 @@ int Wsrep_schema::remove_fragments(THD* thd,
1115
1113
DBUG_RETURN (ret);
1116
1114
}
1117
1115
1118
- int Wsrep_schema::replay_transaction (THD* thd ,
1116
+ int Wsrep_schema::replay_transaction (THD* orig_thd ,
1119
1117
Relay_log_info* rli,
1120
1118
const wsrep::ws_meta& ws_meta,
1121
1119
const std::vector<wsrep::seqno>& fragments)
1122
1120
{
1123
1121
DBUG_ENTER (" Wsrep_schema::replay_transaction" );
1124
1122
DBUG_ASSERT (!fragments.empty ());
1125
1123
1126
- Wsrep_schema_impl::wsrep_off wsrep_off (thd);
1127
- Wsrep_schema_impl::binlog_off binlog_off (thd);
1124
+ THD thd (next_thread_id (), true );
1125
+ thd.thread_stack = (orig_thd ? orig_thd->thread_stack :
1126
+ (char *) &thd);
1127
+
1128
+ Wsrep_schema_impl::wsrep_off wsrep_off (&thd);
1129
+ Wsrep_schema_impl::binlog_off binlog_off (&thd);
1130
+ Wsrep_schema_impl::thd_context_switch thd_context_switch (orig_thd, &thd);
1128
1131
1129
1132
int ret= 1 ;
1130
1133
int error;
@@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd,
1135
1138
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin ();
1136
1139
i != fragments.end (); ++i)
1137
1140
{
1138
- Wsrep_schema_impl::init_stmt (thd);
1139
- if ((error= Wsrep_schema_impl::open_for_read (thd, sr_table_str.c_str (), &frag_table)))
1141
+ Wsrep_schema_impl::init_stmt (& thd);
1142
+ if ((error= Wsrep_schema_impl::open_for_read (& thd, sr_table_str.c_str (), &frag_table)))
1140
1143
{
1141
1144
WSREP_WARN (" Could not open SR table for read: %d" , error);
1142
- Wsrep_schema_impl::finish_stmt (thd);
1145
+ Wsrep_schema_impl::finish_stmt (& thd);
1143
1146
DBUG_RETURN (1 );
1144
1147
}
1145
1148
@@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd,
1169
1172
String buf;
1170
1173
frag_table->field [4 ]->val_str (&buf);
1171
1174
1172
- Wsrep_schema_impl::end_index_scan (frag_table);
1173
- Wsrep_schema_impl::finish_stmt (thd);
1174
- ret= wsrep_apply_events (thd, rli, buf.c_ptr_safe (), buf.length ());
1175
- if (ret)
1176
1175
{
1177
- WSREP_WARN (" Wsrep_schema::replay_transaction: failed to apply fragments" );
1178
- break ;
1176
+ Wsrep_schema_impl::thd_context_switch thd_context_switch (&thd, orig_thd);
1177
+
1178
+ ret= wsrep_apply_events (orig_thd, rli, buf.c_ptr_quick (), buf.length ());
1179
+ if (ret)
1180
+ {
1181
+ WSREP_WARN (" Wsrep_schema::replay_transaction: failed to apply fragments" );
1182
+ break ;
1183
+ }
1179
1184
}
1180
- Wsrep_schema_impl::init_stmt (thd);
1181
1185
1182
- if ((error= Wsrep_schema_impl::open_for_write (thd, sr_table_str.c_str (), &frag_table)))
1186
+ Wsrep_schema_impl::end_index_scan (frag_table);
1187
+ Wsrep_schema_impl::finish_stmt (&thd);
1188
+
1189
+ Wsrep_schema_impl::init_stmt (&thd);
1190
+
1191
+ if ((error= Wsrep_schema_impl::open_for_write (&thd,
1192
+ sr_table_str.c_str (),
1193
+ &frag_table)))
1183
1194
{
1184
1195
WSREP_WARN (" Could not open SR table for write: %d" , error);
1185
- Wsrep_schema_impl::finish_stmt (thd);
1196
+ Wsrep_schema_impl::finish_stmt (& thd);
1186
1197
DBUG_RETURN (1 );
1187
1198
}
1188
1199
error= Wsrep_schema_impl::init_for_index_scan (frag_table,
@@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd,
1206
1217
break ;
1207
1218
}
1208
1219
Wsrep_schema_impl::end_index_scan (frag_table);
1209
- Wsrep_schema_impl::finish_stmt (thd);
1220
+ Wsrep_schema_impl::finish_stmt (& thd);
1210
1221
}
1211
1222
1212
1223
DBUG_RETURN (ret);
@@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd,
1215
1226
int Wsrep_schema::recover_sr_transactions (THD *orig_thd)
1216
1227
{
1217
1228
DBUG_ENTER (" Wsrep_schema::recover_sr_transactions" );
1218
- THD storage_thd (true , true );
1229
+ THD storage_thd (next_thread_id () , true );
1219
1230
storage_thd.thread_stack = (orig_thd ? orig_thd->thread_stack :
1220
1231
(char *) &storage_thd);
1221
1232
TABLE* frag_table= 0 ;
1222
1233
TABLE* cluster_table= 0 ;
1223
1234
Wsrep_storage_service storage_service (&storage_thd);
1224
1235
Wsrep_schema_impl::binlog_off binlog_off (&storage_thd);
1225
- Wsrep_schema_impl::wsrep_off binglog_off (&storage_thd);
1236
+ Wsrep_schema_impl::wsrep_off wsrep_off (&storage_thd);
1226
1237
Wsrep_schema_impl::thd_context_switch thd_context_switch (orig_thd,
1227
1238
&storage_thd);
1228
1239
Wsrep_server_state& server_state (Wsrep_server_state::instance ());
@@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1233
1244
1234
1245
Wsrep_schema_impl::init_stmt (&storage_thd);
1235
1246
storage_thd.wsrep_skip_locking = FALSE ;
1236
- /*
1237
- Open the table for reading and writing so that fragments without
1238
- valid seqno can be deleted.
1239
- */
1240
- if (Wsrep_schema_impl::open_for_write (&storage_thd,
1241
- cluster_table_str.c_str (),
1242
- &cluster_table) ||
1247
+ if (Wsrep_schema_impl::open_for_read (&storage_thd,
1248
+ cluster_table_str.c_str (),
1249
+ &cluster_table) ||
1243
1250
Wsrep_schema_impl::init_for_scan (cluster_table))
1244
1251
{
1245
1252
Wsrep_schema_impl::finish_stmt (&storage_thd);
@@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1273
1280
1274
1281
storage_thd.wsrep_skip_locking = TRUE ;
1275
1282
Wsrep_schema_impl::init_stmt (&storage_thd);
1276
- if (Wsrep_schema_impl::open_for_read (&storage_thd, sr_table_str.c_str (), &frag_table) ||
1283
+
1284
+ /*
1285
+ Open the table for reading and writing so that fragments without
1286
+ valid seqno can be deleted.
1287
+ */
1288
+ if (Wsrep_schema_impl::open_for_write (&storage_thd, sr_table_str.c_str (), &frag_table) ||
1277
1289
Wsrep_schema_impl::init_for_scan (frag_table))
1278
1290
{
1279
- WSREP_ERROR (" Failed to open SR table for read " );
1291
+ WSREP_ERROR (" Failed to open SR table for write " );
1280
1292
goto out;
1281
1293
}
1282
1294
@@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1309
1321
String data_str;
1310
1322
1311
1323
(void )frag_table->field [4 ]->val_str (&data_str);
1312
- wsrep::const_buffer data (data_str.c_ptr (), data_str.length ());
1324
+ wsrep::const_buffer data (data_str.c_ptr_quick (), data_str.length ());
1313
1325
wsrep::ws_meta ws_meta (gtid,
1314
1326
wsrep::stid (server_id,
1315
1327
transaction_id,
@@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1319
1331
1320
1332
wsrep::high_priority_service* applier;
1321
1333
if (!(applier= server_state.find_streaming_applier (server_id,
1322
- transaction_id)))
1334
+ transaction_id)))
1323
1335
{
1324
1336
DBUG_ASSERT (wsrep::starts_transaction (flags));
1325
- THD* thd= new THD (true , true );
1337
+ THD* thd= new THD (next_thread_id () , true );
1326
1338
thd->thread_stack = (char *)&storage_thd;
1327
1339
1328
1340
mysql_mutex_lock (&LOCK_thread_count);
1329
- thd->thread_id = next_thread_id ();
1330
1341
thd->real_id = pthread_self ();
1331
1342
mysql_mutex_unlock (&LOCK_thread_count);
1332
1343
0 commit comments