diff --git a/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt b/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt index 6426bac41a07c..35527d0c5175e 100644 --- a/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt +++ b/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt @@ -1,4 +1,5 @@ --loose-innodb-buffer-pool-stats --loose-innodb-buffer-page --loose-innodb-buffer-page-lru +--innodb-log-buffer-size=2m --innodb-defragment=1 \ No newline at end of file diff --git a/storage/innobase/btr/btr0btr.cc b/storage/innobase/btr/btr0btr.cc index 42b621516392c..48aeebb37b45f 100644 --- a/storage/innobase/btr/btr0btr.cc +++ b/storage/innobase/btr/btr0btr.cc @@ -4013,9 +4013,8 @@ btr_discard_only_page_on_level( DBUG_ASSERT(index->table->instant); DBUG_ASSERT(rec_is_alter_metadata(rec, *index)); btr_set_instant(block, *index, mtr); - rec = page_cur_insert_rec_low( - &cur, - index, rec, offsets, mtr); + rec = page_cur_insert_rec_low(&cur, index, rec, + offsets, mtr); ut_ad(rec); mem_heap_free(heap); } else if (index->is_instant()) { diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc index 5dd01eabd1ea1..03d90ade4f4a0 100644 --- a/storage/innobase/btr/btr0bulk.cc +++ b/storage/innobase/btr/btr0bulk.cc @@ -202,16 +202,22 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets) static_cast(next_rec - insert_rec)); mach_write_to_2(m_cur_rec - REC_NEXT, static_cast(insert_rec - m_cur_rec)); - rec_set_n_owned_new(insert_rec, NULL, 0); - rec_set_heap_no_new(insert_rec, - PAGE_HEAP_NO_USER_LOW + m_rec_no); + rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, + PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_NEW_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); } else { mach_write_to_2(insert_rec - REC_NEXT, mach_read_from_2(m_cur_rec - REC_NEXT)); mach_write_to_2(m_cur_rec - REC_NEXT, page_offset(insert_rec)); - rec_set_n_owned_old(insert_rec, 0); - rec_set_heap_no_old(insert_rec, - PAGE_HEAP_NO_USER_LOW + m_rec_no); + rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, + PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_OLD_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); } /* 4. Set member variables. */ @@ -282,8 +288,9 @@ inline void PageBulk::finishPage() { slot-= PAGE_DIR_SLOT_SIZE; mach_write_to_2(slot, offset); - rec_set_n_owned_new(m_page + offset, nullptr, count); - count = 0; + rec_set_bit_field_1(m_page + offset, count, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + count= 0; } uint16_t next= (mach_read_from_2(m_page + offset - REC_NEXT) + offset) & @@ -308,8 +315,8 @@ inline void PageBulk::finishPage() { slot-= PAGE_DIR_SLOT_SIZE; mach_write_to_2(slot, page_offset(insert_rec)); - rec_set_n_owned_old(insert_rec, count); - + rec_set_bit_field_1(insert_rec, count, REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); count= 0; } @@ -328,13 +335,26 @@ inline void PageBulk::finishPage() count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; - page_dir_slot_set_n_owned(slot, nullptr, 0); + rec_t *rec= const_cast(page_dir_slot_get_rec(slot)); + rec_set_bit_field_1(rec, 0, m_is_comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); slot+= PAGE_DIR_SLOT_SIZE; } slot-= PAGE_DIR_SLOT_SIZE; - page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page)); - page_dir_slot_set_n_owned(slot, nullptr, count + 1); + + if (m_is_comp) + { + mach_write_to_2(slot, PAGE_NEW_SUPREMUM); + rec_set_bit_field_1(m_page + PAGE_NEW_SUPREMUM, count + 1, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + } + else + { + mach_write_to_2(slot, PAGE_OLD_SUPREMUM); + rec_set_bit_field_1(m_page + PAGE_OLD_SUPREMUM, count + 1, REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + } ut_ad(!dict_index_is_spatial(m_index)); ut_ad(!page_get_instant(m_page)); diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 7107862248690..e879768432671 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -5535,13 +5535,27 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict) ut_ad(buf_pool->n_pend_reads > 0); buf_pool->n_pend_reads--; buf_pool->stat.n_pages_read++; + ut_ad(!uncompressed || !bpage->zip.data + || !recv_recovery_is_on() + || buf_page_can_relocate(bpage)); + mutex_exit(block_mutex); if (uncompressed) { +#if 1 /* MDEV-12353 FIXME: Remove this! */ + if (UNIV_LIKELY_NULL(bpage->zip.data) + && recv_recovery_is_on()) { + rw_lock_x_unlock_gen( + &reinterpret_cast(bpage) + ->lock, BUF_IO_READ); + if (!buf_LRU_free_page(bpage, false)) { + ut_ad(!"could not remove"); + } + goto func_exit; + } +#endif rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock, BUF_IO_READ); } - - mutex_exit(block_mutex); } else { /* Write means a flush operation: call the completion routine in the flush system */ @@ -5575,9 +5589,8 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict) DBUG_PRINT("ib_buf", ("%s page %u:%u", io_type == BUF_IO_READ ? "read" : "wrote", bpage->id.space(), bpage->id.page_no())); - +func_exit: mutex_exit(&buf_pool->mutex); - return DB_SUCCESS; } diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index e28eb66d13ce4..8f59d0c566835 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -1765,7 +1765,10 @@ buf_LRU_block_remove_hashed( case FIL_PAGE_INDEX: case FIL_PAGE_RTREE: #if defined UNIV_ZIP_DEBUG && defined BTR_CUR_HASH_ADAPT - ut_a(page_zip_validate( + /* During recovery, we only update the + compressed page, not the uncompressed one. */ + ut_a(recv_recovery_is_on() + || page_zip_validate( &bpage->zip, page, ((buf_block_t*) bpage)->index)); #endif /* UNIV_ZIP_DEBUG && BTR_CUR_HASH_ADAPT */ diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h index fd1f38538bbc3..3ed924847cf23 100644 --- a/storage/innobase/include/page0cur.h +++ b/storage/innobase/include/page0cur.h @@ -209,22 +209,6 @@ page_cur_insert_rec_zip( offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */ mtr_t* mtr) /*!< in/out: mini-transaction */ MY_ATTRIBUTE((nonnull, warn_unused_result)); -/*************************************************************//** -Copies records from page to a newly created page, from a given record onward, -including that record. Infimum and supremum records are not copied. - -IMPORTANT: The caller will have to update IBUF_BITMAP_FREE -if this is a compressed leaf page in a secondary index. -This has to be done either within the same mini-transaction, -or by invoking ibuf_reset_free_bits() before mtr_commit(). */ -ATTRIBUTE_COLD /* only used when crash-upgrading */ -void -page_copy_rec_list_end_to_created_page( -/*===================================*/ - page_t* new_page, /*!< in/out: index page to copy to */ - rec_t* rec, /*!< in: first record to copy */ - dict_index_t* index, /*!< in: record descriptor */ - mtr_t* mtr); /*!< in: mtr */ /***********************************************************//** Deletes a record at the page cursor. The cursor is moved to the next record after the deleted one. */ @@ -363,6 +347,7 @@ page_parse_copy_rec_list_to_created_page( /***********************************************************//** Parses log record of a record delete on a page. @return pointer to record end or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_cur_parse_delete_rec( /*======================*/ diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h index 7831195f2135d..0ed91cb8b1b64 100644 --- a/storage/innobase/include/page0page.h +++ b/storage/innobase/include/page0page.h @@ -406,6 +406,38 @@ inline trx_id_t page_get_max_trx_id(const page_t *page) return mach_read_from_8(p); } +/** +Set the number of owned records. +@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well +@param[in,out] block index page +@param[in,out] rec ROW_FORMAT=REDUNDANT record +@param[in] n_owned number of records skipped in the sparse page directory +@param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED +@param[in,out] mtr mini-transaction */ +template +inline void page_rec_set_n_owned(buf_block_t *block, rec_t *rec, ulint n_owned, + bool comp, mtr_t *mtr) +{ + ut_ad(block->frame == page_align(rec)); + ut_ad(comp == (page_is_comp(block->frame) != 0)); + + if (page_zip_des_t *page_zip= compressed + ? buf_block_get_page_zip(block) : nullptr) + { + ut_ad(comp); + rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + if (rec_get_status(rec) != REC_STATUS_SUPREMUM) + page_zip_rec_set_owned(block, rec, n_owned, mtr); + } + else + { + rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED; + mtr->write<1,mtr_t::OPT>(*block, rec, (*rec & ~REC_N_OWNED_MASK) | + (n_owned << REC_N_OWNED_SHIFT)); + } +} + /*************************************************************//** Sets the max trx id field value. */ void @@ -620,17 +652,6 @@ uint16_t page_dir_get_n_slots( /*=================*/ const page_t* page); /*!< in: index page */ -/*************************************************************//** -Sets the number of dir slots in directory. */ -UNIV_INLINE -void -page_dir_set_n_slots( -/*=================*/ - page_t* page, /*!< in/out: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint n_slots);/*!< in: number of slots */ - /** Gets the pointer to a directory slot. @param n sparse directory slot number @return pointer to the sparse directory slot */ @@ -664,14 +685,6 @@ inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) return page_dir_slot_get_rec(const_cast(slot)); } /***************************************************************//** -This is used to set the record offset in a directory slot. */ -UNIV_INLINE -void -page_dir_slot_set_rec( -/*==================*/ - page_dir_slot_t*slot, /*!< in: directory slot */ - const rec_t* rec); /*!< in: record on the page */ -/***************************************************************//** Gets the number of records owned by a directory slot. @return number of records */ UNIV_INLINE @@ -679,15 +692,6 @@ ulint page_dir_slot_get_n_owned( /*======================*/ const page_dir_slot_t* slot); /*!< in: page directory slot */ -/***************************************************************//** -This is used to set the owned records field of a directory slot. */ -UNIV_INLINE -void -page_dir_slot_set_n_owned( -/*======================*/ - page_dir_slot_t*slot, /*!< in/out: directory slot */ - page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - ulint n); /*!< in: number of records owned by the slot */ /************************************************************//** Calculates the space reserved for directory slots of a given number of records. The exact value is a fraction number @@ -1138,6 +1142,7 @@ page_move_rec_list_start( /**********************************************************//** Parses a log record of a record list end or start deletion. @return end of log record or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_parse_delete_rec_list( /*=======================*/ diff --git a/storage/innobase/include/page0page.ic b/storage/innobase/include/page0page.ic index eaf44600b6030..563de443f7627 100644 --- a/storage/innobase/include/page0page.ic +++ b/storage/innobase/include/page0page.ic @@ -419,19 +419,6 @@ page_dir_get_n_slots( { return(page_header_get_field(page, PAGE_N_DIR_SLOTS)); } -/*************************************************************//** -Sets the number of dir slots in directory. */ -UNIV_INLINE -void -page_dir_set_n_slots( -/*=================*/ - page_t* page, /*!< in/out: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint n_slots)/*!< in: number of slots */ -{ - page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots); -} /*************************************************************//** Gets the number of records in the heap. @@ -487,20 +474,6 @@ page_rec_check( return(TRUE); } -/***************************************************************//** -This is used to set the record offset in a directory slot. */ -UNIV_INLINE -void -page_dir_slot_set_rec( -/*==================*/ - page_dir_slot_t*slot, /*!< in: directory slot */ - const rec_t* rec) /*!< in: record on the page */ -{ - ut_ad(page_rec_check(rec)); - - mach_write_to_2(slot, page_offset(rec)); -} - /***************************************************************//** Gets the number of records owned by a directory slot. @return number of records */ @@ -518,25 +491,6 @@ page_dir_slot_get_n_owned( } } -/***************************************************************//** -This is used to set the owned records field of a directory slot. */ -UNIV_INLINE -void -page_dir_slot_set_n_owned( -/*======================*/ - page_dir_slot_t*slot, /*!< in/out: directory slot */ - page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - ulint n) /*!< in: number of records owned by the slot */ -{ - rec_t* rec = (rec_t*) page_dir_slot_get_rec(slot); - if (page_rec_is_comp(slot)) { - rec_set_n_owned_new(rec, page_zip, n); - } else { - ut_ad(!page_zip); - rec_set_n_owned_old(rec, n); - } -} - /************************************************************//** Calculates the space reserved for directory slots of a given number of records. The exact value is a fraction number n * PAGE_DIR_SLOT_SIZE / diff --git a/storage/innobase/include/page0types.h b/storage/innobase/include/page0types.h index 37e2eb2d36a34..6c5a681f3b560 100644 --- a/storage/innobase/include/page0types.h +++ b/storage/innobase/include/page0types.h @@ -40,6 +40,8 @@ typedef byte page_t; #ifndef UNIV_INNOCHECKSUM /** Index page cursor */ struct page_cur_t; +/** Buffer pool block */ +struct buf_block_t; /** Compressed index page */ typedef byte page_zip_t; @@ -150,9 +152,10 @@ must already have been written on the uncompressed page. */ void page_zip_rec_set_owned( /*===================*/ - page_zip_des_t* page_zip,/*!< in/out: compressed page */ + buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */ const byte* rec, /*!< in: record on the uncompressed page */ - ulint flag) /*!< in: the owned flag (nonzero=TRUE) */ + ulint flag, /*!< in: the owned flag (nonzero=TRUE) */ + mtr_t* mtr) /*!< in/out: mini-transaction */ MY_ATTRIBUTE((nonnull)); #endif /* !UNIV_INNOCHECKSUM */ #endif diff --git a/storage/innobase/include/page0zip.h b/storage/innobase/include/page0zip.h index 16dac663527c7..4da4e74099457 100644 --- a/storage/innobase/include/page0zip.h +++ b/storage/innobase/include/page0zip.h @@ -360,9 +360,10 @@ must already have been written on the uncompressed page. */ void page_zip_rec_set_owned( /*===================*/ - page_zip_des_t* page_zip,/*!< in/out: compressed page */ + buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */ const byte* rec, /*!< in: record on the uncompressed page */ - ulint flag) /*!< in: the owned flag (nonzero=TRUE) */ + ulint flag, /*!< in: the owned flag (nonzero=TRUE) */ + mtr_t* mtr) /*!< in/out: mini-transaction */ MY_ATTRIBUTE((nonnull)); /**********************************************************************//** @@ -385,9 +386,10 @@ page_zip_dir_delete( byte* rec, /*!< in: deleted record */ const dict_index_t* index, /*!< in: index of rec */ const offset_t* offsets, /*!< in: rec_get_offsets(rec) */ - const byte* free) /*!< in: previous start of + const byte* free, /*!< in: previous start of the free list */ - MY_ATTRIBUTE((nonnull(1,2,3,4))); + mtr_t* mtr) /*!< in/out: mini-transaction */ + MY_ATTRIBUTE((nonnull(1,2,3,4,6))); /**********************************************************************//** Add a slot to the dense page directory. */ diff --git a/storage/innobase/include/rem0rec.h b/storage/innobase/include/rem0rec.h index 75a830bef242d..d8483fe2803c2 100644 --- a/storage/innobase/include/rem0rec.h +++ b/storage/innobase/include/rem0rec.h @@ -241,15 +241,6 @@ rec_get_n_owned_old( const rec_t* rec) /*!< in: old-style physical record */ MY_ATTRIBUTE((warn_unused_result)); /******************************************************//** -The following function is used to set the number of owned records. */ -UNIV_INLINE -void -rec_set_n_owned_old( -/*================*/ - rec_t* rec, /*!< in: old-style physical record */ - ulint n_owned) /*!< in: the number of owned */ - MY_ATTRIBUTE((nonnull)); -/******************************************************//** The following function is used to get the number of records owned by the previous directory record. @return number of owned records */ @@ -260,16 +251,6 @@ rec_get_n_owned_new( const rec_t* rec) /*!< in: new-style physical record */ MY_ATTRIBUTE((warn_unused_result)); /******************************************************//** -The following function is used to set the number of owned records. */ -UNIV_INLINE -void -rec_set_n_owned_new( -/*================*/ - rec_t* rec, /*!< in/out: new-style physical record */ - page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - ulint n_owned)/*!< in: the number of owned */ - MY_ATTRIBUTE((nonnull(1))); -/******************************************************//** The following function is used to retrieve the info bits of a record. @return info bits */ @@ -418,16 +399,6 @@ rec_get_heap_no_old( const rec_t* rec) /*!< in: physical record */ MY_ATTRIBUTE((warn_unused_result)); /******************************************************//** -The following function is used to set the heap number -field in an old-style record. */ -UNIV_INLINE -void -rec_set_heap_no_old( -/*================*/ - rec_t* rec, /*!< in: physical record */ - ulint heap_no)/*!< in: the heap number */ - MY_ATTRIBUTE((nonnull)); -/******************************************************//** The following function is used to get the order number of a new-style record in the heap of the index page. @return heap order number */ @@ -438,16 +409,6 @@ rec_get_heap_no_new( const rec_t* rec) /*!< in: physical record */ MY_ATTRIBUTE((warn_unused_result)); /******************************************************//** -The following function is used to set the heap number -field in a new-style record. */ -UNIV_INLINE -void -rec_set_heap_no_new( -/*================*/ - rec_t* rec, /*!< in/out: physical record */ - ulint heap_no)/*!< in: the heap number */ - MY_ATTRIBUTE((nonnull)); -/******************************************************//** The following function is used to test whether the data offsets in the record are stored in one-byte or two-byte format. @return TRUE if 1-byte form */ diff --git a/storage/innobase/include/rem0rec.ic b/storage/innobase/include/rem0rec.ic index a7d75236c3235..4405ad0abb7c7 100644 --- a/storage/innobase/include/rem0rec.ic +++ b/storage/innobase/include/rem0rec.ic @@ -503,19 +503,6 @@ rec_get_n_owned_old( REC_N_OWNED_MASK, REC_N_OWNED_SHIFT)); } -/******************************************************//** -The following function is used to set the number of owned records. */ -UNIV_INLINE -void -rec_set_n_owned_old( -/*================*/ - rec_t* rec, /*!< in: old-style physical record */ - ulint n_owned) /*!< in: the number of owned */ -{ - rec_set_bit_field_1(rec, n_owned, REC_OLD_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); -} - /******************************************************//** The following function is used to get the number of records owned by the previous directory record. @@ -530,23 +517,6 @@ rec_get_n_owned_new( REC_N_OWNED_MASK, REC_N_OWNED_SHIFT)); } -/******************************************************//** -The following function is used to set the number of owned records. */ -UNIV_INLINE -void -rec_set_n_owned_new( -/*================*/ - rec_t* rec, /*!< in/out: new-style physical record */ - page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - ulint n_owned)/*!< in: the number of owned */ -{ - rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - if (page_zip && rec_get_status(rec) != REC_STATUS_SUPREMUM) { - page_zip_rec_set_owned(page_zip, rec, n_owned); - } -} - /******************************************************//** The following function is used to retrieve the info bits of a record. @return info bits */ @@ -674,20 +644,6 @@ rec_get_heap_no_old( REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT)); } -/******************************************************//** -The following function is used to set the heap number -field in an old-style record. */ -UNIV_INLINE -void -rec_set_heap_no_old( -/*================*/ - rec_t* rec, /*!< in: physical record */ - ulint heap_no)/*!< in: the heap number */ -{ - rec_set_bit_field_2(rec, heap_no, REC_OLD_HEAP_NO, - REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); -} - /******************************************************//** The following function is used to get the order number of a new-style record in the heap of the index page. @@ -702,20 +658,6 @@ rec_get_heap_no_new( REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT)); } -/******************************************************//** -The following function is used to set the heap number -field in a new-style record. */ -UNIV_INLINE -void -rec_set_heap_no_new( -/*================*/ - rec_t* rec, /*!< in/out: physical record */ - ulint heap_no)/*!< in: the heap number */ -{ - rec_set_bit_field_2(rec, heap_no, REC_NEW_HEAP_NO, - REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); -} - /******************************************************//** The following function is used to test whether the data offsets in the record are stored in one-byte or two-byte format. diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index 9fa1b669463a6..d671948fdd108 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -313,7 +313,6 @@ class mlog_init_t void mark_ibuf_exist(mtr_t& mtr) { ut_ad(mutex_own(&recv_sys.mutex)); - ut_ad(!recv_no_ibuf_operations); mtr.start(); for (const map::value_type& i : inits) { @@ -324,6 +323,21 @@ class mlog_init_t i.first, 0, RW_X_LATCH, NULL, BUF_GET_IF_IN_POOL, __FILE__, __LINE__, &mtr)) { + if (UNIV_LIKELY_NULL(block->page.zip.data) + && fil_page_type_is_index( + fil_page_get_type( + block->page.zip.data)) + && !page_zip_decompress(&block->page.zip, + block->frame, + true)) { + ib::error() << "corrupted page " + << block->page.id; + } + if (recv_no_ibuf_operations) { + mtr.commit(); + mtr.start(); + continue; + } mutex_exit(&recv_sys.mutex); block->page.ibuf_exist = ibuf_page_exists( block->page); @@ -1570,6 +1584,13 @@ recv_parse_or_apply_log_rec_body( } break; case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT: + if (!page_zip) { + } else if (!page_zip_decompress(page_zip, page, true)) { + ib::error() << "corrupted page " << block->page.id; + } else { + ut_d(page_type = fil_page_get_type(page)); + } + ut_ad(!page || fil_page_type_is_index(page_type)); if (NULL != (ptr = mlog_parse_index( @@ -1603,6 +1624,13 @@ recv_parse_or_apply_log_rec_body( page, page_zip, mtr); break; case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE: + if (!page_zip) { + } else if (!page_zip_decompress(page_zip, page, true)) { + ib::error() << "corrupted page " << block->page.id; + } else { + ut_d(page_type = fil_page_get_type(page)); + } + ut_ad(!page || fil_page_type_is_index(page_type)); if (NULL != (ptr = mlog_parse_index( @@ -2113,22 +2141,10 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, memcpy_aligned<8>(FIL_PAGE_LSN + page_zip->data, FIL_PAGE_LSN + page, 8); - if (fil_page_index_page_check(page) - && !page_zip_decompress(page_zip, page, - true)) { - ib::error() << "corrupted page " - << block->page.id; - } } } } -#ifdef UNIV_ZIP_DEBUG - ut_ad(!fil_page_index_page_check(page) - || !page_zip - || page_zip_validate_low(page_zip, page, NULL, FALSE)); -#endif /* UNIV_ZIP_DEBUG */ - if (start_lsn) { buf_block_modify_clock_inc(block); log_flush_order_mutex_enter(); @@ -2479,7 +2495,7 @@ void recv_apply_hashed_log_recs(bool last_batch) log_mutex_enter(); mutex_enter(&(recv_sys.mutex)); mlog_init.reset(); - } else if (!recv_no_ibuf_operations) { + } else { /* We skipped this in buf_page_create(). */ mlog_init.mark_ibuf_exist(mtr); } diff --git a/storage/innobase/mtr/mtr0log.cc b/storage/innobase/mtr/mtr0log.cc index 9c8de6dac14d4..b040442b30652 100644 --- a/storage/innobase/mtr/mtr0log.cc +++ b/storage/innobase/mtr/mtr0log.cc @@ -276,8 +276,6 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len) ut_ad(len); ut_ad(ofs <= ulint(srv_page_size)); ut_ad(ofs + len <= ulint(srv_page_size)); - ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data || - mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2); set_modified(); if (get_log_mode() != MTR_LOG_ALL) @@ -287,6 +285,9 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len) return; } + ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data || + mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2); + byte *l= get_log()->open(11 + 2 + 2); l= mlog_write_initial_log_record_low(MLOG_WRITE_STRING, b.page.id.space(), b.page.id.page_no(), l, this); diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 727ab63da3976..ab3f6e11bb8e0 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -966,6 +966,24 @@ page_cur_insert_rec_write_log( } } +static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact) +{ + rec_set_bit_field_2(rec, heap_no, + compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); +} + +static void rec_set_heap_no(const buf_block_t& block, rec_t *rec, + ulint heap_no, bool compact, mtr_t *mtr) +{ + rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO; + + // MDEV-12353 FIXME: try single-byte write if possible + mtr->write<2,mtr_t::OPT>(block, rec, + (mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) | + (heap_no << REC_HEAP_NO_SHIFT)); +} + /***********************************************************//** Parses a log record of a record insert on a page. @return end of log record or NULL */ @@ -1115,15 +1133,13 @@ page_cur_parse_insert_rec( memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index); memcpy(buf + mismatch_index, ptr, end_seg_len); + rec_set_heap_no(buf + origin_offset, PAGE_HEAP_NO_USER_LOW, + page_is_comp(page)); if (page_is_comp(page)) { - rec_set_heap_no_new(buf + origin_offset, - PAGE_HEAP_NO_USER_LOW); rec_set_info_and_status_bits(buf + origin_offset, info_and_status_bits); } else { - rec_set_heap_no_old(buf + origin_offset, - PAGE_HEAP_NO_USER_LOW); rec_set_info_bits_old(buf + origin_offset, info_and_status_bits); } @@ -1196,64 +1212,105 @@ page_direction_increment( 1U + page_header_get_field(page, PAGE_N_DIRECTION)); } -/** Split a directory slot which owns too many records. -@param[in,out] page index page -@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL -@param[in] s the slot that needs to be split */ -static void page_dir_split_slot(page_t* page, page_zip_des_t* page_zip, - ulint s) +/** +Set the owned records field of the record pointed to by a directory slot. +@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well +@param[in,out] block file page +@param[in] slot sparse directory slot +@param[in,out] n number of records owned by the directory slot +@param[in,out] mtr mini-transaction */ +template +static void page_dir_slot_set_n_owned(buf_block_t *block, + const page_dir_slot_t *slot, + ulint n, mtr_t *mtr) { - ut_ad(!page_zip || page_is_comp(page)); - ut_ad(s); - - page_dir_slot_t* slot = page_dir_get_nth_slot(page, s); - const ulint n_owned = PAGE_DIR_SLOT_MAX_N_OWNED + 1; - - ut_ad(page_dir_slot_get_n_owned(slot) == n_owned); - compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 - >= PAGE_DIR_SLOT_MIN_N_OWNED); - - /* 1. We loop to find a record approximately in the middle of the - records owned by the slot. */ - - const rec_t* rec = page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); - - for (ulint i = n_owned / 2; i--; ) { - rec = page_rec_get_next_const(rec); - } - - /* 2. Add a directory slot immediately below this one. */ - const ulint n_slots = page_dir_get_n_slots(page); - page_dir_set_n_slots(page, page_zip, n_slots + 1); - page_dir_slot_t* last_slot = page_dir_get_nth_slot(page, n_slots); - memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE, - slot - last_slot); - - /* 3. We store the appropriate values to the new slot. */ - - page_dir_slot_set_rec(slot, rec); - page_dir_slot_set_n_owned(slot, page_zip, n_owned / 2); + rec_t *rec= const_cast(page_dir_slot_get_rec(slot)); + page_rec_set_n_owned(block, rec, n, page_rec_is_comp(rec), mtr); +} - /* 4. Finally, we update the number of records field of the - original slot */ - page_dir_slot_set_n_owned(slot - PAGE_DIR_SLOT_SIZE, - page_zip, n_owned - (n_owned / 2)); +/** +Split a directory slot which owns too many records. +@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well +@param[in,out] block index page +@param[in] s the slot that needs to be split +@param[in,out] mtr mini-transaction */ +template +static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) +{ + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(!compressed || block->page.zip.data); + ut_ad(s); + + page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s); + const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1; + + ut_ad(page_dir_slot_get_n_owned(slot) == n_owned); + compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 + >= PAGE_DIR_SLOT_MIN_N_OWNED); + + /* 1. We loop to find a record approximately in the middle of the + records owned by the slot. */ + + const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE); + + for (ulint i= n_owned / 2; i--; ) + rec= page_rec_get_next_const(rec); + + /* Add a directory slot immediately below this one. */ + byte *n_slots_p= PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame; + const uint16_t n_slots= mach_read_from_2(n_slots_p); + + page_dir_slot_t *last_slot= static_cast + (block->frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) - + n_slots * PAGE_DIR_SLOT_SIZE); + memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE, + slot - last_slot); + + const ulint half_owned= n_owned / 2; + + if (compressed && UNIV_LIKELY_NULL(block->page.zip.data)) + { + /* Log changes to the compressed page header and the dense page + directory. */ + mach_write_to_2(n_slots_p, n_slots + 1); + page_zip_write_header(&block->page.zip, n_slots_p, 2, mtr); + mach_write_to_2(slot, page_offset(rec)); + page_rec_set_n_owned(block, page_dir_slot_get_rec(slot), half_owned, + true, mtr); + page_rec_set_n_owned(block, + page_dir_slot_get_rec(slot - + PAGE_DIR_SLOT_SIZE), + n_owned - half_owned, true, mtr); + } + else + { + mtr->write<2>(*block, n_slots_p, 1U + n_slots); + mtr->memcpy(*block, page_offset(last_slot), slot - last_slot); + mtr->write<2>(*block, slot, page_offset(rec)); + const bool comp= page_is_comp(block->frame) != 0; + page_rec_set_n_owned(block, page_dir_slot_get_rec(slot), half_owned, + comp, mtr); + page_rec_set_n_owned(block, + page_dir_slot_get_rec(slot - + PAGE_DIR_SLOT_SIZE), + n_owned - half_owned, comp, mtr); + } } -/** Try to balance an underfilled directory slot with an adjacent one, +/** +Try to balance an underfilled directory slot with an adjacent one, so that there are at least the minimum number of records owned by the slot; this may result in merging the two slots. -@param[in,out] page index page -@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL -@param[in] s the slot to be balanced */ -static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip, - ulint s) +@param[in,out] block index page +@param[in] s the slot to be balanced +@param[in,out] mtr mini-transaction */ +static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) { - ut_ad(!page_zip || page_is_comp(page)); + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); ut_ad(s > 0); - const ulint n_slots = page_dir_get_n_slots(page); + const ulint n_slots = page_dir_get_n_slots(block->frame); if (UNIV_UNLIKELY(s + 1 == n_slots)) { /* The last directory slot cannot be balanced. */ @@ -1262,9 +1319,10 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip, ut_ad(s < n_slots); - page_dir_slot_t* slot = page_dir_get_nth_slot(page, s); + page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s); page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE; const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot); + page_zip_des_t* page_zip = buf_block_get_page_zip(block); ut_ad(page_dir_slot_get_n_owned(slot) == PAGE_DIR_SLOT_MIN_N_OWNED - 1); @@ -1274,17 +1332,33 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip, <= PAGE_DIR_SLOT_MAX_N_OWNED); /* Merge the slots. */ ulint n_owned = page_dir_slot_get_n_owned(slot); - page_dir_slot_set_n_owned(slot, page_zip, 0); - page_dir_slot_set_n_owned(up_slot, page_zip, - n_owned - + page_dir_slot_get_n_owned(up_slot)); + page_dir_slot_set_n_owned(block, slot, 0, mtr); + page_dir_slot_set_n_owned(block, up_slot, n_owned + + page_dir_slot_get_n_owned( + up_slot), mtr); /* Shift the slots */ page_dir_slot_t* last_slot = page_dir_get_nth_slot( - page, n_slots - 1); + block->frame, n_slots - 1); memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot, slot - last_slot); - mach_write_to_2(last_slot, 0); - page_dir_set_n_slots(page, page_zip, n_slots - 1); + if (UNIV_LIKELY_NULL(page_zip)) { + memset_aligned<2>(last_slot, 0, 2); + mach_write_to_2(PAGE_N_DIR_SLOTS + PAGE_HEADER + + block->frame, n_slots - 1); + page_zip_write_header(page_zip, + PAGE_N_DIR_SLOTS + PAGE_HEADER + + block->frame, 2, mtr); + } else { + mtr->write<2>(*block, + PAGE_N_DIR_SLOTS + PAGE_HEADER + + block->frame, + n_slots - 1); + mtr->write<2>(*block, last_slot, 0U); + mtr->memcpy(*block, page_offset(last_slot) + + PAGE_DIR_SLOT_SIZE, + slot - last_slot); + } + return; } @@ -1292,21 +1366,29 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip, rec_t* old_rec = const_cast(page_dir_slot_get_rec(slot)); rec_t* new_rec; - if (page_is_comp(page)) { + if (page_is_comp(block->frame)) { new_rec = rec_get_next_ptr(old_rec, TRUE); - rec_set_n_owned_new(old_rec, page_zip, 0); - rec_set_n_owned_new(new_rec, page_zip, - PAGE_DIR_SLOT_MIN_N_OWNED); + page_rec_set_n_owned(block, old_rec, 0, true, mtr); + page_rec_set_n_owned(block, new_rec, + PAGE_DIR_SLOT_MIN_N_OWNED, + true, mtr); + if (UNIV_LIKELY_NULL(page_zip)) { + mach_write_to_2(slot, page_offset(new_rec)); + goto func_exit; + } } else { new_rec = rec_get_next_ptr(old_rec, FALSE); - rec_set_n_owned_old(old_rec, 0); - rec_set_n_owned_old(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED); + page_rec_set_n_owned(block, old_rec, 0, false, mtr); + page_rec_set_n_owned(block, new_rec, + PAGE_DIR_SLOT_MIN_N_OWNED, + false, mtr); } - page_dir_slot_set_rec(slot, new_rec); - page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned - 1); + mtr->write<2>(*block, slot, page_offset(new_rec)); +func_exit: + page_dir_slot_set_n_owned(block, up_slot, up_n_owned - 1, mtr); } /** Allocate space for inserting an index record. @@ -1346,7 +1428,6 @@ page_cur_insert_rec_low( { byte* insert_buf; ulint rec_size; - page_t* page; /*!< the relevant page */ rec_t* last_insert; /*!< cursor position at previous insert */ rec_t* free_rec; /*!< a free record that was reused, @@ -1356,19 +1437,27 @@ page_cur_insert_rec_low( record */ rec_t* current_rec = cur->rec; + buf_block_t* block = cur->block; ut_ad(rec_offs_validate(rec, index, offsets)); - page = page_align(current_rec); ut_ad(dict_table_is_comp(index->table) - == (ibool) !!page_is_comp(page)); - ut_ad(fil_page_index_page_check(page)); - ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id + == (ibool) !!page_is_comp(block->frame)); + ut_ad(fil_page_index_page_check(block->frame)); + ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame) + == index->id || index->is_dummy || mtr->is_inside_ibuf()); ut_ad(!page_rec_is_supremum(current_rec)); + const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE); + + /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */ + ut_ad(log_mode == MTR_LOG_NONE + || log_mode == MTR_LOG_NO_REDO + || !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE)); + /* 1. Get the size of the physical record in the page */ rec_size = rec_offs_size(offsets); @@ -1391,7 +1480,7 @@ page_cur_insert_rec_low( /* 2. Try to find suitable space from page memory management */ - free_rec = page_header_get_ptr(page, PAGE_FREE); + free_rec = page_header_get_ptr(block->frame, PAGE_FREE); if (UNIV_LIKELY_NULL(free_rec)) { /* Try to allocate from the head of the free list. */ offset_t foffsets_[REC_OFFS_NORMAL_SIZE]; @@ -1401,7 +1490,7 @@ page_cur_insert_rec_low( rec_offs_init(foffsets_); foffsets = rec_get_offsets( - free_rec, index, foffsets, page_is_leaf(page), + free_rec, index, foffsets, page_is_leaf(block->frame), ULINT_UNDEFINED, &heap); if (rec_offs_size(foffsets) < rec_size) { if (UNIV_LIKELY_NULL(heap)) { @@ -1413,16 +1502,16 @@ page_cur_insert_rec_low( insert_buf = free_rec - rec_offs_extra_size(foffsets); - if (page_is_comp(page)) { + if (page_is_comp(block->frame)) { heap_no = rec_get_heap_no_new(free_rec); - page_mem_alloc_free(page, NULL, - rec_get_next_ptr(free_rec, TRUE), - rec_size); + page_mem_alloc_free(block->frame, NULL, + rec_get_next_ptr(free_rec, TRUE), + rec_size); } else { heap_no = rec_get_heap_no_old(free_rec); - page_mem_alloc_free(page, NULL, - rec_get_next_ptr(free_rec, FALSE), - rec_size); + page_mem_alloc_free(block->frame, NULL, + rec_get_next_ptr(free_rec, FALSE), + rec_size); } if (UNIV_LIKELY_NULL(heap)) { @@ -1431,17 +1520,19 @@ page_cur_insert_rec_low( } else { use_heap: free_rec = NULL; - insert_buf = page_mem_alloc_heap(page, NULL, + insert_buf = page_mem_alloc_heap(block->frame, NULL, rec_size, &heap_no); if (UNIV_UNLIKELY(insert_buf == NULL)) { + mtr->set_log_mode(log_mode); return(NULL); } } /* 3. Create the record */ insert_rec = rec_copy(insert_buf, rec, offsets); - rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets); + rec_offs_make_valid(insert_rec, index, page_is_leaf(block->frame), + offsets); /* 4. Insert the record in the linked list of records */ ut_ad(current_rec != insert_rec); @@ -1450,7 +1541,7 @@ page_cur_insert_rec_low( /* next record after current before the insertion */ rec_t* next_rec = page_rec_get_next(current_rec); #ifdef UNIV_DEBUG - if (page_is_comp(page)) { + if (page_is_comp(block->frame)) { switch (rec_get_status(current_rec)) { case REC_STATUS_ORDINARY: case REC_STATUS_NODE_PTR: @@ -1476,56 +1567,59 @@ page_cur_insert_rec_low( page_rec_set_next(current_rec, insert_rec); } - page_header_set_field(page, NULL, PAGE_N_RECS, - 1U + page_get_n_recs(page)); + page_header_set_field(block->frame, NULL, PAGE_N_RECS, + 1U + page_get_n_recs(block->frame)); /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ - if (page_is_comp(page)) { - rec_set_n_owned_new(insert_rec, NULL, 0); - rec_set_heap_no_new(insert_rec, heap_no); - } else { - rec_set_n_owned_old(insert_rec, 0); - rec_set_heap_no_old(insert_rec, heap_no); - } + page_rec_set_n_owned(block, insert_rec, 0, + page_is_comp(block->frame), mtr); + rec_set_heap_no(*block, insert_rec, heap_no, + page_is_comp(block->frame), mtr); UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); /* 6. Update the last insertion info in page header */ - last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT); - ut_ad(!last_insert || !page_is_comp(page) + last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT); + ut_ad(!last_insert || !page_is_comp(block->frame) || rec_get_node_ptr_flag(last_insert) == rec_get_node_ptr_flag(insert_rec)); if (!dict_index_is_spatial(index)) { - byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page; + byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame; if (UNIV_UNLIKELY(last_insert == NULL)) { no_direction: - page_direction_reset(ptr, page, NULL); + page_direction_reset(ptr, block->frame, NULL); } else if (last_insert == current_rec && page_ptr_get_direction(ptr) != PAGE_LEFT) { - page_direction_increment(ptr, page, NULL, PAGE_RIGHT); + page_direction_increment(ptr, block->frame, NULL, + PAGE_RIGHT); } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT && page_rec_get_next(insert_rec) == last_insert) { - page_direction_increment(ptr, page, NULL, PAGE_LEFT); + page_direction_increment(ptr, block->frame, NULL, + PAGE_LEFT); } else { goto no_direction; } } - page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec); + page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec); /* 7. It remains to update the owner record. */ { rec_t* owner_rec = page_rec_find_owner_rec(insert_rec); ulint n_owned; - if (page_is_comp(page)) { + if (page_is_comp(block->frame)) { n_owned = rec_get_n_owned_new(owner_rec); - rec_set_n_owned_new(owner_rec, NULL, n_owned + 1); + rec_set_bit_field_1(owner_rec, n_owned + 1, + REC_NEW_N_OWNED, REC_N_OWNED_MASK, + REC_N_OWNED_SHIFT); } else { n_owned = rec_get_n_owned_old(owner_rec); - rec_set_n_owned_old(owner_rec, n_owned + 1); + rec_set_bit_field_1(owner_rec, n_owned + 1, + REC_OLD_N_OWNED, REC_N_OWNED_MASK, + REC_N_OWNED_SHIFT); } /* 8. Now we have incremented the n_owned field of the owner @@ -1533,17 +1627,16 @@ page_cur_insert_rec_low( we have to split the corresponding directory slot in two. */ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) { - page_dir_split_slot( - page, NULL, - page_dir_find_owner_slot(owner_rec)); + page_dir_split_slot( + block, + page_dir_find_owner_slot(owner_rec), mtr); } } /* 9. Write log record of the insert */ - if (UNIV_LIKELY(mtr != NULL)) { - page_cur_insert_rec_write_log(insert_rec, rec_size, - current_rec, index, mtr); - } + mtr->set_log_mode(log_mode); + page_cur_insert_rec_write_log(insert_rec, rec_size, + current_rec, index, mtr); return(insert_rec); } @@ -1903,8 +1996,10 @@ page_cur_insert_rec_zip( /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ - rec_set_n_owned_new(insert_rec, NULL, 0); - rec_set_heap_no_new(insert_rec, heap_no); + rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); @@ -1944,16 +2039,20 @@ page_cur_insert_rec_zip( ulint n_owned; n_owned = rec_get_n_owned_new(owner_rec); - rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1); + rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); /* 8. Now we have incremented the n_owned field of the owner record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED, we have to split the corresponding directory slot in two. */ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) { - page_dir_split_slot( - page, page_zip, - page_dir_find_owner_slot(owner_rec)); + const mtr_log_t log_mode = mtr->set_log_mode( + MTR_LOG_NONE); + page_dir_split_slot( + page_cur_get_block(cursor), + page_dir_find_owner_slot(owner_rec), mtr); + mtr->set_log_mode(log_mode); } } @@ -2045,12 +2144,13 @@ ATTRIBUTE_COLD /* only used when crash-upgrading */ void page_copy_rec_list_end_to_created_page( /*===================================*/ - page_t* new_page, /*!< in/out: index page to copy to */ + buf_block_t* block, /*!< in/out: index page to copy to */ rec_t* rec, /*!< in: first record to copy */ dict_index_t* index, /*!< in: record descriptor */ mtr_t* mtr) /*!< in: mtr */ { page_dir_slot_t* slot = 0; /* remove warning */ + page_t* new_page = block->frame; byte* heap_top; rec_t* insert_rec = 0; /* remove warning */ rec_t* prev_rec; @@ -2063,6 +2163,8 @@ page_copy_rec_list_end_to_created_page( offset_t* offsets = offsets_; rec_offs_init(offsets_); + /* The record was never emitted for ROW_FORMAT=COMPRESSED pages. */ + ut_ad(!block->page.zip.data); ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW); ut_ad(page_align(rec) != new_page); ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page)); @@ -2084,9 +2186,10 @@ page_copy_rec_list_end_to_created_page( #ifdef UNIV_DEBUG /* To pass the debug tests we have to set these dummy values in the debug version */ - page_dir_set_n_slots(new_page, NULL, srv_page_size / 2); - page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, - new_page + srv_page_size - 1); + mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page, + srv_page_size / 2); + mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page, + srv_page_size - 1); #endif prev_rec = page_get_infimum_rec(new_page); if (page_is_comp(new_page)) { @@ -2105,22 +2208,21 @@ page_copy_rec_list_end_to_created_page( ULINT_UNDEFINED, &heap); insert_rec = rec_copy(heap_top, rec, offsets); - if (page_is_comp(new_page)) { + const bool comp = page_is_comp(new_page) != 0; + + if (comp) { rec_set_next_offs_new(prev_rec, page_offset(insert_rec)); - - rec_set_n_owned_new(insert_rec, NULL, 0); - rec_set_heap_no_new(insert_rec, - PAGE_HEAP_NO_USER_LOW + n_recs); } else { rec_set_next_offs_old(prev_rec, page_offset(insert_rec)); - - rec_set_n_owned_old(insert_rec, 0); - rec_set_heap_no_old(insert_rec, - PAGE_HEAP_NO_USER_LOW + n_recs); } + page_rec_set_n_owned(block, insert_rec, 0, comp, mtr); + + rec_set_heap_no(insert_rec, PAGE_HEAP_NO_USER_LOW + n_recs, + page_is_comp(new_page)); + count++; n_recs++; @@ -2130,9 +2232,9 @@ page_copy_rec_list_end_to_created_page( slot_index++; slot = page_dir_get_nth_slot(new_page, slot_index); - - page_dir_slot_set_rec(slot, insert_rec); - page_dir_slot_set_n_owned(slot, NULL, count); + mach_write_to_2(slot, page_offset(insert_rec)); + page_dir_slot_set_n_owned(block, slot, count, + mtr); count = 0; } @@ -2164,7 +2266,7 @@ page_copy_rec_list_end_to_created_page( count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; - page_dir_slot_set_n_owned(slot, NULL, 0); + page_dir_slot_set_n_owned(block, slot, 0, mtr); slot_index--; } @@ -2173,18 +2275,24 @@ page_copy_rec_list_end_to_created_page( mem_heap_free(heap); } + slot = page_dir_get_nth_slot(new_page, 1 + slot_index); + if (page_is_comp(new_page)) { rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM); + mach_write_to_2(slot, PAGE_NEW_SUPREMUM); + rec_set_bit_field_1(new_page + PAGE_NEW_SUPREMUM, count + 1, + REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); } else { rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM); + mach_write_to_2(slot, PAGE_OLD_SUPREMUM); + rec_set_bit_field_1(new_page + PAGE_OLD_SUPREMUM, count + 1, + REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); } - slot = page_dir_get_nth_slot(new_page, 1 + slot_index); - - page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page)); - page_dir_slot_set_n_owned(slot, NULL, count + 1); - - page_dir_set_n_slots(new_page, NULL, 2 + slot_index); + mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page, + 2 + slot_index); page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top); page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs); page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs); @@ -2195,41 +2303,10 @@ page_copy_rec_list_end_to_created_page( new_page, NULL); } -/***********************************************************//** -Writes log record of a record delete on a page. */ -UNIV_INLINE -void -page_cur_delete_rec_write_log( -/*==========================*/ - rec_t* rec, /*!< in: record to be deleted */ - const dict_index_t* index, /*!< in: record descriptor */ - mtr_t* mtr) /*!< in: mini-transaction handle */ -{ - byte* log_ptr; - - ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table)); - ut_ad(mtr->is_named_space(index->table->space)); - - log_ptr = mlog_open_and_write_index(mtr, rec, index, - page_rec_is_comp(rec) - ? MLOG_COMP_REC_DELETE - : MLOG_REC_DELETE, 2); - - if (!log_ptr) { - /* Logging in mtr is switched off during crash recovery: - in that case mlog_open returns NULL */ - return; - } - - /* Write the cursor rec offset as a 2-byte ulint */ - mach_write_to_2(log_ptr, page_offset(rec)); - - mlog_close(mtr, log_ptr + 2); -} - /***********************************************************//** Parses log record of a record delete on a page. @return pointer to record end or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_cur_parse_delete_rec( /*======================*/ @@ -2283,33 +2360,40 @@ page_cur_parse_delete_rec( } /** Prepend a record to the PAGE_FREE list. -@param[in,out] page index page -@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL -@param[in,out] rec record being deleted -@param[in] index the index that the page belongs to -@param[in] offsets rec_get_offsets(rec, index) */ -static void page_mem_free(page_t* page, page_zip_des_t* page_zip, rec_t* rec, - const dict_index_t* index, const offset_t* offsets) +@param[in,out] block index page +@param[in,out] rec record being deleted +@param[in] index the index that the page belongs to +@param[in] offsets rec_get_offsets(rec, index) +@param[in,out] mtr mini-transaction */ +static void page_mem_free(buf_block_t *block, rec_t *rec, + const dict_index_t *index, const offset_t *offsets, + mtr_t *mtr) { - ut_ad(rec_offs_validate(rec, index, offsets)); - const rec_t* free = page_header_get_ptr(page, PAGE_FREE); - - if (srv_immediate_scrub_data_uncompressed) { - /* scrub record */ - memset(rec, 0, rec_offs_data_size(offsets)); - } - - page_rec_set_next(rec, free); - page_header_set_ptr(page, page_zip, PAGE_FREE, rec); - page_header_set_field(page, page_zip, PAGE_GARBAGE, - rec_offs_size(offsets) - + page_header_get_field(page, PAGE_GARBAGE)); - if (page_zip) { - page_zip_dir_delete(page_zip, rec, index, offsets, free); - } else { - page_header_set_field(page, page_zip, PAGE_N_RECS, - ulint(page_get_n_recs(page)) - 1); - } + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(page_align(rec) == block->frame); + const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE); + + if (UNIV_LIKELY_NULL(block->page.zip.data)) + page_zip_dir_delete(&block->page.zip, rec, index, offsets, free, mtr); + else + { + if (srv_immediate_scrub_data_uncompressed) + mtr->memset(block, page_offset(rec), rec_offs_data_size(offsets), 0); + + uint16_t next= free + ? (page_is_comp(block->frame) + ? static_cast(free - rec) + : static_cast(page_offset(free))) + : 0; + mtr->write<2>(*block, rec - REC_NEXT, next); + mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame, + page_offset(rec)); + mtr->write<2>(*block, PAGE_GARBAGE + PAGE_HEADER + block->frame, + rec_offs_size(offsets) + + page_header_get_field(block->frame, PAGE_GARBAGE)); + mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame, + ulint(page_get_n_recs(block->frame)) - 1); + } } /***********************************************************//** @@ -2356,12 +2440,15 @@ page_cur_delete_rec( ut_ad(page_rec_is_user_rec(current_rec)); if (page_get_n_recs(block->frame) == 1 +#if 1 /* MDEV-12353 TODO: skip this for the physical log format */ + /* Empty the page, unless we are applying the redo log + during crash recovery. During normal operation, the + page_create_empty() gets logged as one of MLOG_PAGE_CREATE, + MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */ && !recv_recovery_is_on() +#endif && !rec_is_alter_metadata(current_rec, *index)) { - /* Empty the page, unless we are applying the redo log - during crash recovery. During normal operation, the - page_create_empty() gets logged as one of MLOG_PAGE_CREATE, - MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */ + /* Empty the page. */ ut_ad(page_is_leaf(block->frame)); /* Usually, this should be the root page, and the whole index tree should become empty. @@ -2383,10 +2470,7 @@ page_cur_delete_rec( /* 1. Reset the last insert info in the page header and increment the modify clock for the frame */ - - page_zip_des_t* const page_zip = buf_block_get_page_zip(block); - - page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL); + page_header_reset_last_insert(block, mtr); /* The page gets invalid for btr_pcur_restore_pos(). We avoid invoking buf_block_modify_clock_inc(block) because its @@ -2394,8 +2478,6 @@ page_cur_delete_rec( used during IMPORT TABLESPACE. */ block->modify_clock++; - page_cur_delete_rec_write_log(current_rec, index, mtr); - /* 2. Find the next and the previous record. Note that the cursor is left at the next record. */ @@ -2416,34 +2498,72 @@ page_cur_delete_rec( next_rec = cursor->rec; /* 3. Remove the record from the linked list of records */ - - page_rec_set_next(prev_rec, next_rec); - /* 4. If the deleted record is pointed to by a dir slot, update the record pointer in slot. In the following if-clause we assume that prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED >= 2. */ + /* 5. Update the number of owned records of the slot */ compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2); ut_ad(cur_n_owned > 1); - if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) { - page_dir_slot_set_rec(cur_dir_slot, prev_rec); - } + rec_t* slot_rec = const_cast + (page_dir_slot_get_rec(cur_dir_slot)); + + if (UNIV_LIKELY_NULL(block->page.zip.data)) { + ut_ad(page_is_comp(block->frame)); + if (current_rec == slot_rec) { + page_zip_rec_set_owned(block, prev_rec, 1, mtr); + page_zip_rec_set_owned(block, slot_rec, 0, mtr); + slot_rec = prev_rec; + mach_write_to_2(cur_dir_slot, page_offset(slot_rec)); + } else if (cur_n_owned == 1 + && !page_rec_is_supremum(slot_rec)) { + page_zip_rec_set_owned(block, slot_rec, 0, mtr); + } - /* 5. Update the number of owned records of the slot */ + mach_write_to_2(prev_rec - REC_NEXT, static_cast + (next_rec - prev_rec)); + mach_write_to_1(slot_rec - REC_NEW_N_OWNED, + (slot_rec[-REC_NEW_N_OWNED] + & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) << REC_N_OWNED_SHIFT); + } else { + if (current_rec == slot_rec) { + slot_rec = prev_rec; + mtr->write<2>(*block, cur_dir_slot, + page_offset(slot_rec)); + } - page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1); + if (page_is_comp(block->frame)) { + mtr->write<2>(*block, prev_rec - REC_NEXT, + static_cast + (next_rec - prev_rec)); + mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED, + (slot_rec[-REC_NEW_N_OWNED] + & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) + << REC_N_OWNED_SHIFT); + } else { + mtr->write<2>(*block, prev_rec - REC_NEXT, + page_offset(next_rec)); + mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED, + (slot_rec[-REC_OLD_N_OWNED] + & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) + << REC_N_OWNED_SHIFT); + } + } /* 6. Free the memory occupied by the record */ - page_mem_free(block->frame, page_zip, current_rec, index, offsets); + page_mem_free(block, current_rec, index, offsets, mtr); /* 7. Now we have decremented the number of owned records of the slot. If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the slots. */ if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { - page_dir_balance_slot(block->frame, page_zip, cur_slot_no); + page_dir_balance_slot(block, cur_slot_no, mtr); } } diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc index 842f1960c7dfb..75514f173b13b 100644 --- a/storage/innobase/page/page0page.cc +++ b/storage/innobase/page/page0page.cc @@ -842,35 +842,10 @@ page_copy_rec_list_start( return(ret); } -/**********************************************************//** -Writes a log record of a record list end or start deletion. */ -UNIV_INLINE -void -page_delete_rec_list_write_log( -/*===========================*/ - rec_t* rec, /*!< in: record on page */ - dict_index_t* index, /*!< in: record descriptor */ - mlog_id_t type, /*!< in: operation type: - MLOG_LIST_END_DELETE, ... */ - mtr_t* mtr) /*!< in: mtr */ -{ - byte* log_ptr; - ut_ad(type == MLOG_LIST_END_DELETE - || type == MLOG_LIST_START_DELETE - || type == MLOG_COMP_LIST_END_DELETE - || type == MLOG_COMP_LIST_START_DELETE); - - log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2); - if (log_ptr) { - /* Write the parameter as a 2-byte ulint */ - mach_write_to_2(log_ptr, page_offset(rec)); - mlog_close(mtr, log_ptr + 2); - } -} - /**********************************************************//** Parses a log record of a record list end or start deletion. @return end of log record or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_parse_delete_rec_list( /*=======================*/ @@ -993,29 +968,20 @@ page_delete_rec_list_end( } } - /* Reset the last insert info in the page header and increment - the modify clock for the frame */ - - page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL); - /* The page gets invalid for optimistic searches: increment the frame modify clock */ buf_block_modify_clock_inc(block); - page_delete_rec_list_write_log(rec, index, page_is_comp(block->frame) - ? MLOG_COMP_LIST_END_DELETE - : MLOG_LIST_END_DELETE, mtr); - const bool is_leaf = page_is_leaf(block->frame); + byte* last_insert = my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + + block->frame); - if (page_zip) { - mtr_log_t log_mode; - + if (UNIV_LIKELY_NULL(page_zip)) { ut_ad(page_is_comp(block->frame)); - /* Individual deletes are not logged */ - log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE); + memset(last_insert, 0, 2); + page_zip_write_header(page_zip, last_insert, 2, mtr); do { page_cur_t cur; @@ -1034,12 +1000,11 @@ page_delete_rec_list_end( mem_heap_free(heap); } - /* Restore log mode */ - - mtr_set_log_mode(mtr, log_mode); return; } + mtr->write<2,mtr_t::OPT>(*block, last_insert, 0U); + prev_rec = page_rec_get_prev(rec); last_rec = page_rec_get_prev(page_get_supremum_rec(block->frame)); @@ -1100,6 +1065,20 @@ page_delete_rec_list_end( slot_index = page_dir_find_owner_slot(rec2); ut_ad(slot_index > 0); slot = page_dir_get_nth_slot(block->frame, slot_index); + mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM); + byte* owned = PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED + + block->frame; + byte new_owned = (*owned & ~REC_N_OWNED_MASK) + | static_cast(n_owned << REC_N_OWNED_SHIFT); + mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); + mtr->write<2>(*block, prev_rec - REC_NEXT, + static_cast + (PAGE_NEW_SUPREMUM - page_offset(prev_rec))); + uint16_t free = page_header_get_field(block->frame, PAGE_FREE); + mtr->write<2>(*block, last_rec - REC_NEXT, free + ? static_cast + (free - page_offset(last_rec)) + : 0U); } else { rec_t* rec2 = rec; ulint count = 0; @@ -1116,29 +1095,32 @@ page_delete_rec_list_end( slot_index = page_dir_find_owner_slot(rec2); ut_ad(slot_index > 0); slot = page_dir_get_nth_slot(block->frame, slot_index); + mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM); + byte* owned = PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED + + block->frame; + byte new_owned = (*owned & ~REC_N_OWNED_MASK) + | static_cast(n_owned << REC_N_OWNED_SHIFT); + mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); + mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM); + mtr->write<2>(*block, last_rec - REC_NEXT, + page_header_get_field(block->frame, PAGE_FREE)); } - page_dir_slot_set_rec(slot, page_get_supremum_rec(block->frame)); - page_dir_slot_set_n_owned(slot, NULL, n_owned); - - page_dir_set_n_slots(block->frame, NULL, slot_index + 1); - - /* Remove the record chain segment from the record chain */ - page_rec_set_next(prev_rec, page_get_supremum_rec(block->frame)); + mtr->write<2,mtr_t::OPT>(*block, PAGE_N_DIR_SLOTS + PAGE_HEADER + + block->frame, slot_index + 1); /* Catenate the deleted chain segment to the page free list */ - page_rec_set_next(last_rec, page_header_get_ptr(block->frame, - PAGE_FREE)); - page_header_set_ptr(block->frame, NULL, PAGE_FREE, rec); + mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame, + page_offset(rec)); + byte* garbage = my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER + + block->frame); + mtr->write<2>(*block, garbage, size + mach_read_from_2(garbage)); - page_header_set_field(block->frame, NULL, PAGE_GARBAGE, size - + page_header_get_field(block->frame, - PAGE_GARBAGE)); - - ut_ad(page_get_n_recs(block->frame) > n_recs); - page_header_set_field(block->frame, NULL, PAGE_N_RECS, - ulint{page_get_n_recs(block->frame) - n_recs}); + byte* page_n_recs = my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER + + block->frame); + mtr->write<2>(*block, page_n_recs, + ulint{mach_read_from_2(page_n_recs)} - n_recs); } /*************************************************************//** @@ -1187,22 +1169,9 @@ page_delete_rec_list_start( return; } - mlog_id_t type; - - if (page_rec_is_comp(rec)) { - type = MLOG_COMP_LIST_START_DELETE; - } else { - type = MLOG_LIST_START_DELETE; - } - - page_delete_rec_list_write_log(rec, index, type, mtr); - page_cur_set_before_first(block, &cur1); page_cur_move_to_next(&cur1); - /* Individual deletes are not logged */ - - mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE); const bool is_leaf = page_rec_is_leaf(rec); while (page_cur_get_rec(&cur1) != rec) { @@ -1215,10 +1184,6 @@ page_delete_rec_list_start( if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); } - - /* Restore log mode */ - - mtr_set_log_mode(mtr, log_mode); } /*************************************************************//** diff --git a/storage/innobase/page/page0zip.cc b/storage/innobase/page/page0zip.cc index c8201a609662b..37f14ac7b5a81 100644 --- a/storage/innobase/page/page0zip.cc +++ b/storage/innobase/page/page0zip.cc @@ -4223,7 +4223,8 @@ page_zip_clear_rec( page_zip_des_t* page_zip, /*!< in/out: compressed page */ byte* rec, /*!< in: record to clear */ const dict_index_t* index, /*!< in: index of rec */ - const offset_t* offsets) /*!< in: rec_get_offsets(rec, index) */ + const offset_t* offsets, /*!< in: rec_get_offsets(rec, index) */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { ulint heap_no; page_t* page = page_align(rec); @@ -4256,11 +4257,20 @@ page_zip_clear_rec( rec_offs_n_fields(offsets) - 1, &len); ut_ad(len == REC_NODE_PTR_SIZE); - ut_ad(!rec_offs_any_extern(offsets)); memset(field, 0, REC_NODE_PTR_SIZE); - memset(storage - (heap_no - 1) * REC_NODE_PTR_SIZE, - 0, REC_NODE_PTR_SIZE); + storage -= (heap_no - 1) * REC_NODE_PTR_SIZE; +clear_page_zip: + /* TODO: write MEMSET record */ + memset(storage, 0, len); + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + len)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, storage - page_zip->data); + mach_write_to_2(log_ptr + 2, len); + memcpy(log_ptr + 4, storage, len); + mlog_close(mtr, log_ptr + 4 + len); + } } else if (dict_index_is_clust(index)) { /* Clear trx_id and roll_ptr. On the compressed page, there is an array of these fields immediately before the @@ -4269,14 +4279,9 @@ page_zip_clear_rec( = dict_col_get_clust_pos( dict_table_get_sys_col( index->table, DATA_TRX_ID), index); - storage = page_zip_dir_start(page_zip); field = rec_get_nth_field(rec, offsets, trx_id_pos, &len); ut_ad(len == DATA_TRX_ID_LEN); - memset(field, 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); - memset(storage - (heap_no - 1) - * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN), - 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); if (rec_offs_any_extern(offsets)) { ulint i; @@ -4295,6 +4300,12 @@ page_zip_clear_rec( } } } + + len = DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN; + storage = page_zip_dir_start(page_zip) + - (heap_no - 1) + * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + goto clear_page_zip; } else { ut_ad(!rec_offs_any_extern(offsets)); } @@ -4338,18 +4349,33 @@ must already have been written on the uncompressed page. */ void page_zip_rec_set_owned( /*===================*/ - page_zip_des_t* page_zip,/*!< in/out: compressed page */ + buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */ const byte* rec, /*!< in: record on the uncompressed page */ - ulint flag) /*!< in: the owned flag (nonzero=TRUE) */ + ulint flag, /*!< in: the owned flag (nonzero=TRUE) */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { + ut_ad(page_align(rec) == block->frame); + page_zip_des_t* const page_zip = &block->page.zip; byte* slot = page_zip_dir_find(page_zip, page_offset(rec)); ut_a(slot); UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip)); + const byte b = *slot; if (flag) { *slot |= (PAGE_ZIP_DIR_SLOT_OWNED >> 8); } else { *slot &= ~(PAGE_ZIP_DIR_SLOT_OWNED >> 8); } + if (b == *slot) { + } else if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + 1)) { + log_ptr = mlog_write_initial_log_record_low( + MLOG_ZIP_WRITE_STRING, + block->page.id.space(), block->page.id.page_no(), + log_ptr, mtr); + mach_write_to_2(log_ptr, slot - page_zip->data); + mach_write_to_2(log_ptr + 2, 1); + log_ptr[4] = *slot; + mlog_close(mtr, log_ptr + 5); + } } /**********************************************************************//** @@ -4442,12 +4468,12 @@ page_zip_dir_delete( byte* rec, /*!< in: deleted record */ const dict_index_t* index, /*!< in: index of rec */ const offset_t* offsets, /*!< in: rec_get_offsets(rec) */ - const byte* free) /*!< in: previous start of + const byte* free, /*!< in: previous start of the free list */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { byte* slot_rec; byte* slot_free; - ulint n_ext; page_t* page = page_align(rec); ut_ad(rec_offs_validate(rec, index, offsets)); @@ -4458,6 +4484,15 @@ page_zip_dir_delete( UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets), rec_offs_extra_size(offsets)); + mach_write_to_2(rec - REC_NEXT, free + ? static_cast(free - rec) : 0); + mach_write_to_2(PAGE_FREE + PAGE_HEADER + page, page_offset(rec)); + byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page; + mach_write_to_2(garbage, rec_offs_size(offsets) + + mach_read_from_2(garbage)); + compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2); + page_zip_write_header(page_zip, PAGE_FREE + PAGE_HEADER + page, + 4, mtr); slot_rec = page_zip_dir_find(page_zip, page_offset(rec)); ut_a(slot_rec); @@ -4465,8 +4500,9 @@ page_zip_dir_delete( ut_ad(n_recs); ut_ad(n_recs > 1 || page_get_page_no(page) == index->page); /* This could not be done before page_zip_dir_find(). */ - page_header_set_field(page, page_zip, PAGE_N_RECS, - n_recs - 1); + mach_write_to_2(PAGE_N_RECS + PAGE_HEADER + page, n_recs - 1); + page_zip_write_header(page_zip, PAGE_N_RECS + PAGE_HEADER + page, + 2, mtr); if (UNIV_UNLIKELY(!free)) { /* Make the last slot the start of the free list. */ @@ -4482,22 +4518,34 @@ page_zip_dir_delete( slot_free += PAGE_ZIP_DIR_SLOT_SIZE; } - if (UNIV_LIKELY(slot_rec > slot_free)) { + const ulint slot_len = slot_rec > slot_free + ? ulint(slot_rec - slot_free) + : 0; + if (slot_len) { memmove_aligned<2>(slot_free + PAGE_ZIP_DIR_SLOT_SIZE, - slot_free, ulint(slot_rec - slot_free)); + slot_free, slot_len); + /* TODO: issue MEMMOVE record to reduce log volume */ } /* Write the entry for the deleted record. The "owned" and "deleted" flags will be cleared. */ mach_write_to_2(slot_free, page_offset(rec)); - if (!page_is_leaf(page) || !dict_index_is_clust(index)) { - ut_ad(!rec_offs_any_extern(offsets)); - goto skip_blobs; + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, slot_free - page_zip->data); + mach_write_to_2(log_ptr + 2, slot_len + + PAGE_ZIP_DIR_SLOT_SIZE); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, slot_free, slot_len + + PAGE_ZIP_DIR_SLOT_SIZE); } - n_ext = rec_offs_n_extern(offsets); - if (UNIV_UNLIKELY(n_ext != 0)) { + if (const ulint n_ext = rec_offs_n_extern(offsets)) { + ut_ad(index->is_primary()); + ut_ad(page_is_leaf(page)); + /* Shift and zero fill the array of BLOB pointers. */ ulint blob_no; byte* externs; @@ -4510,24 +4558,34 @@ page_zip_dir_delete( - (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW) * PAGE_ZIP_CLUST_LEAF_SLOT_SIZE; - ext_end = externs - page_zip->n_blobs - * BTR_EXTERN_FIELD_REF_SIZE; - externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE; + ext_end = externs - page_zip->n_blobs * FIELD_REF_SIZE; - page_zip->n_blobs -= static_cast(n_ext); /* Shift and zero fill the array. */ - memmove(ext_end + n_ext * BTR_EXTERN_FIELD_REF_SIZE, ext_end, - ulint(page_zip->n_blobs - blob_no) + memmove(ext_end + n_ext * FIELD_REF_SIZE, ext_end, + ulint(page_zip->n_blobs - n_ext - blob_no) * BTR_EXTERN_FIELD_REF_SIZE); - memset(ext_end, 0, n_ext * BTR_EXTERN_FIELD_REF_SIZE); + memset(ext_end, 0, n_ext * FIELD_REF_SIZE); + /* TODO: use MEMMOVE and MEMSET records to reduce volume */ + const ulint ext_len = ulint(page_zip->n_blobs - blob_no) + * FIELD_REF_SIZE; + + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, ext_end - page_zip->data); + mach_write_to_2(log_ptr + 2, ext_len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, ext_end, ext_len); + } + + page_zip->n_blobs -= static_cast(n_ext); } -skip_blobs: /* The compression algorithm expects info_bits and n_owned to be 0 for deleted records. */ rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */ - page_zip_clear_rec(page_zip, rec, index, offsets); + page_zip_clear_rec(page_zip, rec, index, offsets, mtr); } /**********************************************************************//** diff --git a/storage/innobase/rem/rem0rec.cc b/storage/innobase/rem/rem0rec.cc index 613dd2d6a81d3..69ee50a661c51 100644 --- a/storage/innobase/rem/rem0rec.cc +++ b/storage/innobase/rem/rem0rec.cc @@ -1416,7 +1416,8 @@ rec_convert_dtuple_to_rec_old( /* Set the info bits of the record */ rec_set_info_bits_old(rec, dtuple_get_info_bits(dtuple) & REC_INFO_BITS_MASK); - rec_set_heap_no_old(rec, PAGE_HEAP_NO_USER_LOW); + rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, REC_OLD_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); /* Store the data and the offsets */ @@ -1529,7 +1530,9 @@ rec_convert_dtuple_to_rec_comp( ut_ad(n_fields == ulint(index->n_fields) + 1); rec_set_n_add_field(nulls, n_fields - 1 - index->n_core_fields); - rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW); + rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, + REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, + REC_HEAP_NO_SHIFT); rec_set_status(rec, REC_STATUS_INSTANT); n_node_ptr_field = ULINT_UNDEFINED; lens = nulls - UT_BITS_IN_BYTES(index->n_nullable); @@ -1545,8 +1548,9 @@ rec_convert_dtuple_to_rec_comp( case REC_STATUS_ORDINARY: ut_ad(n_fields <= dict_index_get_n_fields(index)); if (!temp) { - rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW); - + rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, + REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, + REC_HEAP_NO_SHIFT); rec_set_status( rec, n_fields == index->n_core_fields ? REC_STATUS_ORDINARY @@ -1569,7 +1573,9 @@ rec_convert_dtuple_to_rec_comp( break; case REC_STATUS_NODE_PTR: ut_ad(!temp); - rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW); + rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, + REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, + REC_HEAP_NO_SHIFT); rec_set_status(rec, status); ut_ad(n_fields == dict_index_get_n_unique_in_tree_nonleaf(index) + 1);