@@ -60,7 +60,6 @@ this program; if not, write to the Free Software Foundation, Inc.,
60
60
61
61
#include <my_service_manager.h>
62
62
#include <key.h>
63
- #include <sql_manager.h>
64
63
65
64
/* Include necessary InnoDB headers */
66
65
#include "btr0btr.h"
@@ -19536,68 +19535,61 @@ wsrep_abort_slave_trx(
19536
19535
(long long)bf_seqno, (long long)victim_seqno);
19537
19536
abort();
19538
19537
}
19539
-
19540
- struct bg_wsrep_kill_trx_arg {
19541
- my_thread_id thd_id;
19542
- trx_id_t trx_id;
19543
- int64_t bf_seqno;
19544
- ibool signal;
19545
- };
19546
-
19547
- static void bg_wsrep_kill_trx(
19548
- void *void_arg )
19538
+ /*******************************************************************//**
19539
+ This function is used to kill one transaction in BF. */
19540
+ UNIV_INTERN
19541
+ void
19542
+ wsrep_innobase_kill_one_trx(
19543
+ /*========================*/
19544
+ MYSQL_THD const bf_thd,
19545
+ const trx_t * const bf_trx,
19546
+ trx_t *victim_trx,
19547
+ ibool signal )
19549
19548
{
19550
- bg_wsrep_kill_trx_arg *arg = (bg_wsrep_kill_trx_arg*)void_arg;
19551
- THD *thd = find_thread_by_id(arg->thd_id, false);
19552
- trx_t *victim_trx = NULL;
19553
- bool awake = false;
19554
- DBUG_ENTER("bg_wsrep_kill_trx");
19549
+ ut_ad(bf_thd);
19550
+ ut_ad(victim_trx);
19551
+ ut_ad(lock_mutex_own());
19552
+ ut_ad(trx_mutex_own(victim_trx));
19555
19553
19556
- if (thd) {
19557
- victim_trx= thd_to_trx(thd);
19558
- /* Victim trx might not exist e.g. on MDL-conflict. */
19559
- if (victim_trx) {
19560
- lock_mutex_enter();
19561
- trx_mutex_enter(victim_trx);
19562
- if (victim_trx->id != arg->trx_id ||
19563
- victim_trx->state == TRX_STATE_COMMITTED_IN_MEMORY)
19564
- {
19565
- /* Victim was meanwhile rolled back or
19566
- committed */
19567
- trx_mutex_exit(victim_trx);
19568
- lock_mutex_exit();
19569
- wsrep_thd_UNLOCK(thd);
19570
- victim_trx= NULL;
19571
- }
19572
- } else {
19573
- /* find_thread_by_id locked
19574
- THD::LOCK_thd_data */
19575
- wsrep_thd_UNLOCK(thd);
19576
- }
19577
- }
19554
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
19555
+ THD *thd = (THD *) victim_trx->mysql_thd;
19556
+ int64_t bf_seqno = wsrep_thd_trx_seqno(bf_thd);
19578
19557
19579
- if (!victim_trx) {
19580
- /* Victim trx might not exist (MDL-conflict) or victim
19581
- was meanwhile rolled back or committed because of
19582
- a KILL statement or a disconnect. */
19583
- goto ret;
19558
+ if (!thd) {
19559
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
19560
+ WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id);
19561
+ DBUG_VOID_RETURN;
19584
19562
}
19585
19563
19564
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
19565
+
19586
19566
WSREP_DEBUG("BF kill (" ULINTPF ", seqno: " INT64PF
19587
19567
"), victim: (%lu) trx: " TRX_ID_FMT,
19588
- arg-> signal, arg-> bf_seqno,
19568
+ signal, bf_seqno,
19589
19569
thd_get_thread_id(thd),
19590
19570
victim_trx->id);
19591
19571
19592
19572
WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64,
19593
- (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
19573
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
19594
19574
wsrep_thd_conflict_state(thd, FALSE),
19595
19575
wsrep_thd_ws_handle(thd)->trx_id);
19596
19576
19577
+ wsrep_thd_LOCK(thd);
19578
+ DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
19579
+ {
19580
+ const char act[]=
19581
+ "now "
19582
+ "wait_for signal.wsrep_after_BF_victim_lock";
19583
+ DBUG_ASSERT(!debug_sync_set_action(bf_thd,
19584
+ STRING_WITH_LEN(act)));
19585
+ };);
19586
+
19587
+
19597
19588
if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
19598
19589
WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT,
19599
19590
victim_trx->id);
19600
- goto ret_unlock;
19591
+ wsrep_thd_UNLOCK(thd);
19592
+ DBUG_VOID_RETURN;
19601
19593
}
19602
19594
19603
19595
if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
@@ -19613,13 +19605,18 @@ static void bg_wsrep_kill_trx(
19613
19605
case MUST_ABORT:
19614
19606
WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state",
19615
19607
victim_trx->id);
19616
- goto ret_awake;
19608
+ wsrep_thd_UNLOCK(thd);
19609
+ wsrep_thd_awake(thd, signal);
19610
+ DBUG_VOID_RETURN;
19611
+ break;
19617
19612
case ABORTED:
19618
19613
case ABORTING: // fall through
19619
19614
default:
19620
19615
WSREP_DEBUG("victim " TRX_ID_FMT " in state %d",
19621
19616
victim_trx->id, wsrep_thd_get_conflict_state(thd));
19622
- goto ret_unlock;
19617
+ wsrep_thd_UNLOCK(thd);
19618
+ DBUG_VOID_RETURN;
19619
+ break;
19623
19620
}
19624
19621
19625
19622
switch (wsrep_thd_query_state(thd)) {
@@ -19632,12 +19629,12 @@ static void bg_wsrep_kill_trx(
19632
19629
victim_trx->id);
19633
19630
19634
19631
if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
19635
- wsrep_abort_slave_trx(arg-> bf_seqno,
19632
+ wsrep_abort_slave_trx(bf_seqno,
19636
19633
wsrep_thd_trx_seqno(thd));
19637
19634
} else {
19638
19635
wsrep_t *wsrep= get_wsrep();
19639
19636
rcode = wsrep->abort_pre_commit(
19640
- wsrep, arg-> bf_seqno,
19637
+ wsrep, bf_seqno,
19641
19638
(wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id
19642
19639
);
19643
19640
@@ -19646,7 +19643,10 @@ static void bg_wsrep_kill_trx(
19646
19643
WSREP_DEBUG("cancel commit warning: "
19647
19644
TRX_ID_FMT,
19648
19645
victim_trx->id);
19649
- goto ret_awake;
19646
+ wsrep_thd_UNLOCK(thd);
19647
+ wsrep_thd_awake(thd, signal);
19648
+ DBUG_VOID_RETURN;
19649
+ break;
19650
19650
case WSREP_OK:
19651
19651
break;
19652
19652
default:
@@ -19659,9 +19659,12 @@ static void bg_wsrep_kill_trx(
19659
19659
* kill the lock holder first.
19660
19660
*/
19661
19661
abort();
19662
+ break;
19662
19663
}
19663
19664
}
19664
- goto ret_awake;
19665
+ wsrep_thd_UNLOCK(thd);
19666
+ wsrep_thd_awake(thd, signal);
19667
+ break;
19665
19668
case QUERY_EXEC:
19666
19669
/* it is possible that victim trx is itself waiting for some
19667
19670
* other lock. We need to cancel this waiting
@@ -19682,30 +19685,37 @@ static void bg_wsrep_kill_trx(
19682
19685
lock_cancel_waiting_and_release(wait_lock);
19683
19686
}
19684
19687
19688
+ wsrep_thd_UNLOCK(thd);
19689
+ wsrep_thd_awake(thd, signal);
19685
19690
} else {
19686
19691
/* abort currently executing query */
19687
19692
DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu",
19688
19693
thd_get_thread_id(thd)));
19689
19694
WSREP_DEBUG("kill query for: %ld",
19690
19695
thd_get_thread_id(thd));
19696
+ /* Note that innobase_kill_query will take lock_mutex
19697
+ and trx_mutex */
19698
+ wsrep_thd_UNLOCK(thd);
19699
+ wsrep_thd_awake(thd, signal);
19691
19700
19692
19701
/* for BF thd, we need to prevent him from committing */
19693
19702
if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
19694
- wsrep_abort_slave_trx(arg-> bf_seqno,
19703
+ wsrep_abort_slave_trx(bf_seqno,
19695
19704
wsrep_thd_trx_seqno(thd));
19696
19705
}
19697
19706
}
19698
- goto ret_awake ;
19707
+ break ;
19699
19708
case QUERY_IDLE:
19700
19709
{
19701
19710
WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id);
19702
19711
19703
19712
if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
19704
19713
WSREP_DEBUG("kill BF IDLE, seqno: %lld",
19705
19714
(long long)wsrep_thd_trx_seqno(thd));
19706
- wsrep_abort_slave_trx(arg->bf_seqno,
19715
+ wsrep_thd_UNLOCK(thd);
19716
+ wsrep_abort_slave_trx(bf_seqno,
19707
19717
wsrep_thd_trx_seqno(thd));
19708
- goto ret_unlock ;
19718
+ DBUG_VOID_RETURN ;
19709
19719
}
19710
19720
/* This will lock thd from proceeding after net_read() */
19711
19721
wsrep_thd_set_conflict_state(thd, ABORTING);
@@ -19726,67 +19736,17 @@ static void bg_wsrep_kill_trx(
19726
19736
DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
19727
19737
WSREP_DEBUG("signaling aborter");
19728
19738
wsrep_unlock_rollback();
19729
- goto ret_unlock;
19739
+ wsrep_thd_UNLOCK(thd);
19740
+
19741
+ break;
19730
19742
}
19731
19743
default:
19732
19744
WSREP_WARN("bad wsrep query state: %d",
19733
19745
wsrep_thd_query_state(thd));
19734
- goto ret_unlock;
19746
+ wsrep_thd_UNLOCK(thd);
19747
+ break;
19735
19748
}
19736
19749
19737
- ret_awake:
19738
- awake= true;
19739
-
19740
- ret_unlock:
19741
- trx_mutex_exit(victim_trx);
19742
- lock_mutex_exit();
19743
- if (awake)
19744
- wsrep_thd_awake(thd, arg->signal);
19745
- wsrep_thd_UNLOCK(thd);
19746
-
19747
- ret:
19748
- free(arg);
19749
- DBUG_VOID_RETURN;
19750
-
19751
- }
19752
-
19753
- /*******************************************************************//**
19754
- This function is used to kill one transaction in BF. */
19755
- UNIV_INTERN
19756
- void
19757
- wsrep_innobase_kill_one_trx(
19758
- /*========================*/
19759
- MYSQL_THD const bf_thd,
19760
- const trx_t * const bf_trx,
19761
- trx_t *victim_trx,
19762
- ibool signal)
19763
- {
19764
- ut_ad(bf_thd);
19765
- ut_ad(victim_trx);
19766
- ut_ad(lock_mutex_own());
19767
- ut_ad(trx_mutex_own(victim_trx));
19768
-
19769
- bg_wsrep_kill_trx_arg *arg = (bg_wsrep_kill_trx_arg*)malloc(sizeof(*arg));
19770
- arg->thd_id = thd_get_thread_id(victim_trx->mysql_thd);
19771
- arg->trx_id = victim_trx->id;
19772
- arg->bf_seqno = wsrep_thd_trx_seqno((THD*)bf_thd);
19773
- arg->signal = signal;
19774
-
19775
- DBUG_ENTER("wsrep_innobase_kill_one_trx");
19776
-
19777
- WSREP_LOG_CONFLICT(bf_thd, victim_trx->mysql_thd, TRUE);
19778
-
19779
- DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
19780
- {
19781
- const char act[]=
19782
- "now "
19783
- "wait_for signal.wsrep_after_BF_victim_lock";
19784
- DBUG_ASSERT(!debug_sync_set_action(bf_thd,
19785
- STRING_WITH_LEN(act)));
19786
- };);
19787
-
19788
-
19789
- mysql_manager_submit(bg_wsrep_kill_trx, arg);
19790
19750
DBUG_VOID_RETURN;
19791
19751
}
19792
19752
@@ -19821,8 +19781,8 @@ wsrep_abort_transaction(
19821
19781
WSREP_DEBUG("victim does not have transaction");
19822
19782
wsrep_thd_LOCK(victim_thd);
19823
19783
wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
19824
- wsrep_thd_awake(victim_thd, signal);
19825
19784
wsrep_thd_UNLOCK(victim_thd);
19785
+ wsrep_thd_awake(victim_thd, signal);
19826
19786
}
19827
19787
19828
19788
DBUG_VOID_RETURN;
0 commit comments