Skip to content

Commit

Permalink
MW-322 - CTAS fix merged to 5.5-v25 branch
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Lindström <jan.lindstrom@mariadb.com>
  • Loading branch information
sjaakola authored and Jan Lindström committed Jul 20, 2017
1 parent a481de3 commit a4bc8db
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 45 deletions.
5 changes: 5 additions & 0 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5426,6 +5426,11 @@ void ha_wsrep_fake_trx_id(THD *thd)
DBUG_VOID_RETURN;
}

if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
{
WSREP_DEBUG("fake trx id skipped: %lu", thd->wsrep_ws_handle.trx_id);
DBUG_VOID_RETURN;
}
handlerton *hton= installed_htons[DB_TYPE_INNODB];
if (hton && hton->wsrep_fake_trx_id)
{
Expand Down
13 changes: 12 additions & 1 deletion sql/sql_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9506,11 +9506,22 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type,
I_P_List_iterator<TABLE, TABLE_share> it(share->free_tables);
#ifndef DBUG_OFF
if (remove_type == TDC_RT_REMOVE_ALL)
#ifdef WITH_WSREP
{
DBUG_ASSERT(share->used_tables.is_empty());
/* following assert may cause false posivive fire for CTAS */
if (thd->lex->sql_command != SQLCOM_CREATE_TABLE)
{
#endif /* WITH_WSREP */
{
DBUG_ASSERT(share->used_tables.is_empty());
}
#ifdef WITH_WSREP
}
}
else if (remove_type == TDC_RT_REMOVE_NOT_OWN ||
remove_type == TDC_RT_REMOVE_NOT_OWN_AND_MARK_NOT_USABLE)
#endif /* WITH_WSREP */

{
I_P_List_iterator<TABLE, TABLE_share> it2(share->used_tables);
while ((table= it2++))
Expand Down
29 changes: 29 additions & 0 deletions sql/sql_insert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4313,9 +4313,38 @@ bool select_create::send_eof()
*/
if (!table->s->tmp_table)
{
#ifdef WITH_WSREP
/*
append table level exclusive key for CTAS
*/
wsrep_key_arr_t key_arr= {0, 0};
wsrep_prepare_keys_for_isolation(thd,
create_table->db,
create_table->table_name,
table_list,
&key_arr);
int rcode = wsrep->append_key(
wsrep,
&thd->wsrep_ws_handle,
key_arr.keys, //&wkey,
key_arr.keys_len,
WSREP_KEY_EXCLUSIVE,
false);
wsrep_keys_free(&key_arr);
if (rcode) {
DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void", rcode);
return true;
}
/* If commit fails, we should be able to reset the OK status. */
thd->stmt_da->can_overwrite_status= TRUE;
#endif /* WITH_WSREP */
trans_commit_stmt(thd);
trans_commit_implicit(thd);
#ifdef WITH_WSREP
thd->stmt_da->can_overwrite_status= FALSE;
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if (thd->wsrep_conflict_state != NO_CONFLICT)
{
Expand Down
24 changes: 7 additions & 17 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -944,17 +944,7 @@ bool wsrep_sync_wait (THD* thd, uint mask)
return false;
}

/*
* Helpers to deal with TOI key arrays
*/
typedef struct wsrep_key_arr
{
wsrep_key_t* keys;
size_t keys_len;
} wsrep_key_arr_t;


static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
{
Expand Down Expand Up @@ -1019,11 +1009,11 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
}

/* Prepare key list from db/table and table_list */
static bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
const TABLE_LIST* table_list,
wsrep_key_arr_t* ka)
bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
const TABLE_LIST* table_list,
wsrep_key_arr_t* ka)
{
ka->keys= 0;
ka->keys_len= 0;
Expand Down Expand Up @@ -1554,7 +1544,7 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
}
else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
{
WSREP_DEBUG("DROP caused BF abort");
WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
Expand Down
11 changes: 11 additions & 0 deletions sql/wsrep_mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,15 @@ const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*);
wsrep_seqno_t wsrep_xid_seqno(const xid_t*);
extern "C" int wsrep_is_wsrep_xid(const void* xid);

typedef struct wsrep_key_arr
{
wsrep_key_t* keys;
size_t keys_len;
} wsrep_key_arr_t;
bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
const TABLE_LIST* table_list,
wsrep_key_arr_t* ka);
void wsrep_keys_free(wsrep_key_arr_t* key_arr);
#endif /* WSREP_MYSQLD_H */
62 changes: 35 additions & 27 deletions storage/innobase/handler/ha_innodb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6000,10 +6000,14 @@ ha_innobase::write_row(
prebuilt->table->flags,
user_thd);
#ifdef WITH_WSREP
if (!error_result &&
wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
wsrep_on(user_thd) &&
!wsrep_consistency_check(user_thd))
if (!error_result
&& wsrep_on(user_thd)
&& wsrep_thd_exec_mode(user_thd) == LOCAL_STATE
&& !wsrep_consistency_check(user_thd))
&& (sql_command != SQLCOM_CREATE_TABLE)
&& (sql_command != SQLCOM_LOAD ||
thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {

{
if (wsrep_append_keys(user_thd, false, record, NULL))
{
Expand Down Expand Up @@ -12788,8 +12792,10 @@ wsrep_innobase_kill_one_trx(
wsrep_thd_thread_id(thd),
victim_trx->id);

WSREP_DEBUG("Aborting query: %s",
(thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
WSREP_DEBUG("Aborting query: %s conf %d trx: %lu",
(thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
wsrep_thd_conflict_state(thd),
wsrep_thd_ws_handle(thd)->trx_id);

wsrep_thd_LOCK(thd);

Expand Down Expand Up @@ -12841,9 +12847,8 @@ wsrep_innobase_kill_one_trx(
} else {
rcode = wsrep->abort_pre_commit(
wsrep, bf_seqno,
(wsrep_trx_id_t)victim_trx->id
(wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id
);

switch (rcode) {
case WSREP_WARNING:
WSREP_DEBUG("cancel commit warning: %llu",
Expand Down Expand Up @@ -12973,14 +12978,16 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
my_bool signal)
{
DBUG_ENTER("wsrep_innobase_abort_thd");
trx_t* victim_trx = thd_to_trx(victim_thd);
trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;

trx_t* victim_trx = thd_to_trx(victim_thd);
trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;

ut_ad(!mutex_own(&kernel_mutex));
WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d",
wsrep_thd_query(bf_thd),
wsrep_thd_query(victim_thd),
wsrep_thd_conflict_state(victim_thd));

WSREP_DEBUG("abort transaction: BF: %s victim: %s",
wsrep_thd_query(bf_thd),
wsrep_thd_query(victim_thd));
ut_ad(!mutex_own(&kernel_mutex));

if (victim_trx)
{
Expand All @@ -13001,24 +13008,24 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
{
DBUG_ASSERT(hton == innodb_hton_ptr);
if (wsrep_is_wsrep_xid(xid)) {
mtr_t mtr;
mtr_start(&mtr);
trx_sysf_t* sys_header = trx_sysf_get(&mtr);
trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
mtr_commit(&mtr);
innobase_flush_logs(hton);
return 0;
} else {
return 1;
}
if (wsrep_is_wsrep_xid(xid)) {
mtr_t mtr;
mtr_start(&mtr);
trx_sysf_t* sys_header = trx_sysf_get(&mtr);
trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
mtr_commit(&mtr);
innobase_flush_logs(hton);
return 0;
} else {
return 1;
}
}

static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
{
DBUG_ASSERT(hton == innodb_hton_ptr);
trx_sys_read_wsrep_checkpoint(xid);
return 0;
trx_sys_read_wsrep_checkpoint(xid);
return 0;
}

static void
Expand All @@ -13030,6 +13037,7 @@ wsrep_fake_trx_id(
mutex_enter(&kernel_mutex);
trx_id_t trx_id = trx_sys_get_new_trx_id();
mutex_exit(&kernel_mutex);
WSREP_DEBUG("innodb fake trx id: %lu thd: %s", trx_id, wsrep_thd_query(thd));

(void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
}
Expand Down

0 comments on commit a4bc8db

Please sign in to comment.