Skip to content

Commit

Permalink
MDEV-28457 Crash in page_dir_find_owner_slot()
Browse files Browse the repository at this point in the history
A prominent remaining source of crashes on corrupted index pages
is page directory corruption.

A frequent caller of page_dir_find_owner_slot() is page_rec_get_prev().
Some of those calls can be replaced with simpler logic that is less
prone to fail.

page_dir_find_owner_slot(),
page_rec_get_prev(), page_rec_get_prev_const(),
btr_pcur_move_to_prev(), btr_pcur_move_to_prev_on_page(),
btr_cur_upd_rec_sys(),
page_delete_rec_list_end(),
rtr_page_copy_rec_list_end_no_locks(),
rtr_page_copy_rec_list_start_no_locks(): Return an error code on failure.

fil_space_t::io(), buf_page_get_low(): Use DB_CORRUPTION for
out-of-bounds page reads.

PageBulk::getSplitRec(), PageBulk::copyOut(): Simplify the code.

btr_validate_level(): Prevent some more CHECK TABLE crashes on
corrupted pages.

btr_block_get(), btr_pcur_move_to_next_page(): Implement some checks that
were previously only part of IndexPurge::next().

IndexPurge::next(): Use btr_pcur_move_to_next_page().
  • Loading branch information
dr-m committed Jun 8, 2022
1 parent 892c426 commit 77b3959
Show file tree
Hide file tree
Showing 23 changed files with 391 additions and 338 deletions.
108 changes: 68 additions & 40 deletions storage/innobase/btr/btr0btr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ buf_block_t *btr_block_get(const dict_index_t &index,
{
if (!!page_is_comp(block->page.frame) != index.table->not_redundant() ||
btr_page_get_index_id(block->page.frame) != index.id ||
!fil_page_index_page_check(block->page.frame))
!fil_page_index_page_check(block->page.frame) ||
index.is_spatial() !=
(fil_page_get_type(block->page.frame) == FIL_PAGE_RTREE))
{
*err= DB_PAGE_CORRUPTED;
block= nullptr;
Expand Down Expand Up @@ -2716,11 +2718,9 @@ page_move_rec_list_end(

ut_ad(new_data_size >= old_data_size);

page_delete_rec_list_end(split_rec, block, index,
new_n_recs - old_n_recs,
new_data_size - old_data_size, mtr);

return DB_SUCCESS;
return page_delete_rec_list_end(split_rec, block, index,
new_n_recs - old_n_recs,
new_data_size - old_data_size, mtr);
}

/*************************************************************//**
Expand Down Expand Up @@ -2980,10 +2980,15 @@ btr_page_split_and_insert(

page_zip_copy_recs(new_block,
page_zip, page, cursor->index, mtr);
page_delete_rec_list_end(move_limit - page + new_page,
new_block, cursor->index,
ULINT_UNDEFINED,
ULINT_UNDEFINED, mtr);
*err = page_delete_rec_list_end(move_limit
- page + new_page,
new_block,
cursor->index,
ULINT_UNDEFINED,
ULINT_UNDEFINED, mtr);
if (*err != DB_SUCCESS) {
return nullptr;
}

/* Update the lock table and possible hash index. */
if (cursor->index->has_locking()) {
Expand Down Expand Up @@ -3045,10 +3050,13 @@ btr_page_split_and_insert(

/* Delete the records from the source page. */

page_delete_rec_list_end(move_limit, block,
cursor->index,
ULINT_UNDEFINED,
ULINT_UNDEFINED, mtr);
*err = page_delete_rec_list_end(move_limit, block,
cursor->index,
ULINT_UNDEFINED,
ULINT_UNDEFINED, mtr);
if (*err != DB_SUCCESS) {
return nullptr;
}
}

left_block = block;
Expand Down Expand Up @@ -4699,13 +4707,16 @@ btr_validate_level(
default:
err = e;
}
ut_a(index->table->space_id == block->page.id().space());
ut_a(block->page.id().space() == page_get_space_id(page));
ut_ad(index->table->space_id == block->page.id().space());
ut_ad(block->page.id().space() == page_get_space_id(page));
#ifdef UNIV_ZIP_DEBUG
page_zip = buf_block_get_page_zip(block);
ut_a(!page_zip || page_zip_validate(page_zip, page, index));
#endif /* UNIV_ZIP_DEBUG */
ut_a(!page_is_leaf(page));
if (page_is_leaf(page)) {
err = DB_CORRUPTION;
goto invalid_page;
}

page_cur_set_before_first(block, &cursor);
page_cur_move_to_next(&cursor);
Expand Down Expand Up @@ -4833,7 +4844,11 @@ btr_validate_level(
err = DB_CORRUPTION;
}

rec = page_rec_get_prev(page_get_supremum_rec(page));
if (!(rec = page_rec_get_prev(page_get_supremum_rec(page)))) {
btr_validate_report1(index, level, block);
fputs("InnoDB: broken record links\n", stderr);
goto invalid_page;
}
right_rec = page_rec_get_next(page_get_infimum_rec(
right_page));
offsets = rec_get_offsets(rec, index, offsets,
Expand All @@ -4857,10 +4872,12 @@ btr_validate_level(
fputs("InnoDB: records in wrong order"
" on adjacent pages\n", stderr);

fputs("InnoDB: record ", stderr);
rec = page_rec_get_prev(page_get_supremum_rec(page));
rec_print(stderr, rec, index);
putc('\n', stderr);
if (rec) {
fputs("InnoDB: record ", stderr);
rec_print(stderr, rec, index);
putc('\n', stderr);
}
fputs("InnoDB: record ", stderr);
rec = page_rec_get_next(
page_get_infimum_rec(right_page));
Expand Down Expand Up @@ -4905,15 +4922,17 @@ btr_validate_level(
rightmost_child = page_rec_is_supremum(
page_rec_get_next(node_ptr));

btr_cur_position(
index,
page_rec_get_prev(page_get_supremum_rec(page)),
block, &node_cur);
rec = page_rec_get_prev(page_get_supremum_rec(page));
if (rec) {
btr_cur_position(index, rec, block, &node_cur);

offsets = btr_page_get_father_node_ptr_for_validate(
offsets = btr_page_get_father_node_ptr_for_validate(
offsets, heap, &node_cur, &mtr);
} else {
offsets = nullptr;
}

if (node_ptr != btr_cur_get_rec(&node_cur)
if (!offsets || node_ptr != btr_cur_get_rec(&node_cur)
|| btr_node_ptr_get_child_page_no(node_ptr, offsets)
!= block->page.id().page_no()) {

Expand All @@ -4925,14 +4944,17 @@ btr_validate_level(
fputs("InnoDB: node ptr ", stderr);
rec_print(stderr, node_ptr, index);

rec = btr_cur_get_rec(&node_cur);
fprintf(stderr, "\n"
"InnoDB: node ptr child page n:o %u\n",
btr_node_ptr_get_child_page_no(rec, offsets));
if (offsets) {
rec = btr_cur_get_rec(&node_cur);
fprintf(stderr, "\n"
"InnoDB: node ptr child page n:o %u\n",
btr_node_ptr_get_child_page_no(
rec, offsets));
fputs("InnoDB: record on page ", stderr);
rec_print_new(stderr, rec, offsets);
putc('\n', stderr);
}

fputs("InnoDB: record on page ", stderr);
rec_print_new(stderr, rec, offsets);
putc('\n', stderr);
err = DB_CORRUPTION;
goto node_ptr_fails;
}
Expand Down Expand Up @@ -4963,15 +4985,21 @@ btr_validate_level(
}

if (left_page_no == FIL_NULL) {
ut_a(node_ptr == page_rec_get_next(
page_get_infimum_rec(father_page)));
ut_a(!page_has_prev(father_page));
if (page_has_prev(father_page)
|| node_ptr != page_rec_get_next(
page_get_infimum_rec(father_page))) {
err = DB_CORRUPTION;
goto node_ptr_fails;
}
}

if (right_page_no == FIL_NULL) {
ut_a(node_ptr == page_rec_get_prev(
page_get_supremum_rec(father_page)));
ut_a(!page_has_next(father_page));
if (page_has_next(father_page)
|| node_ptr != page_rec_get_prev(
page_get_supremum_rec(father_page))) {
err = DB_CORRUPTION;
goto node_ptr_fails;
}
} else {
const rec_t* right_node_ptr;

Expand Down
35 changes: 20 additions & 15 deletions storage/innobase/btr/btr0bulk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ PageBulk::getSplitRec()
< total_used_size / 2);

/* Keep at least one record on left page */
if (page_rec_is_infimum(page_rec_get_prev(rec))) {
if (page_rec_is_second(rec, m_page)) {
rec = page_rec_get_next(rec);
ut_ad(page_rec_is_user_rec(rec));
}
Expand Down Expand Up @@ -679,35 +679,40 @@ void
PageBulk::copyOut(
rec_t* split_rec)
{
rec_t* rec;
rec_t* last_rec;
ulint n;

/* Suppose before copyOut, we have 5 records on the page:
infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
after copyOut, we have 2 records on the page:
infimum->r1->r2->supremum. slot ajustment is not done. */

rec = page_rec_get_next(page_get_infimum_rec(m_page));
last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
n = 0;
rec_t *rec = page_get_infimum_rec(m_page);
ulint n;

while (rec != split_rec) {
rec = page_rec_get_next(rec);
n++;
for (n = 0;; n++) {
rec_t *next = page_rec_get_next(rec);
if (next == split_rec) {
break;
}
rec = next;
}

ut_ad(n > 0);

const rec_t *last_rec = split_rec;
for (;;) {
const rec_t *next = page_rec_get_next_const(last_rec);
if (page_rec_is_supremum(next)) {
break;
}
last_rec = next;
}

/* Set last record's next in page */
rec_offs* offsets = NULL;
rec = page_rec_get_prev(split_rec);
const ulint n_core = page_rec_is_leaf(split_rec)
? m_index->n_core_fields : 0;

offsets = rec_get_offsets(rec, m_index, offsets, n_core,
ULINT_UNDEFINED, &m_heap);
rec_offs* offsets = rec_get_offsets(rec, m_index, nullptr, n_core,
ULINT_UNDEFINED, &m_heap);
mach_write_to_2(rec - REC_NEXT, m_is_comp
? static_cast<uint16_t>
(PAGE_NEW_SUPREMUM - page_offset(rec))
Expand Down
Loading

0 comments on commit 77b3959

Please sign in to comment.