diff --git a/mysql-test/suite/innodb/t/innodb-index-online.opt b/mysql-test/suite/innodb/t/innodb-index-online.opt index ff20edbe2f74c..1837463f07a09 100644 --- a/mysql-test/suite/innodb/t/innodb-index-online.opt +++ b/mysql-test/suite/innodb/t/innodb-index-online.opt @@ -1,6 +1,5 @@ --loose-innodb-sort-buffer-size=64k --loose-innodb-online-alter-log-max-size=128k --loose-innodb-buffer-pool-size=5M ---loose-innodb-log-buffer-size=256k --loose-innodb-sys-indexes --loose-innodb-sys-fields diff --git a/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt b/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt index 35527d0c5175e..5d0dd66114b4c 100644 --- a/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt +++ b/mysql-test/suite/innodb/t/innodb_defrag_concurrent.opt @@ -1,5 +1,5 @@ --loose-innodb-buffer-pool-stats --loose-innodb-buffer-page --loose-innodb-buffer-page-lru ---innodb-log-buffer-size=2m +--innodb-log-buffer-size=6m --innodb-defragment=1 \ No newline at end of file diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc index 03d90ade4f4a0..b664ad57e316f 100644 --- a/storage/innobase/btr/btr0bulk.cc +++ b/storage/innobase/btr/btr0bulk.cc @@ -163,83 +163,74 @@ PageBulk::init() template inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets) { - ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); - ut_ad((fmt != REDUNDANT) == m_is_comp); - ulint rec_size; + ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); + ut_ad((fmt != REDUNDANT) == m_is_comp); - ut_ad(m_heap != NULL); + ut_ad(m_heap); - rec_size = rec_offs_size(offsets); - ut_d(const bool is_leaf = page_rec_is_leaf(m_cur_rec)); + ulint rec_size= rec_offs_size(offsets); + ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec)); #ifdef UNIV_DEBUG - /* Check whether records are in order. */ - if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) { - rec_t* old_rec = m_cur_rec; - offset_t* old_offsets = rec_get_offsets( - old_rec, m_index, NULL, is_leaf, - ULINT_UNDEFINED, &m_heap); - - ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index) - > 0); - } + /* Check whether records are in order. */ + if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) + { + const rec_t *old_rec = m_cur_rec; + offset_t *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf, + ULINT_UNDEFINED, &m_heap); + ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index) > 0); + } - m_total_data += rec_size; + m_total_data+= rec_size; #endif /* UNIV_DEBUG */ - /* 1. Copy the record to page. */ - rec_t* insert_rec = rec_copy(m_heap_top, rec, offsets); - ut_ad(page_align(insert_rec) == m_page); - rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets); - - /* 2. Insert the record in the linked list. */ - if (fmt != REDUNDANT) { - rec_t* next_rec = m_page - + page_offset(m_cur_rec - + mach_read_from_2(m_cur_rec - - REC_NEXT)); - mach_write_to_2(insert_rec - REC_NEXT, - static_cast(next_rec - insert_rec)); - mach_write_to_2(m_cur_rec - REC_NEXT, - static_cast(insert_rec - m_cur_rec)); - rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - rec_set_bit_field_2(insert_rec, - PAGE_HEAP_NO_USER_LOW + m_rec_no, - REC_NEW_HEAP_NO, - REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); - } else { - mach_write_to_2(insert_rec - REC_NEXT, - mach_read_from_2(m_cur_rec - REC_NEXT)); - mach_write_to_2(m_cur_rec - REC_NEXT, page_offset(insert_rec)); - rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - rec_set_bit_field_2(insert_rec, - PAGE_HEAP_NO_USER_LOW + m_rec_no, - REC_OLD_HEAP_NO, - REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); - } + /* Copy the record payload. */ + rec_t *insert_rec= rec_copy(m_heap_top, rec, offsets); + ut_ad(page_align(insert_rec) == m_page); + rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets); - /* 4. Set member variables. */ - ulint slot_size; - slot_size = page_dir_calc_reserved_space(m_rec_no + 1) - - page_dir_calc_reserved_space(m_rec_no); + /* Insert the record in the linked list. */ + if (fmt != REDUNDANT) + { + rec_t *next_rec= m_page + + page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT)); + mach_write_to_2(insert_rec - REC_NEXT, + static_cast(next_rec - insert_rec)); + if (fmt != COMPRESSED) + m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, + static_cast(insert_rec - m_cur_rec)); + else + mach_write_to_2(m_cur_rec - REC_NEXT, + static_cast(insert_rec - m_cur_rec)); + rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK, + REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } + else + { + memcpy(insert_rec - REC_NEXT, m_cur_rec - REC_NEXT, 2); + m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec)); + rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, REC_N_OWNED_MASK, + REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, + REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } - ut_ad(m_free_space >= rec_size + slot_size); - ut_ad(m_heap_top + rec_size < m_page + srv_page_size); + if (fmt != COMPRESSED) + m_mtr.memcpy(*m_block, page_offset(m_heap_top), rec_offs_size(offsets)); - m_free_space -= rec_size + slot_size; - m_heap_top += rec_size; - m_rec_no += 1; + /* Update the member variables. */ + ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) - + page_dir_calc_reserved_space(m_rec_no); - if (fmt != COMPRESSED) { - /* For ROW_FORMAT=COMPRESSED, redo log may be written - in PageBulk::compress(). */ - page_cur_insert_rec_write_log(insert_rec, rec_size, - m_cur_rec, m_index, &m_mtr); - } + ut_ad(m_free_space >= rec_size + slot_size); + ut_ad(m_heap_top + rec_size < m_page + srv_page_size); - m_cur_rec = insert_rec; + m_free_space-= rec_size + slot_size; + m_heap_top+= rec_size; + m_rec_no++; + m_cur_rec= insert_rec; } /** Insert a record in the page. @@ -255,6 +246,14 @@ inline void PageBulk::insert(const rec_t *rec, offset_t *offsets) insertPage(rec, offsets); } +/** Set the number of owned records in the uncompressed page of +a ROW_FORMAT=COMPRESSED record without redo-logging. */ +static void rec_set_n_owned_zip(rec_t *rec, ulint n_owned) +{ + rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); +} + /** Mark end of insertion to the page. Scan all records to set page dirs, and set page header members. @tparam fmt page format */ @@ -287,9 +286,19 @@ inline void PageBulk::finishPage() if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) { slot-= PAGE_DIR_SLOT_SIZE; - mach_write_to_2(slot, offset); - rec_set_bit_field_1(m_page + offset, count, REC_NEW_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + + if (fmt != COMPRESSED) + { + m_mtr.write<2,mtr_t::OPT>(*m_block, slot, offset); + page_rec_set_n_owned(m_block, m_page + offset, count, true, + &m_mtr); + } + else + { + mach_write_to_2(slot, offset); + rec_set_n_owned_zip(m_page + offset, count); + } + count= 0; } @@ -299,6 +308,33 @@ inline void PageBulk::finishPage() offset= next; } while (offset != PAGE_NEW_SUPREMUM); + + if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <= + PAGE_DIR_SLOT_MAX_N_OWNED)) + { + /* Merge the last two slots, like page_cur_insert_rec_low() does. */ + count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; + + rec_t *rec= const_cast(page_dir_slot_get_rec(slot)); + if (fmt != COMPRESSED) + page_rec_set_n_owned(m_block, rec, 0, true, &m_mtr); + else + rec_set_n_owned_zip(rec, 0); + } + else + slot-= PAGE_DIR_SLOT_SIZE; + + if (fmt != COMPRESSED) + { + m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_NEW_SUPREMUM); + page_rec_set_n_owned(m_block, m_page + PAGE_NEW_SUPREMUM, + count + 1, true, &m_mtr); + } + else + { + mach_write_to_2(slot, PAGE_NEW_SUPREMUM); + rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1); + } } else { @@ -314,46 +350,30 @@ inline void PageBulk::finishPage() if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) { slot-= PAGE_DIR_SLOT_SIZE; - mach_write_to_2(slot, page_offset(insert_rec)); - rec_set_bit_field_1(insert_rec, count, REC_OLD_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + m_mtr.write<2,mtr_t::OPT>(*m_block, slot, page_offset(insert_rec)); + page_rec_set_n_owned(m_block, insert_rec, count, false, &m_mtr); count= 0; } insert_rec= m_page + mach_read_from_2(insert_rec - REC_NEXT); } while (insert_rec != m_page + PAGE_OLD_SUPREMUM); - } - - if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <= - PAGE_DIR_SLOT_MAX_N_OWNED)) { - /* We can merge the two last dir slots. This operation is here to - make this function imitate exactly the equivalent task made using - page_cur_insert_rec(), which we use in database recovery to - reproduce the task performed by this function. To be able to - check the correctness of recovery, it is good that it imitates exactly. */ - - count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; - rec_t *rec= const_cast(page_dir_slot_get_rec(slot)); - rec_set_bit_field_1(rec, 0, m_is_comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - slot+= PAGE_DIR_SLOT_SIZE; - } + if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <= + PAGE_DIR_SLOT_MAX_N_OWNED)) + { + /* Merge the last two slots, like page_cur_insert_rec_low() does. */ + count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2; - slot-= PAGE_DIR_SLOT_SIZE; + rec_t *rec= const_cast(page_dir_slot_get_rec(slot)); + page_rec_set_n_owned(m_block, rec, 0, false, &m_mtr); + } + else + slot-= PAGE_DIR_SLOT_SIZE; - if (m_is_comp) - { - mach_write_to_2(slot, PAGE_NEW_SUPREMUM); - rec_set_bit_field_1(m_page + PAGE_NEW_SUPREMUM, count + 1, REC_NEW_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - } - else - { - mach_write_to_2(slot, PAGE_OLD_SUPREMUM); - rec_set_bit_field_1(m_page + PAGE_OLD_SUPREMUM, count + 1, REC_OLD_N_OWNED, - REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_OLD_SUPREMUM); + page_rec_set_n_owned(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1, + false, &m_mtr); } ut_ad(!dict_index_is_spatial(m_index)); @@ -386,8 +406,7 @@ inline void PageBulk::finishPage() mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page, static_cast(m_heap_top - m_page)); mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page, - (PAGE_HEAP_NO_USER_LOW + m_rec_no) | - uint16_t{fmt != REDUNDANT} << 15); + (PAGE_HEAP_NO_USER_LOW + m_rec_no) | 1U << 15); mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); mach_write_to_2(PAGE_HEADER + PAGE_LAST_INSERT + m_page, static_cast(m_cur_rec - m_page)); @@ -563,7 +582,10 @@ PageBulk::copyOut( offsets = rec_get_offsets(rec, m_index, offsets, page_rec_is_leaf(split_rec), ULINT_UNDEFINED, &m_heap); - page_rec_set_next(rec, page_get_supremum_rec(m_page)); + mach_write_to_2(rec - REC_NEXT, m_is_comp + ? static_cast + (PAGE_NEW_SUPREMUM - page_offset(rec)) + : PAGE_OLD_SUPREMUM); /* Set related members */ m_cur_rec = rec; diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc index 6c204514ab6e3..186f75dd7923f 100644 --- a/storage/innobase/btr/btr0cur.cc +++ b/storage/innobase/btr/btr0cur.cc @@ -3878,31 +3878,6 @@ btr_cur_upd_lock_and_undo( cmpl_info, rec, offsets, roll_ptr)); } -/** Copy DB_TRX_ID,DB_ROLL_PTR to the redo log. -@param[in] index clustered index -@param[in] trx_id_t DB_TRX_ID -@param[in] roll_ptr DB_ROLL_PTR -@param[in,out] log_ptr redo log buffer -@return current end of the redo log buffer */ -static byte* -btr_cur_log_sys( - const dict_index_t* index, - trx_id_t trx_id, - roll_ptr_t roll_ptr, - byte* log_ptr) -{ - log_ptr += mach_write_compressed(log_ptr, index->db_trx_id()); - /* Yes, we are writing DB_ROLL_PTR,DB_TRX_ID in reverse order, - after emitting the position of DB_TRX_ID in the index. - This is how row_upd_write_sys_vals_to_log() - originally worked, and it is part of the redo log format. */ - trx_write_roll_ptr(log_ptr, roll_ptr); - log_ptr += DATA_ROLL_PTR_LEN; - log_ptr += mach_u64_write_compressed(log_ptr, trx_id); - - return log_ptr; -} - /** Write DB_TRX_ID,DB_ROLL_PTR to a clustered index entry. @param[in,out] entry clustered index entry @param[in] index clustered index @@ -3922,63 +3897,6 @@ static void btr_cur_write_sys( trx_write_roll_ptr(static_cast(r->data), roll_ptr); } -/***********************************************************//** -Writes a redo log record of updating a record in-place. */ -void -btr_cur_update_in_place_log( -/*========================*/ - ulint flags, /*!< in: flags */ - const rec_t* rec, /*!< in: record */ - dict_index_t* index, /*!< in: index of the record */ - const upd_t* update, /*!< in: update vector */ - trx_id_t trx_id, /*!< in: transaction id */ - roll_ptr_t roll_ptr, /*!< in: roll ptr */ - mtr_t* mtr) /*!< in: mtr */ -{ - byte* log_ptr; - const page_t* page = page_align(rec); - ut_ad(flags < 256); - ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table)); - - log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page) - ? MLOG_COMP_REC_UPDATE_IN_PLACE - : MLOG_REC_UPDATE_IN_PLACE, - 1 + DATA_ROLL_PTR_LEN + 14 + 2 - + MLOG_BUF_MARGIN); - - if (!log_ptr) { - /* Logging in mtr is switched off during crash recovery */ - return; - } - - /* For secondary indexes, we could skip writing the dummy system fields - to the redo log but we have to change redo log parsing of - MLOG_REC_UPDATE_IN_PLACE/MLOG_COMP_REC_UPDATE_IN_PLACE or we have to add - new redo log record. For now, just write dummy sys fields to the redo - log if we are updating a secondary index record. - */ - mach_write_to_1(log_ptr, flags); - log_ptr++; - - if (dict_index_is_clust(index)) { - log_ptr = btr_cur_log_sys(index, trx_id, roll_ptr, log_ptr); - } else { - /* Dummy system fields for a secondary index */ - /* TRX_ID Position */ - log_ptr += mach_write_compressed(log_ptr, 0); - /* ROLL_PTR */ - trx_write_roll_ptr(log_ptr, 0); - log_ptr += DATA_ROLL_PTR_LEN; - /* TRX_ID */ - log_ptr += mach_u64_write_compressed(log_ptr, 0); - } - - mach_write_to_2(log_ptr, page_offset(rec)); - log_ptr += 2; - - row_upd_index_write_log(update, log_ptr, mtr); -} - /** Update DB_TRX_ID, DB_ROLL_PTR in a clustered index record. @param[in,out] block clustered index leaf page @param[in,out] rec clustered index record @@ -4059,6 +3977,190 @@ row_upd_parse_sys_vals( return(const_cast(ptr)); } +/***********************************************************//** +Sets the value of the ith field SQL null bit of an old-style record. */ +static +void +rec_set_nth_field_null_bit( +/*=======================*/ + rec_t* rec, /*!< in: record */ + ulint i, /*!< in: ith field */ + ibool val) /*!< in: value to set */ +{ + ulint info; + + if (rec_get_1byte_offs_flag(rec)) { + + info = rec_1_get_field_end_info(rec, i); + + if (val) { + info = info | REC_1BYTE_SQL_NULL_MASK; + } else { + info = info & ~REC_1BYTE_SQL_NULL_MASK; + } + + rec_1_set_field_end_info(rec, i, info); + + return; + } + + info = rec_2_get_field_end_info(rec, i); + + if (val) { + info = info | REC_2BYTE_SQL_NULL_MASK; + } else { + info = info & ~REC_2BYTE_SQL_NULL_MASK; + } + + rec_2_set_field_end_info(rec, i, info); +} + +/***********************************************************//** +Sets an old-style record field to SQL null. +The physical size of the field is not changed. */ +static +void +rec_set_nth_field_sql_null( +/*=======================*/ + rec_t* rec, /*!< in: record */ + ulint n) /*!< in: index of the field */ +{ + ulint offset; + + offset = rec_get_field_start_offs(rec, n); + + data_write_sql_null(rec + offset, rec_get_nth_field_size(rec, n)); + + rec_set_nth_field_null_bit(rec, n, TRUE); +} + +/***********************************************************//** +This is used to modify the value of an already existing field in a record. +The previous value must have exactly the same size as the new value. If len +is UNIV_SQL_NULL then the field is treated as an SQL null. +For records in ROW_FORMAT=COMPACT (new-style records), len must not be +UNIV_SQL_NULL unless the field already is SQL null. */ +static +void +rec_set_nth_field( +/*==============*/ + rec_t* rec, /*!< in: record */ + const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ + ulint n, /*!< in: index number of the field */ + const void* data, /*!< in: pointer to the data + if not SQL null */ + ulint len) /*!< in: length of the data or UNIV_SQL_NULL */ +{ + byte* data2; + ulint len2; + + ut_ad(rec_offs_validate(rec, NULL, offsets)); + ut_ad(!rec_offs_nth_default(offsets, n)); + + if (len == UNIV_SQL_NULL) { + if (!rec_offs_nth_sql_null(offsets, n)) { + ut_a(!rec_offs_comp(offsets)); + rec_set_nth_field_sql_null(rec, n); + } + + return; + } + + data2 = (byte*)rec_get_nth_field(rec, offsets, n, &len2); + if (len2 == UNIV_SQL_NULL) { + ut_ad(!rec_offs_comp(offsets)); + rec_set_nth_field_null_bit(rec, n, FALSE); + ut_ad(len == rec_get_nth_field_size(rec, n)); + } else { + ut_ad(len2 == len); + } + + memcpy(data2, data, len); +} + +/***********************************************************//** +Replaces the new column values stored in the update vector to the +record given. No field size changes are allowed. This function is +usually invoked on a clustered index. The only use case for a +secondary index is row_ins_sec_index_entry_by_modify() or its +counterpart in ibuf_insert_to_index_page(). */ +static +void +row_upd_rec_in_place( +/*=================*/ + rec_t* rec, /*!< in/out: record where replaced */ + dict_index_t* index, /*!< in: the index the record belongs to */ + const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ + const upd_t* update, /*!< in: update vector */ + page_zip_des_t* page_zip,/*!< in: compressed page with enough space + available, or NULL */ + mtr_t* mtr) /*!< in/out: mini-transaction */ +{ + const upd_field_t* upd_field; + const dfield_t* new_val; + ulint n_fields; + ulint i; + + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(!index->table->skip_alter_undo); + + if (rec_offs_comp(offsets)) { +#ifdef UNIV_DEBUG + switch (rec_get_status(rec)) { + case REC_STATUS_ORDINARY: + break; + case REC_STATUS_INSTANT: + ut_ad(index->is_instant()); + break; + case REC_STATUS_NODE_PTR: + if (index->is_dummy + && fil_page_get_type(page_align(rec)) + == FIL_PAGE_RTREE) { + /* The function rtr_update_mbr_field_in_place() + is generating MLOG_COMP_REC_UPDATE_IN_PLACE + and MLOG_REC_UPDATE_IN_PLACE records for + node pointer pages. */ + break; + } + /* fall through */ + case REC_STATUS_INFIMUM: + case REC_STATUS_SUPREMUM: + ut_ad(!"wrong record status in update"); + } +#endif /* UNIV_DEBUG */ + + rec_set_bit_field_1(rec, update->info_bits, REC_NEW_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); + } else { + rec_set_bit_field_1(rec, update->info_bits, REC_OLD_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); + } + + n_fields = upd_get_n_fields(update); + + for (i = 0; i < n_fields; i++) { + upd_field = upd_get_nth_field(update, i); + + /* No need to update virtual columns for non-virtual index */ + if (upd_fld_is_virtual_col(upd_field) + && !dict_index_has_virtual(index)) { + continue; + } + + new_val = &(upd_field->new_val); + ut_ad(!dfield_is_ext(new_val) == + !rec_offs_nth_extern(offsets, upd_field->field_no)); + + rec_set_nth_field(rec, offsets, upd_field->field_no, + dfield_get_data(new_val), + dfield_get_len(new_val)); + } + + if (page_zip) { + page_zip_write_rec(page_zip, rec, index, offsets, 0, mtr); + } +} + /***********************************************************//** Parses a redo log record of updating a record in-place. @return end of log record or NULL */ @@ -4070,7 +4172,8 @@ btr_cur_parse_update_in_place( const byte* end_ptr,/*!< in: buffer end */ page_t* page, /*!< in/out: page or NULL */ page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - dict_index_t* index) /*!< in: index corresponding to page */ + dict_index_t* index, /*!< in: index corresponding to page */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { ulint flags; rec_t* rec; @@ -4146,7 +4249,7 @@ btr_cur_parse_update_in_place( trx_write_roll_ptr(field + DATA_TRX_ID_LEN, roll_ptr); } - row_upd_rec_in_place(rec, index, offsets, update, page_zip); + row_upd_rec_in_place(rec, index, offsets, update, page_zip, mtr); func_exit: mem_heap_free(heap); @@ -4241,6 +4344,114 @@ btr_cur_update_alloc_zip_func( return(false); } +/** Apply an update vector to a record. No field size changes are allowed. + +This is usually invoked on a clustered index. The only use case for a +secondary index is row_ins_sec_index_entry_by_modify() or its +counterpart in ibuf_insert_to_index_page(). +@param[in,out] rec index record +@param[in] index the index of the record +@param[in] offsets rec_get_offsets(rec, index) +@param[in] update update vector +@param[in,out] block index page +@param[in,out] mtr mini-transaction */ +void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index, + const offset_t *offsets, const upd_t *update, + buf_block_t *block, mtr_t *mtr) +{ + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(!index->table->skip_alter_undo); + ut_ad(!block->page.zip.data || index->table->not_redundant()); + +#ifdef UNIV_DEBUG + if (rec_offs_comp(offsets)) { + switch (rec_get_status(rec)) { + case REC_STATUS_ORDINARY: + break; + case REC_STATUS_INSTANT: + ut_ad(index->is_instant()); + break; + case REC_STATUS_NODE_PTR: + case REC_STATUS_INFIMUM: + case REC_STATUS_SUPREMUM: + ut_ad(!"wrong record status in update"); + } + } +#endif /* UNIV_DEBUG */ + + byte* info_bits = &rec[rec_offs_comp(offsets) + ? -REC_NEW_INFO_BITS + : -REC_OLD_INFO_BITS]; + compile_time_assert(REC_INFO_BITS_SHIFT == 0); + if ((*info_bits & REC_INFO_BITS_MASK) == update->info_bits) { + } else if (UNIV_LIKELY_NULL(block->page.zip.data)) { + *info_bits &= ~REC_INFO_BITS_MASK; + *info_bits |= update->info_bits; + } else { + mtr->write<1>(*block, info_bits, + (*info_bits & ~REC_INFO_BITS_MASK) + | update->info_bits); + } + + for (ulint i = 0; i < update->n_fields; i++) { + const upd_field_t* uf = upd_get_nth_field(update, i); + if (upd_fld_is_virtual_col(uf) && !index->has_virtual()) { + continue; + } + const ulint n = uf->field_no; + + ut_ad(!dfield_is_ext(&uf->new_val) + == !rec_offs_nth_extern(offsets, n)); + ut_ad(!rec_offs_nth_default(offsets, n)); + + if (UNIV_UNLIKELY(dfield_is_null(&uf->new_val))) { + ut_ad(!rec_offs_nth_sql_null(offsets, n)); + ut_ad(!index->table->not_redundant()); + mtr->memset(block, + page_offset(rec + rec_get_field_start_offs( + rec, n)), + rec_get_nth_field_size(rec, n), 0); + ulint l = rec_get_1byte_offs_flag(rec) + ? (n + 1) : (n + 1) * 2; + byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l]; + compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8 + == REC_2BYTE_SQL_NULL_MASK); + mtr->write<1>(*block, b, + byte(*b | REC_1BYTE_SQL_NULL_MASK)); + continue; + } + + ulint len; + byte* data = rec_get_nth_field(rec, offsets, n, &len); + if (UNIV_LIKELY_NULL(block->page.zip.data)) { + ut_ad(len == uf->new_val.len); + memcpy(data, uf->new_val.data, len); + continue; + } + + if (UNIV_UNLIKELY(len != uf->new_val.len)) { + ut_ad(len == UNIV_SQL_NULL); + ut_ad(!rec_offs_comp(offsets)); + len = uf->new_val.len; + ut_ad(len == rec_get_nth_field_size(rec, n)); + ulint l = rec_get_1byte_offs_flag(rec) + ? (n + 1) : (n + 1) * 2; + byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l]; + compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8 + == REC_2BYTE_SQL_NULL_MASK); + mtr->write<1>(*block, b, + byte(*b & ~REC_1BYTE_SQL_NULL_MASK)); + } + + mtr->memcpy(block, page_offset(data), uf->new_val.data, len); + } + + if (UNIV_LIKELY_NULL(block->page.zip.data)) { + page_zip_write_rec(&block->page.zip, rec, index, offsets, 0, + mtr); + } +} + /*************************************************************//** Updates a record when the update causes no size changes in its fields. We assume here that the ordering fields of the record do not change. @@ -4364,7 +4575,8 @@ btr_cur_update_in_place( assert_block_ahi_valid(block); #endif /* BTR_CUR_HASH_ADAPT */ - row_upd_rec_in_place(rec, index, offsets, update, page_zip); + btr_cur_upd_rec_in_place(rec, index, offsets, update, block, + mtr); #ifdef BTR_CUR_HASH_ADAPT if (ahi_latch) { @@ -4373,9 +4585,6 @@ btr_cur_update_in_place( } #endif /* BTR_CUR_HASH_ADAPT */ - btr_cur_update_in_place_log(flags, rec, index, update, - trx_id, roll_ptr, mtr); - if (was_delete_marked && !rec_get_deleted_flag( rec, page_is_comp(buf_block_get_frame(block)))) { diff --git a/storage/innobase/btr/btr0defragment.cc b/storage/innobase/btr/btr0defragment.cc index 9e1c0c6fad8dd..aa3c33893c695 100644 --- a/storage/innobase/btr/btr0defragment.cc +++ b/storage/innobase/btr/btr0defragment.cc @@ -770,6 +770,7 @@ static void btr_defragment_chunk(void*) return; } } + log_free_check(); mtr_start(&mtr); cursor = btr_pcur_get_btr_cur(pcur); index = btr_cur_get_index(cursor); diff --git a/storage/innobase/gis/gis0rtree.cc b/storage/innobase/gis/gis0rtree.cc index 6de460196ec9e..f46314d2767c6 100644 --- a/storage/innobase/gis/gis0rtree.cc +++ b/storage/innobase/gis/gis0rtree.cc @@ -183,82 +183,6 @@ rtr_index_build_node_ptr( return(tuple); } -/**************************************************************//** -In-place update the mbr field of a spatial index row. -@return true if update is successful */ -static -bool -rtr_update_mbr_field_in_place( -/*==========================*/ - dict_index_t* index, /*!< in: spatial index. */ - rec_t* rec, /*!< in/out: rec to be modified.*/ - offset_t* offsets, /*!< in/out: offsets on rec. */ - rtr_mbr_t* mbr, /*!< in: the new mbr. */ - mtr_t* mtr) /*!< in: mtr */ -{ - void* new_mbr_ptr; - double new_mbr[SPDIMS * 2]; - byte* log_ptr; - page_t* page = page_align(rec); - ulint len = DATA_MBR_LEN; - ulint flags = BTR_NO_UNDO_LOG_FLAG - | BTR_NO_LOCKING_FLAG - | BTR_KEEP_SYS_FLAG; - ulint rec_info; - - rtr_write_mbr(reinterpret_cast(&new_mbr), mbr); - new_mbr_ptr = static_cast(new_mbr); - /* Otherwise, set the mbr to the new_mbr. */ - rec_set_nth_field(rec, offsets, 0, new_mbr_ptr, len); - - rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets)); - - /* Write redo log. */ - /* For now, we use LOG_REC_UPDATE_IN_PLACE to log this enlarge. - In the future, we may need to add a new log type for this. */ - log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page) - ? MLOG_COMP_REC_UPDATE_IN_PLACE - : MLOG_REC_UPDATE_IN_PLACE, - 1 + DATA_ROLL_PTR_LEN + 14 + 2 - + MLOG_BUF_MARGIN); - - if (!log_ptr) { - /* Logging in mtr is switched off during - crash recovery */ - return(false); - } - - /* Flags */ - mach_write_to_1(log_ptr, flags); - log_ptr++; - /* TRX_ID Position */ - log_ptr += mach_write_compressed(log_ptr, 0); - /* ROLL_PTR */ - trx_write_roll_ptr(log_ptr, 0); - log_ptr += DATA_ROLL_PTR_LEN; - /* TRX_ID */ - log_ptr += mach_u64_write_compressed(log_ptr, 0); - - /* Offset */ - mach_write_to_2(log_ptr, page_offset(rec)); - log_ptr += 2; - /* Info bits */ - mach_write_to_1(log_ptr, rec_info); - log_ptr++; - /* N fields */ - log_ptr += mach_write_compressed(log_ptr, 1); - /* Field no, len */ - log_ptr += mach_write_compressed(log_ptr, 0); - log_ptr += mach_write_compressed(log_ptr, len); - /* Data */ - memcpy(log_ptr, new_mbr_ptr, len); - log_ptr += len; - - mlog_close(mtr, log_ptr); - - return(true); -} - /**************************************************************//** Update the mbr field of a spatial index row. @return true if update is successful */ @@ -280,7 +204,7 @@ rtr_update_mbr_field( mem_heap_t* heap; page_t* page; rec_t* rec; - ulint flags = BTR_NO_UNDO_LOG_FLAG + constexpr ulint flags = BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG; dberr_t err; @@ -291,7 +215,6 @@ rtr_update_mbr_field( ulint low_match = 0; ulint child; ulint rec_info; - page_zip_des_t* page_zip; bool ins_suc = true; ulint cur2_pos = 0; ulint del_page_no = 0; @@ -305,7 +228,6 @@ rtr_update_mbr_field( heap = mem_heap_create(100); block = btr_cur_get_block(cursor); ut_ad(page == buf_block_get_frame(block)); - page_zip = buf_block_get_page_zip(block); child = btr_node_ptr_get_child_page_no(rec, offsets); const bool is_leaf = page_is_leaf(block->frame); @@ -330,11 +252,16 @@ rtr_update_mbr_field( cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2)); } + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_base(offsets)[0 + 1] == DATA_MBR_LEN); + ut_ad(node_ptr->fields[0].len == DATA_MBR_LEN); + if (rec_info & REC_INFO_MIN_REC_FLAG) { /* When the rec is minimal rec in this level, we do - in-place update for avoiding it move to other place. */ + in-place update for avoiding it move to other place. */ + page_zip_des_t* page_zip = buf_block_get_page_zip(block); - if (page_zip) { + if (UNIV_LIKELY_NULL(page_zip)) { /* Check if there's enough space for in-place update the zip page. */ if (!btr_cur_update_alloc_zip( @@ -370,21 +297,18 @@ rtr_update_mbr_field( rec, rec_offs_comp(offsets)); ut_ad(rec_info & REC_INFO_MIN_REC_FLAG); #endif /* UNIV_DEBUG */ - } - - if (!rtr_update_mbr_field_in_place(index, rec, - offsets, mbr, mtr)) { - return(false); - } - - if (page_zip) { - page_zip_write_rec(page_zip, rec, index, offsets, 0); + memcpy(rec, node_ptr->fields[0].data, DATA_MBR_LEN); + page_zip_write_rec(page_zip, rec, index, offsets, 0, + mtr); + } else { + mtr->memcpy(block, page_offset(rec), + node_ptr->fields[0].data, DATA_MBR_LEN); } if (cursor2) { offset_t* offsets2; - if (page_zip) { + if (UNIV_LIKELY_NULL(page_zip)) { cursor2->page_cur.rec = page_rec_get_nth(page, cur2_pos); } diff --git a/storage/innobase/ibuf/ibuf0ibuf.cc b/storage/innobase/ibuf/ibuf0ibuf.cc index c0fe72149d574..284233d9b99ac 100644 --- a/storage/innobase/ibuf/ibuf0ibuf.cc +++ b/storage/innobase/ibuf/ibuf0ibuf.cc @@ -3833,15 +3833,8 @@ ibuf_insert_to_index_page( /* This is the easy case. Do something similar to btr_cur_update_in_place(). */ rec = page_cur_get_rec(&page_cur); - row_upd_rec_in_place(rec, index, offsets, - update, page_zip); - - /* Log the update in place operation. During recovery - MLOG_COMP_REC_UPDATE_IN_PLACE/MLOG_REC_UPDATE_IN_PLACE - expects trx_id, roll_ptr for secondary indexes. So we - just write dummy trx_id(0), roll_ptr(0) */ - btr_cur_update_in_place_log(BTR_KEEP_SYS_FLAG, rec, - index, update, 0, 0, mtr); + btr_cur_upd_rec_in_place(rec, index, offsets, + update, block, mtr); DBUG_EXECUTE_IF( "crash_after_log_ibuf_upd_inplace", diff --git a/storage/innobase/include/btr0cur.h b/storage/innobase/include/btr0cur.h index 0192aeaddef40..9e29a2450cb76 100644 --- a/storage/innobase/include/btr0cur.h +++ b/storage/innobase/include/btr0cur.h @@ -356,6 +356,22 @@ btr_cur_update_alloc_zip_func( # define btr_cur_update_alloc_zip(page_zip,cursor,index,offsets,len,cr,mtr) \ btr_cur_update_alloc_zip_func(page_zip,cursor,index,len,cr,mtr) #endif /* UNIV_DEBUG */ + +/** Apply an update vector to a record. No field size changes are allowed. + +This is usually invoked on a clustered index. The only use case for a +secondary index is row_ins_sec_index_entry_by_modify() or its +counterpart in ibuf_insert_to_index_page(). +@param[in,out] rec index record +@param[in] index the index of the record +@param[in] offsets rec_get_offsets(rec, index) +@param[in] update update vector +@param[in,out] block index page +@param[in,out] mtr mini-transaction */ +void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index, + const offset_t *offsets, const upd_t *update, + buf_block_t *block, mtr_t *mtr) + MY_ATTRIBUTE((nonnull)); /*************************************************************//** Updates a record when the update causes no size changes in its fields. @return locking or undo log related error code, or @@ -380,19 +396,6 @@ btr_cur_update_in_place( mtr_commit(mtr) before latching any further pages */ MY_ATTRIBUTE((warn_unused_result, nonnull)); -/***********************************************************//** -Writes a redo log record of updating a record in-place. */ -void -btr_cur_update_in_place_log( -/*========================*/ - ulint flags, /*!< in: flags */ - const rec_t* rec, /*!< in: record */ - dict_index_t* index, /*!< in: index of the record */ - const upd_t* update, /*!< in: update vector */ - trx_id_t trx_id, /*!< in: transaction id */ - roll_ptr_t roll_ptr, /*!< in: roll ptr */ - mtr_t* mtr) /*!< in: mtr */ - MY_ATTRIBUTE((nonnull)); /*************************************************************//** Tries to update a record on a page in an index tree. It is assumed that mtr holds an x-latch on the page. The operation does not succeed if there is too @@ -563,7 +566,8 @@ btr_cur_parse_update_in_place( const byte* end_ptr,/*!< in: buffer end */ page_t* page, /*!< in/out: page or NULL */ page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */ - dict_index_t* index); /*!< in: index corresponding to page */ + dict_index_t* index, /*!< in: index corresponding to page */ + mtr_t* mtr); /*!< in/out: mini-transaction */ /****************************************************************//** Parses the redo log record for delete marking or unmarking of a clustered index record. diff --git a/storage/innobase/include/dyn0types.h b/storage/innobase/include/dyn0types.h index 06d837081a19d..83d0b0d64c27f 100644 --- a/storage/innobase/include/dyn0types.h +++ b/storage/innobase/include/dyn0types.h @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 2013, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2020, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -29,8 +30,7 @@ Created 2013-03-16 Sunny Bains /** Value of dyn_block_t::magic_n */ #define DYN_BLOCK_MAGIC_N 375767 -/** This is the initial 'payload' size of a dynamic array; -this must be > MLOG_BUF_MARGIN + 30! */ +/** This is the initial 'payload' size of a dynamic array */ #define DYN_ARRAY_DATA_SIZE 512 /** Flag for dyn_block_t::used that indicates a full block */ diff --git a/storage/innobase/include/mtr0log.h b/storage/innobase/include/mtr0log.h index f049c094f9474..dfb1ff69fe9dc 100644 --- a/storage/innobase/include/mtr0log.h +++ b/storage/innobase/include/mtr0log.h @@ -239,10 +239,6 @@ mlog_parse_index( bool comp, /*!< in: TRUE=compact record format */ dict_index_t** index); /*!< out, own: dummy index */ -/** Insert, update, and maybe other functions may use this value to define an -extra mlog buffer size for variable size data */ -#define MLOG_BUF_MARGIN 256 - #include "mtr0log.ic" #endif /* mtr0log_h */ diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h index 3ed924847cf23..669321c999b2d 100644 --- a/storage/innobase/include/page0cur.h +++ b/storage/innobase/include/page0cur.h @@ -150,29 +150,8 @@ page_cur_tuple_insert( offset_t** offsets,/*!< out: offsets on *rec */ mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */ ulint n_ext, /*!< in: number of externally stored columns */ - mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */ - MY_ATTRIBUTE((nonnull(1,2,3,4,5), warn_unused_result)); -/***********************************************************//** -Inserts a record next to page cursor. Returns pointer to inserted record if -succeed, i.e., enough space available, NULL otherwise. The cursor stays at -the same logical position, but the physical position may change if it is -pointing to a compressed page that was reorganized. - -IMPORTANT: The caller will have to update IBUF_BITMAP_FREE -if this is a compressed leaf page in a secondary index. -This has to be done either within the same mini-transaction, -or by invoking ibuf_reset_free_bits() before mtr_commit(). - -@return pointer to record if succeed, NULL otherwise */ -UNIV_INLINE -rec_t* -page_cur_rec_insert( -/*================*/ - page_cur_t* cursor, /*!< in/out: a page cursor */ - const rec_t* rec, /*!< in: record to insert */ - dict_index_t* index, /*!< in: record descriptor */ - offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */ - mtr_t* mtr); /*!< in: mini-transaction handle, or NULL */ + mtr_t* mtr) /*!< in/out: mini-transaction */ + MY_ATTRIBUTE((nonnull, warn_unused_result)); /***********************************************************//** Inserts a record next to page cursor on an uncompressed page. Returns pointer to inserted record if succeed, i.e., enough @@ -306,23 +285,10 @@ page_cur_open_on_rnd_user_rec( /*==========================*/ buf_block_t* block, /*!< in: page */ page_cur_t* cursor);/*!< out: page cursor */ -/** Write a redo log record of inserting a record into an index page. -@param[in] insert_rec inserted record -@param[in] rec_size rec_get_size(insert_rec) -@param[in] cursor_rec predecessor of insert_rec -@param[in,out] index index tree -@param[in,out] mtr mini-transaction */ -void -page_cur_insert_rec_write_log( - const rec_t* insert_rec, - ulint rec_size, - const rec_t* cursor_rec, - dict_index_t* index, - mtr_t* mtr) - MY_ATTRIBUTE((nonnull)); /***********************************************************//** Parses a log record of a record insert on a page. @return end of log record or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_cur_parse_insert_rec( /*======================*/ diff --git a/storage/innobase/include/page0cur.ic b/storage/innobase/include/page0cur.ic index 3be677663744c..67d64ada6a898 100644 --- a/storage/innobase/include/page0cur.ic +++ b/storage/innobase/include/page0cur.ic @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1994, 2014, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2015, 2019, MariaDB Corporation. +Copyright (c) 2015, 2020, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -257,7 +257,7 @@ page_cur_tuple_insert( offset_t** offsets,/*!< out: offsets on *rec */ mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */ ulint n_ext, /*!< in: number of externally stored columns */ - mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { rec_t* rec; ulint size = rec_get_converted_size(index, tuple, n_ext); @@ -288,34 +288,3 @@ page_cur_tuple_insert( ut_ad(!rec || !cmp_dtuple_rec(tuple, rec, *offsets)); return(rec); } - -/***********************************************************//** -Inserts a record next to page cursor. Returns pointer to inserted record if -succeed, i.e., enough space available, NULL otherwise. The cursor stays at -the same logical position, but the physical position may change if it is -pointing to a compressed page that was reorganized. - -IMPORTANT: The caller will have to update IBUF_BITMAP_FREE -if this is a compressed leaf page in a secondary index. -This has to be done either within the same mini-transaction, -or by invoking ibuf_reset_free_bits() before mtr_commit(). - -@return pointer to record if succeed, NULL otherwise */ -UNIV_INLINE -rec_t* -page_cur_rec_insert( -/*================*/ - page_cur_t* cursor, /*!< in/out: a page cursor */ - const rec_t* rec, /*!< in: record to insert */ - dict_index_t* index, /*!< in: record descriptor */ - offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */ - mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */ -{ - if (is_buf_block_get_page_zip(cursor->block)) { - return(page_cur_insert_rec_zip( - cursor, index, rec, offsets, mtr)); - } else { - return(page_cur_insert_rec_low( - cursor, index, rec, offsets, mtr)); - } -} diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h index 0ed91cb8b1b64..6f57fd38848f7 100644 --- a/storage/innobase/include/page0page.h +++ b/storage/innobase/include/page0page.h @@ -506,17 +506,6 @@ inline uint16_t page_header_get_field(const page_t *page, ulint field) #ifndef UNIV_INNOCHECKSUM /*************************************************************//** -Sets the given header field. */ -UNIV_INLINE -void -page_header_set_field( -/*==================*/ - page_t* page, /*!< in/out: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint field, /*!< in: PAGE_N_DIR_SLOTS, ... */ - ulint val); /*!< in: value */ -/*************************************************************//** Returns the offset stored in the given header field. @return offset from the start of the page, or 0 */ UNIV_INLINE @@ -532,17 +521,6 @@ Returns the pointer stored in the given header field, or NULL. */ #define page_header_get_ptr(page, field) \ (page_header_get_offs(page, field) \ ? page + page_header_get_offs(page, field) : NULL) -/*************************************************************//** -Sets the pointer stored in the given header field. */ -UNIV_INLINE -void -page_header_set_ptr( -/*================*/ - page_t* page, /*!< in/out: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint field, /*!< in/out: PAGE_FREE, ... */ - const byte* ptr); /*!< in: pointer or NULL*/ /** Reset PAGE_LAST_INSERT. @@ -632,19 +610,6 @@ page_dir_get_n_heap( /*================*/ const page_t* page); /*!< in: index page */ /*************************************************************//** -Sets the number of records in the heap. */ -UNIV_INLINE -void -page_dir_set_n_heap( -/*================*/ - page_t* page, /*!< in/out: index page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL. - Note that the size of the dense page directory - in the compressed page trailer is - n_heap * PAGE_ZIP_DIR_SLOT_SIZE. */ - ulint n_heap);/*!< in: number of records */ -/*************************************************************//** Gets the number of dir slots in directory. @return number of slots */ UNIV_INLINE @@ -797,16 +762,6 @@ page_rec_get_next_non_del_marked( /*=============================*/ const rec_t* rec); /*!< in: pointer to record */ /************************************************************//** -Sets the pointer to the next record on the page. */ -UNIV_INLINE -void -page_rec_set_next( -/*==============*/ - rec_t* rec, /*!< in: pointer to record, - must not be page supremum */ - const rec_t* next); /*!< in: pointer to next record, - must not be page infimum */ -/************************************************************//** Gets the pointer to the previous record. @return pointer to previous record */ UNIV_INLINE @@ -941,21 +896,6 @@ uint16_t page_get_data_size( /*===============*/ const page_t* page); /*!< in: index page */ -/************************************************************//** -Allocates a block of memory from the head of the free list -of an index page. */ -UNIV_INLINE -void -page_mem_alloc_free( -/*================*/ - page_t* page, /*!< in/out: index page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page with enough - space available for inserting the record, - or NULL */ - rec_t* next_rec,/*!< in: pointer to the new head of the - free record list */ - ulint need); /*!< in: number of bytes allocated */ - /** Read the PAGE_DIRECTION field from a byte. @param[in] ptr pointer to PAGE_DIRECTION_B @return the value of the PAGE_DIRECTION field */ @@ -963,13 +903,6 @@ inline byte page_ptr_get_direction(const byte* ptr); -/** Set the PAGE_DIRECTION field. -@param[in] ptr pointer to PAGE_DIRECTION_B -@param[in] dir the value of the PAGE_DIRECTION field */ -inline -void -page_ptr_set_direction(byte* ptr, byte dir); - /** Read the PAGE_DIRECTION field. @param[in] page index page @return the value of the PAGE_DIRECTION field */ diff --git a/storage/innobase/include/page0page.ic b/storage/innobase/include/page0page.ic index 563de443f7627..b4fd9606caa90 100644 --- a/storage/innobase/include/page0page.ic +++ b/storage/innobase/include/page0page.ic @@ -105,33 +105,6 @@ page_set_ssn_id( #endif /* !UNIV_INNOCHECKSUM */ #ifndef UNIV_INNOCHECKSUM -/*************************************************************//** -Sets the given header field. */ -UNIV_INLINE -void -page_header_set_field( -/*==================*/ - page_t* page, /*!< in/out: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint field, /*!< in: PAGE_N_DIR_SLOTS, ... */ - ulint val) /*!< in: value */ -{ - ut_ad(page); - ut_ad(field <= PAGE_N_RECS); -#if 0 /* FIXME: MDEV-19344 hits this */ - ut_ad(field != PAGE_N_RECS || val); -#endif - ut_ad(field == PAGE_N_HEAP || val < srv_page_size); - ut_ad(field != PAGE_N_HEAP || (val & 0x7fff) < srv_page_size); - - mach_write_to_2(page + PAGE_HEADER + field, val); - if (page_zip) { - page_zip_write_header(page_zip, - page + PAGE_HEADER + field, 2, NULL); - } -} - /*************************************************************//** Returns the offset stored in the given header field. @return offset from the start of the page, or 0 */ @@ -153,35 +126,6 @@ page_header_get_offs( return(offs); } -/*************************************************************//** -Sets the pointer stored in the given header field. */ -UNIV_INLINE -void -page_header_set_ptr( -/*================*/ - page_t* page, /*!< in: page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL */ - ulint field, /*!< in: PAGE_FREE, ... */ - const byte* ptr) /*!< in: pointer or NULL*/ -{ - ulint offs; - - ut_ad(page); - ut_ad((field == PAGE_FREE) - || (field == PAGE_LAST_INSERT) - || (field == PAGE_HEAP_TOP)); - - if (ptr == NULL) { - offs = 0; - } else { - offs = ulint(ptr - page); - } - - ut_ad((field != PAGE_HEAP_TOP) || offs); - - page_header_set_field(page, page_zip, field, offs); -} /** Reset PAGE_LAST_INSERT. @@ -432,29 +376,6 @@ page_dir_get_n_heap( return(page_header_get_field(page, PAGE_N_HEAP) & 0x7fff); } -/*************************************************************//** -Sets the number of records in the heap. */ -UNIV_INLINE -void -page_dir_set_n_heap( -/*================*/ - page_t* page, /*!< in/out: index page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page whose - uncompressed part will be updated, or NULL. - Note that the size of the dense page directory - in the compressed page trailer is - n_heap * PAGE_ZIP_DIR_SLOT_SIZE. */ - ulint n_heap) /*!< in: number of records */ -{ - ut_ad(n_heap < 0x8000); - ut_ad(!page_zip || uint16_t(n_heap) - == (page_header_get_field(page, PAGE_N_HEAP) & 0x7fff) + 1); - - page_header_set_field(page, page_zip, PAGE_N_HEAP, n_heap - | (0x8000 - & page_header_get_field(page, PAGE_N_HEAP))); -} - /**************************************************************//** Used to check the consistency of a record on a page. @return TRUE if succeed */ @@ -595,35 +516,6 @@ page_rec_get_next_non_del_marked( return(r); } -/************************************************************//** -Sets the pointer to the next record on the page. */ -UNIV_INLINE -void -page_rec_set_next( -/*==============*/ - rec_t* rec, /*!< in: pointer to record, - must not be page supremum */ - const rec_t* next) /*!< in: pointer to next record, - must not be page infimum */ -{ - ulint offs; - - ut_ad(page_rec_check(rec)); - ut_ad(!page_rec_is_supremum(rec)); - ut_ad(rec != next); - - ut_ad(!next || !page_rec_is_infimum(next)); - ut_ad(!next || page_align(rec) == page_align(next)); - - offs = next != NULL ? page_offset(next) : 0; - - if (page_rec_is_comp(rec)) { - rec_set_next_offs_new(rec, offs); - } else { - rec_set_next_offs_old(rec, offs); - } -} - /************************************************************//** Gets the pointer to the previous record. @return pointer to previous record */ @@ -745,39 +637,6 @@ page_get_data_size( } #ifndef UNIV_INNOCHECKSUM -/************************************************************//** -Allocates a block of memory from the free list of an index page. */ -UNIV_INLINE -void -page_mem_alloc_free( -/*================*/ - page_t* page, /*!< in/out: index page */ - page_zip_des_t* page_zip,/*!< in/out: compressed page with enough - space available for inserting the record, - or NULL */ - rec_t* next_rec,/*!< in: pointer to the new head of the - free record list */ - ulint need) /*!< in: number of bytes allocated */ -{ - ulint garbage; - -#ifdef UNIV_DEBUG - const rec_t* old_rec = page_header_get_ptr(page, PAGE_FREE); - ulint next_offs; - - ut_ad(old_rec); - next_offs = rec_get_next_offs(old_rec, page_is_comp(page)); - ut_ad(next_rec == (next_offs ? page + next_offs : NULL)); -#endif - - page_header_set_ptr(page, page_zip, PAGE_FREE, next_rec); - - garbage = page_header_get_field(page, PAGE_GARBAGE); - ut_ad(garbage >= need); - - page_header_set_field(page, page_zip, PAGE_GARBAGE, garbage - need); -} - /*************************************************************//** Calculates free space if a page is emptied. @return free space */ @@ -884,19 +743,6 @@ page_ptr_get_direction(const byte* ptr) return *ptr & ((1U << 3) - 1); } -/** Set the PAGE_DIRECTION field. -@param[in] ptr pointer to PAGE_DIRECTION_B -@param[in] dir the value of the PAGE_DIRECTION field */ -inline -void -page_ptr_set_direction(byte* ptr, byte dir) -{ - ut_ad(page_offset(ptr) == PAGE_HEADER + PAGE_DIRECTION_B); - ut_ad(dir >= PAGE_LEFT); - ut_ad(dir <= PAGE_NO_DIRECTION); - *ptr = (*ptr & ~((1U << 3) - 1)) | dir; -} - /** Read the PAGE_INSTANT field. @param[in] page index page @return the value of the PAGE_INSTANT field */ diff --git a/storage/innobase/include/page0zip.h b/storage/innobase/include/page0zip.h index 4da4e74099457..ef3e0f2a56904 100644 --- a/storage/innobase/include/page0zip.h +++ b/storage/innobase/include/page0zip.h @@ -243,17 +243,17 @@ page_zip_write_header( mtr_t* mtr) /*!< in: mini-transaction, or NULL */ MY_ATTRIBUTE((nonnull(1,2))); -/**********************************************************************//** -Write an entire record on the compressed page. The data must already -have been written to the uncompressed page. */ -void -page_zip_write_rec( -/*===============*/ - page_zip_des_t* page_zip,/*!< in/out: compressed page */ - const byte* rec, /*!< in: record being written */ - dict_index_t* index, /*!< in: the index the record belongs to */ - const offset_t* offsets,/*!< in: rec_get_offsets(rec, index) */ - ulint create) /*!< in: nonzero=insert, zero=update */ +/** Write an entire record to the ROW_FORMAT=COMPRESSED page. +The data must already have been written to the uncompressed page. +@param[in,out] page_zip ROW_FORMAT=COMPRESSED page +@param[in] rec record in the uncompressed page +@param[in] index the index that the page belongs to +@param[in] offsets rec_get_offsets(rec, index) +@param[in] create nonzero=insert, zero=update +@param[in,out] mtr mini-transaction */ +void page_zip_write_rec(page_zip_des_t *page_zip, const byte *rec, + const dict_index_t *index, const offset_t *offsets, + ulint create, mtr_t *mtr) MY_ATTRIBUTE((nonnull)); /***********************************************************//** @@ -374,7 +374,9 @@ page_zip_dir_insert( page_cur_t* cursor, /*!< in/out: page cursor */ const byte* free_rec,/*!< in: record from which rec was allocated, or NULL */ - byte* rec); /*!< in: record to insert */ + byte* rec, /*!< in: record to insert */ + mtr_t* mtr) /*!< in/out: mini-transaction */ + MY_ATTRIBUTE((nonnull(1,3,4))); /**********************************************************************//** Shift the dense page directory and the array of BLOB pointers @@ -391,16 +393,6 @@ page_zip_dir_delete( mtr_t* mtr) /*!< in/out: mini-transaction */ MY_ATTRIBUTE((nonnull(1,2,3,4,6))); -/**********************************************************************//** -Add a slot to the dense page directory. */ -void -page_zip_dir_add_slot( -/*==================*/ - page_zip_des_t* page_zip, /*!< in/out: compressed page */ - ulint is_clustered) /*!< in: nonzero for clustered index, - zero for others */ - MY_ATTRIBUTE((nonnull)); - /***********************************************************//** Parses a log record of writing to the header of a page. @return end of log record or NULL */ diff --git a/storage/innobase/include/rem0rec.h b/storage/innobase/include/rem0rec.h index d8483fe2803c2..033f7c94a4b6a 100644 --- a/storage/innobase/include/rem0rec.h +++ b/storage/innobase/include/rem0rec.h @@ -250,6 +250,7 @@ rec_get_n_owned_new( /*================*/ const rec_t* rec) /*!< in: new-style physical record */ MY_ATTRIBUTE((warn_unused_result)); + /******************************************************//** The following function is used to retrieve the info bits of a record. @@ -261,24 +262,6 @@ rec_get_info_bits( const rec_t* rec, /*!< in: physical record */ ulint comp) /*!< in: nonzero=compact page format */ MY_ATTRIBUTE((warn_unused_result)); -/******************************************************//** -The following function is used to set the info bits of a record. */ -UNIV_INLINE -void -rec_set_info_bits_old( -/*==================*/ - rec_t* rec, /*!< in: old-style physical record */ - ulint bits) /*!< in: info bits */ - MY_ATTRIBUTE((nonnull)); -/******************************************************//** -The following function is used to set the info bits of a record. */ -UNIV_INLINE -void -rec_set_info_bits_new( -/*==================*/ - rec_t* rec, /*!< in/out: new-style physical record */ - ulint bits) /*!< in: info bits */ - MY_ATTRIBUTE((nonnull)); /** Determine the status bits of a non-REDUNDANT record. @param[in] rec ROW_FORMAT=COMPACT,DYNAMIC,COMPRESSED record @@ -859,26 +842,6 @@ rec_offs_n_extern( /*==============*/ const offset_t* offsets)/*!< in: array returned by rec_get_offsets() */ MY_ATTRIBUTE((warn_unused_result)); -/***********************************************************//** -This is used to modify the value of an already existing field in a record. -The previous value must have exactly the same size as the new value. If len -is UNIV_SQL_NULL then the field is treated as an SQL null. -For records in ROW_FORMAT=COMPACT (new-style records), len must not be -UNIV_SQL_NULL unless the field already is SQL null. */ -UNIV_INLINE -void -rec_set_nth_field( -/*==============*/ - rec_t* rec, /*!< in: record */ - const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ - ulint n, /*!< in: index number of the field */ - const void* data, /*!< in: pointer to the data if not SQL null */ - ulint len) /*!< in: length of the data or UNIV_SQL_NULL. - If not SQL null, must have the same - length as the previous value. - If SQL null, previous value must be - SQL null. */ - MY_ATTRIBUTE((nonnull(1,2))); /**********************************************************//** The following function returns the data size of an old-style physical record, that is the sum of field lengths. SQL null fields diff --git a/storage/innobase/include/rem0rec.ic b/storage/innobase/include/rem0rec.ic index 4405ad0abb7c7..7ea68ade6caf6 100644 --- a/storage/innobase/include/rem0rec.ic +++ b/storage/innobase/include/rem0rec.ic @@ -123,23 +123,6 @@ and the shift needed to obtain each bit-field of the record. */ # error "sum of new-style masks != 0xFFFFFFUL" #endif -/***********************************************************//** -Sets the value of the ith field SQL null bit of an old-style record. */ -void -rec_set_nth_field_null_bit( -/*=======================*/ - rec_t* rec, /*!< in: record */ - ulint i, /*!< in: ith field */ - ibool val); /*!< in: value to set */ -/***********************************************************//** -Sets an old-style record field to SQL null. -The physical size of the field is not changed. */ -void -rec_set_nth_field_sql_null( -/*=======================*/ - rec_t* rec, /*!< in: record */ - ulint n); /*!< in: index of the field */ - /******************************************************//** Gets a bit field from within 1 byte. */ UNIV_INLINE @@ -533,31 +516,6 @@ rec_get_info_bits( return(val); } -/******************************************************//** -The following function is used to set the info bits of a record. */ -UNIV_INLINE -void -rec_set_info_bits_old( -/*==================*/ - rec_t* rec, /*!< in: old-style physical record */ - ulint bits) /*!< in: info bits */ -{ - rec_set_bit_field_1(rec, bits, REC_OLD_INFO_BITS, - REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); -} -/******************************************************//** -The following function is used to set the info bits of a record. */ -UNIV_INLINE -void -rec_set_info_bits_new( -/*==================*/ - rec_t* rec, /*!< in/out: new-style physical record */ - ulint bits) /*!< in: info bits */ -{ - rec_set_bit_field_1(rec, bits, REC_NEW_INFO_BITS, - REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); -} - /******************************************************//** The following function is used to retrieve the info and status bits of a record. (Only compact records have status bits.) @@ -594,7 +552,9 @@ rec_set_info_and_status_bits( compile_time_assert(!((REC_NEW_STATUS_MASK >> REC_NEW_STATUS_SHIFT) & (REC_INFO_BITS_MASK >> REC_INFO_BITS_SHIFT))); rec_set_status(rec, bits & REC_NEW_STATUS_MASK); - rec_set_info_bits_new(rec, bits & ~REC_NEW_STATUS_MASK); + rec_set_bit_field_1(rec, bits & ~REC_NEW_STATUS_MASK, + REC_NEW_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); } /******************************************************//** @@ -1032,50 +992,6 @@ rec_get_nth_field_size( return(next_os - os); } -/***********************************************************//** -This is used to modify the value of an already existing field in a record. -The previous value must have exactly the same size as the new value. If len -is UNIV_SQL_NULL then the field is treated as an SQL null. -For records in ROW_FORMAT=COMPACT (new-style records), len must not be -UNIV_SQL_NULL unless the field already is SQL null. */ -UNIV_INLINE -void -rec_set_nth_field( -/*==============*/ - rec_t* rec, /*!< in: record */ - const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ - ulint n, /*!< in: index number of the field */ - const void* data, /*!< in: pointer to the data - if not SQL null */ - ulint len) /*!< in: length of the data or UNIV_SQL_NULL */ -{ - byte* data2; - ulint len2; - - ut_ad(rec_offs_validate(rec, NULL, offsets)); - ut_ad(!rec_offs_nth_default(offsets, n)); - - if (len == UNIV_SQL_NULL) { - if (!rec_offs_nth_sql_null(offsets, n)) { - ut_a(!rec_offs_comp(offsets)); - rec_set_nth_field_sql_null(rec, n); - } - - return; - } - - data2 = (byte*)rec_get_nth_field(rec, offsets, n, &len2); - if (len2 == UNIV_SQL_NULL) { - ut_ad(!rec_offs_comp(offsets)); - rec_set_nth_field_null_bit(rec, n, FALSE); - ut_ad(len == rec_get_nth_field_size(rec, n)); - } else { - ut_ad(len2 == len); - } - - memcpy(data2, data, len); -} - /**********************************************************//** The following function returns the data size of an old-style physical record, that is the sum of field lengths. SQL null fields diff --git a/storage/innobase/include/row0upd.h b/storage/innobase/include/row0upd.h index 7ee2f4e2f142f..86cb18231bcf3 100644 --- a/storage/innobase/include/row0upd.h +++ b/storage/innobase/include/row0upd.h @@ -108,17 +108,6 @@ upd_node_create( /*============*/ mem_heap_t* heap); /*!< in: mem heap where created */ /***********************************************************//** -Writes to the redo log the new values of the fields occurring in the index. */ -void -row_upd_index_write_log( -/*====================*/ - const upd_t* update, /*!< in: update vector */ - byte* log_ptr,/*!< in: pointer to mlog buffer: must - contain at least MLOG_BUF_MARGIN bytes - of free space; the buffer is closed - within this function */ - mtr_t* mtr); /*!< in: mtr into whose log to write */ -/***********************************************************//** Returns TRUE if row update changes size of some field in index or if some field to be updated is stored externally in rec or update. @return TRUE if the update changes the size of some field in index or @@ -137,21 +126,6 @@ row_upd_changes_disowned_external( /*==============================*/ const upd_t* update) /*!< in: update vector */ MY_ATTRIBUTE((nonnull, warn_unused_result)); -/***********************************************************//** -Replaces the new column values stored in the update vector to the -record given. No field size changes are allowed. This function is -usually invoked on a clustered index. The only use case for a -secondary index is row_ins_sec_index_entry_by_modify() or its -counterpart in ibuf_insert_to_index_page(). */ -void -row_upd_rec_in_place( -/*=================*/ - rec_t* rec, /*!< in/out: record where replaced */ - dict_index_t* index, /*!< in: the index the record belongs to */ - const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ - const upd_t* update, /*!< in: update vector */ - page_zip_des_t* page_zip);/*!< in: compressed page with enough space - available, or NULL */ /***************************************************************//** Builds an update vector from those fields which in a secondary index entry @@ -345,6 +319,7 @@ row_upd_step( /*********************************************************************//** Parses the log data written by row_upd_index_write_log. @return log data end or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ byte* row_upd_index_parse( /*================*/ diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index d671948fdd108..c983000f448eb 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -323,15 +323,20 @@ class mlog_init_t i.first, 0, RW_X_LATCH, NULL, BUF_GET_IF_IN_POOL, __FILE__, __LINE__, &mtr)) { - if (UNIV_LIKELY_NULL(block->page.zip.data) - && fil_page_type_is_index( - fil_page_get_type( - block->page.zip.data)) - && !page_zip_decompress(&block->page.zip, + if (UNIV_LIKELY_NULL(block->page.zip.data)) { + switch (fil_page_get_type( + block->page.zip.data)) { + case FIL_PAGE_INDEX: + case FIL_PAGE_RTREE: + if (page_zip_decompress( + &block->page.zip, block->frame, true)) { - ib::error() << "corrupted page " - << block->page.id; + break; + } + ib::error() << "corrupted " + << block->page.id; + } } if (recv_no_ibuf_operations) { mtr.commit(); @@ -1465,6 +1470,10 @@ recv_parse_or_apply_log_rec_body( switch (type) { case MLOG_1BYTE: case MLOG_2BYTES: case MLOG_4BYTES: case MLOG_8BYTES: + ut_ad(!page_zip + || fil_page_get_type(page_zip->data) + <= FIL_PAGE_TYPE_ZBLOB2); + /* fall through */ case MLOG_MEMSET: #ifdef UNIV_DEBUG if (page && page_type == FIL_PAGE_TYPE_ALLOCATED @@ -1641,7 +1650,8 @@ recv_parse_or_apply_log_rec_body( || (ibool)!!page_is_comp(page) == dict_table_is_comp(index->table)); ptr = btr_cur_parse_update_in_place(ptr, end_ptr, page, - page_zip, index); + page_zip, index, + mtr); } break; case MLOG_LIST_END_DELETE: case MLOG_COMP_LIST_END_DELETE: diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index ab3f6e11bb8e0..5fa6df85558a3 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -784,188 +784,6 @@ page_cur_open_on_rnd_user_rec( ut_rnd_interval(n_recs) + 1); } -/** Write a redo log record of inserting a record into an index page. -@param[in] insert_rec inserted record -@param[in] rec_size rec_get_size(insert_rec) -@param[in] cursor_rec predecessor of insert_rec -@param[in,out] index index tree -@param[in,out] mtr mini-transaction */ -void -page_cur_insert_rec_write_log( - const rec_t* insert_rec, - ulint rec_size, - const rec_t* cursor_rec, - dict_index_t* index, - mtr_t* mtr) -{ - ulint cur_rec_size; - ulint extra_size; - ulint cur_extra_size; - const byte* ins_ptr; - const byte* log_end; - ulint i; - - if (index->table->is_temporary()) { - mtr->set_modified(); - ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO); - return; - } - - ut_a(rec_size < srv_page_size); - ut_ad(mtr->is_named_space(index->table->space)); - ut_ad(page_align(insert_rec) == page_align(cursor_rec)); - ut_ad(!page_rec_is_comp(insert_rec) - == !dict_table_is_comp(index->table)); - - const bool is_leaf = page_rec_is_leaf(cursor_rec); - - { - mem_heap_t* heap = NULL; - offset_t cur_offs_[REC_OFFS_NORMAL_SIZE]; - offset_t ins_offs_[REC_OFFS_NORMAL_SIZE]; - - offset_t* cur_offs; - offset_t* ins_offs; - - rec_offs_init(cur_offs_); - rec_offs_init(ins_offs_); - - cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_, - is_leaf, ULINT_UNDEFINED, &heap); - ins_offs = rec_get_offsets(insert_rec, index, ins_offs_, - is_leaf, ULINT_UNDEFINED, &heap); - - extra_size = rec_offs_extra_size(ins_offs); - cur_extra_size = rec_offs_extra_size(cur_offs); - ut_ad(rec_size == rec_offs_size(ins_offs)); - cur_rec_size = rec_offs_size(cur_offs); - - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - } - - ins_ptr = insert_rec - extra_size; - - i = 0; - - if (cur_extra_size == extra_size) { - ulint min_rec_size = ut_min(cur_rec_size, rec_size); - - const byte* cur_ptr = cursor_rec - cur_extra_size; - - /* Find out the first byte in insert_rec which differs from - cursor_rec; skip the bytes in the record info */ - - do { - if (*ins_ptr == *cur_ptr) { - i++; - ins_ptr++; - cur_ptr++; - } else if ((i < extra_size) - && (i >= extra_size - - page_rec_get_base_extra_size - (insert_rec))) { - i = extra_size; - ins_ptr = insert_rec; - cur_ptr = cursor_rec; - } else { - break; - } - } while (i < min_rec_size); - } - - byte* log_ptr; - - if (page_rec_is_comp(insert_rec)) { - log_ptr = mlog_open_and_write_index( - mtr, insert_rec, index, MLOG_COMP_REC_INSERT, - 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN); - if (UNIV_UNLIKELY(!log_ptr)) { - /* Logging in mtr is switched off - during crash recovery: in that case - mlog_open returns NULL */ - return; - } - } else { - log_ptr = mlog_open(mtr, 11 - + 2 + 5 + 1 + 5 + 5 - + MLOG_BUF_MARGIN); - if (UNIV_UNLIKELY(!log_ptr)) { - /* Logging in mtr is switched off - during crash recovery: in that case - mlog_open returns NULL */ - return; - } - - log_ptr = mlog_write_initial_log_record_fast( - insert_rec, MLOG_REC_INSERT, log_ptr, mtr); - } - - log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN]; - /* Write the cursor rec offset as a 2-byte ulint */ - mach_write_to_2(log_ptr, page_offset(cursor_rec)); - log_ptr += 2; - - if (page_rec_is_comp(insert_rec)) { - if (UNIV_UNLIKELY - (rec_get_info_and_status_bits(insert_rec, TRUE) - != rec_get_info_and_status_bits(cursor_rec, TRUE))) { - - goto need_extra_info; - } - } else { - if (UNIV_UNLIKELY - (rec_get_info_and_status_bits(insert_rec, FALSE) - != rec_get_info_and_status_bits(cursor_rec, FALSE))) { - - goto need_extra_info; - } - } - - if (extra_size != cur_extra_size || rec_size != cur_rec_size) { -need_extra_info: - /* Write the record end segment length - and the extra info storage flag */ - log_ptr += mach_write_compressed(log_ptr, - 2 * (rec_size - i) + 1); - - /* Write the info bits */ - mach_write_to_1(log_ptr, - rec_get_info_and_status_bits( - insert_rec, - page_rec_is_comp(insert_rec))); - log_ptr++; - - /* Write the record origin offset */ - log_ptr += mach_write_compressed(log_ptr, extra_size); - - /* Write the mismatch index */ - log_ptr += mach_write_compressed(log_ptr, i); - - ut_a(i < srv_page_size); - ut_a(extra_size < srv_page_size); - } else { - /* Write the record end segment length - and the extra info storage flag */ - log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i)); - } - - /* Write to the log the inserted index record end segment which - differs from the cursor record */ - - rec_size -= i; - - if (log_ptr + rec_size <= log_end) { - memcpy(log_ptr, ins_ptr, rec_size); - mlog_close(mtr, log_ptr + rec_size); - } else { - mlog_close(mtr, log_ptr); - ut_a(rec_size < srv_page_size); - mlog_catenate_string(mtr, ins_ptr, rec_size); - } -} - static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact) { rec_set_bit_field_2(rec, heap_no, @@ -973,20 +791,10 @@ static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact) REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); } -static void rec_set_heap_no(const buf_block_t& block, rec_t *rec, - ulint heap_no, bool compact, mtr_t *mtr) -{ - rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO; - - // MDEV-12353 FIXME: try single-byte write if possible - mtr->write<2,mtr_t::OPT>(block, rec, - (mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) | - (heap_no << REC_HEAP_NO_SHIFT)); -} - /***********************************************************//** Parses a log record of a record insert on a page. @return end of log record or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_cur_parse_insert_rec( /*======================*/ @@ -1140,19 +948,26 @@ page_cur_parse_insert_rec( rec_set_info_and_status_bits(buf + origin_offset, info_and_status_bits); } else { - rec_set_info_bits_old(buf + origin_offset, - info_and_status_bits); + rec_set_bit_field_1(buf + origin_offset, info_and_status_bits, + REC_OLD_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); } page_cur_position(cursor_rec, block, &cursor); offsets = rec_get_offsets(buf + origin_offset, index, offsets, is_leaf, ULINT_UNDEFINED, &heap); - if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor, - buf + origin_offset, - index, offsets, mtr))) { - /* The redo log record should only have been written - after the write was successful. */ + /* The redo log record should only have been written + after the write was successful. */ + if (block->page.zip.data) { + if (!page_cur_insert_rec_zip(&cursor, index, + buf + origin_offset, + offsets, mtr)) { + ut_error; + } + } else if (!page_cur_insert_rec_low(&cursor, index, + buf + origin_offset, + offsets, mtr)) { ut_error; } @@ -1169,47 +984,57 @@ page_cur_parse_insert_rec( } /** Reset PAGE_DIRECTION and PAGE_N_DIRECTION. -@param[in,out] ptr the PAGE_DIRECTION_B field -@param[in,out] page index tree page frame -@param[in] page_zip compressed page descriptor, or NULL */ -static inline -void -page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip) +@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED +@param[in,out] block index page +@param[in,out] ptr the PAGE_DIRECTION_B field +@param[in,out] mtr mini-transaction */ +template +inline void page_direction_reset(buf_block_t *block, byte *ptr, mtr_t *mtr) { - ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page); - page_ptr_set_direction(ptr, PAGE_NO_DIRECTION); - if (page_zip) { - page_zip_write_header(page_zip, ptr, 1, NULL); - } - ptr = PAGE_HEADER + PAGE_N_DIRECTION + page; - *reinterpret_cast(ptr) = 0; - if (page_zip) { - page_zip_write_header(page_zip, ptr, 2, NULL); - } + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(!compressed || block->page.zip.data); + ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame); + static_assert(PAGE_DIRECTION_B + 1 == PAGE_N_DIRECTION, "adjacent fields"); + + if (compressed) + { + *ptr= PAGE_NO_DIRECTION; /* no instant ALTER bits */ + memset_aligned<2>(ptr + 1, 0, 2); + page_zip_write_header(&block->page.zip, ptr, 3, mtr); + } + else + { + mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1)) + | PAGE_NO_DIRECTION); + mtr->write<2,mtr_t::OPT>(*block, ptr + 1, 0U); + } } /** Increment PAGE_N_DIRECTION. -@param[in,out] ptr the PAGE_DIRECTION_B field -@param[in,out] page index tree page frame -@param[in] page_zip compressed page descriptor, or NULL -@param[in] dir PAGE_RIGHT or PAGE_LEFT */ -static inline -void -page_direction_increment( - byte* ptr, - page_t* page, - page_zip_des_t* page_zip, - uint dir) +@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED +@param[in,out] block index page +@param[in,out] ptr the PAGE_DIRECTION_B field +@param[in] dir PAGE_RIGHT or PAGE_LEFT +@param[in,out] mtr mini-transaction */ +template +inline void page_direction_increment(buf_block_t *block, byte *ptr, uint dir, + mtr_t *mtr) { - ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page); - ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT); - page_ptr_set_direction(ptr, dir); - if (page_zip) { - page_zip_write_header(page_zip, ptr, 1, NULL); - } - page_header_set_field( - page, page_zip, PAGE_N_DIRECTION, - 1U + page_header_get_field(page, PAGE_N_DIRECTION)); + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(!compressed || block->page.zip.data); + ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame); + ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT); + if (compressed) + { + *ptr= static_cast(dir); + mach_write_to_2(ptr + 1, 1 + mach_read_from_2(ptr + 1)); + page_zip_write_header(&block->page.zip, ptr, 3, mtr); + } + else + { + mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1)) | dir); + mtr->write<2>(*block, ptr + 1, 1U + mach_read_from_2(ptr + 1)); + } } /** @@ -1228,7 +1053,6 @@ static void page_dir_slot_set_n_owned(buf_block_t *block, page_rec_set_n_owned(block, rec, n, page_rec_is_comp(rec), mtr); } - /** Split a directory slot which owns too many records. @tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well @@ -1269,7 +1093,7 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) const ulint half_owned= n_owned / 2; - if (compressed && UNIV_LIKELY_NULL(block->page.zip.data)) + if (compressed) { /* Log changes to the compressed page header and the dense page directory. */ @@ -1392,24 +1216,47 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) } /** Allocate space for inserting an index record. -@param[in,out] page index page -@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL +@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well +@param[in,out] block index page @param[in] need number of bytes needed @param[out] heap_no record heap number +@param[in,out] mtr mini-transaction @return pointer to the start of the allocated buffer @retval NULL if allocation fails */ -static byte* page_mem_alloc_heap(page_t* page, page_zip_des_t* page_zip, - ulint need, ulint* heap_no) +template +static byte* page_mem_alloc_heap(buf_block_t *block, ulint need, + ulint *heap_no, mtr_t *mtr) { - if (need > page_get_max_insert_size(page, 1)) { - return NULL; - } + ut_ad(!compressed || block->page.zip.data); + + byte *heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER + + block->frame); + + const uint16_t top= mach_read_from_2(heap_top); + + if (need > page_get_max_insert_size(block->frame, 1)) + return NULL; + + byte *n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER + block->frame); + + const uint16_t h= mach_read_from_2(n_heap); + *heap_no= h & 0x7fff; + ut_ad(*heap_no < srv_page_size / REC_N_NEW_EXTRA_BYTES); + compile_time_assert(UNIV_PAGE_SIZE_MAX / REC_N_NEW_EXTRA_BYTES < 0x3fff); - byte* top = page_header_get_ptr(page, PAGE_HEAP_TOP); - page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP, top + need); - *heap_no = page_dir_get_n_heap(page); - page_dir_set_n_heap(page, page_zip, 1 + *heap_no); - return top; + mach_write_to_2(heap_top, top + need); + mach_write_to_2(n_heap, h + 1); + + if (compressed) + { + ut_ad(h & 0x8000); + page_zip_write_header(&block->page.zip, heap_top, 4, mtr); + } + else + mtr->memcpy(*block, PAGE_HEAP_TOP + PAGE_HEADER, 4); + + compile_time_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2); + return &block->frame[top]; } /***********************************************************//** @@ -1451,11 +1298,9 @@ page_cur_insert_rec_low( ut_ad(!page_rec_is_supremum(current_rec)); - const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE); - /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */ - ut_ad(log_mode == MTR_LOG_NONE - || log_mode == MTR_LOG_NO_REDO + ut_ad(mtr->get_log_mode() == MTR_LOG_NONE + || mtr->get_log_mode() == MTR_LOG_NO_REDO || !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE)); /* 1. Get the size of the physical record in the page */ @@ -1502,29 +1347,37 @@ page_cur_insert_rec_low( insert_buf = free_rec - rec_offs_extra_size(foffsets); + byte* page_free = my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER + + block->frame); + byte* page_garbage = my_assume_aligned<2>(PAGE_GARBAGE + + PAGE_HEADER + + block->frame); + ut_ad(mach_read_from_2(page_garbage) >= rec_size); + mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) + - rec_size); if (page_is_comp(block->frame)) { heap_no = rec_get_heap_no_new(free_rec); - page_mem_alloc_free(block->frame, NULL, - rec_get_next_ptr(free_rec, TRUE), - rec_size); + const rec_t* next = rec_get_next_ptr(free_rec, true); + mach_write_to_2(page_free, + next ? page_offset(next) : 0); } else { heap_no = rec_get_heap_no_old(free_rec); - page_mem_alloc_free(block->frame, NULL, - rec_get_next_ptr(free_rec, FALSE), - rec_size); + memcpy(page_free, free_rec - REC_NEXT, 2); } + compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2); + mtr->memcpy(*block, PAGE_FREE + PAGE_HEADER, 4); + if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); } } else { use_heap: free_rec = NULL; - insert_buf = page_mem_alloc_heap(block->frame, NULL, - rec_size, &heap_no); + insert_buf = page_mem_alloc_heap(block, rec_size, &heap_no, + mtr); if (UNIV_UNLIKELY(insert_buf == NULL)) { - mtr->set_log_mode(log_mode); return(NULL); } } @@ -1539,9 +1392,10 @@ page_cur_insert_rec_low( { /* next record after current before the insertion */ - rec_t* next_rec = page_rec_get_next(current_rec); -#ifdef UNIV_DEBUG if (page_is_comp(block->frame)) { + const rec_t* next_rec = page_rec_get_next_low( + current_rec, true); +#ifdef UNIV_DEBUG switch (rec_get_status(current_rec)) { case REC_STATUS_ORDINARY: case REC_STATUS_NODE_PTR: @@ -1561,24 +1415,42 @@ page_cur_insert_rec_low( ut_ad(!"wrong status on insert_rec"); } ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - } #endif - page_rec_set_next(insert_rec, next_rec); - page_rec_set_next(current_rec, insert_rec); + mach_write_to_2(insert_rec - REC_NEXT, + static_cast + (next_rec - insert_rec)); + mtr->write<2>(*block, current_rec - REC_NEXT, + static_cast + (insert_rec - current_rec)); + } else { + memcpy(insert_rec - REC_NEXT, current_rec - REC_NEXT, + 2); + mtr->write<2>(*block, current_rec - REC_NEXT, + page_offset(insert_rec)); + } } - page_header_set_field(block->frame, NULL, PAGE_N_RECS, - 1U + page_get_n_recs(block->frame)); + mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame, + 1U + page_get_n_recs(block->frame)); /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ - page_rec_set_n_owned(block, insert_rec, 0, - page_is_comp(block->frame), mtr); - rec_set_heap_no(*block, insert_rec, heap_no, - page_is_comp(block->frame), mtr); + if (page_is_comp(block->frame)) { + rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } else { + rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, heap_no, REC_OLD_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); + mtr->memcpy(*block, page_offset(insert_buf), rec_offs_size(offsets)); + /* 6. Update the last insertion info in page header */ last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT); @@ -1586,25 +1458,24 @@ page_cur_insert_rec_low( || rec_get_node_ptr_flag(last_insert) == rec_get_node_ptr_flag(insert_rec)); - if (!dict_index_is_spatial(index)) { + if (!index->is_spatial()) { byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame; if (UNIV_UNLIKELY(last_insert == NULL)) { no_direction: - page_direction_reset(ptr, block->frame, NULL); + page_direction_reset(block, ptr, mtr); } else if (last_insert == current_rec && page_ptr_get_direction(ptr) != PAGE_LEFT) { - page_direction_increment(ptr, block->frame, NULL, - PAGE_RIGHT); + page_direction_increment(block, ptr, PAGE_RIGHT, mtr); } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT && page_rec_get_next(insert_rec) == last_insert) { - page_direction_increment(ptr, block->frame, NULL, - PAGE_LEFT); + page_direction_increment(block, ptr, PAGE_LEFT, mtr); } else { goto no_direction; } } - page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec); + mtr->write<2>(*block, PAGE_LAST_INSERT + PAGE_HEADER + block->frame, + page_offset(insert_rec)); /* 7. It remains to update the owner record. */ { @@ -1612,14 +1483,12 @@ page_cur_insert_rec_low( ulint n_owned; if (page_is_comp(block->frame)) { n_owned = rec_get_n_owned_new(owner_rec); - rec_set_bit_field_1(owner_rec, n_owned + 1, - REC_NEW_N_OWNED, REC_N_OWNED_MASK, - REC_N_OWNED_SHIFT); + page_rec_set_n_owned(block, owner_rec, + n_owned + 1, true, mtr); } else { n_owned = rec_get_n_owned_old(owner_rec); - rec_set_bit_field_1(owner_rec, n_owned + 1, - REC_OLD_N_OWNED, REC_N_OWNED_MASK, - REC_N_OWNED_SHIFT); + page_rec_set_n_owned(block, owner_rec, + n_owned + 1, false, mtr); } /* 8. Now we have incremented the n_owned field of the owner @@ -1633,14 +1502,88 @@ page_cur_insert_rec_low( } } - /* 9. Write log record of the insert */ - mtr->set_log_mode(log_mode); - page_cur_insert_rec_write_log(insert_rec, rec_size, - current_rec, index, mtr); - return(insert_rec); } +/** Add a slot to the dense page directory. +@param[in,out] block ROW_FORMAT=COMPRESSED page +@param[in] index the index that the page belongs to +@param[in,out] mtr mini-transaction */ +static inline void page_zip_dir_add_slot(buf_block_t *block, + const dict_index_t *index, mtr_t *mtr) +{ + page_zip_des_t *page_zip= &block->page.zip; + + ut_ad(page_is_comp(page_zip->data)); + UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip)); + + /* Read the old n_dense (n_heap has already been incremented). */ + ulint n_dense= page_dir_get_n_heap(page_zip->data) - (PAGE_HEAP_NO_USER_LOW + + 1U); + + byte *dir= page_zip->data + page_zip_get_size(page_zip) - + PAGE_ZIP_DIR_SLOT_SIZE * n_dense; + byte *stored= dir; + + if (!page_is_leaf(page_zip->data)) + { + ut_ad(!page_zip->n_blobs); + stored-= n_dense * REC_NODE_PTR_SIZE; + } + else if (index->is_clust()) + { + /* Move the BLOB pointer array backwards to make space for the + columns DB_TRX_ID,DB_ROLL_PTR and the dense directory slot. */ + + stored-= n_dense * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + byte *externs= stored - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; + byte *dst= externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE; + ut_ad(!memcmp(dst, field_ref_zero, PAGE_ZIP_CLUST_LEAF_SLOT_SIZE)); + if (const ulint len = ulint(stored - externs)) + { + memmove(dst, externs, len); + /* TODO: write MEMMOVE record */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) + { + log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING, + block->page.id.space(), + block->page.id.page_no(), + log_ptr, mtr); + mach_write_to_2(log_ptr, dst - page_zip->data); + mach_write_to_2(log_ptr + 2, len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, dst, len); + } + } + } + else + { + stored-= page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; + ut_ad(!memcmp(stored - PAGE_ZIP_DIR_SLOT_SIZE, field_ref_zero, + PAGE_ZIP_DIR_SLOT_SIZE)); + } + + /* Move the uncompressed area backwards to make space + for one directory slot. */ + if (const ulint len = ulint(dir - stored)) + { + byte* dst = stored - PAGE_ZIP_DIR_SLOT_SIZE; + memmove(dst, stored, len); + /* TODO: write MEMMOVE record */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) + { + log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING, + block->page.id.space(), + block->page.id.page_no(), + log_ptr, mtr); + mach_write_to_2(log_ptr, dst - page_zip->data); + mach_write_to_2(log_ptr + 2, len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, dst, len); + } + } +} + /***********************************************************//** Inserts a record next to page cursor on a compressed and uncompressed page. Returns pointer to inserted record if succeed, i.e., @@ -1665,8 +1608,6 @@ page_cur_insert_rec_zip( byte* insert_buf; ulint rec_size; page_t* page; /*!< the relevant page */ - rec_t* last_insert; /*!< cursor position at previous - insert */ rec_t* free_rec; /*!< a free record that was reused, or NULL */ rec_t* insert_rec; /*!< inserted record */ @@ -1676,7 +1617,6 @@ page_cur_insert_rec_zip( page_zip = page_cur_get_page_zip(cursor); ut_ad(page_zip); - ut_ad(rec_offs_validate(rec, index, offsets)); page = page_cur_get_page(cursor); @@ -1727,6 +1667,7 @@ page_cur_insert_rec_zip( rec_t* cursor_rec = page_cur_get_rec(cursor); #endif /* UNIV_DEBUG */ +#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */ /* If we are not writing compressed page images, we must reorganize the page before attempting the insert. */ @@ -1735,6 +1676,7 @@ page_cur_insert_rec_zip( The page reorganization or creation that we would attempt outside crash recovery would have been covered by a previous redo log record. */ +#endif } else if (page_is_empty(page)) { ut_ad(page_cur_is_before_first(cursor)); @@ -1813,12 +1755,14 @@ page_cur_insert_rec_zip( be logged after a successful operation. */ ut_ad(!recv_recovery_is_on()); ut_ad(!index->is_dummy); +#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */ } else if (recv_recovery_is_on()) { /* This should be followed by MLOG_ZIP_PAGE_COMPRESS_NO_DATA, which should succeed. */ rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets); +#endif } else { ulint pos = page_rec_get_n_recs_before(insert_rec); ut_ad(pos > 0); @@ -1841,7 +1785,7 @@ page_cur_insert_rec_zip( rec_offs_make_valid( insert_rec, index, page_is_leaf(page), offsets); - return(insert_rec); + return insert_rec; } /* Theoretically, we could try one last resort @@ -1908,9 +1852,16 @@ page_cur_insert_rec_zip( } heap_no = rec_get_heap_no_new(free_rec); - page_mem_alloc_free(page, page_zip, - rec_get_next_ptr(free_rec, TRUE), - rec_size); + const rec_t* next = rec_get_next_ptr_const(free_rec, true); + mach_write_to_2(PAGE_FREE + PAGE_HEADER + page, + next ? page_offset(next) : 0); + byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page; + ut_ad(mach_read_from_2(garbage) >= rec_size); + mach_write_to_2(garbage, mach_read_from_2(garbage) - rec_size); + compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2); + page_zip_write_header(page_zip, + PAGE_HEADER + PAGE_FREE + page, 4, mtr); + /* TODO: group with PAGE_LAST_INSERT */ if (!page_is_leaf(page)) { /* Zero out the node pointer of free_rec, @@ -1961,14 +1912,14 @@ page_cur_insert_rec_zip( } else { use_heap: free_rec = NULL; - insert_buf = page_mem_alloc_heap(page, page_zip, - rec_size, &heap_no); + insert_buf = page_mem_alloc_heap(cursor->block, rec_size, + &heap_no, mtr); if (UNIV_UNLIKELY(insert_buf == NULL)) { return(NULL); } - page_zip_dir_add_slot(page_zip, dict_index_is_clust(index)); + page_zip_dir_add_slot(cursor->block, index, mtr); } /* 3. Create the record */ @@ -1978,21 +1929,19 @@ page_cur_insert_rec_zip( /* 4. Insert the record in the linked list of records */ ut_ad(cursor->rec != insert_rec); - { - /* next record after current before the insertion */ - const rec_t* next_rec = page_rec_get_next_low( - cursor->rec, TRUE); - ut_ad(rec_get_status(cursor->rec) - <= REC_STATUS_INFIMUM); - ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM); - ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - - page_rec_set_next(insert_rec, next_rec); - page_rec_set_next(cursor->rec, insert_rec); - } + /* next record after current before the insertion */ + const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE); + ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM); + ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM); + ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - page_header_set_field(page, page_zip, PAGE_N_RECS, - 1U + page_get_n_recs(page)); + mach_write_to_2(insert_rec - REC_NEXT, static_cast + (next_rec - insert_rec)); + mach_write_to_2(cursor->rec - REC_NEXT, static_cast + (insert_rec - cursor->rec)); + byte* n_recs = PAGE_N_RECS + PAGE_HEADER + page; + mach_write_to_2(n_recs, mach_read_from_2(n_recs) + 1); + page_zip_write_header(page_zip, n_recs, 2, mtr); /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ @@ -2004,35 +1953,37 @@ page_cur_insert_rec_zip( UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); - page_zip_dir_insert(cursor, free_rec, insert_rec); + page_zip_dir_insert(cursor, free_rec, insert_rec, mtr); /* 6. Update the last insertion info in page header */ - - last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT); - ut_ad(!last_insert - || rec_get_node_ptr_flag(last_insert) + byte* last_insert = PAGE_LAST_INSERT + PAGE_HEADER + page; + const uint16_t last_insert_rec = mach_read_from_2(last_insert); + ut_ad(!last_insert_rec + || rec_get_node_ptr_flag(page + last_insert_rec) == rec_get_node_ptr_flag(insert_rec)); + /* TODO: combine with PAGE_DIRECTION changes */ + mach_write_to_2(last_insert, page_offset(insert_rec)); + page_zip_write_header(page_zip, last_insert, 2, mtr); - if (!dict_index_is_spatial(index)) { + if (!index->is_spatial()) { byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page; - if (UNIV_UNLIKELY(last_insert == NULL)) { + if (UNIV_UNLIKELY(!last_insert_rec)) { no_direction: - page_direction_reset(ptr, page, page_zip); - } else if (last_insert == cursor->rec + page_direction_reset(cursor->block, ptr, mtr); + } else if (page + last_insert_rec == cursor->rec && page_ptr_get_direction(ptr) != PAGE_LEFT) { - page_direction_increment(ptr, page, page_zip, - PAGE_RIGHT); + page_direction_increment(cursor->block, ptr, + PAGE_RIGHT, mtr); } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT - && page_rec_get_next(insert_rec) == last_insert) { - page_direction_increment(ptr, page, page_zip, - PAGE_LEFT); + && page_rec_get_next(insert_rec) + == page + last_insert_rec) { + page_direction_increment(cursor->block, ptr, + PAGE_LEFT, mtr); } else { goto no_direction; } } - page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec); - /* 7. It remains to update the owner record. */ { rec_t* owner_rec = page_rec_find_owner_rec(insert_rec); @@ -2047,22 +1998,14 @@ page_cur_insert_rec_zip( we have to split the corresponding directory slot in two. */ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) { - const mtr_log_t log_mode = mtr->set_log_mode( - MTR_LOG_NONE); page_dir_split_slot( page_cur_get_block(cursor), page_dir_find_owner_slot(owner_rec), mtr); - mtr->set_log_mode(log_mode); } } - page_zip_write_rec(page_zip, insert_rec, index, offsets, 1); - - /* 9. Write log record of the insert */ - page_cur_insert_rec_write_log(insert_rec, rec_size, - cursor->rec, index, mtr); - - return(insert_rec); + page_zip_write_rec(page_zip, insert_rec, index, offsets, 1, mtr); + return insert_rec; } /**********************************************************//** @@ -2079,8 +2022,6 @@ page_parse_copy_rec_list_to_created_page( mtr_t* mtr) /*!< in: mtr or NULL */ { ulint log_data_len; - page_t* page; - page_zip_des_t* page_zip; ut_ad(index->is_dummy); @@ -2119,14 +2060,16 @@ page_parse_copy_rec_list_to_created_page( ut_a(ptr == rec_end); - page = buf_block_get_frame(block); - page_zip = buf_block_get_page_zip(block); - - page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL); + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + block->frame, 0, 2); + if (block->page.zip.data) { + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + + block->page.zip.data, 0, 2); + } - if (!dict_index_is_spatial(index)) { - page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page, - page, page_zip); + if (!index->is_spatial()) { + page_direction_reset(block, + PAGE_HEADER + PAGE_DIRECTION_B + + block->frame, mtr); } return(rec_end); @@ -2246,8 +2189,6 @@ page_copy_rec_list_end_to_created_page( heap_top += rec_size; rec_offs_make_valid(insert_rec, index, is_leaf, offsets); - page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec, - index, mtr); prev_rec = insert_rec; rec = page_rec_get_next(rec); } while (!page_rec_is_supremum(rec)); @@ -2293,14 +2234,18 @@ page_copy_rec_list_end_to_created_page( mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page, 2 + slot_index); - page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top); - page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs); - page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs); - - *reinterpret_cast(PAGE_HEADER + PAGE_LAST_INSERT + new_page) - = 0; - page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page, - new_page, NULL); + mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page, + page_offset(heap_top)); + mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + new_page, + PAGE_HEAP_NO_USER_LOW + n_recs); + mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + new_page, n_recs); + + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + new_page, 0, 2); + mach_write_to_1(PAGE_HEADER + PAGE_DIRECTION_B + new_page, + (mach_read_from_1(PAGE_HEADER + PAGE_DIRECTION_B + + new_page) & ~((1U << 3) - 1)) + | PAGE_NO_DIRECTION); + memset_aligned<2>(PAGE_HEADER + PAGE_N_DIRECTION + new_page, 0, 2); } /***********************************************************//** diff --git a/storage/innobase/page/page0zip.cc b/storage/innobase/page/page0zip.cc index df882e319e9b6..ea1c5cb209267 100644 --- a/storage/innobase/page/page0zip.cc +++ b/storage/innobase/page/page0zip.cc @@ -3518,13 +3518,14 @@ page_zip_write_rec_ext( page_zip_des_t* page_zip, /*!< in/out: compressed page */ const page_t* page, /*!< in: page containing rec */ const byte* rec, /*!< in: record being written */ - dict_index_t* index, /*!< in: record descriptor */ + const dict_index_t*index, /*!< in: record descriptor */ const offset_t* offsets, /*!< in: rec_get_offsets(rec, index) */ ulint create, /*!< in: nonzero=insert, zero=update */ ulint trx_id_col, /*!< in: position of DB_TRX_ID */ ulint heap_no, /*!< in: heap number of rec */ byte* storage, /*!< in: end of dense page directory */ - byte* data) /*!< in: end of modification log */ + byte* data, /*!< in: end of modification log */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { const byte* start = rec; ulint i; @@ -3544,25 +3545,36 @@ page_zip_write_rec_ext( the BLOB columns of rec if create==TRUE. */ ut_ad(data + rec_offs_data_size(offsets) - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN) - - n_ext * BTR_EXTERN_FIELD_REF_SIZE - < externs - BTR_EXTERN_FIELD_REF_SIZE * page_zip->n_blobs); + - n_ext * FIELD_REF_SIZE + < externs - FIELD_REF_SIZE * page_zip->n_blobs); - { + if (n_ext) { ulint blob_no = page_zip_get_n_prev_extern( page_zip, rec, index); - byte* ext_end = externs - page_zip->n_blobs - * BTR_EXTERN_FIELD_REF_SIZE; + byte* ext_end = externs - page_zip->n_blobs * FIELD_REF_SIZE; ut_ad(blob_no <= page_zip->n_blobs); - externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE; + externs -= blob_no * FIELD_REF_SIZE; if (create) { page_zip->n_blobs += static_cast(n_ext); - ASSERT_ZERO_BLOB(ext_end - n_ext - * BTR_EXTERN_FIELD_REF_SIZE); - memmove(ext_end - n_ext - * BTR_EXTERN_FIELD_REF_SIZE, - ext_end, - ulint(externs - ext_end)); + ASSERT_ZERO_BLOB(ext_end - n_ext * FIELD_REF_SIZE); + if (ulint len = ulint(externs - ext_end)) { + byte* ext_start = ext_end + - n_ext * FIELD_REF_SIZE; + memmove(ext_start, ext_end, len); + /* TODO: write MEMMOVE record */ + if (byte* l = mlog_open(mtr, 11 + 2 + 2)) { + l = mlog_write_initial_log_record_fast( + page, MLOG_ZIP_WRITE_STRING, + l, mtr); + mach_write_to_2(l, ext_start + - page_zip->data); + mach_write_to_2(l + 2, len); + mlog_close(mtr, l + 4); + mlog_catenate_string(mtr, ext_start, + len); + } + } } ut_a(blob_no + n_ext <= page_zip->n_blobs); @@ -3594,28 +3606,49 @@ page_zip_write_rec_ext( + DATA_ROLL_PTR_LEN); /* Store trx_id and roll_ptr. */ - memcpy(storage - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN) - * (heap_no - 1), - src, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + constexpr ulint sys_len = DATA_TRX_ID_LEN + + DATA_ROLL_PTR_LEN; + byte* sys = storage - sys_len * (heap_no - 1); + memcpy(sys, src, sys_len); i++; /* skip also roll_ptr */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + + sys_len)) { + log_ptr = mlog_write_initial_log_record_fast( + page, MLOG_ZIP_WRITE_STRING, log_ptr, + mtr); + mach_write_to_2(log_ptr, sys - page_zip->data); + mach_write_to_2(log_ptr + 2, sys_len); + memcpy(log_ptr + 4, sys, sys_len); + mlog_close(mtr, log_ptr + 4 + sys_len); + } } else if (rec_offs_nth_extern(offsets, i)) { src = rec_get_nth_field(rec, offsets, i, &len); ut_ad(dict_index_is_clust(index)); - ut_ad(len - >= BTR_EXTERN_FIELD_REF_SIZE); - src += len - BTR_EXTERN_FIELD_REF_SIZE; + ut_ad(len >= FIELD_REF_SIZE); + src += len - FIELD_REF_SIZE; ASSERT_ZERO(data, src - start); memcpy(data, start, ulint(src - start)); data += src - start; - start = src + BTR_EXTERN_FIELD_REF_SIZE; + start = src + FIELD_REF_SIZE; /* Store the BLOB pointer. */ - externs -= BTR_EXTERN_FIELD_REF_SIZE; + externs -= FIELD_REF_SIZE; ut_ad(data < externs); - memcpy(externs, src, BTR_EXTERN_FIELD_REF_SIZE); + memcpy(externs, src, FIELD_REF_SIZE); + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + + FIELD_REF_SIZE)) { + log_ptr = mlog_write_initial_log_record_fast( + page, MLOG_ZIP_WRITE_STRING, log_ptr, + mtr); + mach_write_to_2(log_ptr, externs + - page_zip->data); + mach_write_to_2(log_ptr + 2, FIELD_REF_SIZE); + memcpy(log_ptr + 4, externs, FIELD_REF_SIZE); + mlog_close(mtr, log_ptr + 4 + FIELD_REF_SIZE); + } } } @@ -3629,19 +3662,19 @@ page_zip_write_rec_ext( return(data); } -/**********************************************************************//** -Write an entire record on the compressed page. The data must already -have been written to the uncompressed page. */ -void -page_zip_write_rec( -/*===============*/ - page_zip_des_t* page_zip,/*!< in/out: compressed page */ - const byte* rec, /*!< in: record being written */ - dict_index_t* index, /*!< in: the index the record belongs to */ - const offset_t* offsets,/*!< in: rec_get_offsets(rec, index) */ - ulint create) /*!< in: nonzero=insert, zero=update */ +/** Write an entire record to the ROW_FORMAT=COMPRESSED page. +The data must already have been written to the uncompressed page. +@param[in,out] page_zip ROW_FORMAT=COMPRESSED page +@param[in] rec record in the uncompressed page +@param[in] index the index that the page belongs to +@param[in] offsets rec_get_offsets(rec, index) +@param[in] create nonzero=insert, zero=update +@param[in,out] mtr mini-transaction */ +void page_zip_write_rec(page_zip_des_t *page_zip, const byte *rec, + const dict_index_t *index, const offset_t *offsets, + ulint create, mtr_t *mtr) { - const page_t* page; + const page_t* page = page_align(rec); byte* data; byte* storage; ulint heap_no; @@ -3655,8 +3688,6 @@ page_zip_write_rec( ut_ad(page_zip->m_start >= PAGE_DATA); - page = page_align(rec); - ut_ad(page_zip_header_cmp(page_zip, page)); ut_ad(page_simple_validate_new((page_t*) page)); @@ -3721,8 +3752,6 @@ page_zip_write_rec( storage = page_zip_dir_start(page_zip); if (page_is_leaf(page)) { - ulint len; - if (dict_index_is_clust(index)) { /* Store separately trx_id, roll_ptr and the BTR_EXTERN_FIELD_REF of each BLOB column. */ @@ -3731,9 +3760,10 @@ page_zip_write_rec( page_zip, page, rec, index, offsets, create, index->db_trx_id(), heap_no, - storage, data); + storage, data, mtr); } else { /* Locate trx_id and roll_ptr. */ + ulint len; const byte* src = rec_get_nth_field(rec, offsets, index->db_trx_id(), @@ -3751,14 +3781,24 @@ page_zip_write_rec( data += src - rec; /* Store trx_id and roll_ptr. */ - memcpy(storage - - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN) - * (heap_no - 1), - src, - DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); - - src += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN; - + constexpr ulint sys_len + = DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN; + byte* sys = storage - sys_len * (heap_no - 1); + memcpy(sys, src, sys_len); + + src += sys_len; + + if (byte* l = mlog_open(mtr, 11 + 2 + 2 + + sys_len)) { + l = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, l, + mtr); + mach_write_to_2(l, sys + - page_zip->data); + mach_write_to_2(l + 2, sys_len); + memcpy(l + 4, sys, sys_len); + mlog_close(mtr, l + 4 + sys_len); + } /* Log the last bytes of the record. */ len = rec_offs_data_size(offsets) - ulint(src - rec); @@ -3773,7 +3813,7 @@ page_zip_write_rec( ut_ad(!rec_offs_any_extern(offsets)); /* Log the entire record. */ - len = rec_offs_data_size(offsets); + ulint len = rec_offs_data_size(offsets); ASSERT_ZERO(data, len); memcpy(data, rec, len); @@ -3781,14 +3821,12 @@ page_zip_write_rec( } } else { /* This is a node pointer page. */ - ulint len; - /* Non-leaf nodes should not have any externally stored columns. */ ut_ad(!rec_offs_any_extern(offsets)); /* Copy the data bytes, except node_ptr. */ - len = rec_offs_data_size(offsets) - REC_NODE_PTR_SIZE; + ulint len = rec_offs_data_size(offsets) - REC_NODE_PTR_SIZE; ut_ad(data + len < storage - REC_NODE_PTR_SIZE * (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)); ASSERT_ZERO(data, len); @@ -3796,14 +3834,32 @@ page_zip_write_rec( data += len; /* Copy the node pointer to the uncompressed area. */ - memcpy(storage - REC_NODE_PTR_SIZE - * (heap_no - 1), - rec + len, - REC_NODE_PTR_SIZE); + byte* node_ptr = storage - REC_NODE_PTR_SIZE * (heap_no - 1); + memcpy(node_ptr, rec + len, REC_NODE_PTR_SIZE); + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + + REC_NODE_PTR_SIZE)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, node_ptr - page_zip->data); + mach_write_to_2(log_ptr + 2, REC_NODE_PTR_SIZE); + memcpy(log_ptr + 4, node_ptr, REC_NODE_PTR_SIZE); + mlog_close(mtr, log_ptr + 4 + REC_NODE_PTR_SIZE); + } } ut_a(!*data); ut_ad((ulint) (data - page_zip->data) < page_zip_get_size(page_zip)); + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, page_zip->m_end); + mach_write_to_2(log_ptr + 2, + data - page_zip->data - page_zip->m_end); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, page_zip->data + page_zip->m_end, + data - page_zip->data - page_zip->m_end); + } + page_zip->m_end = unsigned(data - page_zip->data); page_zip->m_nonempty = TRUE; @@ -4386,7 +4442,8 @@ page_zip_dir_insert( page_cur_t* cursor, /*!< in/out: page cursor */ const byte* free_rec,/*!< in: record from which rec was allocated, or NULL */ - byte* rec) /*!< in: record to insert */ + byte* rec, /*!< in: record to insert */ + mtr_t* mtr) /*!< in/out: mini-transaction */ { ut_ad(page_align(cursor->rec) == cursor->block->frame); ut_ad(page_align(rec) == cursor->block->frame); @@ -4449,13 +4506,26 @@ page_zip_dir_insert( - PAGE_ZIP_DIR_SLOT_SIZE * n_dense; } + const ulint slot_len = ulint(slot_rec - slot_free); /* Shift the dense directory to allocate place for rec. */ memmove_aligned<2>(slot_free - PAGE_ZIP_DIR_SLOT_SIZE, slot_free, - ulint(slot_rec - slot_free)); + slot_len); /* Write the entry for the inserted record. The "owned" and "deleted" flags must be zero. */ mach_write_to_2(slot_rec - PAGE_ZIP_DIR_SLOT_SIZE, page_offset(rec)); + /* TODO: issue MEMMOVE record to reduce log volume */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) { + log_ptr = mlog_write_initial_log_record_fast( + rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr); + mach_write_to_2(log_ptr, slot_free - PAGE_ZIP_DIR_SLOT_SIZE + - page_zip->data); + mach_write_to_2(log_ptr + 2, + PAGE_ZIP_DIR_SLOT_SIZE + slot_len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, slot_free - PAGE_ZIP_DIR_SLOT_SIZE, + PAGE_ZIP_DIR_SLOT_SIZE + slot_len); + } } /**********************************************************************//** @@ -4588,57 +4658,6 @@ page_zip_dir_delete( page_zip_clear_rec(page_zip, rec, index, offsets, mtr); } -/**********************************************************************//** -Add a slot to the dense page directory. */ -void -page_zip_dir_add_slot( -/*==================*/ - page_zip_des_t* page_zip, /*!< in/out: compressed page */ - ulint is_clustered) /*!< in: nonzero for clustered index, - zero for others */ -{ - ulint n_dense; - byte* dir; - byte* stored; - - ut_ad(page_is_comp(page_zip->data)); - UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip)); - - /* Read the old n_dense (n_heap has already been incremented). */ - n_dense = page_dir_get_n_heap(page_zip->data) - - (PAGE_HEAP_NO_USER_LOW + 1U); - - dir = page_zip->data + page_zip_get_size(page_zip) - - PAGE_ZIP_DIR_SLOT_SIZE * n_dense; - - if (!page_is_leaf(page_zip->data)) { - ut_ad(!page_zip->n_blobs); - stored = dir - n_dense * REC_NODE_PTR_SIZE; - } else if (is_clustered) { - /* Move the BLOB pointer array backwards to make space for the - roll_ptr and trx_id columns and the dense directory slot. */ - byte* externs; - - stored = dir - n_dense - * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); - externs = stored - - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; - ASSERT_ZERO(externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE, - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE); - memmove(externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE, - externs, ulint(stored - externs)); - } else { - stored = dir - - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; - ASSERT_ZERO(stored - PAGE_ZIP_DIR_SLOT_SIZE, - static_cast(PAGE_ZIP_DIR_SLOT_SIZE)); - } - - /* Move the uncompressed area backwards to make space - for one directory slot. */ - memmove(stored - PAGE_ZIP_DIR_SLOT_SIZE, stored, ulint(dir - stored)); -} - /***********************************************************//** Parses a log record of writing to the header of a page. @return end of log record or NULL */ diff --git a/storage/innobase/rem/rem0rec.cc b/storage/innobase/rem/rem0rec.cc index 69ee50a661c51..0a3656b2c10c9 100644 --- a/storage/innobase/rem/rem0rec.cc +++ b/storage/innobase/rem/rem0rec.cc @@ -1322,61 +1322,6 @@ rec_get_converted_size_comp( return(ULINT_UNDEFINED); } -/***********************************************************//** -Sets the value of the ith field SQL null bit of an old-style record. */ -void -rec_set_nth_field_null_bit( -/*=======================*/ - rec_t* rec, /*!< in: record */ - ulint i, /*!< in: ith field */ - ibool val) /*!< in: value to set */ -{ - ulint info; - - if (rec_get_1byte_offs_flag(rec)) { - - info = rec_1_get_field_end_info(rec, i); - - if (val) { - info = info | REC_1BYTE_SQL_NULL_MASK; - } else { - info = info & ~REC_1BYTE_SQL_NULL_MASK; - } - - rec_1_set_field_end_info(rec, i, info); - - return; - } - - info = rec_2_get_field_end_info(rec, i); - - if (val) { - info = info | REC_2BYTE_SQL_NULL_MASK; - } else { - info = info & ~REC_2BYTE_SQL_NULL_MASK; - } - - rec_2_set_field_end_info(rec, i, info); -} - -/***********************************************************//** -Sets an old-style record field to SQL null. -The physical size of the field is not changed. */ -void -rec_set_nth_field_sql_null( -/*=======================*/ - rec_t* rec, /*!< in: record */ - ulint n) /*!< in: index of the field */ -{ - ulint offset; - - offset = rec_get_field_start_offs(rec, n); - - data_write_sql_null(rec + offset, rec_get_nth_field_size(rec, n)); - - rec_set_nth_field_null_bit(rec, n, TRUE); -} - /*********************************************************//** Builds an old-style physical record out of a data tuple and stores it beginning from the start of the given buffer. @@ -1414,8 +1359,10 @@ rec_convert_dtuple_to_rec_old( rec_set_n_fields_old(rec, n_fields); /* Set the info bits of the record */ - rec_set_info_bits_old(rec, dtuple_get_info_bits(dtuple) - & REC_INFO_BITS_MASK); + rec_set_bit_field_1(rec, + dtuple_get_info_bits(dtuple) & REC_INFO_BITS_MASK, + REC_OLD_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); @@ -1738,7 +1685,9 @@ rec_convert_dtuple_to_rec_new( status, false); } - rec_set_info_bits_new(buf, dtuple->info_bits & ~REC_NEW_STATUS_MASK); + rec_set_bit_field_1(buf, dtuple->info_bits & ~REC_NEW_STATUS_MASK, + REC_NEW_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); return buf; } diff --git a/storage/innobase/row/row0upd.cc b/storage/innobase/row/row0upd.cc index 82910ace04f88..97094abc8a33f 100644 --- a/storage/innobase/row/row0upd.cc +++ b/storage/innobase/row/row0upd.cc @@ -585,163 +585,10 @@ row_upd_changes_disowned_external( return(false); } -/***********************************************************//** -Replaces the new column values stored in the update vector to the -record given. No field size changes are allowed. This function is -usually invoked on a clustered index. The only use case for a -secondary index is row_ins_sec_index_entry_by_modify() or its -counterpart in ibuf_insert_to_index_page(). */ -void -row_upd_rec_in_place( -/*=================*/ - rec_t* rec, /*!< in/out: record where replaced */ - dict_index_t* index, /*!< in: the index the record belongs to */ - const offset_t* offsets,/*!< in: array returned by rec_get_offsets() */ - const upd_t* update, /*!< in: update vector */ - page_zip_des_t* page_zip)/*!< in: compressed page with enough space - available, or NULL */ -{ - const upd_field_t* upd_field; - const dfield_t* new_val; - ulint n_fields; - ulint i; - - ut_ad(rec_offs_validate(rec, index, offsets)); - ut_ad(!index->table->skip_alter_undo); - - if (rec_offs_comp(offsets)) { -#ifdef UNIV_DEBUG - switch (rec_get_status(rec)) { - case REC_STATUS_ORDINARY: - break; - case REC_STATUS_INSTANT: - ut_ad(index->is_instant()); - break; - case REC_STATUS_NODE_PTR: - if (index->is_dummy - && fil_page_get_type(page_align(rec)) - == FIL_PAGE_RTREE) { - /* The function rtr_update_mbr_field_in_place() - is generating MLOG_COMP_REC_UPDATE_IN_PLACE - and MLOG_REC_UPDATE_IN_PLACE records for - node pointer pages. */ - break; - } - /* fall through */ - case REC_STATUS_INFIMUM: - case REC_STATUS_SUPREMUM: - ut_ad(!"wrong record status in update"); - } -#endif /* UNIV_DEBUG */ - - rec_set_info_bits_new(rec, update->info_bits); - } else { - rec_set_info_bits_old(rec, update->info_bits); - } - - n_fields = upd_get_n_fields(update); - - for (i = 0; i < n_fields; i++) { - upd_field = upd_get_nth_field(update, i); - - /* No need to update virtual columns for non-virtual index */ - if (upd_fld_is_virtual_col(upd_field) - && !dict_index_has_virtual(index)) { - continue; - } - - new_val = &(upd_field->new_val); - ut_ad(!dfield_is_ext(new_val) == - !rec_offs_nth_extern(offsets, upd_field->field_no)); - - rec_set_nth_field(rec, offsets, upd_field->field_no, - dfield_get_data(new_val), - dfield_get_len(new_val)); - } - - if (page_zip) { - page_zip_write_rec(page_zip, rec, index, offsets, 0); - } -} - -/***********************************************************//** -Writes to the redo log the new values of the fields occurring in the index. */ -void -row_upd_index_write_log( -/*====================*/ - const upd_t* update, /*!< in: update vector */ - byte* log_ptr,/*!< in: pointer to mlog buffer: must - contain at least MLOG_BUF_MARGIN bytes - of free space; the buffer is closed - within this function */ - mtr_t* mtr) /*!< in: mtr into whose log to write */ -{ - const upd_field_t* upd_field; - const dfield_t* new_val; - ulint len; - ulint n_fields; - byte* buf_end; - ulint i; - - n_fields = upd_get_n_fields(update); - - buf_end = log_ptr + MLOG_BUF_MARGIN; - - mach_write_to_1(log_ptr, update->info_bits); - log_ptr++; - log_ptr += mach_write_compressed(log_ptr, n_fields); - - for (i = 0; i < n_fields; i++) { - compile_time_assert(MLOG_BUF_MARGIN > 30); - - if (log_ptr + 30 > buf_end) { - mlog_close(mtr, log_ptr); - - log_ptr = mlog_open(mtr, MLOG_BUF_MARGIN); - buf_end = log_ptr + MLOG_BUF_MARGIN; - } - - upd_field = upd_get_nth_field(update, i); - - new_val = &(upd_field->new_val); - - len = dfield_get_len(new_val); - - /* If this is a virtual column, mark it using special - field_no */ - ulint field_no = upd_fld_is_virtual_col(upd_field) - ? REC_MAX_N_FIELDS + unsigned(upd_field->field_no) - : unsigned(upd_field->field_no); - - log_ptr += mach_write_compressed(log_ptr, field_no); - log_ptr += mach_write_compressed(log_ptr, len); - - if (len != UNIV_SQL_NULL) { - if (log_ptr + len < buf_end) { - memcpy(log_ptr, dfield_get_data(new_val), len); - - log_ptr += len; - } else { - mlog_close(mtr, log_ptr); - - mlog_catenate_string( - mtr, - static_cast( - dfield_get_data(new_val)), - len); - - log_ptr = mlog_open(mtr, MLOG_BUF_MARGIN); - buf_end = log_ptr + MLOG_BUF_MARGIN; - } - } - } - - mlog_close(mtr, log_ptr); -} - /*********************************************************************//** Parses the log data written by row_upd_index_write_log. @return log data end or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ byte* row_upd_index_parse( /*================*/ diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 714abb5081e6a..d215b07e3a32c 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -2438,7 +2438,48 @@ trx_undo_prev_version_build( *old_vers = rec_copy(buf, rec, offsets); rec_offs_make_valid(*old_vers, index, true, offsets); - row_upd_rec_in_place(*old_vers, index, offsets, update, NULL); + rec_set_bit_field_1(*old_vers, update->info_bits, + rec_offs_comp(offsets) + ? REC_NEW_INFO_BITS : REC_OLD_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); + for (ulint i = 0; i < update->n_fields; i++) { + const upd_field_t* uf = upd_get_nth_field(update, i); + if (upd_fld_is_virtual_col(uf)) { + /* There are no virtual columns in + a clustered index record. */ + continue; + } + const ulint n = uf->field_no; + ut_ad(!dfield_is_ext(&uf->new_val) + == !rec_offs_nth_extern(offsets, n)); + ut_ad(!rec_offs_nth_default(offsets, n)); + + if (UNIV_UNLIKELY(dfield_is_null(&uf->new_val))) { + ut_ad(!rec_offs_nth_sql_null(offsets, n)); + ut_ad(!index->table->not_redundant()); + ulint l = rec_get_1byte_offs_flag(*old_vers) + ? (n + 1) : (n + 1) * 2; + (*old_vers)[-REC_N_OLD_EXTRA_BYTES - l] + |= REC_1BYTE_SQL_NULL_MASK; + compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8 + == REC_2BYTE_SQL_NULL_MASK); + continue; + } + + ulint len; + memcpy(rec_get_nth_field(*old_vers, offsets, n, &len), + uf->new_val.data, uf->new_val.len); + if (UNIV_UNLIKELY(len != uf->new_val.len)) { + ut_ad(len == UNIV_SQL_NULL); + ut_ad(!rec_offs_comp(offsets)); + ut_ad(uf->new_val.len + == rec_get_nth_field_size(rec, n)); + ulint l = rec_get_1byte_offs_flag(*old_vers) + ? (n + 1) : (n + 1) * 2; + (*old_vers)[-REC_N_OLD_EXTRA_BYTES - l] + &= ~REC_1BYTE_SQL_NULL_MASK; + } + } } /* Set the old value (which is the after image of an update) in the