diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index b02a0b4eeaef..81fb28a51444 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -1403,6 +1403,17 @@ memtx_tx_story_full_unlink_on_space_delete(struct memtx_story *story) } } +/** + * Find a top story in chain of @a story by index @a ind. + */ +static struct memtx_story * +memtx_tx_story_find_top(struct memtx_story *story, uint32_t ind) +{ + while (story->link[ind].newer_story != NULL) + story = story->link[ind].newer_story; + return story; +} + /** * Unlink @a story from all chains and remove corresponding tuple from * indexes if necessary: used in garbage collection step and preserves the top @@ -2166,25 +2177,6 @@ memtx_tx_history_remove_story_del_stmts(struct memtx_story *story) } } -/* - * Push story down the history chain to the level of prepared stories in each - * index. - */ -static void -memtx_tx_history_sink_story(struct memtx_story *story) -{ - for (uint32_t i = 0; i < story->index_count; ) { - struct memtx_story *old_story = story->link[i].older_story; - if (old_story == NULL || old_story->add_psn != 0 || - old_story->add_stmt == NULL) { - /* Old story is absent or prepared or committed. */ - i++; /* Go to the next index. */ - continue; - } - memtx_tx_story_reorder(story, old_story, i); - } -} - /* * Rollback addition of story by statement. */ @@ -2280,6 +2272,41 @@ memtx_tx_history_remove_stmt(struct txn_stmt *stmt) stmt->engine_savepoint = NULL; } +/** + * Abort or send to read view readers of @a story, except the transaction + * @a writer that is actually deletes the story. + */ +static void +memtx_tx_handle_conflict_story_readers(struct memtx_story *story, + struct txn *writer) +{ + struct tx_read_tracker *tracker, *tmp; + rlist_foreach_entry_safe(tracker, &story->reader_list, + in_reader_list, tmp) { + if (tracker->reader == writer) + continue; + memtx_tx_handle_conflict(writer, tracker->reader); + } +} + +/** + * Abort or send to read view readers of @a story, except the transaction + * @a writer that is actually deletes the story. + */ +static void +memtx_tx_handle_conflict_gap_readers(struct memtx_story *top_story, + uint32_t ind, struct txn *writer) +{ + assert(top_story->link[ind].newer_story == NULL); + struct gap_item_base *item, *tmp; + rlist_foreach_entry_safe(item, &top_story->link[ind].read_gaps, + in_read_gaps, tmp) { + if (item->txn == writer || item->type != GAP_INPLACE) + continue; + memtx_tx_handle_conflict(writer, item->txn); + } +} + /** * Helper of memtx_tx_history_prepare_stmt. Do the job in case when * stmt->add_story != NULL, that is REPLACE, INSERT, UPDATE etc. @@ -2287,7 +2314,8 @@ memtx_tx_history_remove_stmt(struct txn_stmt *stmt) static void memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt) { - assert(stmt->add_story != NULL); + struct memtx_story *story = stmt->add_story; + assert(story != NULL); /** * History of a key in an index can consist of several stories. * The list of stories is started with a dirty tuple that is in index. @@ -2306,154 +2334,163 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt) * If a statement becomes prepared, the story it adds must be 'sunk' to * the level of prepared stories. */ - struct memtx_story *story = stmt->add_story; - uint32_t index_count = story->index_count; - memtx_tx_history_sink_story(story); + for (uint32_t i = 0; i < story->index_count; ) { + struct memtx_story *old_story = story->link[i].older_story; + if (old_story == NULL || old_story->add_psn != 0 || + old_story->add_stmt == NULL) { + /* Old story is absent or prepared or committed. */ + i++; /* Go to the next index. */ + continue; + } + memtx_tx_story_reorder(story, old_story, i); + } + /* Consistency asserts. */ + { + assert(story->del_stmt == NULL || + story->del_stmt->next_in_del_list == NULL); + struct memtx_story *old_story = story->link[0].older_story; + if (stmt->del_story == NULL) + assert(old_story == NULL || old_story->del_psn != 0); + else + assert(old_story == stmt->del_story); + (void)old_story; + } + + /* + * Set newer (in-progress) statements in the primary chain to delete + * this story. + */ if (stmt->del_story == NULL) { /* * This statement replaced nothing. That means that before * this preparation there was no visible tuple in index, and * now there is. - * There also can be some in-progress transactions that think - * that the replaced nothing. Some of them must be aborted - * (for example inserts, that must replace nothing), other - * must be told that they replace this tuple now. + * There could be some in-progress transactions that think + * that the replaced nothing. They must be told that they + * replace this tuple now. */ - struct memtx_story_link *link = &story->link[0]; - while (link->newer_story != NULL) { - struct memtx_story *test = link->newer_story; - link = &test->link[0]; - struct txn_stmt *test_stmt = test->add_stmt; - if (test_stmt->txn == stmt->txn) - continue; - if (test_stmt->is_own_change && - test_stmt->del_story == NULL) - continue; - if (test_stmt->del_story != NULL) { - assert(test_stmt->del_story->add_stmt->txn - == test_stmt->txn); + struct memtx_story *test_story; + for (test_story = story->link[0].newer_story; + test_story != NULL; + test_story = test_story->link[0].newer_story) { + struct txn_stmt *test_stmt = test_story->add_stmt; + if (test_stmt->is_own_change) continue; - } - + assert(test_stmt->txn != stmt->txn); + assert(test_stmt->del_story == NULL); + assert(test_stmt->txn->psn == 0); memtx_tx_story_link_deleted_by(story, test_stmt); } - /* Now link is in top of chain, where gap records are stored. */ - struct gap_item_base *item_base, *tmp; - rlist_foreach_entry_safe(item_base, &link->read_gaps, - in_read_gaps, tmp) { - if (item_base->txn == stmt->txn || - item_base->type != GAP_INPLACE) - continue; - memtx_tx_handle_conflict(stmt->txn, item_base->txn); - } - } - - struct memtx_story *old_story = story->link[0].older_story; - if (stmt->del_story == NULL) - assert(old_story == NULL || old_story->del_psn != 0); - else - assert(old_story != NULL && stmt->del_story == old_story); - if (old_story != NULL && old_story->del_psn != 0) { - assert(stmt->del_story == NULL); - old_story = NULL; - } - if (old_story != NULL) { + } else { /* - * There can be some transactions that want to delete old_story. - * It can be this transaction. - * All other transactions must be aborted or relinked to delete - * this tuple. + * This statement replaced older story. That means that before + * this preparation there was another visible tuple in this + * place of index. + * There could be some in-progress transactions that think + * they deleted or replaced that another tuple. They must be + * told that they replace this tuple now. */ - struct txn_stmt **from = &old_story->del_stmt; - struct txn_stmt **to = &story->del_stmt; - while (*to != NULL) - to = &((*to)->next_in_del_list); + struct txn_stmt **from = &stmt->del_story->del_stmt; while (*from != NULL) { struct txn_stmt *test_stmt = *from; - assert(test_stmt->del_story == old_story); - if (test_stmt->txn == stmt->txn) { - assert(test_stmt == stmt || - test_stmt->add_story == NULL); - /* - * Statement from the same transaction. Go to - * the next statement. - */ + assert(test_stmt->del_story == stmt->del_story); + if (test_stmt == stmt) { + /* Leave this statement, go to the next. */ from = &test_stmt->next_in_del_list; continue; } + assert(test_stmt->txn != stmt->txn); assert(test_stmt->txn->psn == 0); - /* Unlink from old list in any case. */ + + /* Unlink from old story list. */ *from = test_stmt->next_in_del_list; test_stmt->next_in_del_list = NULL; test_stmt->del_story = NULL; /* Link to story's list. */ - test_stmt->del_story = story; - *to = test_stmt; - to = &test_stmt->next_in_del_list; + memtx_tx_story_link_deleted_by(story, test_stmt); } } + /* Handle main conflicts. */ + if (stmt->del_story != NULL) { + /* + * The story stmt->del_story ends by now. Every TX that + * depend on it must go to read view or be aborted. + */ + memtx_tx_handle_conflict_story_readers(stmt->del_story, + stmt->txn); + } else { + /* + * A tuple is inserted. Every TX that depends on absence of + * a tuple (in any index) must go to read view or be aborted. + * We check only primary index here, we will check all other + * indexes below. + */ + struct memtx_story *top_story = + memtx_tx_story_find_top(story, 0); + memtx_tx_handle_conflict_gap_readers(top_story, 0, stmt->txn); + } + + /* Handle conflicts in the secondary indexes. */ for (uint32_t i = 1; i < story->index_count; i++) { - struct memtx_story_link *link = &story->link[i]; - while (link->newer_story != NULL) { - struct memtx_story *test = link->newer_story; - link = &test->link[i]; - struct txn_stmt *test_stmt = test->add_stmt; + /* + * Handle secondary cross-write conflict. This case is too + * complicated and deserves an explanation with example. + * Imagine a space with primary index (pk) by the first field + * and secondary index (sk) by the second field. + * Imagine then three in-progress transactions that executes + * replaces {1, 1, 1}, {2, 1, 2} and {1, 1, 3} correspondingly. + * What must happen when the first transaction commits? + * Both other transactions intersect the current in the sk. + * But the second transaction with {2, 1, 2} must be aborted + * (or sent to read view) because of conflict: it now introduces + * duplicate insertion to the sk. + * On the other hand the third transactions with {1, 1, 3} has + * a right to live since it tends to overwrite {1, 1, 1} in + * both pk and sk. + * To handle those conflicts in general we must scan chains + * towards the top and check insert statements. + */ + struct memtx_story *newer_story = story; + while (newer_story->link[i].newer_story != NULL) { + newer_story = newer_story->link[i].newer_story; + struct txn_stmt *test_stmt = newer_story->add_stmt; + /* Don't conflict own changes. */ if (test_stmt->txn == stmt->txn) continue; + /* + * Ignore case when other TX executes insert after + * precedence delete. + */ if (test_stmt->is_own_change && test_stmt->del_story == NULL) continue; - if (test_stmt->del_story == story) - continue; - memtx_tx_handle_conflict(stmt->txn, test_stmt->txn); /* - * Note that it's a secondary index and there's no - * need to call memtx_tx_story_link_deleted_by since - * all is done by the primary story chain. + * Ignore the case when other TX overwrites in both + * primary and secondary index. */ - } - /* Now link is in top of chain, where gap records are stored. */ - struct gap_item_base *item_base, *tmp; - rlist_foreach_entry_safe(item_base, &link->read_gaps, - in_read_gaps, tmp) { - if (item_base->txn == stmt->txn || - item_base->type != GAP_INPLACE) - continue; - memtx_tx_handle_conflict(stmt->txn, item_base->txn); - } - } - - for (uint32_t i = 0; i < index_count; i++) { - old_story = story->link[i].older_story; - if (old_story == NULL) - continue; - - struct tx_read_tracker *tracker; - rlist_foreach_entry(tracker, &old_story->reader_list, - in_reader_list) { - if (tracker->reader == stmt->txn) + if (test_stmt->del_story == story) continue; - memtx_tx_handle_conflict(stmt->txn, tracker->reader); + memtx_tx_handle_conflict(stmt->txn, test_stmt->txn); } + /* + * We have already checked gap readers before for the case + * of insertion to the primary index. + * In any (replace or insert) case we must handle gap readers + * in the secondary indexes as well since in all kinds of + * statements can insert new value to secondary index. + * Note that newer_story is in top of chain due to previous + * manipulations. + */ + memtx_tx_handle_conflict_gap_readers(newer_story, i, stmt->txn); } - for (uint32_t i = 0; i < index_count; i++) { - for (struct memtx_story *read_story = story; - read_story != NULL; - read_story = read_story->link[i].newer_story) { - struct tx_read_tracker *tracker; - rlist_foreach_entry(tracker, &read_story->reader_list, - in_reader_list) { - if (tracker->reader == stmt->txn) - continue; - memtx_tx_handle_conflict(stmt->txn, - tracker->reader); - } - } - } + /* Finally set PSNs in stories to mark them add/delete as prepared. */ + stmt->add_story->add_psn = stmt->txn->psn; + if (stmt->del_story != NULL) + stmt->del_story->del_psn = stmt->txn->psn; } /** @@ -2466,55 +2503,59 @@ memtx_tx_history_prepare_delete_stmt(struct txn_stmt *stmt) assert(stmt->add_story == NULL); assert(stmt->del_story != NULL); - struct memtx_story *story = stmt->del_story; /* - * There can be some transactions that want to delete old_story. - * All other transactions must be aborted. + * There can be other transactions that want to delete old_story. + * Since the story ends, all of them must be unlinked from the story. */ - struct txn_stmt **itr = &story->del_stmt; - while (*itr != NULL) { - struct txn_stmt *test_stmt = *itr; - assert(test_stmt->del_story == story); - if (test_stmt->txn == stmt->txn) { - assert(test_stmt == stmt || - test_stmt->add_story == NULL); - /* - * Statement from the same transaction. Go to the next - * statement. - */ - itr = &test_stmt->next_in_del_list; + struct txn_stmt **from = &stmt->del_story->del_stmt; + while (*from != NULL) { + struct txn_stmt *test_stmt = *from; + assert(test_stmt->del_story == stmt->del_story); + if (test_stmt == stmt) { + /* Leave this statement, go to the next. */ + from = &test_stmt->next_in_del_list; continue; } + assert(test_stmt->txn != stmt->txn); + assert(test_stmt->del_story == stmt->del_story); assert(test_stmt->txn->psn == 0); - /* Unlink from old list in any case. */ - *itr = test_stmt->next_in_del_list; + + /* Unlink from old story list. */ + *from = test_stmt->next_in_del_list; test_stmt->next_in_del_list = NULL; test_stmt->del_story = NULL; } - struct tx_read_tracker *tracker; - rlist_foreach_entry(tracker, &story->reader_list, in_reader_list) { - if (tracker->reader == stmt->txn) - continue; - memtx_tx_handle_conflict(stmt->txn, tracker->reader); - } + /* + * The story stmt->del_story ends by now. Every TX that + * depend on it must go to read view or be aborted. + */ + memtx_tx_handle_conflict_story_readers(stmt->del_story, stmt->txn); + + /* Finally set PSN in story to mark its deletion as prepared. */ + stmt->del_story->del_psn = stmt->txn->psn; } void memtx_tx_history_prepare_stmt(struct txn_stmt *stmt) { assert(stmt->txn->psn != 0); + assert(stmt->space != NULL); + if (stmt->space->def->opts.is_ephemeral) + assert(stmt->add_story == NULL && stmt->del_story == NULL); - if (stmt->add_story != NULL) { + /* + * Note that both add_story and del_story can be NULL in cases: + * * The space is is_ephemeral. + * * It's an initial recovery. + * * It's a deletion from space by key that was not found in the space. + * In all these cases nothing must be done in MVCC engine. + */ + if (stmt->add_story != NULL) memtx_tx_history_prepare_insert_stmt(stmt); - } else if (stmt->del_story != NULL) { + else if (stmt->del_story != NULL) memtx_tx_history_prepare_delete_stmt(stmt); - } - if (stmt->add_story != NULL) - stmt->add_story->add_psn = stmt->txn->psn; - if (stmt->del_story != NULL) - stmt->del_story->del_psn = stmt->txn->psn; memtx_tx_story_gc(); }