Skip to content

Commit

Permalink
Remove the flag vers_update_trt
Browse files Browse the repository at this point in the history
THD::vers_update_trt, trx_t::vers_update_trt, trx_savept_t::vers_update_trt:
Remove. Instead, determine from trx_t::mod_tables whether versioned
columns were affected by the transaction.

handlerton::prepare_commit_versioned: Replaces vers_get_trt_data.
Return the transaction start ID and also the commit ID, in case
the transaction modified any system-versioned columns (0 if not).

TR_table::store_data(): Remove (merge with update() below).

TR_table::update(): Add the parameters start_id, end_id.

ha_commit_trans(): Remove a condition on SQLCOM_ALTER_TABLE.
If we need something special for ALTER TABLE...ALGORITHM=INPLACE,
that can be done inside InnoDB by modifying trx_t::mod_tables.

innodb_prepare_commit_versioned(): Renamed from innodb_get_trt_data().
Check trx_t::mod_tables to see if any changes to versioned columns
are present.

trx_mod_table_time_t: A pair of logical timestamps, replacing the
undo_no_t in trx_mod_tables_t. Keep track of not only the first
modification to a persistent table in each transaction, but also
the first modification of a versioned column in a table.

dtype_t, dict_col_t: Add the accessor is_any_versioned(), to check
if the type refers to a system-versioned user or system column.

upd_t::affects_versioned(): Check if an update affects a versioned
column.

trx_undo_report_row_operation(): If a versioned column is affected
by the update, invoke trx_mod_table_time_t::set_versioned().

trx_rollback_to_savepoint_low(): If all changes to versioned columns
were rolled back, invoke trx_mod_table_time_t::rollback_versioned(),
so that trx_mod_table_time_t::is_versioned() will no longer hold.
  • Loading branch information
dr-m authored and midenok committed Nov 27, 2017
1 parent 03fbfee commit 0b89a42
Show file tree
Hide file tree
Showing 21 changed files with 185 additions and 138 deletions.
3 changes: 0 additions & 3 deletions mysql-test/suite/versioning/r/alter.result
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ add period for system_time(trx_start, trx_end),
add system versioning;
call verify_vtq;
No A B C D
1 1 1 1 1
show create table t;
Table Create Table
t CREATE TABLE `t` (
Expand All @@ -362,7 +361,6 @@ No A B C D
alter table t add system versioning, algorithm=copy;
call verify_vtq;
No A B C D
1 1 1 1 1
show create table t;
Table Create Table
t CREATE TABLE `t` (
Expand Down Expand Up @@ -415,7 +413,6 @@ No A B C D
alter table t add system versioning, algorithm=inplace;
call verify_vtq;
No A B C D
1 1 1 1 1
show create table t;
Table Create Table
t CREATE TABLE `t` (
Expand Down
24 changes: 20 additions & 4 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1414,15 +1414,31 @@ int ha_commit_trans(THD *thd, bool all)
goto err;
}

if (rw_trans || thd->lex->sql_command == SQLCOM_ALTER_TABLE)
if (rw_trans && use_transaction_registry)
{
if (use_transaction_registry && thd->vers_update_trt)
ulonglong trx_start_id= 0, trx_end_id= 0;
for (Ha_trx_info *ha_info= trans->ha_list; ha_info;
ha_info= ha_info->next())
{
if (ulonglong (*prepare)(THD*,ulonglong*)= ha_info->ht()->
prepare_commit_versioned)
{
trx_end_id= prepare(thd, &trx_start_id);
if (trx_end_id)
break; // FIXME: use a common ID for cross-engine transactions
}
}

if (trx_end_id)
{
DBUG_ASSERT(trx_start_id);
TR_table trt(thd, true);
if (trt.update())
if (trt.update(trx_start_id, trx_end_id))
goto err;
if (all)
#if 1 // FIXME: fix this properly, and remove TR_table::was_updated()
if (all) // avoid a crash in versioning.rpl_stmt
commit_one_phase_2(thd, false, &thd->transaction.stmt, false);
#endif
}
}

Expand Down
11 changes: 6 additions & 5 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,6 @@ struct handler_iterator {
class handler;
class group_by_handler;
struct Query;
class TR_table;
typedef class st_select_lex SELECT_LEX;
typedef struct st_order ORDER;

Expand Down Expand Up @@ -1390,10 +1389,12 @@ struct handlerton
/*
System Versioning
*/
/** Fill TRT record for update.
@param[out] trt TRT table which record[0] will be filled with
transaction data. */
void (*vers_get_trt_data)(TR_table &trt);
/** Determine if system-versioned data was modified by the transaction.
@param[in,out] thd current session
@param[out] trx_id transaction start ID
@return transaction commit ID
@retval 0 if no system-versioned data was affected by the transaction */
ulonglong (*prepare_commit_versioned)(THD *thd, ulonglong *trx_id);
};


Expand Down
7 changes: 0 additions & 7 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,6 @@ extern "C" void thd_kill_timeout(THD* thd)
mysql_mutex_unlock(&thd->LOCK_thd_data);
}

void thd_vers_update_trt(THD * thd, bool value)
{
thd->vers_update_trt= value;
}

THD::THD(my_thread_id id, bool is_wsrep_applier)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
Expand Down Expand Up @@ -1347,8 +1342,6 @@ void THD::init(void)
wsrep_skip_wsrep_GTID = false;
#endif /* WITH_WSREP */

vers_update_trt = false;

if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
else
Expand Down
4 changes: 0 additions & 4 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -4565,10 +4565,6 @@ class THD :public Statement,
/* Handling of timeouts for commands */
thr_timer_t query_timer;

// Storage engine may set this to true is we want to write a row to
// transaction_registry table on transaction commit.
bool vers_update_trt;

public:
void set_query_timer()
{
Expand Down
12 changes: 9 additions & 3 deletions sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7442,10 +7442,16 @@ static bool mysql_inplace_alter_table(THD *thd,

{
TR_table trt(thd, true);
if (thd->vers_update_trt && trt != *table_list)
if (trt == *table_list || !use_transaction_registry);
else if (ulonglong (*prepare)(THD*,ulonglong*)= table->file->ht->
prepare_commit_versioned)
{
if (use_transaction_registry && trt.update())
return true;
ulonglong trx_start_id, trx_end_id= prepare(thd, &trx_start_id);
if (trx_end_id && trt.update(trx_start_id, trx_end_id))
{
my_error(ER_UNKNOWN_ERROR, MYF(0));
goto rollback;
}
}

if (table->file->ha_commit_inplace_alter_table(altered_table,
Expand Down
36 changes: 9 additions & 27 deletions sql/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8525,48 +8525,30 @@ void TR_table::store(uint field_id, timeval ts)
table->field[field_id]->set_notnull();
}

void TR_table::store_data(ulonglong trx_id, ulonglong commit_id, timeval commit_ts)
{
timeval start_time= {static_cast<int>(thd->start_time),
static_cast<int>(thd->start_time_sec_part)};
store(FLD_TRX_ID, trx_id);
store(FLD_COMMIT_ID, commit_id);
store(FLD_BEGIN_TS, start_time);
if (thd->start_time_ge(commit_ts.tv_sec, commit_ts.tv_usec))
{
thd->start_time_inc();
commit_ts.tv_sec= thd->start_time;
commit_ts.tv_usec= thd->start_time_sec_part;
}
store(FLD_COMMIT_TS, commit_ts);
store_iso_level(thd->tx_isolation);
}

enum_tx_isolation TR_table::iso_level() const
{
enum_tx_isolation res= (enum_tx_isolation) ((*this)[FLD_ISO_LEVEL]->val_int() - 1);
DBUG_ASSERT(res <= ISO_SERIALIZABLE);
return res;
}

bool TR_table::update()
bool TR_table::update(ulonglong start_id, ulonglong end_id)
{
if (!table && open())
return true;

DBUG_ASSERT(table->s);
handlerton *hton= table->s->db_type();
DBUG_ASSERT(hton);
DBUG_ASSERT(hton->flags & HTON_NATIVE_SYS_VERSIONING);
DBUG_ASSERT(thd->vers_update_trt);
timeval start_time= {thd->start_time, long(thd->start_time_sec_part)};
thd->set_current_time();
timeval end_time= {thd->start_time, long(thd->start_time_sec_part)};
store(FLD_TRX_ID, start_id);
store(FLD_COMMIT_ID, end_id);
store(FLD_BEGIN_TS, start_time);
store(FLD_COMMIT_TS, end_time);
store_iso_level(thd->tx_isolation);

hton->vers_get_trt_data(*this);
int error= table->file->ha_write_row(table->record[0]);
if (error)
{
table->file->print_error(error, MYF(0));
}
thd->vers_update_trt= false;
return error;
}

Expand Down
27 changes: 8 additions & 19 deletions sql/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2983,26 +2983,15 @@ class TR_table: public TABLE_LIST
*/
void store(uint field_id, timeval ts);
/**
Stores value to internal transaction_registry TABLE object.
@param[in] current (InnoDB) transaction id
@param[in] InnoDB transaction counter at the time of transaction commit
@param[in] transaction commit timestamp
*/
void store_data(ulonglong trx_id, ulonglong commit_id, timeval commit_ts);
/**
Writes a row from internal TABLE object to transaction_registry table.
Update the transaction_registry right before commit.
@param start_id transaction identifier at start
@param end_id transaction identifier at commit
@retval true on error, false otherwise.
*/
bool update();
/**
Checks whether a row with specified transaction_id exists in a
transaction_registry table.
@param[in] transacton_id value
@retval true if exists, false it not exists or an error occured
*/
@retval false on success
@retval true on error (the transaction must be rolled back)
*/
bool update(ulonglong start_id, ulonglong end_id);
// return true if found; false if not found or error
bool query(ulonglong trx_id);
/**
Gets a row from transaction_registry with the closest commit_timestamp to
Expand Down
8 changes: 5 additions & 3 deletions sql/vtmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,13 @@ VTMD_table::update(THD *thd, const char* archive_name)
}

quit:
if (!result && use_transaction_registry)
if (result || !use_transaction_registry);
else if (ulonglong (*prepare)(THD*,ulonglong*)= vtmd.table->file->ht->
prepare_commit_versioned)
{
DBUG_ASSERT(thd->vers_update_trt);
TR_table trt(thd, true);
result= trt.update();
ulonglong trx_start_id, trx_end_id= prepare(thd, &trx_start_id);
result= trx_end_id && trt.update(trx_start_id, trx_end_id);
}

close_log_table(thd, &open_tables_backup);
Expand Down
62 changes: 32 additions & 30 deletions storage/innobase/handler/ha_innodb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3623,22 +3623,37 @@ static const char* ha_innobase_exts[] = {
NullS
};

void innodb_get_trt_data(TR_table& trt) {
THD* thd = trt.get_thd();
trx_t* trx = thd_to_trx(thd);
ut_a(trx);
ut_a(trx->vers_update_trt);
mutex_enter(&trx_sys->mutex);
trx_id_t commit_id = trx_sys_get_new_trx_id();
ulint sec = 0;
ulint usec = 0;
ut_usectime(&sec, &usec);
mutex_exit(&trx_sys->mutex);
/** Determine if system-versioned data was modified by the transaction.
@param[in,out] thd current session
@param[out] trx_id transaction start ID
@return transaction commit ID
@retval 0 if no system-versioned data was affected by the transaction */
static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
{
if (const trx_t* trx = thd_to_trx(thd)) {
*trx_id = trx->id;

for (trx_mod_tables_t::const_iterator t
= trx->mod_tables.begin();
t != trx->mod_tables.end(); t++) {
if (t->second.is_versioned()) {
DBUG_ASSERT(t->first->versioned());
DBUG_ASSERT(trx->undo_no);
DBUG_ASSERT(trx->rsegs.m_redo.rseg);

mutex_enter(&trx_sys->mutex);
trx_id_t commit_id = trx_sys_get_new_trx_id();
mutex_exit(&trx_sys->mutex);

return commit_id;
}
}

return 0;
}

// silent downgrade cast warning on win64
timeval commit_ts = {static_cast<int>(sec), static_cast<int>(usec)};
trt.store_data(trx->id, commit_id, commit_ts);
trx->vers_update_trt = false;
*trx_id = 0;
return 0;
}

/*********************************************************************//**
Expand Down Expand Up @@ -3709,7 +3724,8 @@ innobase_init(
innobase_hton->table_options = innodb_table_option_list;

/* System Versioning */
innobase_hton->vers_get_trt_data = innodb_get_trt_data;
innobase_hton->prepare_commit_versioned
= innodb_prepare_commit_versioned;

innodb_remember_check_sysvar_funcs();

Expand Down Expand Up @@ -4894,8 +4910,6 @@ innobase_rollback_to_savepoint(
dberr_t error = trx_rollback_to_savepoint_for_mysql(
trx, name, &mysql_binlog_cache_pos);

thd_vers_update_trt(thd, trx->vers_update_trt);

if (error == DB_SUCCESS && trx->fts_trx != NULL) {
fts_savepoint_rollback(trx, name);
}
Expand Down Expand Up @@ -8376,10 +8390,6 @@ ha_innobase::write_row(
/* Step-5: Execute insert graph that will result in actual insert. */
error = row_insert_for_mysql((byte*) record, m_prebuilt, vers_set_fields);

if (m_prebuilt->trx->vers_update_trt) {
thd_vers_update_trt(m_user_thd, true);
}

DEBUG_SYNC(m_user_thd, "ib_after_row_insert");

/* Step-6: Handling of errors related to auto-increment. */
Expand Down Expand Up @@ -9199,10 +9209,6 @@ ha_innobase::update_row(
}
}

if (m_prebuilt->trx->vers_update_trt) {
thd_vers_update_trt(m_user_thd, true);
}

if (error == DB_SUCCESS && autoinc) {
/* A value for an AUTO_INCREMENT column
was specified in the UPDATE statement. */
Expand Down Expand Up @@ -9321,10 +9327,6 @@ ha_innobase::delete_row(

error = row_update_for_mysql(m_prebuilt, vers_set_fields);

if (m_prebuilt->trx->vers_update_trt) {
thd_vers_update_trt(m_user_thd, true);
}

innobase_srv_conc_exit_innodb(m_prebuilt);

/* Tell the InnoDB server that there might be work for
Expand Down
4 changes: 0 additions & 4 deletions storage/innobase/handler/handler0alter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7090,10 +7090,6 @@ ha_innobase::inplace_alter_table(
ctx->m_stage, add_v, eval_table,
ha_alter_info->handler_flags & Alter_inplace_info::ALTER_DROP_HISTORICAL);

if (m_prebuilt->trx->vers_update_trt) {
thd_vers_update_trt(m_user_thd, true);
}

#ifndef DBUG_OFF
oom:
#endif /* !DBUG_OFF */
Expand Down
4 changes: 3 additions & 1 deletion storage/innobase/include/data0type.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ struct dtype_t{
mbminlen=DATA_MBMINLEN(mbminmaxlen);
mbmaxlen=DATA_MBMINLEN(mbminmaxlen) */

/** @return whether this is system versioned */
/** @return whether this is any system versioned field */
bool is_any_versioned() const { return prtype & DATA_VERSIONED; }
/** @return whether this is system versioned user field */
bool is_versioned() const { return !(~prtype & DATA_VERSIONED); }
/** @return whether this is the system version start */
bool is_version_start() const
Expand Down
2 changes: 2 additions & 0 deletions storage/innobase/include/dict0mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,8 @@ struct dict_col_t{
/** @return whether NULL is an allowed value for this column */
bool is_nullable() const { return !(prtype & DATA_NOT_NULL); }

/** @return whether this is any system versioned field */
bool is_any_versioned() const { return prtype & DATA_VERSIONED; }
/** @return whether this is system versioned */
bool is_versioned() const { return !(~prtype & DATA_VERSIONED); }
/** @return whether this is the system version start */
Expand Down
Loading

0 comments on commit 0b89a42

Please sign in to comment.