Skip to content
Permalink
Browse files
MDEV-12396 IMPORT TABLESPACE: Simplify validation
fil_iterate(): Validate the pages directly.

import_page_status_t, PageConverter::validate(): Remove.

AbstractCallback::filename(): New accessor.

AbstractCallback::is_interrupted(): Replaces periodic_check().

PageConverter::trigger_corruption(): Remove.
  • Loading branch information
dr-m committed Mar 20, 2018
1 parent 6247c64 commit eaa7bfb
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 318 deletions.
@@ -421,6 +421,8 @@ class AbstractCallback
return(m_page_size);
}

const char* filename() const { return m_filepath; }

/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
@@ -437,6 +439,8 @@ class AbstractCallback
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;

bool is_interrupted() const { return trx_is_interrupted(m_trx); }

protected:
/**
Get the data page depending on the table type, compressed or not.
@@ -451,18 +455,6 @@ class AbstractCallback
return(buf_block_get_frame(block));
}

/** Check for session interrupt. If required we could
even flush to disk here every N pages.
@retval DB_SUCCESS or error code */
dberr_t periodic_check() UNIV_NOTHROW
{
if (trx_is_interrupted(m_trx)) {
return(DB_INTERRUPTED);
}

return(DB_SUCCESS);
}

/**
Get the physical offset of the extent descriptor within the page.
@param page_no - page number of the extent descriptor
@@ -732,11 +724,7 @@ FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
dberr_t err;

if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_interrupted()) return DB_INTERRUPTED;

const page_t* page = get_frame(block);

@@ -749,9 +737,9 @@ FetchIndexRootPages::operator() (
block->page.offset,
(ulint) (offset / m_page_size));

err = DB_CORRUPTION;
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
err = set_current_xdes(block->page.offset, page);
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
&& is_root_page(page)) {
@@ -776,7 +764,7 @@ FetchIndexRootPages::operator() (
}
}

return(err);
return DB_SUCCESS;
}

/**
@@ -900,14 +888,6 @@ class PageConverter : public AbstractCallback {
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private:

/** Status returned by PageConverter::validate() */
enum import_page_status_t {
IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
};

/**
Update the page, set the space id, max trx id and index id.
@param block - block read from file
@@ -917,17 +897,6 @@ class PageConverter : public AbstractCallback {
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;

#if defined UNIV_DEBUG
/**
@return true error condition is enabled. */
bool trigger_corruption() UNIV_NOTHROW
{
return(false);
}
#else
#define trigger_corruption() (false)
#endif /* UNIV_DEBUG */

/**
Update the space, index id, trx id.
@param block - block to convert
@@ -940,15 +909,6 @@ class PageConverter : public AbstractCallback {
@retval DB_SUCCESS or error code */
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;

/**
Validate the page, check for corruption.
@param offset - physical offset within file.
@param page - page read from file.
@return 0 on success, 1 if all zero, 2 if corrupted */
import_page_status_t validate(
os_offset_t offset,
buf_block_t* page) UNIV_NOTHROW;

/**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
@@ -2075,97 +2035,24 @@ PageConverter::update_page(
return(DB_CORRUPTION);
}

/**
Validate the page
@param offset - physical offset within file.
@param page - page read from file.
@return status */
PageConverter::import_page_status_t
PageConverter::validate(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
buf_frame_t* page = get_frame(block);

/* Check that the page number corresponds to the offset in
the file. Flag as corrupt if it doesn't. Disable the check
for LSN in buf_page_is_corrupted() */

if (buf_page_is_corrupted(false, page, get_zip_size(), NULL)
|| (page_get_page_no(page) != offset / m_page_size
&& page_get_page_no(page) != 0)) {

return(IMPORT_PAGE_STATUS_CORRUPTED);

} else if (offset > 0 && page_get_page_no(page) == 0) {
ulint checksum;

checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
if (checksum != 0) {
/* Checksum check passed in buf_page_is_corrupted(). */
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu checksum " ULINTPF
" should be zero.",
m_filepath, (ulong) (offset / m_page_size),
checksum);
}

const byte* b = page + FIL_PAGE_OFFSET;
const byte* e = page + m_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM;

/* If the page number is zero and offset > 0 then
the entire page MUST consist of zeroes. If not then
we flag it as corrupt. */

while (b != e) {

if (*b++ && !trigger_corruption()) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
}
}

/* The page is all zero: do nothing. */
return(IMPORT_PAGE_STATUS_ALL_ZERO);
}

return(IMPORT_PAGE_STATUS_OK);
}

/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{
ulint page_type;
dberr_t err = DB_SUCCESS;

if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}

if (is_compressed_table()) {
m_page_zip_ptr = &block->page.zip;
} else {
ut_ad(m_page_zip_ptr == 0);
}

switch(validate(offset, block)) {
case IMPORT_PAGE_STATUS_OK:

/* We have to decompress the compressed pages before
we can work on them */

if ((err = update_page(block, page_type)) != DB_SUCCESS) {
break;
}

dberr_t err = update_page(block, page_type);
if (err == DB_SUCCESS) {
/* Note: For compressed pages this function will write to the
zip descriptor and for uncompressed pages it will write to
page (ie. the block->frame). Therefore the caller should write
@@ -2187,23 +2074,9 @@ PageConverter::operator() (
get_frame(block), get_zip_size(),
m_current_lsn);
}

break;

case IMPORT_PAGE_STATUS_ALL_ZERO:
/* The page is all zero: leave it as is. */
break;

case IMPORT_PAGE_STATUS_CORRUPTED:

ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset " UINT64PF " looks corrupted.",
m_filepath, (ulong) (offset / m_page_size), offset);

err = DB_CORRUPTION;
}

/* If we already had and old page with matching number
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
@@ -3519,8 +3392,6 @@ fil_iterate(
AbstractCallback& callback)
{
os_offset_t offset;
ulint page_no = 0;
ulint space_id = callback.get_space_id();
ulint n_bytes = iter.n_io_buffers * iter.page_size;

ut_ad(!srv_read_only_mode);
@@ -3531,6 +3402,9 @@ fil_iterate(
const bool row_compressed = callback.get_zip_size() > 0;

for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) {
return DB_INTERRUPTED;
}

byte* io_buffer = iter.io_buffer;

@@ -3572,30 +3446,45 @@ fil_iterate(
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
const ulint size = iter.page_size;
block->page.offset = page_off / size;

for (ulint i = 0; i < n_pages_read; ++i) {
ulint size = iter.page_size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;

ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);

const bool page_compressed
= page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);

/* Proceed to the next page,
because this one is all zero. */
continue;
}

if (page_no != page_off / size) {
goto page_corrupted;
}

/* If tablespace is encrypted, we need to decrypt
the page. Note that tablespaces are not in
fil_system during import. */
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data,
dst, //dst
iter.page_size,
src, // src
&err);
iter.crypt_data, dst,
iter.page_size, src, &err);

if (err != DB_SUCCESS) {
return err;
@@ -3619,10 +3508,20 @@ fil_iterate(
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
} else if (buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}

buf_block_set_file_page(block, space_id, page_no++);

if ((err = callback(page_off, block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
@@ -3714,9 +3613,6 @@ fil_iterate(

updated = true;
}

page_off += iter.page_size;
block->frame += iter.page_size;
}

/* A page was updated in the set, write back to disk. */
@@ -3820,6 +3716,7 @@ fil_tablespace_iterate(

memset(&block, 0, sizeof block);
block.frame = page;
block.page.space = callback.get_space_id();
block.page.io_fix = BUF_IO_NONE;
block.page.buf_fix_count = 1;
block.page.state = BUF_BLOCK_FILE_PAGE;

0 comments on commit eaa7bfb

Please sign in to comment.