Skip to content

Commit

Permalink
memtx: fix story delete statement list
Browse files Browse the repository at this point in the history
Current implementation of tracking statements that delete a story has a
flaw, consider the following example:

tx1('box.space.s:replace{0, 0}') -- statement 1

tx2('box.space.s:replace{0, 1}') -- statement 2
tx2('box.space.s:delete{0}') -- statement 3
tx2('box.space.s:replace{0, 2}') -- statement 4

When statement 1 is prepared, both statements 2 and 4 will be linked to the
delete statement list of {0, 0}'s story, though, apparently, statement 4
does not delete {0, 0}.

Let us notice the following: statement 4 is "pure" in the sense that, in
the transaction's scope, it is guaranteed not to replace any tuple — we
can retrieve this information when we check where the insert statement
violates replacement rules, use it to determine "pure" insert statements,
and skip them later on when, during preparation of insert statements, we
handle other insert statements which assume they do not replace anything
(i.e., have no visible old tuple).

We also need to fix relinking of delete statements from the older story
(in terms of the history chain) to the new one during preparation of insert
statements: a statement needs to be relinked iff it comes from a different
transaction (to be precise, there must, actually, be no more than one
delete statement from the same transaction).

Additionally, added assertions to verify the invariant that the story's
add (delete) psn is equal to the psn of the add (delete) statement's
transaction psn and to verify that no self-conflicting of transactions
occurs.

Closes tarantool#7214
Closes tarantool#7217

NO_DOC=bugfix
  • Loading branch information
CuriousGeorgiy committed May 30, 2022
1 parent 0eac13b commit f333b72
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 20 deletions.
4 changes: 4 additions & 0 deletions changelogs/unreleased/gh-7214-fix-story-del-stmt-list.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## bugfix/memtx

* Fixed repeatable replace with memtx transaction manager enabled which
triggered assertion (gh-7214).
76 changes: 56 additions & 20 deletions src/box/memtx_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ memtx_tx_abort_all_for_ddl(struct txn *ddl_owner)
int
memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
{
assert(breaker != victim);
struct tx_conflict_tracker *tracker = NULL;
struct rlist *r1 = breaker->conflict_list.next;
struct rlist *r2 = victim->conflicted_by_list.next;
Expand Down Expand Up @@ -620,7 +621,9 @@ memtx_tx_adjust_position_in_read_view_list(struct txn *txn)
void
memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
{
assert(breaker != victim);
assert(breaker->psn != 0);
assert(victim->psn == 0);
if (victim->status != TXN_INPROGRESS &&
victim->status != TXN_IN_READ_VIEW) {
/* Was conflicted by somebody else. */
Expand Down Expand Up @@ -857,7 +860,12 @@ memtx_tx_story_get(struct tuple *tuple)

mh_int_t pos = mh_history_find(txm.history, tuple, 0);
assert(pos != mh_end(txm.history));
return *mh_history_node(txm.history, pos);
struct memtx_story *story = *mh_history_node(txm.history, pos);
if (story->add_stmt != 0)
assert(story->add_psn == story->add_stmt->txn->psn);
if (story->del_stmt != 0)
assert(story->del_psn == story->del_stmt->txn->psn);
return story;
}

/**
Expand Down Expand Up @@ -1405,6 +1413,7 @@ static int
memtx_tx_save_conflict(struct txn *breaker, struct txn *victim,
struct memtx_tx_conflict **conflicts_head)
{
assert(breaker != victim);
struct memtx_tx_conflict *next_conflict;
next_conflict = memtx_tx_region_alloc_object(breaker,
MEMTX_TX_OBJECT_CONFLICT);
Expand All @@ -1424,17 +1433,20 @@ memtx_tx_save_conflict(struct txn *breaker, struct txn *victim,
* Scan a history starting with @a story in @a index for a @a visible_tuple.
* If @a is_prepared_ok is true that prepared statements are visible for
* that lookup, and not visible otherwise.
*
* `is_own_change` is set to true iff `old_tuple` was modified (either
* added or deleted) by `stmt`'s transaction.
*/
static void
memtx_tx_story_find_visible_tuple(struct memtx_story *story, struct txn *tnx,
uint32_t index, bool is_prepared_ok,
struct tuple **visible_tuple)
struct tuple **visible_tuple,
bool *is_own_change)
{
for (; story != NULL; story = story->link[index].older_story) {
assert(index < story->index_count);
bool unused;
if (memtx_tx_story_is_visible(story, tnx, is_prepared_ok,
visible_tuple, &unused))
visible_tuple, is_own_change))
return;
}
*visible_tuple = NULL;
Expand Down Expand Up @@ -1499,7 +1511,8 @@ check_hole(struct space *space, uint32_t index,

struct point_hole_item *item = list;
do {
if (memtx_tx_save_conflict(inserter, item->txn,
if (inserter != item->txn &&
memtx_tx_save_conflict(inserter, item->txn,
collected_conflicts) != 0)
return -1;
item = rlist_entry(item->ring.next,
Expand All @@ -1515,12 +1528,16 @@ check_hole(struct space *space, uint32_t index,
* (!) Version for the case when replaced tuple in the primary index is
* either NULL or clean.
* @return 0 on success or -1 on fail.
*
* `is_own_change` is set to true iff `old_tuple` was modified (either
* added or deleted) by `stmt`'s transaction.
*/
static int
check_dup_clean(struct txn_stmt *stmt, struct tuple *new_tuple,
struct tuple **replaced, struct tuple **old_tuple,
enum dup_replace_mode mode,
struct memtx_tx_conflict **collected_conflicts)
struct memtx_tx_conflict **collected_conflicts,
bool *is_own_change)
{
assert(replaced[0] == NULL || !replaced[0]->is_dirty);
struct space *space = stmt->space;
Expand Down Expand Up @@ -1567,7 +1584,8 @@ check_dup_clean(struct txn_stmt *stmt, struct tuple *new_tuple,
struct memtx_story *second_story = memtx_tx_story_get(replaced[i]);
struct tuple *check_visible;
memtx_tx_story_find_visible_tuple(second_story, txn, i,
true, &check_visible);
true, &check_visible,
is_own_change);

if (memtx_tx_check_dup(new_tuple, replaced[0], check_visible,
DUP_INSERT, space->index[i], space) != 0) {
Expand All @@ -1589,12 +1607,16 @@ check_dup_clean(struct txn_stmt *stmt, struct tuple *new_tuple,
* replace rules. See memtx_space_replace_all_keys comment.
* (!) Version for the case when replaced tuple is dirty.
* @return 0 on success or -1 on fail.
*
* `is_own_change` is set to true iff `old_tuple` was modified (either
* added or deleted) by `stmt`'s transaction.
*/
static int
check_dup_dirty(struct txn_stmt *stmt, struct tuple *new_tuple,
struct tuple **replaced, struct tuple **old_tuple,
enum dup_replace_mode mode,
struct memtx_tx_conflict **collected_conflicts)
struct memtx_tx_conflict **collected_conflicts,
bool *is_own_change)
{
assert(replaced[0] != NULL && replaced[0]->is_dirty);
struct space *space = stmt->space;
Expand All @@ -1603,7 +1625,7 @@ check_dup_dirty(struct txn_stmt *stmt, struct tuple *new_tuple,
struct memtx_story *old_story = memtx_tx_story_get(replaced[0]);
struct tuple *visible_replaced;
memtx_tx_story_find_visible_tuple(old_story, txn, 0, true,
&visible_replaced);
&visible_replaced, is_own_change);

if (memtx_tx_check_dup(new_tuple, *old_tuple, visible_replaced,
mode, space->index[0], space) != 0) {
Expand Down Expand Up @@ -1641,9 +1663,9 @@ check_dup_dirty(struct txn_stmt *stmt, struct tuple *new_tuple,

struct memtx_story *second_story = memtx_tx_story_get(replaced[i]);
struct tuple *check_visible;

bool unused;
memtx_tx_story_find_visible_tuple(second_story, txn, i, true,
&check_visible);
&check_visible, &unused);

if (memtx_tx_check_dup(new_tuple, visible_replaced,
check_visible, DUP_INSERT,
Expand All @@ -1666,22 +1688,26 @@ check_dup_dirty(struct txn_stmt *stmt, struct tuple *new_tuple,
* replace rules. See memtx_space_replace_all_keys comment.
* Call check_dup_clean or check_dup_dirty depending on situation.
* @return 0 on success or -1 on fail.
*
* `is_own_change` is set to true iff `old_tuple` was modified (either
* added or deleted) by `stmt`'s transaction.
*/
static int
check_dup_common(struct txn_stmt *stmt, struct tuple *new_tuple,
struct tuple **directly_replaced, struct tuple **old_tuple,
enum dup_replace_mode mode,
struct memtx_tx_conflict **collected_conflicts)
struct memtx_tx_conflict **collected_conflicts,
bool *is_own_change)
{
struct tuple *replaced = directly_replaced[0];
if (replaced == NULL || !replaced->is_dirty)
return check_dup_clean(stmt, new_tuple, directly_replaced,
old_tuple, mode,
collected_conflicts);
collected_conflicts, is_own_change);
else
return check_dup_dirty(stmt, new_tuple, directly_replaced,
old_tuple, mode,
collected_conflicts);
collected_conflicts, is_own_change);
}

static struct gap_item *
Expand Down Expand Up @@ -1829,9 +1855,10 @@ memtx_tx_history_add_insert_stmt(struct txn_stmt *stmt,
struct tuple *replaced = directly_replaced[0];

/* Check overwritten tuple */
bool is_own_change;
int rc = check_dup_common(stmt, new_tuple, directly_replaced,
&old_tuple, mode,
&collected_conflicts);
&collected_conflicts, &is_own_change);
if (rc != 0)
goto fail;

Expand Down Expand Up @@ -1901,7 +1928,8 @@ memtx_tx_history_add_insert_stmt(struct txn_stmt *stmt,
else
del_story = memtx_tx_story_get(old_tuple);
memtx_tx_story_link_deleted_by(del_story, stmt);
}
} else if (is_own_change)
stmt->is_pure_insert = true;

if (new_tuple != NULL) {
/*
Expand Down Expand Up @@ -2119,7 +2147,9 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
struct txn_stmt *test_stmt = test->add_stmt;
if (test_stmt->txn == stmt->txn)
continue;
if (test_stmt->del_story != stmt->del_story) {
if (test_stmt->is_pure_insert)
continue;
if (test_stmt->del_story != NULL) {
assert(test_stmt->del_story->add_stmt->txn
== test_stmt->txn);
continue;
Expand All @@ -2146,11 +2176,12 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
while (*from != NULL) {
struct txn_stmt *test_stmt = *from;
assert(test_stmt->del_story == old_story);
if (test_stmt == stmt || test_stmt->txn->psn != 0) {
if (test_stmt->txn == stmt->txn) {
/* This or prepared. Go to the next stmt. */
from = &test_stmt->next_in_del_list;
continue;
}
assert(test_stmt->txn->psn == 0);
/* Unlink from old list in any case. */
*from = test_stmt->next_in_del_list;
test_stmt->next_in_del_list = NULL;
Expand Down Expand Up @@ -2246,11 +2277,12 @@ memtx_tx_history_prepare_delete_stmt(struct txn_stmt *stmt)
while (*itr != NULL) {
struct txn_stmt *test_stmt = *itr;
assert(test_stmt->del_story == story);
if (test_stmt == stmt && test_stmt->txn->psn != 0) {
if (test_stmt->txn == stmt->txn) {
/* This statement. Go to the next stmt. */
itr = &test_stmt->next_in_del_list;
continue;
}
assert(test_stmt->txn->psn == 0);
/* Unlink from old list in any case. */
*itr = test_stmt->next_in_del_list;
test_stmt->next_in_del_list = NULL;
Expand Down Expand Up @@ -2320,6 +2352,7 @@ memtx_tx_tuple_clarify_impl(struct txn *txn, struct space *space,
break;
if (story->add_psn != 0 && story->add_stmt != NULL &&
txn != NULL) {
assert(story->add_psn == story->add_stmt->txn->psn);
/*
* If we skip prepared story then the transaction
* must be before prepared in serialization order.
Expand All @@ -2332,6 +2365,7 @@ memtx_tx_tuple_clarify_impl(struct txn *txn, struct space *space,
story = story->link[index->dense_id].older_story;
}
if (story->del_psn != 0 && story->del_stmt != NULL && txn != NULL) {
assert(story->del_psn == story->del_stmt->txn->psn);
/*
* If we see a tuple that is deleted by prepared transaction
* then the transaction must be before prepared in serialization
Expand Down Expand Up @@ -2416,8 +2450,10 @@ memtx_tx_index_invisible_count_slow(struct txn *txn,

struct tuple *visible = NULL;
bool is_prepared_ok = detect_whether_prepared_ok(txn);
bool unused;
memtx_tx_story_find_visible_tuple(story, txn, index->dense_id,
is_prepared_ok, &visible);
is_prepared_ok, &visible,
&unused);
if (visible == NULL)
res++;
}
Expand Down
1 change: 1 addition & 0 deletions src/box/txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ txn_stmt_new(struct txn *txn)
stmt->row = NULL;
stmt->has_triggers = false;
stmt->does_require_old_tuple = false;
stmt->is_pure_insert = false;
return stmt;
}

Expand Down
8 changes: 8 additions & 0 deletions src/box/txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ struct txn_stmt {
* old_tuple to be NULL.
*/
bool does_require_old_tuple;
/*
* `insert` statement is guaranteed not to delete anything
* from the transaction's point of view (i.e., there was a preceding
* `delete` in the scope of the same transaction): no linking to the
* list of `delete` statements is required during preparation of insert
* statements that add preceding stories.
*/
bool is_pure_insert;
/**
* Request type - IPROTO type code
*/
Expand Down
86 changes: 86 additions & 0 deletions test/box-luatest/gh_7214_fix_story_del_stmt_list_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
local server = require('test.luatest_helpers.server')
local t = require('luatest')

local g = t.group()

g.before_all = function()
g.server = server:new{
alias = 'dflt',
box_cfg = {memtx_use_mvcc_engine = true}
}
g.server:start()
g.server:exec(function()
local s = box.schema.create_space('s')
s:create_index('pk')
end)
end

g.after_all = function()
g.server:drop()
end

g.test_repeatable_replace = function()
g.server:exec(function()
local t = require('luatest')
local txn_proxy = require('test.box.lua.txn_proxy')

local tx1 = txn_proxy:new()
tx1('box.begin()')

local tx2 = txn_proxy:new()
tx2('box.begin()')

tx1('box.space.s:replace{0, 0}')

tx2('box.space.s:replace{0, 1}')
tx2('box.space.s:delete{0}')
tx2('box.space.s:replace{0, 2}')

tx1('box.commit()')
t.assert_equals(tx2:commit(), "")
end)
end

g.test_repeatable_insert = function()
g.server:exec(function()
local t = require('luatest')
local txn_proxy = require('test.box.lua.txn_proxy')

local tx1 = txn_proxy:new()
tx1('box.begin()')

local tx2 = txn_proxy:new()
tx2('box.begin()')

tx1('box.space.s:replace{0, 0}')

tx2('box.space.s:replace{0, 1}')
tx2('box.space.s:delete{0}')
tx2('box.space.s:insert{0, 2}')

tx1('box.commit()')
t.assert_equals(tx2:commit(), "")
end)
end

g.test_repeatable_upsert = function()
g.server:exec(function()
local t = require('luatest')
local txn_proxy = require('test.box.lua.txn_proxy')

local tx1 = txn_proxy:new()
tx1('box.begin()')

local tx2 = txn_proxy:new()
tx2('box.begin()')

tx1('box.space.s:replace{0, 0}')

tx2('box.space.s:replace{0, 1}')
tx2('box.space.s:delete{0}')
tx2('box.space.s:upsert({0, 2}, {{"=", 2, 2}})')

tx1('box.commit()')
t.assert_equals(tx2:commit(), "")
end)
end

0 comments on commit f333b72

Please sign in to comment.