@@ -6196,23 +6196,75 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
6196
6196
}
6197
6197
}
6198
6198
6199
- if (unlikely (mi->gtid_reconnect_event_skip_count ))
6200
- {
6201
- goto default_action;
6202
- }
6203
-
6204
6199
/*
6205
- We have successfully queued to relay log everything before this GTID, so
6200
+ Unless the previous group is malformed,
6201
+ we have successfully queued to relay log everything before this GTID, so
6206
6202
in case of reconnect we can start from after any previous GTID.
6207
- (Normally we would have updated gtid_current_pos earlier at the end of
6208
- the previous event group, but better leave an extra check here for
6209
- safety).
6203
+ (We must have updated gtid_current_pos earlier at the end of
6204
+ the previous event group. Unless ...)
6210
6205
*/
6211
- if (mi->events_queued_since_last_gtid )
6206
+ if (unlikely (mi->events_queued_since_last_gtid >
6207
+ mi->gtid_reconnect_event_skip_count ))
6212
6208
{
6213
- mi->gtid_current_pos .update (&mi->last_queued_gtid );
6214
- mi->events_queued_since_last_gtid = 0 ;
6209
+ /*
6210
+ ...unless the last group has not been completed. An assert below
6211
+ can be satisfied only with the strict mode that ensures
6212
+ against "genuine" gtid duplicates.
6213
+ */
6214
+ rpl_gtid *gtid_in_slave_state=
6215
+ mi->gtid_current_pos .find (mi->last_queued_gtid .domain_id );
6216
+
6217
+ // Slave gtid state must not have updated yet to the last received gtid.
6218
+ DBUG_ASSERT ((mi->using_gtid == Master_info::USE_GTID_NO ||
6219
+ !opt_gtid_strict_mode) ||
6220
+ (!gtid_in_slave_state ||
6221
+ !(*gtid_in_slave_state == mi->last_queued_gtid )));
6222
+
6223
+ DBUG_EXECUTE_IF (" slave_discard_xid_for_gtid_0_x_1000" ,
6224
+ {
6225
+ /* Inject an event group that is missing its XID commit event. */
6226
+ if (mi->last_queued_gtid .domain_id == 0 &&
6227
+ mi->last_queued_gtid .seq_no == 1000 )
6228
+ {
6229
+ sql_print_warning (
6230
+ " Unexpected break of being relay-logged GTID %u-%u-%llu "
6231
+ " event group by the current GTID event %u-%u-%llu" ,
6232
+ PARAM_GTID (mi->last_queued_gtid ),PARAM_GTID (event_gtid));
6233
+ DBUG_SET (" -d,slave_discard_xid_for_gtid_0_x_1000" );
6234
+ goto dbug_gtid_accept;
6235
+ }
6236
+ });
6237
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6238
+ sql_print_error (" Unexpected break of being relay-logged GTID %u-%u-%llu "
6239
+ " event group by the current GTID event %u-%u-%llu" ,
6240
+ PARAM_GTID (mi->last_queued_gtid ),PARAM_GTID (event_gtid));
6241
+ goto err;
6242
+ }
6243
+ else if (unlikely (mi->gtid_reconnect_event_skip_count > 0 ))
6244
+ {
6245
+ if (mi->gtid_reconnect_event_skip_count ==
6246
+ mi->events_queued_since_last_gtid )
6247
+ {
6248
+ DBUG_ASSERT (event_gtid == mi->last_queued_gtid );
6249
+
6250
+ goto default_action;
6251
+ }
6252
+
6253
+ DBUG_ASSERT (0 );
6215
6254
}
6255
+ // else_likely{...
6256
+ #ifndef DBUG_OFF
6257
+ dbug_gtid_accept:
6258
+ DBUG_EXECUTE_IF (" slave_discard_gtid_0_x_1002" ,
6259
+ {
6260
+ if (mi->last_queued_gtid .server_id == 27697 &&
6261
+ mi->last_queued_gtid .seq_no == 1002 )
6262
+ {
6263
+ DBUG_SET (" -d,slave_discard_gtid_0_x_1002" );
6264
+ goto skip_relay_logging;
6265
+ }
6266
+ });
6267
+ #endif
6216
6268
mi->last_queued_gtid = event_gtid;
6217
6269
mi->last_queued_gtid_standalone =
6218
6270
(gtid_flag & Gtid_log_event::FL_STANDALONE) != 0 ;
@@ -6222,6 +6274,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
6222
6274
6223
6275
++mi->events_queued_since_last_gtid ;
6224
6276
inc_pos= event_len;
6277
+ // ...} eof else_likely
6225
6278
}
6226
6279
break ;
6227
6280
/*
@@ -6274,6 +6327,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
6274
6327
case XID_EVENT:
6275
6328
DBUG_EXECUTE_IF (" slave_discard_xid_for_gtid_0_x_1000" ,
6276
6329
{
6330
+ if (mi->last_queued_gtid .server_id == 27697 &&
6331
+ mi->last_queued_gtid .seq_no == 1000 )
6332
+ {
6333
+ DBUG_SET (" -d,slave_discard_xid_for_gtid_0_x_1000" );
6334
+ goto skip_relay_logging;
6335
+ }
6277
6336
/* Inject an event group that is missing its XID commit event. */
6278
6337
if (mi->last_queued_gtid .domain_id == 0 &&
6279
6338
mi->last_queued_gtid .seq_no == 1000 )
@@ -6319,15 +6378,48 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
6319
6378
}
6320
6379
};);
6321
6380
6322
- if (mi->using_gtid != Master_info::USE_GTID_NO && mi-> gtid_event_seen )
6381
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
6323
6382
{
6324
- if (unlikely (mi->gtid_reconnect_event_skip_count ))
6383
+ if (likely (mi->gtid_event_seen ))
6325
6384
{
6326
- --mi->gtid_reconnect_event_skip_count ;
6327
- gtid_skip_enqueue= true ;
6385
+ if (unlikely (mi->gtid_reconnect_event_skip_count ))
6386
+ {
6387
+ if (!got_gtid_event &&
6388
+ mi->gtid_reconnect_event_skip_count ==
6389
+ mi->events_queued_since_last_gtid )
6390
+ goto gtid_not_start; // the 1st re-sent must be gtid
6391
+
6392
+ --mi->gtid_reconnect_event_skip_count ;
6393
+ gtid_skip_enqueue= true ;
6394
+ }
6395
+ else if (likely (mi->events_queued_since_last_gtid ))
6396
+ {
6397
+ DBUG_ASSERT (!got_gtid_event);
6398
+
6399
+ ++mi->events_queued_since_last_gtid ;
6400
+ }
6401
+ else if (Log_event::is_group_event ((Log_event_type) (uchar)
6402
+ buf[EVENT_TYPE_OFFSET]))
6403
+ {
6404
+ goto gtid_not_start; // no first gtid event in this group
6405
+ }
6406
+ }
6407
+ else if (Log_event::is_group_event ((Log_event_type) (uchar)
6408
+ buf[EVENT_TYPE_OFFSET]))
6409
+ {
6410
+ gtid_not_start:
6411
+
6412
+ DBUG_ASSERT (!got_gtid_event);
6413
+
6414
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6415
+ sql_print_error (" The current group of events starts with "
6416
+ " a non-GTID %s event; "
6417
+ " the last seen GTID is %u-%u-%llu" ,
6418
+ Log_event::get_type_str ((Log_event_type) (uchar)
6419
+ buf[EVENT_TYPE_OFFSET]),
6420
+ mi->last_queued_gtid );
6421
+ goto err;
6328
6422
}
6329
- else if (mi->events_queued_since_last_gtid )
6330
- ++mi->events_queued_since_last_gtid ;
6331
6423
}
6332
6424
6333
6425
if (!is_compress_event)
@@ -6500,15 +6592,35 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
6500
6592
Query_log_event::peek_is_commit_rollback (buf, event_len,
6501
6593
checksum_alg))))))
6502
6594
{
6503
- /*
6504
- The whole of the current event group is queued. So in case of
6505
- reconnect we can start from after the current GTID.
6506
- */
6507
- mi->gtid_current_pos .update (&mi->last_queued_gtid );
6508
- mi->events_queued_since_last_gtid = 0 ;
6595
+ DBUG_ASSERT (mi->events_queued_since_last_gtid > 1 );
6509
6596
6510
- /* Reset the domain_id_filter flag. */
6511
- mi->domain_id_filter .reset_filter ();
6597
+ if (unlikely (gtid_skip_enqueue))
6598
+ {
6599
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
6600
+ sql_print_error (" Recieved a group closing %s event "
6601
+ " at %llu position in the group while there are "
6602
+ " still %llu events to skip upon reconnecting; "
6603
+ " the last seen GTID is %u-%u-%llu" ,
6604
+ Log_event::get_type_str ((Log_event_type) (uchar)
6605
+ buf[EVENT_TYPE_OFFSET]),
6606
+ (mi->events_queued_since_last_gtid -
6607
+ mi->gtid_reconnect_event_skip_count ),
6608
+ mi->events_queued_since_last_gtid ,
6609
+ mi->last_queued_gtid );
6610
+ goto err;
6611
+ }
6612
+ else
6613
+ {
6614
+ /*
6615
+ The whole of the current event group is queued. So in case of
6616
+ reconnect we can start from after the current GTID.
6617
+ */
6618
+ mi->gtid_current_pos .update (&mi->last_queued_gtid );
6619
+ mi->events_queued_since_last_gtid = 0 ;
6620
+
6621
+ /* Reset the domain_id_filter flag. */
6622
+ mi->domain_id_filter .reset_filter ();
6623
+ }
6512
6624
}
6513
6625
6514
6626
skip_relay_logging:
0 commit comments