@@ -2055,29 +2055,31 @@ static int
2055
2055
binlog_truncate_trx_cache (THD *thd, binlog_cache_mngr *cache_mngr, bool all)
2056
2056
{
2057
2057
DBUG_ENTER (" binlog_truncate_trx_cache" );
2058
+
2059
+ if (!WSREP_EMULATE_BINLOG_NNULL (thd) && !mysql_bin_log.is_open ())
2060
+ DBUG_RETURN (0 );
2061
+
2058
2062
int error=0 ;
2059
- /*
2060
- This function handles transactional changes and as such this flag
2061
- equals to true.
2062
- */
2063
- bool const is_transactional= TRUE ;
2064
2063
2065
2064
DBUG_PRINT (" info" , (" thd->options={ %s %s}, transaction: %s" ,
2066
2065
FLAGSTR (thd->variables .option_bits , OPTION_NOT_AUTOCOMMIT),
2067
2066
FLAGSTR (thd->variables .option_bits , OPTION_BEGIN),
2068
2067
all ? " all" : " stmt" ));
2069
2068
2070
- thd->binlog_remove_pending_rows_event (TRUE , is_transactional);
2069
+ auto &trx_cache= cache_mngr->trx_cache ;
2070
+ MYSQL_BIN_LOG::remove_pending_rows_event (thd, &trx_cache);
2071
+ thd->reset_binlog_for_next_statement ();
2072
+
2071
2073
/*
2072
2074
If rolling back an entire transaction or a single statement not
2073
2075
inside a transaction, we reset the transaction cache.
2074
2076
*/
2075
2077
if (ending_trans (thd, all))
2076
2078
{
2077
- if (cache_mngr-> trx_cache .has_incident ())
2079
+ if (trx_cache.has_incident ())
2078
2080
error= mysql_bin_log.write_incident (thd);
2079
2081
2080
- thd->reset_binlog_for_next_statement ( );
2082
+ DBUG_ASSERT ( thd->binlog_table_maps == 0 );
2081
2083
2082
2084
cache_mngr->reset (false , true );
2083
2085
}
@@ -2086,9 +2088,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
2086
2088
transaction cache to remove the statement.
2087
2089
*/
2088
2090
else
2089
- cache_mngr-> trx_cache .restore_prev_position ();
2091
+ trx_cache.restore_prev_position ();
2090
2092
2091
- DBUG_ASSERT (cache_mngr-> trx_cache .pending () == NULL );
2093
+ DBUG_ASSERT (trx_cache.pending () == NULL );
2092
2094
DBUG_RETURN (error);
2093
2095
}
2094
2096
@@ -2406,7 +2408,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
2406
2408
thd->reset_binlog_for_next_statement ();
2407
2409
DBUG_RETURN (error);
2408
2410
}
2409
- if (!wsrep_emulate_bin_log && mysql_bin_log. check_write_error (thd))
2411
+ if (!wsrep_emulate_bin_log && MYSQL_BIN_LOG:: check_write_error (thd))
2410
2412
{
2411
2413
/*
2412
2414
"all == true" means that a "rollback statement" triggered the error and
@@ -2458,12 +2460,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
2458
2460
2459
2461
void binlog_reset_cache (THD *thd)
2460
2462
{
2461
- binlog_cache_mngr *const cache_mngr= opt_bin_log ?
2463
+ binlog_cache_mngr *const cache_mngr= opt_bin_log ?
2462
2464
thd->binlog_get_cache_mngr () : 0 ;
2463
2465
DBUG_ENTER (" binlog_reset_cache" );
2464
2466
if (cache_mngr)
2465
2467
{
2466
- thd->binlog_remove_pending_rows_event (TRUE , TRUE );
2468
+ MYSQL_BIN_LOG::remove_pending_rows_event (thd, &cache_mngr->trx_cache );
2469
+ thd->reset_binlog_for_next_statement ();
2467
2470
cache_mngr->reset (true , true );
2468
2471
}
2469
2472
DBUG_VOID_RETURN;
@@ -6318,30 +6321,31 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
6318
6321
Rows_log_event* binlog_get_pending_rows_event (binlog_cache_mngr *cache_mngr,
6319
6322
bool use_trans_cache)
6320
6323
{
6324
+ DBUG_ASSERT (cache_mngr);
6321
6325
Rows_log_event* rows= NULL ;
6322
6326
6323
- /*
6324
- This is less than ideal, but here's the story: If there is no cache_mngr,
6325
- prepare_pending_rows_event() has never been called (since the cache_mngr
6326
- is set up there). In that case, we just return NULL.
6327
- */
6328
6327
if (cache_mngr)
6329
6328
rows= cache_mngr->get_binlog_cache_data (use_trans_cache)->pending ();
6330
6329
return rows;
6331
6330
}
6332
6331
6332
+ binlog_cache_data* binlog_get_cache_data (binlog_cache_mngr *cache_mngr,
6333
+ bool use_trans_cache)
6334
+ {
6335
+ return cache_mngr->get_binlog_cache_data (use_trans_cache);
6336
+ }
6337
+
6333
6338
int binlog_flush_pending_rows_event (THD *thd, bool stmt_end,
6334
6339
bool is_transactional,
6335
6340
MYSQL_BIN_LOG *bin_log,
6336
- binlog_cache_mngr *cache_mngr,
6337
- bool use_trans_cache)
6341
+ binlog_cache_data *cache_data)
6338
6342
{
6339
6343
/*
6340
6344
Mark the event as the last event of a statement if the stmt_end
6341
6345
flag is set.
6342
6346
*/
6343
6347
int error= 0 ;
6344
- auto *pending= cache_mngr-> get_binlog_cache_data (use_trans_cache) ->pending ();
6348
+ auto *pending= cache_data ->pending ();
6345
6349
if (pending)
6346
6350
{
6347
6351
if (stmt_end)
@@ -6350,36 +6354,12 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
6350
6354
thd->reset_binlog_for_next_statement ();
6351
6355
}
6352
6356
6353
- error= bin_log->flush_and_set_pending_rows_event (thd, 0 , cache_mngr ,
6357
+ error= bin_log->flush_and_set_pending_rows_event (thd, 0 , cache_data ,
6354
6358
is_transactional);
6355
6359
}
6356
6360
return error;
6357
6361
}
6358
6362
6359
- /* *
6360
- This function stores a pending row event into a cache which is specified
6361
- through the parameter @c is_transactional. Respectively, when it is @c
6362
- true, the pending event is stored into the transactional cache. Otherwise
6363
- into the non-transactional cache.
6364
-
6365
- @param evt a pointer to the row event.
6366
- @param use_trans_cache @c true indicates a transactional cache,
6367
- otherwise @c false a non-transactional.
6368
- */
6369
- void
6370
- THD::binlog_set_pending_rows_event (Rows_log_event* ev, bool use_trans_cache)
6371
- {
6372
- binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data ();
6373
-
6374
- DBUG_ASSERT (cache_mngr);
6375
-
6376
- binlog_cache_data *cache_data=
6377
- cache_mngr->get_binlog_cache_data (use_trans_cache);
6378
-
6379
- cache_data->set_pending (ev);
6380
- }
6381
-
6382
-
6383
6363
/* *
6384
6364
This function removes the pending rows event, discarding any outstanding
6385
6365
rows. If there is no pending rows event available, this is effectively a
@@ -6390,17 +6370,10 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
6390
6370
otherwise @c false a non-transactional.
6391
6371
*/
6392
6372
int
6393
- MYSQL_BIN_LOG::remove_pending_rows_event (THD *thd, bool is_transactional )
6373
+ MYSQL_BIN_LOG::remove_pending_rows_event (THD *thd, binlog_cache_data *cache_data )
6394
6374
{
6395
6375
DBUG_ENTER (" MYSQL_BIN_LOG::remove_pending_rows_event" );
6396
6376
6397
- binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr ();
6398
-
6399
- DBUG_ASSERT (cache_mngr);
6400
-
6401
- binlog_cache_data *cache_data=
6402
- cache_mngr->get_binlog_cache_data (use_trans_cache (thd, is_transactional));
6403
-
6404
6377
if (Rows_log_event* pending= cache_data->pending ())
6405
6378
{
6406
6379
delete pending;
@@ -6414,6 +6387,7 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
6414
6387
Moves the last bunch of rows from the pending Rows event to a cache (either
6415
6388
transactional cache if is_transaction is @c true, or the non-transactional
6416
6389
cache otherwise. Sets a new pending event.
6390
+ In case of error during flushing, sets write_error=1 to itself.
6417
6391
6418
6392
@param thd a pointer to the user thread.
6419
6393
@param evt a pointer to the row event.
@@ -6423,19 +6397,13 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
6423
6397
int
6424
6398
MYSQL_BIN_LOG::flush_and_set_pending_rows_event (THD *thd,
6425
6399
Rows_log_event* event,
6426
- binlog_cache_mngr *cache_mngr ,
6400
+ binlog_cache_data *cache_data ,
6427
6401
bool is_transactional)
6428
6402
{
6429
6403
DBUG_ENTER (" MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)" );
6430
6404
DBUG_ASSERT (WSREP_EMULATE_BINLOG (thd) || mysql_bin_log.is_open ());
6431
6405
DBUG_PRINT (" enter" , (" event: %p" , event));
6432
6406
6433
- DBUG_ASSERT (cache_mngr);
6434
-
6435
- bool should_use_trans_cache= use_trans_cache (thd, is_transactional);
6436
- binlog_cache_data *cache_data=
6437
- cache_mngr->get_binlog_cache_data (should_use_trans_cache);
6438
-
6439
6407
DBUG_PRINT (" info" , (" cache_mngr->pending(): %p" , cache_data->pending ()));
6440
6408
6441
6409
if (Rows_log_event* pending= cache_data->pending ())
@@ -6463,11 +6431,89 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
6463
6431
delete pending;
6464
6432
}
6465
6433
6466
- thd-> binlog_set_pending_rows_event (event, should_use_trans_cache );
6434
+ cache_data-> set_pending (event);
6467
6435
6468
6436
DBUG_RETURN (0 );
6469
6437
}
6470
6438
6439
+ /*
6440
+ Member function for ensuring that there is an rows log
6441
+ event of the apropriate type before proceeding.
6442
+
6443
+ POST CONDITION:
6444
+ If a non-NULL pointer is returned, the pending event for thread 'thd' will
6445
+ be an event created by callback hold by event_factory, and
6446
+ will be either empty or have enough space to hold 'needed' bytes.
6447
+ In addition, the columns bitmap will be correct for the row, meaning that
6448
+ the pending event will be flushed if the columns in the event differ from
6449
+ the columns suppled to the function.
6450
+
6451
+ RETURNS
6452
+ If no error, a non-NULL pending event (either one which already existed or
6453
+ the newly created one).
6454
+ If error, NULL.
6455
+ */
6456
+
6457
+ Rows_log_event*
6458
+ MYSQL_BIN_LOG::prepare_pending_rows_event (THD *thd, TABLE* table,
6459
+ binlog_cache_data *cache_data,
6460
+ uint32 serv_id, size_t needed,
6461
+ bool is_transactional,
6462
+ Rows_event_factory event_factory)
6463
+ {
6464
+ DBUG_ENTER (" MYSQL_BIN_LOG::prepare_pending_rows_event" );
6465
+ /* Pre-conditions */
6466
+ DBUG_ASSERT (table->s ->table_map_id != ~0UL );
6467
+
6468
+ /*
6469
+ There is no good place to set up the transactional data, so we
6470
+ have to do it here.
6471
+ */
6472
+ Rows_log_event* pending= cache_data->pending ();
6473
+
6474
+ if (unlikely (pending && !pending->is_valid ()))
6475
+ DBUG_RETURN (NULL );
6476
+
6477
+ /*
6478
+ Check if the current event is non-NULL and a write-rows
6479
+ event. Also check if the table provided is mapped: if it is not,
6480
+ then we have switched to writing to a new table.
6481
+ If there is no pending event, we need to create one. If there is a pending
6482
+ event, but it's not about the same table id, or not of the same type
6483
+ (between Write, Update and Delete), or not the same affected columns, or
6484
+ going to be too big, flush this event to disk and create a new pending
6485
+ event.
6486
+ */
6487
+ if (!pending ||
6488
+ pending->server_id != serv_id ||
6489
+ pending->get_table_id () != table->s ->table_map_id ||
6490
+ pending->get_general_type_code () != event_factory.type_code ||
6491
+ pending->get_data_size () + needed > opt_binlog_rows_event_max_size ||
6492
+ pending->read_write_bitmaps_cmp (table) == FALSE )
6493
+ {
6494
+ /* Create a new RowsEventT... */
6495
+ Rows_log_event* const
6496
+ ev= event_factory.create (thd, table, table->s ->table_map_id ,
6497
+ is_transactional);
6498
+ if (unlikely (!ev))
6499
+ DBUG_RETURN (NULL );
6500
+ ev->server_id = serv_id; // I don't like this, it's too easy to forget.
6501
+ /*
6502
+ flush the pending event and replace it with the newly created
6503
+ event...
6504
+ */
6505
+ if (unlikely (flush_and_set_pending_rows_event (thd, ev, cache_data,
6506
+ is_transactional)))
6507
+ {
6508
+ delete ev;
6509
+ DBUG_RETURN (NULL );
6510
+ }
6511
+
6512
+ DBUG_RETURN (ev); /* This is the new pending event */
6513
+ }
6514
+ DBUG_RETURN (pending); /* This is the current pending event */
6515
+ }
6516
+
6471
6517
6472
6518
/* Generate a new global transaction ID, and write it to the binlog */
6473
6519
@@ -12108,7 +12154,8 @@ void wsrep_thd_binlog_stmt_rollback(THD * thd)
12108
12154
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr ();
12109
12155
if (cache_mngr)
12110
12156
{
12111
- thd->binlog_remove_pending_rows_event (TRUE , TRUE );
12157
+ MYSQL_BIN_LOG::remove_pending_rows_event (thd, &cache_mngr->trx_cache );
12158
+ thd->reset_binlog_for_next_statement ();
12112
12159
cache_mngr->stmt_cache .reset ();
12113
12160
}
12114
12161
DBUG_VOID_RETURN;
0 commit comments