From bdb12d149992910e54051c8eb800ff2610712987 Mon Sep 17 00:00:00 2001 From: Aleksey Midenkov Date: Mon, 26 Sep 2016 08:37:53 +0000 Subject: [PATCH] IB: 0.2 part II * moved vers_notify_vtq() to commit phase; * low_level insert (load test passed); * rest of SYS_VTQ columns filled: COMMIT_TS, CONCURR_TRX; * savepoints support; * I_S.INNODB_SYS_VTQ adjustments: - limit to I_S_SYS_VTQ_LIMIT(10000) of most recent records; - CONCURR_TRX limit to I_S_MAX_CONCURR_TRX(100) with '...' truncation marker; - TIMESTAMP fields show fractions of seconds. --- CMakeLists.txt | 2 +- sql/sql_time.cc | 13 ++ sql/sql_time.h | 10 ++ storage/innobase/btr/btr0pcur.cc | 62 +++++++- storage/innobase/dict/dict0load.cc | 91 +++++++---- storage/innobase/handler/ha_innodb.cc | 5 + storage/innobase/handler/i_s.cc | 21 +-- storage/innobase/include/btr0pcur.h | 38 +++++ storage/innobase/include/btr0pcur.ic | 36 +++++ storage/innobase/include/dict0load.h | 6 +- storage/innobase/include/row0ins.h | 8 +- storage/innobase/include/trx0trx.h | 2 +- storage/innobase/include/trx0types.h | 1 + storage/innobase/row/row0ins.cc | 208 ++++++++++++++++++-------- storage/innobase/row/row0mysql.cc | 17 +-- storage/innobase/trx/trx0roll.cc | 2 + storage/innobase/trx/trx0trx.cc | 5 +- 17 files changed, 407 insertions(+), 120 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index eec96eea542f5..869e65baf8d81 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -365,7 +365,7 @@ ENDIF() IF(WITH_INNOBASE_STORAGE_ENGINE) SET(PLUGIN_INNOBASE STATIC CACHE STRING "" FORCE) - SET(PLUGIN_XTRADB DYNAMIC CACHE STRING "" FORCE) + SET(PLUGIN_XTRADB NO CACHE STRING "" FORCE) ELSE() SET(PLUGIN_INNOBASE DYNAMIC CACHE STRING "" FORCE) SET(PLUGIN_XTRADB STATIC CACHE STRING "" FORCE) diff --git a/sql/sql_time.cc b/sql/sql_time.cc index cad4bae03e8be..6f37f97feef05 100644 --- a/sql/sql_time.cc +++ b/sql/sql_time.cc @@ -475,6 +475,19 @@ void localtime_to_TIME(MYSQL_TIME *to, struct tm *from) to->second= (int) from->tm_sec; } + +/* + Convert seconds since Epoch to TIME +*/ + +void unix_time_to_TIME(MYSQL_TIME *to, time_t secs, suseconds_t usecs) +{ + struct tm tm_time; + localtime_r(&secs, &tm_time); + localtime_to_TIME(to, &tm_time); + to->second_part = usecs; +} + void calc_time_from_sec(MYSQL_TIME *to, long seconds, long microseconds) { long t_seconds; diff --git a/sql/sql_time.h b/sql/sql_time.h index e0cab5cfa66e8..fd0c0273dcf5e 100644 --- a/sql/sql_time.h +++ b/sql/sql_time.h @@ -171,6 +171,16 @@ bool calc_time_diff(const MYSQL_TIME *l_time1, const MYSQL_TIME *l_time2, int lsign, MYSQL_TIME *l_time3, ulonglong fuzzydate); int my_time_compare(const MYSQL_TIME *a, const MYSQL_TIME *b); void localtime_to_TIME(MYSQL_TIME *to, struct tm *from); +void unix_time_to_TIME(MYSQL_TIME *to, time_t secs, suseconds_t usecs); + +inline +longlong unix_time_to_packed(time_t secs, suseconds_t usecs) +{ + MYSQL_TIME mysql_time; + unix_time_to_TIME(&mysql_time, secs, usecs); + return pack_time(&mysql_time); +} + void calc_time_from_sec(MYSQL_TIME *to, long seconds, long microseconds); uint calc_week(MYSQL_TIME *l_time, uint week_behaviour, uint *year); diff --git a/storage/innobase/btr/btr0pcur.cc b/storage/innobase/btr/btr0pcur.cc index 445aa3504b7b3..9160265aae989 100644 --- a/storage/innobase/btr/btr0pcur.cc +++ b/storage/innobase/btr/btr0pcur.cc @@ -452,6 +452,66 @@ btr_pcur_move_to_next_page( ut_d(page_check_dir(next_page)); } +/*********************************************************//** +Moves the persistent cursor to the last record on the previous page. Releases the +latch on the current page, and bufferunfixes it. Note that there must not be +modifications on the current page, as then the x-latch can be released only in +mtr_commit. */ +UNIV_INTERN +void +btr_pcur_move_to_prev_page( +/*=======================*/ + btr_pcur_t* cursor, /*!< in: persistent cursor; must be on the + last record of the current page */ + mtr_t* mtr) /*!< in: mtr */ +{ + ulint prev_page_no; + page_t* page; + buf_block_t* prev_block; + page_t* prev_page; + ulint mode; + + ut_ad(cursor->pos_state == BTR_PCUR_IS_POSITIONED); + ut_ad(cursor->latch_mode != BTR_NO_LATCHES); + ut_ad(btr_pcur_is_before_first_on_page(cursor)); + + cursor->old_stored = false; + + page = btr_pcur_get_page(cursor); + prev_page_no = btr_page_get_prev(page, mtr); + + ut_ad(prev_page_no != FIL_NULL); + + mode = cursor->latch_mode; + switch (mode) { + case BTR_SEARCH_TREE: + mode = BTR_SEARCH_LEAF; + break; + case BTR_MODIFY_TREE: + mode = BTR_MODIFY_LEAF; + } + + buf_block_t* block = btr_pcur_get_block(cursor); + + prev_block = btr_block_get( + page_id_t(block->page.id.space(), prev_page_no), + block->page.size, mode, + btr_pcur_get_btr_cur(cursor)->index, mtr); + + prev_page = buf_block_get_frame(prev_block); +#ifdef UNIV_BTR_DEBUG + ut_a(page_is_comp(prev_page) == page_is_comp(page)); + ut_a(btr_page_get_next(prev_page, mtr) + == btr_pcur_get_block(cursor)->page.id.page_no()); +#endif /* UNIV_BTR_DEBUG */ + + btr_leaf_page_release(btr_pcur_get_block(cursor), mode, mtr); + + page_cur_set_after_last(prev_block, btr_pcur_get_page_cur(cursor)); + + page_check_dir(prev_page); +} + /*********************************************************//** Moves the persistent cursor backward if it is on the first record of the page. Commits mtr. Note that to prevent a possible deadlock, the operation @@ -461,7 +521,7 @@ alphabetical position of the cursor is guaranteed to be sensible on return, but it may happen that the cursor is not positioned on the last record of any page, because the structure of the tree may have changed during the time when the cursor had no latches. */ -static +UNIV_INTERN void btr_pcur_move_backward_from_page( /*=============================*/ diff --git a/storage/innobase/dict/dict0load.cc b/storage/innobase/dict/dict0load.cc index b646123048b8d..0fe6981f675cb 100644 --- a/storage/innobase/dict/dict0load.cc +++ b/storage/innobase/dict/dict0load.cc @@ -319,7 +319,10 @@ dict_getnext_system_low( rec_t* rec = NULL; while (!rec || rec_get_deleted_flag(rec, 0)) { - btr_pcur_move_to_next_user_rec(pcur, mtr); + if (pcur->search_mode == PAGE_CUR_L) + btr_pcur_move_to_prev_user_rec(pcur, mtr); + else + btr_pcur_move_to_next_user_rec(pcur, mtr); rec = btr_pcur_get_rec(pcur); @@ -346,7 +349,8 @@ dict_startscan_system( btr_pcur_t* pcur, /*!< out: persistent cursor to the record */ mtr_t* mtr, /*!< in: the mini-transaction */ - dict_system_id_t system_id) /*!< in: which system table to open */ + dict_system_id_t system_id, /*!< in: which system table to open */ + bool from_left) { dict_table_t* system_table; dict_index_t* clust_index; @@ -358,7 +362,7 @@ dict_startscan_system( clust_index = UT_LIST_GET_FIRST(system_table->indexes); - btr_pcur_open_at_index_side(true, clust_index, BTR_SEARCH_LEAF, pcur, + btr_pcur_open_at_index_side(from_left, clust_index, BTR_SEARCH_LEAF, pcur, true, 0, mtr); rec = dict_getnext_system_low(pcur, mtr); @@ -822,6 +826,15 @@ dict_process_sys_datafiles( return(NULL); } + +inline +const char* dict_print_error(mem_heap_t* heap, ulint col, ulint len, ulint expected) +{ + return mem_heap_printf(heap, + "incorrect column %lu length in SYS_VTQ; got: %lu, expected: %lu", + col, len, expected); +} + /********************************************************************//** This function parses a SYS_VTQ record, extracts necessary information from the record and returns it to the caller. @@ -832,13 +845,15 @@ dict_process_sys_vtq( /*=======================*/ mem_heap_t* heap, /*!< in/out: heap memory */ const rec_t* rec, /*!< in: current rec */ -ullong* col_trx_id, /*!< out: field values */ +trx_id_t* col_trx_id, /*!< out: field values */ ullong* col_begin_ts, ullong* col_commit_ts, -ullong* col_concurr_trx) +char** col_concurr_trx) { - ulint len; - const byte* field; + ulint len, col, concurr_n; + const byte *field, *ptr; + char *out; + trx_id_t trx_id; if (rec_get_deleted_flag(rec, 0)) { return("delete-marked record in SYS_VTQ"); @@ -847,35 +862,57 @@ ullong* col_concurr_trx) if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VTQ) { return("wrong number of columns in SYS_VTQ record"); } - + /* TRX_ID */ field = rec_get_nth_field_old( - rec, DICT_FLD__SYS_VTQ__TRX_ID, &len); - if (len != sizeof(col_trx_id)) { - err_len: - return("incorrect column length in SYS_VTQ"); - } - *col_trx_id = mach_read_from_8(field); + rec, (col = DICT_FLD__SYS_VTQ__TRX_ID), &len); + + if (len != sizeof(trx_id_t)) + return dict_print_error(heap, col, len, sizeof(trx_id_t)); + *col_trx_id = mach_read_from_8(field); + /* BEGIN_TS */ field = rec_get_nth_field_old( - rec, DICT_FLD__SYS_VTQ__BEGIN_TS, &len); - if (len != sizeof(col_begin_ts)) { - goto err_len; - } + rec, (col = DICT_FLD__SYS_VTQ__BEGIN_TS), &len); + + if (len != sizeof(ullong)) + return dict_print_error(heap, col, len, sizeof(ullong)); + *col_begin_ts = mach_read_from_8(field); + /* COMMIT_TS */ + field = rec_get_nth_field_old( + rec, (col = DICT_FLD__SYS_VTQ__COMMIT_TS), &len); + if (len != sizeof(ullong)) + return dict_print_error(heap, col, len, sizeof(ullong)); + + *col_commit_ts = mach_read_from_8(field); + /* CONCURR_TRX */ field = rec_get_nth_field_old( - rec, DICT_FLD__SYS_VTQ__COMMIT_TS, &len); - if (len != sizeof(col_commit_ts)) { - goto err_len; + rec, (col = DICT_FLD__SYS_VTQ__CONCURR_TRX), &len); + concurr_n = len / sizeof(trx_id_t); + if (len != concurr_n * sizeof(trx_id_t)) + return dict_print_error(heap, col, len, concurr_n * sizeof(trx_id_t)); + + bool truncated = false; + if (concurr_n > I_S_MAX_CONCURR_TRX) { + concurr_n = I_S_MAX_CONCURR_TRX; + truncated = true; } - *col_commit_ts = mach_read_from_8(field); - field = rec_get_nth_field_old( - rec, DICT_FLD__SYS_VTQ__CONCURR_TRX, &len); - if (len != sizeof(col_concurr_trx)) { - goto err_len; + if (concurr_n == 0) { + *col_concurr_trx = NULL; + return(NULL); + } + *col_concurr_trx = static_cast(mem_heap_alloc(heap, concurr_n * (TRX_ID_MAX_LEN + 1) + 3 + 1)); + ptr = field, out = *col_concurr_trx; + for (ulint i = 0; i < concurr_n; + ++i, ptr += sizeof(trx_id_t)) + { + trx_id = mach_read_from_8(ptr); + out += ut_snprintf(out, TRX_ID_MAX_LEN + 1, TRX_ID_FMT " ", trx_id); } - *col_concurr_trx = mach_read_from_8(field); + if (truncated) + strcpy(out, "..."); return(NULL); } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 7e90511e9e146..acba2461f55ed 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -4816,6 +4816,11 @@ innobase_commit( if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { + /* Notify VTQ on System Versioned tables update */ + if (trx->vtq_notify_on_commit) { + vers_notify_vtq(trx); + trx->vtq_notify_on_commit = false; + } DBUG_EXECUTE_IF("crash_innodb_before_commit", DBUG_SUICIDE();); diff --git a/storage/innobase/handler/i_s.cc b/storage/innobase/handler/i_s.cc index d53f4fe9df298..7125e2645aac4 100644 --- a/storage/innobase/handler/i_s.cc +++ b/storage/innobase/handler/i_s.cc @@ -9690,10 +9690,11 @@ static ST_FIELD_INFO innodb_vtq_fields_info[] = #define SYS_VTQ_CONCURR_TRX 3 { STRUCT_FLD(field_name, "concurr_trx"), - STRUCT_FLD(field_length, 120), - STRUCT_FLD(field_type, MYSQL_TYPE_MEDIUM_BLOB), + // 3 for "..." if list is truncated + STRUCT_FLD(field_length, I_S_MAX_CONCURR_TRX * (TRX_ID_MAX_LEN + 1) + 3), + STRUCT_FLD(field_type, MYSQL_TYPE_STRING), STRUCT_FLD(value, 0), - STRUCT_FLD(field_flags, 0), + STRUCT_FLD(field_flags, MY_I_S_MAYBE_NULL), STRUCT_FLD(old_name, ""), STRUCT_FLD(open_method, SKIP_OPEN_TABLE) }, @@ -9712,7 +9713,7 @@ i_s_dict_fill_vtq( ullong col_trx_id, /*!< in: table fields */ ullong col_begin_ts, ullong col_commit_ts, - ullong col_concurr_trx, + char* col_concurr_trx, TABLE* table_to_fill) /*!< in/out: fill this table */ { Field** fields; @@ -9723,7 +9724,7 @@ i_s_dict_fill_vtq( OK(field_store_ullong(fields[SYS_VTQ_TRX_ID], col_trx_id)); OK(field_store_packed_ts(fields[SYS_VTQ_BEGIN_TS], col_begin_ts)); OK(field_store_packed_ts(fields[SYS_VTQ_COMMIT_TS], col_commit_ts)); - OK(field_store_ullong(fields[SYS_VTQ_CONCURR_TRX], col_concurr_trx)); + OK(field_store_string(fields[SYS_VTQ_CONCURR_TRX], col_concurr_trx)); OK(schema_table_store_record(thd, table_to_fill)); @@ -9736,7 +9737,7 @@ Loop through each record in SYS_VTQ, and extract the column information and fill the INFORMATION_SCHEMA.INNODB_SYS_VTQ table. @return 0 on success */ -static const int I_S_SYS_VTQ_LIMIT = 1000; // maximum number of records in I_S.INNODB_SYS_VTQ +static const int I_S_SYS_VTQ_LIMIT = 10000; // maximum number of records in I_S.INNODB_SYS_VTQ static int @@ -9764,16 +9765,16 @@ i_s_sys_vtq_fill_table( mutex_enter(&dict_sys->mutex); mtr_start(&mtr); - rec = dict_startscan_system(&pcur, &mtr, SYS_VTQ); + rec = dict_startscan_system(&pcur, &mtr, SYS_VTQ, false); for (int i = 0; rec && i < I_S_SYS_VTQ_LIMIT; ++i) { const char* err_msg; - ullong col_trx_id; + trx_id_t col_trx_id; ullong col_begin_ts; ullong col_commit_ts; - ullong col_concurr_trx; + char* col_concurr_trx; - /* Extract necessary information from a SYS_VTQ row */ + /* Extract necessary information from SYS_VTQ row */ err_msg = dict_process_sys_vtq( heap, rec, diff --git a/storage/innobase/include/btr0pcur.h b/storage/innobase/include/btr0pcur.h index e00af1304795b..6e85d01c19f8d 100644 --- a/storage/innobase/include/btr0pcur.h +++ b/storage/innobase/include/btr0pcur.h @@ -337,6 +337,17 @@ btr_pcur_move_to_next_user_rec( function may release the page latch */ mtr_t* mtr); /*!< in: mtr */ /*********************************************************//** +Moves the persistent cursor to the previous user record in the tree. If no user +records are left, the cursor ends up 'before first in tree'. +@return TRUE if the cursor moved forward, ending on a user record */ +UNIV_INLINE +ibool +btr_pcur_move_to_prev_user_rec( +/*===========================*/ + btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the + function may release the page latch */ + mtr_t* mtr); /*!< in: mtr */ +/*********************************************************//** Moves the persistent cursor to the first record on the next page. Releases the latch on the current page, and bufferunfixes it. Note that there must not be modifications on the current page, @@ -347,6 +358,33 @@ btr_pcur_move_to_next_page( btr_pcur_t* cursor, /*!< in: persistent cursor; must be on the last record of the current page */ mtr_t* mtr); /*!< in: mtr */ +/*********************************************************//** +Moves the persistent cursor to the last record on the previous page. +Releases the latch on the current page, and bufferunfixes it. +Note that there must not be modifications on the current page, +as then the x-latch can be released only in mtr_commit. */ +void +btr_pcur_move_to_prev_page( +/*=======================*/ + btr_pcur_t* cursor, /*!< in: persistent cursor; must be on the + last record of the current page */ + mtr_t* mtr); /*!< in: mtr */ +/*********************************************************//** +Moves the persistent cursor backward if it is on the first record +of the page. Releases the latch on the current page, and bufferunfixes +it. Note that to prevent a possible deadlock, the operation first +stores the position of the cursor, releases the leaf latch, acquires +necessary latches and restores the cursor position again before returning. +The alphabetical position of the cursor is guaranteed to be sensible +on return, but it may happen that the cursor is not positioned on the +last record of any page, because the structure of the tree may have +changed while the cursor had no latches. */ +void +btr_pcur_move_backward_from_page( +/*=============================*/ + btr_pcur_t* cursor, /*!< in: persistent cursor, must be on the + first record of the current page */ + mtr_t* mtr); /*!< in: mtr */ #ifdef UNIV_DEBUG /*********************************************************//** Returns the btr cursor component of a persistent cursor. diff --git a/storage/innobase/include/btr0pcur.ic b/storage/innobase/include/btr0pcur.ic index 425593631d39c..51b6adf842089 100644 --- a/storage/innobase/include/btr0pcur.ic +++ b/storage/innobase/include/btr0pcur.ic @@ -334,6 +334,42 @@ loop: goto loop; } +/*********************************************************//** +Moves the persistent cursor to the previous user record in the tree. If no user +records are left, the cursor ends up 'before first in tree'. +@return TRUE if the cursor moved forward, ending on a user record */ +UNIV_INLINE +ibool +btr_pcur_move_to_prev_user_rec( +/*===========================*/ + btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the + function may release the page latch */ + mtr_t* mtr) /*!< in: mtr */ +{ + ut_ad(cursor->pos_state == BTR_PCUR_IS_POSITIONED); + ut_ad(cursor->latch_mode != BTR_NO_LATCHES); + cursor->old_stored = false; +loop: + if (btr_pcur_is_before_first_on_page(cursor)) { + + if (btr_pcur_is_before_first_in_tree(cursor, mtr)) { + + return(FALSE); + } + + btr_pcur_move_to_prev(cursor, mtr); + } else { + btr_pcur_move_to_prev_on_page(cursor); + } + + if (btr_pcur_is_on_user_rec(cursor)) { + + return(TRUE); + } + + goto loop; +} + /*********************************************************//** Moves the persistent cursor to the next record in the tree. If no records are left, the cursor stays 'after last in tree'. diff --git a/storage/innobase/include/dict0load.h b/storage/innobase/include/dict0load.h index 197a61b4ebe9a..6c0c1ddc267ee 100644 --- a/storage/innobase/include/dict0load.h +++ b/storage/innobase/include/dict0load.h @@ -182,7 +182,8 @@ dict_startscan_system( btr_pcur_t* pcur, /*!< out: persistent cursor to the record */ mtr_t* mtr, /*!< in: the mini-transaction */ - dict_system_id_t system_id); /*!< in: which system table to open */ + dict_system_id_t system_id, /*!< in: which system table to open */ + bool from_left = true); /********************************************************************//** This function get the next system table record as we scan the table. @return the record if found, NULL if end of scan. */ @@ -319,6 +320,7 @@ dict_process_sys_datafiles( This function parses a SYS_VTQ record, extracts necessary information from the record and returns it to the caller. @return error message, or NULL on success */ +#define I_S_MAX_CONCURR_TRX 100 UNIV_INTERN const char* dict_process_sys_vtq( @@ -328,7 +330,7 @@ const rec_t* rec, /*!< in: current rec */ ullong* col_trx_id, /*!< out: field values */ ullong* col_begin_ts, ullong* col_commit_ts, -ullong* col_concurr_trx); +char** col_concurr_trx); /** Update the record for space_id in SYS_TABLESPACES to this filepath. @param[in] space_id Tablespace ID diff --git a/storage/innobase/include/row0ins.h b/storage/innobase/include/row0ins.h index 541a3b4e70ee6..dc6ad2d34adb8 100644 --- a/storage/innobase/include/row0ins.h +++ b/storage/innobase/include/row0ins.h @@ -95,9 +95,10 @@ row_ins_clust_index_entry_low( dtuple_t* entry, /*!< in/out: index entry to insert */ ulint n_ext, /*!< in: number of externally stored columns */ que_thr_t* thr, /*!< in: query thread or NULL */ - bool dup_chk_only) + bool dup_chk_only, /*!< in: if true, just do duplicate check and return. don't execute actual insert. */ + trx_t* trx = 0) MY_ATTRIBUTE((warn_unused_result)); /***************************************************************//** @@ -123,9 +124,10 @@ row_ins_sec_index_entry_low( trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during row_log_table_apply(), or 0 */ que_thr_t* thr, /*!< in: query thread */ - bool dup_chk_only) + bool dup_chk_only, /*!< in: if true, just do duplicate check and return. don't execute actual insert. */ + trx_t* trx = 0) MY_ATTRIBUTE((warn_unused_result)); /** Sets the values of the dtuple fields in entry from the values of appropriate columns in row. @@ -183,7 +185,7 @@ row_ins_step( /***********************************************************//** Inserts a row to SYS_VTQ table. @return error state */ -dberr_t vers_notify_vtq(que_thr_t * thr, mem_heap_t * heap); +void vers_notify_vtq(trx_t* trx); /* Insert node structure */ diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 7cc95b28c0da3..a1de49cae1f83 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -1267,7 +1267,7 @@ struct trx_t { os_event_t wsrep_event; /* event waited for in srv_conc_slot */ #endif /* WITH_WSREP */ - bool vtq_notified; + bool vtq_notify_on_commit; /*!< Notify VTQ for System Versioned update */ ulint magic_n; /** @return whether any persistent undo log has been generated */ diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h index 8092246c7fa43..45c3356d8ec75 100644 --- a/storage/innobase/include/trx0types.h +++ b/storage/innobase/include/trx0types.h @@ -146,6 +146,7 @@ typedef ib_id_t undo_no_t; /** Transaction savepoint */ struct trx_savept_t{ undo_no_t least_undo_no; /*!< least undo number to undo */ + bool vtq_notify_on_commit; /*!< Notify VTQ for System Versioned update */ }; /** File objects */ diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index 4fa07aca80851..d071036e94787 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -50,6 +50,7 @@ Created 4/20/1996 Heikki Tuuri #include "fts0types.h" #include "m_string.h" #include "gis0geo.h" +#include "sql_time.h" /************************************************************************* IMPORTANT NOTE: Any operation that generates redo MUST check that there @@ -2042,7 +2043,8 @@ row_ins_scan_sec_index_for_duplicate( que_thr_t* thr, /*!< in: query thread */ bool s_latch,/*!< in: whether index->lock is being held */ mtr_t* mtr, /*!< in/out: mini-transaction */ - mem_heap_t* offsets_heap) + mem_heap_t* offsets_heap, + trx_t* trx = 0) /*!< in/out: memory heap that can be emptied */ { ulint n_unique; @@ -2055,6 +2057,10 @@ row_ins_scan_sec_index_for_duplicate( DBUG_ENTER("row_ins_scan_sec_index_for_duplicate"); + ut_ad(thr || (trx && flags & BTR_NO_LOCKING_FLAG)); + if (!trx) + trx = thr_get_trx(thr); + ut_ad(s_latch == rw_lock_own_flagged( &index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX)); @@ -2086,7 +2092,7 @@ row_ins_scan_sec_index_for_duplicate( : BTR_SEARCH_LEAF, &pcur, mtr); - allow_duplicates = thr_get_trx(thr)->duplicates; + allow_duplicates = trx->duplicates; /* Scan index records and check if there is a duplicate */ @@ -2142,7 +2148,7 @@ row_ins_scan_sec_index_for_duplicate( index, offsets)) { err = DB_DUPLICATE_KEY; - thr_get_trx(thr)->error_info = index; + trx->error_info = index; /* If the duplicate is on hidden FTS_DOC_ID, state so in the error log */ @@ -2528,9 +2534,10 @@ row_ins_clust_index_entry_low( dtuple_t* entry, /*!< in/out: index entry to insert */ ulint n_ext, /*!< in: number of externally stored columns */ que_thr_t* thr, /*!< in: query thread */ - bool dup_chk_only) + bool dup_chk_only, /*!< in: if true, just do duplicate check and return. don't execute actual insert. */ + trx_t* trx) { btr_pcur_t pcur; btr_cur_t* cursor; @@ -2545,11 +2552,15 @@ row_ins_clust_index_entry_low( DBUG_ENTER("row_ins_clust_index_entry_low"); + ut_ad(thr || trx); + if (!trx) + trx = thr_get_trx(thr); + ut_ad(dict_index_is_clust(index)); ut_ad(!dict_index_is_unique(index) || n_uniq == dict_index_get_n_unique(index)); ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index)); - ut_ad(!thr_get_trx(thr)->in_rollback); + ut_ad(!trx->in_rollback); mtr_start(&mtr); @@ -2612,7 +2623,10 @@ row_ins_clust_index_entry_low( if (flags == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG - | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) { + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG) || !thr) { + // thr == 0 for SYS_VTQ table + ut_ad(thr || flags & + (BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)); /* Set no locks when applying log in online table rebuild. Only check for duplicates. */ err = row_ins_duplicate_error_in_clust_online( @@ -2627,7 +2641,7 @@ row_ins_clust_index_entry_low( /* fall through */ case DB_SUCCESS_LOCKED_REC: case DB_DUPLICATE_KEY: - thr_get_trx(thr)->error_info = cursor->index; + trx->error_info = cursor->index; } } else { /* Note that the following may return also @@ -2716,7 +2730,7 @@ row_ins_clust_index_entry_low( LSN_MAX, TRUE);); err = row_ins_index_entry_big_rec( entry, big_rec, offsets, &offsets_heap, index, - thr_get_trx(thr)->mysql_thd); + trx->mysql_thd); dtuple_convert_back_big_rec(index, entry, big_rec); } else { if (err == DB_SUCCESS @@ -2810,9 +2824,10 @@ row_ins_sec_index_entry_low( trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during row_log_table_apply(), or 0 */ que_thr_t* thr, /*!< in: query thread */ - bool dup_chk_only) + bool dup_chk_only, /*!< in: if true, just do duplicate check and return. don't execute actual insert. */ + trx_t* trx) { DBUG_ENTER("row_ins_sec_index_entry_low"); @@ -2826,6 +2841,10 @@ row_ins_sec_index_entry_low( rec_offs_init(offsets_); rtr_info_t rtr_info; + ut_ad(thr || trx); + if (!trx) + trx = thr_get_trx(thr); + ut_ad(!dict_index_is_clust(index)); ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE); @@ -2867,7 +2886,7 @@ row_ins_sec_index_entry_low( } if (row_log_online_op_try( - index, entry, thr_get_trx(thr)->id)) { + index, entry, trx->id)) { goto func_exit; } } @@ -2876,7 +2895,7 @@ row_ins_sec_index_entry_low( the function will return in both low_match and up_match of the cursor sensible values */ - if (!thr_get_trx(thr)->check_unique_secondary) { + if (!trx->check_unique_secondary) { search_mode |= BTR_IGNORE_SEC_UNIQUE; } @@ -2919,8 +2938,6 @@ row_ins_sec_index_entry_low( } if (err != DB_SUCCESS) { - trx_t* trx = thr_get_trx(thr); - if (err == DB_DECRYPTION_FAILED) { ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED, @@ -2964,7 +2981,7 @@ row_ins_sec_index_entry_low( } err = row_ins_scan_sec_index_for_duplicate( - flags, index, entry, thr, check, &mtr, offsets_heap); + flags, index, entry, thr, check, &mtr, offsets_heap, trx); mtr_commit(&mtr); @@ -2973,7 +2990,7 @@ row_ins_sec_index_entry_low( break; case DB_DUPLICATE_KEY: if (!index->is_committed()) { - ut_ad(!thr_get_trx(thr) + ut_ad(!trx ->dict_operation_lock_mode); mutex_enter(&dict_sys->mutex); dict_set_corrupted_index_cache_only(index); @@ -3016,8 +3033,8 @@ row_ins_sec_index_entry_low( if (!(flags & BTR_NO_LOCKING_FLAG) && dict_index_is_unique(index) - && thr_get_trx(thr)->duplicates - && thr_get_trx(thr)->isolation_level >= TRX_ISO_REPEATABLE_READ) { + && trx->duplicates + && trx->isolation_level >= TRX_ISO_REPEATABLE_READ) { /* When using the REPLACE statement or ON DUPLICATE clause, a gap lock is taken on the position of the to-be-inserted record, @@ -3040,7 +3057,7 @@ row_ins_sec_index_entry_low( switch (err) { case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC: - if (thr_get_trx(thr)->error_state != DB_DUPLICATE_KEY) { + if (trx->error_state != DB_DUPLICATE_KEY) { break; } /* Fall through (skip actual insert) after we have @@ -3050,7 +3067,7 @@ row_ins_sec_index_entry_low( } } - ut_ad(thr_get_trx(thr)->error_state == DB_SUCCESS); + ut_ad(trx->error_state == DB_SUCCESS); if (dup_chk_only) { goto func_exit; @@ -3802,63 +3819,134 @@ row_ins_step( inline void set_row_field_8(dtuple_t* row, int field_num, ib_uint64_t data, mem_heap_t* heap) { + static const ulint fsize = 8; dfield_t* dfield = dtuple_get_nth_field(row, field_num); - byte* buf = static_cast(mem_heap_alloc(heap, 8)); + ut_ad(dfield->type.len == fsize); + byte* buf = static_cast(mem_heap_alloc(heap, fsize)); mach_write_to_8(buf, data); - dfield_set_data(dfield, buf, 8); + dfield_set_data(dfield, buf, fsize); } -#include "my_time.h" -#include "sql_time.h" - /***********************************************************//** -Inserts a row to SYS_VTQ table. -@return error state */ -UNIV_INTERN +Inserts a row to SYS_VTQ, low level. +@return DB_SUCCESS if operation successfully completed, else error +code */ +static __attribute__((nonnull, warn_unused_result)) dberr_t -vers_notify_vtq(que_thr_t* thr, mem_heap_t* heap) +vers_row_ins_vtq_low(trx_t* trx, mem_heap_t* heap, dtuple_t* row) { dberr_t err; - trx_t* trx = thr_get_trx(thr); - dict_table_t* sys_vtq = dict_sys->sys_vtq; - ins_node_t* node = ins_node_create(INS_DIRECT, sys_vtq, heap); + dtuple_t* entry; + ulint n_index = 0; + dict_index_t* index = dict_table_get_first_index(dict_sys->sys_vtq); + static const ulint flags + = (BTR_KEEP_SYS_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG); - node->select = NULL; - node->values_list = NULL; // for INS_VALUES + entry = row_build_index_entry(row, NULL, index, heap); - dtuple_t* row = dtuple_create(heap, dict_table_get_n_cols(sys_vtq)); - dict_table_copy_types(row, sys_vtq); + dfield_t* dfield = dtuple_get_nth_field(entry, DATA_TRX_ID); + ut_ad(dfield->type.len == DATA_TRX_ID_LEN); + dfield_set_data(dfield, mem_heap_alloc(heap, DATA_TRX_ID_LEN), DATA_TRX_ID_LEN); + row_upd_index_entry_sys_field(entry, index, DATA_TRX_ID, trx->id); - struct tm unix_time; - MYSQL_TIME mysql_time; - localtime_r(&trx->start_time, &unix_time); - localtime_to_TIME(&mysql_time, &unix_time); - mysql_time.second_part = trx->start_time_micro; - ullong start_time = pack_time(&mysql_time); + err = row_ins_clust_index_entry_low( + flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, NULL, false, trx); - set_row_field_8(row, DICT_FLD__SYS_VTQ__TRX_ID, trx->id, heap); - set_row_field_8(row, DICT_FLD__SYS_VTQ__BEGIN_TS - 2, start_time, heap); - set_row_field_8(row, DICT_FLD__SYS_VTQ__COMMIT_TS - 2, start_time, heap); - set_row_field_8(row, DICT_FLD__SYS_VTQ__CONCURR_TRX - 2, 3, heap); + switch (err) { + case DB_SUCCESS: + break; + case DB_SUCCESS_LOCKED_REC: + /* The row had already been copied to the table. */ + fprintf(stderr, "InnoDB: duplicate VTQ record!\n"); + return DB_SUCCESS; + default: + return err; + } - ins_node_set_new_row(node, row); + mem_heap_t* offsets_heap = mem_heap_create(1024); - trx_write_trx_id(node->trx_id_buf, trx->id); - err = lock_table(0, node->table, LOCK_IX, thr); - DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait", - err = DB_LOCK_WAIT;); + do { + n_index++; - if (err != DB_SUCCESS) { - goto end_func; - } + if (!(index = dict_table_get_next_index(index))) { + break; + } - node->trx_id = trx->id; - node->state = INS_NODE_ALLOC_ROW_ID; - err = row_ins(node, thr); + if (index->type & DICT_FTS) { + continue; + } -end_func: - trx->error_state = err; - if (err != DB_SUCCESS) - fprintf(stderr, "InnoDB: failed to insert VTQ record (see SQL error message)\n"); + entry = row_build_index_entry(row, NULL, index, heap); + err = row_ins_sec_index_entry_low( + flags, BTR_MODIFY_TREE, + index, offsets_heap, heap, entry, trx->id, NULL, false, trx); + + ///* Report correct index name for duplicate key error. */ + // No need to report on commit phase? + //if (err == DB_DUPLICATE_KEY) { + // trx->error_key_num = n_index; + //} + } while (err == DB_SUCCESS); + + mem_heap_free(offsets_heap); return err; } + +/***********************************************************//** +Inserts a row to SYS_VTQ table. +@return error state */ +void vers_notify_vtq(trx_t* trx) +{ + dberr_t err; + mem_heap_t* heap = mem_heap_create(1024); + dtuple_t* row = dtuple_create(heap, dict_table_get_n_cols(dict_sys->sys_vtq)); + + ulint now_secs, now_usecs; + ut_usectime(&now_secs, &now_usecs); + ullong begin_ts = unix_time_to_packed(trx->start_time, trx->start_time_micro); + ullong commit_ts = unix_time_to_packed(now_secs, now_usecs); + + dict_table_copy_types(row, dict_sys->sys_vtq); + set_row_field_8(row, DICT_FLD__SYS_VTQ__TRX_ID, trx->id, heap); + set_row_field_8(row, DICT_FLD__SYS_VTQ__BEGIN_TS - 2, begin_ts, heap); + set_row_field_8(row, DICT_FLD__SYS_VTQ__COMMIT_TS - 2, commit_ts, heap); + + dfield_t* dfield = dtuple_get_nth_field(row, DICT_FLD__SYS_VTQ__CONCURR_TRX - 2); + mutex_enter(&trx_sys->mutex); + trx_ut_list_t &rw_list = trx_sys->rw_trx_list; + if (rw_list.count > 1) { + byte* buf = static_cast(mem_heap_alloc(heap, rw_list.count * 8)); + byte* ptr = buf; + ulint count = 0; + + for (trx_t* ctrx = UT_LIST_GET_FIRST(rw_list); + ctrx != NULL; + ctrx = UT_LIST_GET_NEXT(trx_list, ctrx)) + { + if (ctrx == trx || ctrx->state == TRX_STATE_NOT_STARTED) + continue; + + mach_write_to_8(ptr, ctrx->id); + ++count; + ptr += 8; + } + + if (count) + dfield_set_data(dfield, buf, count * 8); + else + dfield_set_data(dfield, NULL, 0); + } else { + // there must be at least current transaction + ut_ad(rw_list.count == 1 && UT_LIST_GET_FIRST(rw_list) == trx); + dfield_set_data(dfield, NULL, 0); + } + mutex_exit(&trx_sys->mutex); + + err = vers_row_ins_vtq_low(trx, heap, row); + if (DB_SUCCESS != err) + fprintf(stderr, "InnoDB: failed to insert VTQ record (error %d)\n", err); + + mem_heap_free(heap); +} diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index 68cc4372bef64..c08b608e006da 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -1537,12 +1537,8 @@ row_insert_for_mysql( node->duplicate = NULL; - if (!trx->vtq_notified && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) { - trx->vtq_notified = true; - err = vers_notify_vtq(thr, node->table->heap); - if (err != DB_SUCCESS) { - goto error_exit; - } + if (!trx->vtq_notify_on_commit && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) { + trx->vtq_notify_on_commit = true; } if (dict_table_has_fts_index(table)) { @@ -1989,7 +1985,6 @@ row_update_for_mysql_using_upd_graph( err = trx->error_state; if (err != DB_SUCCESS) { - error_exit: que_thr_stop_for_mysql(thr); if (err == DB_RECORD_NOT_FOUND) { @@ -2137,12 +2132,8 @@ row_update_for_mysql_using_upd_graph( } } - if (!trx->vtq_notified && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) { - trx->vtq_notified = true; - err = vers_notify_vtq(thr, node->table->heap); - if (err != DB_SUCCESS) { - goto error; - } + if (!trx->vtq_notify_on_commit && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) { + trx->vtq_notify_on_commit = true; } trx->op_info = ""; diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index ec723375fe968..b695d2710511b 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -128,6 +128,7 @@ trx_rollback_to_savepoint_low( } else { trx->lock.que_state = TRX_QUE_RUNNING; MONITOR_INC(MONITOR_TRX_ROLLBACK_SAVEPOINT); + trx->vtq_notify_on_commit = savept->vtq_notify_on_commit; } ut_a(trx->error_state == DB_SUCCESS); @@ -614,6 +615,7 @@ trx_savept_take( trx_savept_t savept; savept.least_undo_no = trx->undo_no; + savept.vtq_notify_on_commit = trx->vtq_notify_on_commit; return(savept); } diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index e934d1501e9a8..9b6159fc81e64 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -470,6 +470,8 @@ trx_create_low() trx_free(). */ ut_a(trx->mod_tables.size() == 0); + trx->vtq_notify_on_commit = false; + #ifdef WITH_WSREP trx->wsrep_event = NULL; #endif /* WITH_WSREP */ @@ -1363,8 +1365,7 @@ trx_start_low( } } - trx->vtq_notified = false; - + trx->vtq_notify_on_commit = false; ut_a(trx->error_state == DB_SUCCESS); MONITOR_INC(MONITOR_TRX_ACTIVE);