Skip to content

Commit

Permalink
memtx: replace conflict trackers with read trackers
Browse files Browse the repository at this point in the history
Conflict trackers are used to store information that if some
transaction is committed then some another transaction must be
aborted. This happens when the first transaction writes some
key while the other reads the same key. On the other hand there
are another trackers - read trackers - that are designed to
handle exactly the same situation. That's why conflict trackers
can be simply replaced with read trackers.

That would allow to remove conflict trackers as not needed
anymore.

Part of tarantool#8648
Part of tarantool#8654

NO_DOC=refactoring
NO_TEST=refactoring
NO_CHANGELOG=refactoring
  • Loading branch information
alyapunov committed May 16, 2023
1 parent f5539a4 commit 2b1c028
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 131 deletions.
124 changes: 11 additions & 113 deletions src/box/memtx_tx.c
Expand Up @@ -157,21 +157,6 @@ memtx_tx_story_key_hash(const struct tuple *a)
#define MH_SOURCE
#include "salad/mhash.h"

/**
* Record that links two transactions, breaker and victim.
* See memtx_tx_cause_conflict for details.
*/
struct tx_conflict_tracker {
/** TX that aborts victim on commit. */
struct txn *breaker;
/** TX that will be aborted on breaker's commit. */
struct txn *victim;
/** Link in breaker->conflict_list. */
struct rlist in_conflict_list;
/** Link in victim->conflicted_by_list. */
struct rlist in_conflicted_by_list;
};

/**
* Record that links transaction and a story that the transaction have read.
*/
Expand Down Expand Up @@ -438,10 +423,6 @@ memtx_tx_region_alloc_object(struct txn *txn,
enum memtx_tx_alloc_type alloc_type =
memtx_tx_region_object_to_type(alloc_obj);
switch (alloc_obj) {
case MEMTX_TX_OBJECT_CONFLICT_TRACKER:
alloc = region_alloc_object(&txn->region,
struct tx_conflict_tracker, &size);
break;
case MEMTX_TX_OBJECT_READ_TRACKER:
alloc = region_alloc_object(&txn->region,
struct tx_read_tracker, &size);
Expand Down Expand Up @@ -649,67 +630,6 @@ memtx_tx_abort_all_for_ddl(struct txn *ddl_owner)
}
}

/**
* Notify TX manager that if transaction @a breaker is committed then the
* transaction @a victim must be aborted due to conflict. It is achieved
* by adding corresponding entry (of tx_conflict_tracker type) to @a breaker
* conflict list. In case there's already such entry, then move it to the head
* of the list in order to optimize next invocations of this function.
* For example: there's two rw transaction in progress, one have read
* some value while the second is about to overwrite it. If the second
* is committed first, the first must be aborted.
*
* NB: can trigger story garbage collection.
*
* @return 0 on success, -1 on memory error.
*/
static int
memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
{
assert(breaker != victim);
struct tx_conflict_tracker *tracker = NULL;
struct rlist *r1 = breaker->conflict_list.next;
struct rlist *r2 = victim->conflicted_by_list.next;
while (r1 != &breaker->conflict_list &&
r2 != &victim->conflicted_by_list) {
tracker = rlist_entry(r1, struct tx_conflict_tracker,
in_conflict_list);
assert(tracker->breaker == breaker);
if (tracker->victim == victim)
break;
tracker = rlist_entry(r2, struct tx_conflict_tracker,
in_conflicted_by_list);
assert(tracker->victim == victim);
if (tracker->breaker == breaker)
break;
tracker = NULL;
r1 = r1->next;
r2 = r2->next;
}
if (tracker != NULL) {
/*
* Move to the beginning of a list
* for a case of subsequent lookups.
*/
rlist_del(&tracker->in_conflict_list);
rlist_del(&tracker->in_conflicted_by_list);
} else {
tracker =
memtx_tx_region_alloc_object(
victim, MEMTX_TX_OBJECT_CONFLICT_TRACKER);
if (tracker == NULL) {
diag_set(OutOfMemory, sizeof(*tracker), "tx region",
"conflict_tracker");
return -1;
}
tracker->breaker = breaker;
tracker->victim = victim;
}
rlist_add(&breaker->conflict_list, &tracker->in_conflict_list);
rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
return 0;
}

/**
* Fix position of @a txn in global read view list to preserve the list to
* be ordered by rv_psn. Can only move txn to the beginning of the list.
Expand Down Expand Up @@ -1740,25 +1660,30 @@ point_hole_storage_find(struct index *index, struct tuple *tuple)
return *mh_point_holes_node(txm.point_holes, pos);
}

static int
memtx_tx_track_read_story(struct txn *txn, struct space *space,
struct memtx_story *story, uint64_t index_mask);

/**
* Check for possible conflict relations during insertion of @a new tuple,
* and given that it was a real insertion, not the replacement of existing
* tuple. It's the moment where we can search for stored point hole trackers
* and find conflict causes.
*/
static int
check_hole(struct space *space, uint32_t index,
struct tuple *new_tuple, struct txn *inserter)
check_hole(struct space *space, struct memtx_story *story,
struct tuple *new_tuple, uint32_t ind)
{
struct point_hole_item *list =
point_hole_storage_find(space->index[index], new_tuple);
point_hole_storage_find(space->index[ind], new_tuple);
if (list == NULL)
return 0;

struct point_hole_item *item = list;
uint64_t index_mask = 1ull << (ind & 63);
do {
if (inserter != item->txn &&
memtx_tx_cause_conflict(inserter, item->txn) != 0)
if (memtx_tx_track_read_story(item->txn, space, story,
index_mask) != 0)
return -1;
item = rlist_entry(item->ring.next,
struct point_hole_item, ring);
Expand Down Expand Up @@ -1852,10 +1777,6 @@ static struct gap_item *
memtx_tx_gap_item_new(struct txn *txn, enum iterator_type type,
const char *key, uint32_t part_count);

static int
memtx_tx_track_read_story(struct txn *txn, struct space *space,
struct memtx_story *story, uint64_t index_mask);

/**
* Handle insertion to a new place in index. There can be readers which
* have read from this gap and thus must be sent to read view or conflicted.
Expand Down Expand Up @@ -2031,7 +1952,7 @@ memtx_tx_history_add_insert_stmt(struct txn_stmt *stmt,

/* Collect point phantom read conflicts. */
for (uint32_t i = 0; i < space->index_count; i++) {
if (check_hole(space, i, new_tuple, stmt->txn) != 0)
if (check_hole(space, add_story, new_tuple, i) != 0)
goto fail;
}

Expand Down Expand Up @@ -2560,15 +2481,6 @@ memtx_tx_clear_txn_read_lists(struct txn *txn);
void
memtx_tx_prepare_finalize(struct txn *txn)
{
struct tx_conflict_tracker *entry, *next;
/* Handle conflicts. */
rlist_foreach_entry_safe(entry, &txn->conflict_list,
in_conflict_list, next) {
assert(entry->breaker == txn);
memtx_tx_handle_conflict(txn, entry->victim);
rlist_del(&entry->in_conflict_list);
rlist_del(&entry->in_conflicted_by_list);
}
/* Just free all other lists - we don't need 'em anymore. */
memtx_tx_clear_txn_read_lists(txn);
}
Expand Down Expand Up @@ -3204,20 +3116,6 @@ memtx_tx_clear_txn_read_lists(struct txn *txn)
}
assert(rlist_empty(&txn->read_set));

struct tx_conflict_tracker *entry, *next;
rlist_foreach_entry_safe(entry, &txn->conflict_list,
in_conflict_list, next) {
rlist_del(&entry->in_conflict_list);
rlist_del(&entry->in_conflicted_by_list);
}
rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
in_conflicted_by_list, next) {
rlist_del(&entry->in_conflict_list);
rlist_del(&entry->in_conflicted_by_list);
}
assert(rlist_empty(&txn->conflict_list));
assert(rlist_empty(&txn->conflicted_by_list));

rlist_del(&txn->in_read_view_txs);
}

Expand Down
3 changes: 2 additions & 1 deletion src/box/memtx_tx.h
Expand Up @@ -66,7 +66,8 @@ enum memtx_tx_alloc_object {
*/
MEMTX_TX_OBJECT_CONFLICT = 0,
/**
* Object of type struct tx_conflict_tracker.
* Deprecated object of type struct tx_conflict_tracker.
* TODO: remove it considering monitoring module.
*/
MEMTX_TX_OBJECT_CONFLICT_TRACKER = 1,
/**
Expand Down
4 changes: 0 additions & 4 deletions src/box/txn.c
Expand Up @@ -413,8 +413,6 @@ txn_new(void)
rlist_create(&txn->point_holes_list);
rlist_create(&txn->gap_list);
rlist_create(&txn->full_scan_list);
rlist_create(&txn->conflict_list);
rlist_create(&txn->conflicted_by_list);
rlist_create(&txn->in_read_view_txs);
rlist_create(&txn->in_all_txs);
txn->space_on_replace_triggers_depth = 0;
Expand Down Expand Up @@ -476,8 +474,6 @@ txn_begin(void)
struct txn *txn = txn_new();
if (txn == NULL)
return NULL;
assert(rlist_empty(&txn->conflict_list));
assert(rlist_empty(&txn->conflicted_by_list));

/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
Expand Down
12 changes: 0 additions & 12 deletions src/box/txn.h
Expand Up @@ -503,18 +503,6 @@ struct txn {
struct rlist on_commit, on_rollback, on_wal_write;
/** List of savepoints to find savepoint by name. */
struct rlist savepoints;
/**
* List of tx_conflict_tracker records where .breaker is the current
* transaction and .victim is the transactions that must be aborted
* if the current transaction is committed.
*/
struct rlist conflict_list;
/**
* List of tx_conflict_tracker records where .victim is the current
* transaction and .breaker is the transactions that, if committed,
* will abort the current transaction.
*/
struct rlist conflicted_by_list;
/**
* Link in tx_manager::read_view_txs.
*/
Expand Down
Expand Up @@ -365,7 +365,7 @@ g.test_conflict = function()
g.server:eval('tx1("s:get(1)")')
g.server:eval('tx2("s:replace{1, 2}")')
g.server:eval("box.internal.memtx_tx_gc(10)")
local trackers_used = SIZE_OF_CONFLICT_TRACKER + SIZE_OF_POINT_TRACKER
local trackers_used = SIZE_OF_READ_TRACKER + SIZE_OF_POINT_TRACKER
local diff = {
["txn"] = {
["statements"] = {
Expand Down

0 comments on commit 2b1c028

Please sign in to comment.