Skip to content

Commit

Permalink
limbo: fix commit/rollback failures with triggers
Browse files Browse the repository at this point in the history
Currently some transactions on synchronous space fail to complete with
the `ER_CURSOR_NO_TRANSACTION` error, when on_rollback/on_commit triggers
are set.

This is caused due to the fact, that some rollback/commit triggers
require in_txn fiber variable to be set but it's not done when a
transaction is completed from the limbo. Callbacks, which are used to
work with iterators (`lbox_txn_pairs` and `lbox_txn_iterator_next`),
acquire tnx statements from the current transactions, but they cannot
do that, when this transaction is not assigned to the current fiber, so
`ER_CURSOR_NO_TRANSACTION` is thrown.

Let's assign in_txn variable when we complete transaction from the limbo.
Moreover, let's add assertions, which check whether in_txn() is correct,
in order to be sure, that `txn_complete_success/fail` always run with
in_txn set.

Closes tarantool#8505

NO_DOC=bugfix
  • Loading branch information
Serpentian committed Jun 6, 2023
1 parent 116df1d commit 6ff0b0c
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 3 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/gh-8505-synchro-triggers-fail.md
@@ -0,0 +1,5 @@
## bugfix/core

* Fixed a bug causing the `ER_CURSOR_NO_TRANSACTION` failure for transactions
on synchronous spaces when the `on_commit/on_rollback` triggers are set
(gh-8505).
2 changes: 2 additions & 0 deletions src/box/txn.c
Expand Up @@ -751,6 +751,7 @@ txn_complete_fail(struct txn *txn)
assert(!txn_has_flag(txn, TXN_IS_DONE));
assert(txn->signature < 0);
assert(txn->signature != TXN_SIGNATURE_UNKNOWN);
assert(in_txn() == txn);
if (txn->limbo_entry != NULL) {
assert(txn_has_flag(txn, TXN_WAIT_SYNC));
txn_limbo_abort(&txn_limbo, txn->limbo_entry);
Expand Down Expand Up @@ -783,6 +784,7 @@ txn_complete_success(struct txn *txn)
assert(!txn_has_flag(txn, TXN_IS_DONE));
assert(!txn_has_flag(txn, TXN_WAIT_SYNC));
assert(txn->signature >= 0);
assert(in_txn() == txn);
txn->status = TXN_COMMITTED;
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
Expand Down
22 changes: 19 additions & 3 deletions src/box/txn_limbo.c
Expand Up @@ -72,6 +72,22 @@ txn_limbo_is_ro(struct txn_limbo *limbo)
(limbo->owner_id != instance_id || txn_limbo_is_frozen(limbo));
}

void
txn_limbo_complete(struct txn *txn, bool is_success)
{
/*
* Some rollback/commit triggers require the in_txn fiber
* variable to be set.
*/
assert(in_txn() == NULL);
fiber_set_txn(fiber(), txn);
if (is_success)
txn_complete_success(txn);
else
txn_complete_fail(txn);
fiber_set_txn(fiber(), NULL);
}

struct txn_limbo_entry *
txn_limbo_last_synchro_entry(struct txn_limbo *limbo)
{
Expand Down Expand Up @@ -287,7 +303,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
e->txn->limbo_entry = NULL;
txn_limbo_abort(limbo, e);
txn_clear_flags(e->txn, TXN_WAIT_SYNC | TXN_WAIT_ACK);
txn_complete_fail(e->txn);
txn_limbo_complete(e->txn, false);
if (e == entry)
break;
fiber_wakeup(e->txn->fiber);
Expand Down Expand Up @@ -461,7 +477,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
* after the affected transactions.
*/
assert(e->txn->signature >= 0);
txn_complete_success(e->txn);
txn_limbo_complete(e->txn, true);
}
/*
* Track CONFIRM lsn on replica in order to detect split-brain by
Expand Down Expand Up @@ -514,7 +530,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(e->txn->signature >= 0);
e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
e->txn->limbo_entry = NULL;
txn_complete_fail(e->txn);
txn_limbo_complete(e->txn, false);
if (e == last_rollback)
break;
}
Expand Down
67 changes: 67 additions & 0 deletions test/box-luatest/gh_8505_synchro_triggers_test.lua
@@ -0,0 +1,67 @@
local t = require('luatest')
local replica_set = require('luatest.replica_set')
local server = require('luatest.server')

local g = t.group('gh-8505-synchro-triggers')

g.before_all(function(g)
g.replica_set = replica_set:new({})
g.box_cfg = {
replication_timeout = 0.01,
replication = {
server.build_listen_uri('server1', g.replica_set.id),
server.build_listen_uri('server2', g.replica_set.id),
},
}

g.box_cfg.election_mode = 'voter'
g.server2 = g.replica_set:build_and_add_server({
alias = 'server2', box_cfg = g.box_cfg
})

g.box_cfg.election_mode = 'candidate'
g.server1 = g.replica_set:build_and_add_server({
alias = 'server1', box_cfg = g.box_cfg
})

g.replica_set:start()
g.server1:wait_for_election_leader()
g.server1:exec(function()
box.schema.create_space('test', {is_sync = true}):create_index('pk')
end)
g.server2:wait_for_vclock_of(g.server1)
end)

g.after_all(function(g)
g.replica_set:drop()
end)

g.test_on_commit_trigger = function(g)
g.server1:exec(function()
box.begin()
box.on_commit(function(iter) iter() end)
box.space.test:upsert({1}, {{'=', 1, 1}})
box.commit()
end)
end

g.test_on_rollback_trigger = function(g)
-- Force ACK gathering to fail and cause rollback
local cfg = g.box_cfg
cfg.replication_synchro_timeout = 1e-9
cfg.election_mode = 'candidate'

g.server1:update_box_cfg(cfg)
g.server1:wait_for_election_leader()

g.server1:exec(function()
box.begin()
box.on_rollback(function(iter) iter() end)
box.space.test:upsert({1}, {{'=', 1, 1}})
local _, err = pcall(box.commit)
t.assert_equals(err.code, box.error.SYNC_QUORUM_TIMEOUT)
end)

g.server1:update_box_cfg(g.box_cfg)
g.server1:wait_for_election_leader()
end

0 comments on commit 6ff0b0c

Please sign in to comment.