Skip to content

Commit

Permalink
Ported all remaining storage/innobase changes from lp:codership-mysql…
Browse files Browse the repository at this point in the history
…/5.6, up tp revision #4021

This is same level as wsrep_25.1 milestone
Note: stotage/xtaradb is not upgraded yet
  • Loading branch information
Seppo Jaakola committed Nov 27, 2013
1 parent 9642344 commit 447b19a
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 178 deletions.
79 changes: 51 additions & 28 deletions storage/innobase/handler/ha_innodb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
size_t* key_len);

extern handlerton * wsrep_hton;
extern handlerton * binlog_hton;
extern TC_LOG* tc_log;
extern void wsrep_cleanup_transaction(THD *thd);
#endif /* WITH_WSREP */
/** to protect innobase_open_files */
Expand Down Expand Up @@ -1175,6 +1175,10 @@ innobase_srv_conc_enter_innodb(
/*===========================*/
trx_t* trx) /*!< in: transaction handle */
{
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_thd_is_brute_force(trx->mysql_thd)) return;
#endif /* WITH_WSREP */
if (srv_thread_concurrency) {
if (trx->n_tickets_to_enter_innodb > 0) {

Expand Down Expand Up @@ -1209,6 +1213,10 @@ innobase_srv_conc_exit_innodb(
#ifdef UNIV_SYNC_DEBUG
ut_ad(!sync_thread_levels_nonempty_trx(trx->has_search_latch));
#endif /* UNIV_SYNC_DEBUG */
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) &&
wsrep_thd_is_brute_force(trx->mysql_thd)) return;
#endif /* WITH_WSREP */

/* This is to avoid making an unnecessary function call. */
if (trx->declared_to_be_inside_innodb
Expand Down Expand Up @@ -5484,6 +5492,10 @@ wsrep_innobase_mysql_sort(

tmp_length = charset->coll->strnxfrm(charset, str, str_length,
tmp_str, str_length);
/* Note: in MySQL 5.6:
tmp_length = charset->coll->strnxfrm(charset, str, str_length,
str_length, tmp_str, tmp_length, 0);
*/
DBUG_ASSERT(tmp_length == str_length);

break;
Expand Down Expand Up @@ -9177,7 +9189,7 @@ wsrep_dict_foreign_find_index(
ulint check_null);


extern ulint
extern dberr_t
wsrep_append_foreign_key(
/*===========================*/
trx_t* trx, /*!< in: trx */
Expand Down Expand Up @@ -9292,7 +9304,7 @@ wsrep_append_foreign_key(
(index && index->table_name) ? index->table_name :
"void table",
wsrep_thd_query(thd));
return rcode;
return DB_ERROR;
}
strncpy(cache_key,
(wsrep_protocol_version > 1) ?
Expand Down Expand Up @@ -9400,11 +9412,14 @@ wsrep_append_key(
WSREP_WARN("Appending row key failed: %s, %d",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void", rcode);
DBUG_RETURN(rcode);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
DBUG_RETURN(0);
}

extern void compute_md5_hash(char *digest, const char *buf, int len);
#define MD5_HASH compute_md5_hash

int
ha_innobase::wsrep_append_keys(
/*==================*/
Expand All @@ -9413,16 +9428,17 @@ ha_innobase::wsrep_append_keys(
const uchar* record0, /* in: row in MySQL format */
const uchar* record1) /* in: row in MySQL format */
{
int rcode;
DBUG_ENTER("wsrep_append_keys");

bool key_appended = false;
bool key_appended = false;
trx_t *trx = thd_to_trx(thd);

if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
wsrep_thd_thread_id(thd),
table_share->tmp_table,
(wsrep_thd_query(thd)) ?
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(0);
}
Expand All @@ -9438,20 +9454,19 @@ ha_innobase::wsrep_append_keys(
table, 0, key, key_info->key_length, record0, &is_null);

if (!is_null) {
int rcode = wsrep_append_key(
thd, trx, table_share, table, keyval,
rcode = wsrep_append_key(
thd, trx, table_share, table, keyval,
len, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped (proto 0): %s",
WSREP_DEBUG("NULL key skipped (proto 0): %s",
wsrep_thd_query(thd));
}
} else {
ut_a(table->s->keys <= 256);
uint i;

for (i=0; i<table->s->keys; ++i) {
uint len;
char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
Expand Down Expand Up @@ -9481,27 +9496,27 @@ ha_innobase::wsrep_append_keys(
key_appended = true;

len = wsrep_store_key_val_for_row(
table, i, key0, key_info->key_length,
table, i, key0, key_info->key_length,
record0, &is_null);
if (!is_null) {
int rcode = wsrep_append_key(
thd, trx, table_share, table,
rcode = wsrep_append_key(
thd, trx, table_share, table,
keyval0, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped: %s",
WSREP_DEBUG("NULL key skipped: %s",
wsrep_thd_query(thd));
}
if (record1) {
len = wsrep_store_key_val_for_row(
table, i, key1, key_info->key_length,
table, i, key1, key_info->key_length,
record1, &is_null);
if (!is_null && memcmp(key0, key1, len)) {
int rcode = wsrep_append_key(
thd, trx, table_share,
table,
rcode = wsrep_append_key(
thd, trx, table_share,
table,
keyval1, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
Expand Down Expand Up @@ -16658,15 +16673,20 @@ wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
abort();
}
int
wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal)
wsrep_innobase_kill_one_trx(void * const bf_thd_ptr,
const trx_t * const bf_trx,
trx_t *victim_trx, ibool signal)
{
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(victim_trx));
ut_ad(bf_thd_ptr);
ut_ad(victim_trx);

DBUG_ENTER("wsrep_innobase_kill_one_trx");
THD *bf_thd = (THD *)((bf_trx) ? bf_trx->mysql_thd : NULL);
THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
THD *thd = (THD *) victim_trx->mysql_thd;
int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;

if (!bf_thd) bf_thd = (bf_trx) ? (THD *)bf_trx->mysql_thd : NULL;

if (!thd) {
DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
WSREP_WARN("no THD for trx: %lu", victim_trx->id);
Expand Down Expand Up @@ -16869,12 +16889,15 @@ wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,

if (victim_trx)
{
mutex_enter(&trx_sys->mutex);
int rcode = wsrep_innobase_kill_one_trx(bf_trx, victim_trx,
signal);
mutex_exit(&trx_sys->mutex);
lock_mutex_enter();
trx_mutex_enter(victim_trx);
int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
victim_trx, signal);
trx_mutex_exit(victim_trx);
lock_mutex_exit();
wsrep_srv_conc_cancel_wait(victim_trx);
DBUG_RETURN(rcode);

DBUG_RETURN(rcode);
} else {
WSREP_DEBUG("victim does not have transaction");
wsrep_thd_LOCK(victim_thd);
Expand Down
3 changes: 2 additions & 1 deletion storage/innobase/include/ha_prototypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ innobase_casedn_str(
#ifdef WITH_WSREP
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
wsrep_innobase_kill_one_trx(void *thd_ptr,
const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
int wsrep_trx_order_before(void *thd1, void *thd2);
void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
Expand Down
3 changes: 3 additions & 0 deletions storage/innobase/include/trx0sys.ic
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ trx_id_t
trx_sys_get_new_trx_id(void)
/*========================*/
{
#ifndef WITH_WSREP
/* wsrep_fake_trx_id violates this assert */
ut_ad(mutex_own(&trx_sys->mutex));
#endif /* WITH_WSREP */

/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the following if
Expand Down
49 changes: 29 additions & 20 deletions storage/innobase/lock/lock0lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1583,38 +1583,41 @@ lock_rec_other_has_expl_req(

#ifdef WITH_WSREP
static void
wsrep_kill_victim(trx_t *trx, lock_t *lock) {
wsrep_kill_victim(const trx_t * const trx, const lock_t *lock) {
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(lock->trx));
int bf_this = wsrep_thd_is_brute_force(trx->mysql_thd);
int bf_other =
wsrep_thd_is_brute_force(lock->trx->mysql_thd);
if ((bf_this && !bf_other) ||
(bf_this && bf_other && wsrep_trx_order_before(
trx->mysql_thd, lock->trx->mysql_thd))) {

// if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
if (wsrep_debug)
fprintf(stderr, "WSREP: BF victim waiting\n");
/* cannot release lock, until our lock
is in the queue*/
} else if (lock->trx != trx) {
if (wsrep_log_conflicts) {
mutex_enter(&trx_sys->mutex);
if (bf_this)
fputs("\n*** Priority TRANSACTION:\n",
stderr);
else
fputs("\n*** Victim TRANSACTION:\n",
stderr);
trx_print(stderr, trx, 3000);
trx_print_latched(stderr, trx, 3000);

if (bf_other)
fputs("\n*** Priority TRANSACTION:\n",
stderr);
else
fputs("\n*** Victim TRANSACTION:\n",
stderr);
trx_print(stderr, lock->trx, 3000);
trx_print_latched(stderr, lock->trx, 3000);

mutex_exit(&trx_sys->mutex);
fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
stderr);

Expand All @@ -1624,8 +1627,8 @@ wsrep_kill_victim(trx_t *trx, lock_t *lock) {
lock_table_print(stderr, lock);
}
}
wsrep_innobase_kill_one_trx(
trx, lock->trx, TRUE);
wsrep_innobase_kill_one_trx(trx->mysql_thd,
(const trx_t*) trx, lock->trx, TRUE);
}
}
}
Expand Down Expand Up @@ -1800,7 +1803,8 @@ lock_t*
lock_rec_create(
/*============*/
#ifdef WITH_WSREP
lock_t* c_lock, /* conflicting lock */
lock_t* const c_lock, /* conflicting lock */
que_thr_t* thr,
#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
Expand Down Expand Up @@ -2048,12 +2052,14 @@ lock_rec_enqueue_waiting(
to be granted, note that we already own
the trx mutex. */
#ifdef WITH_WSREP
if (wsrep_on(trx->mysql_thd) && c_lock->trx->lock.was_chosen_as_deadlock_victim) {
return(DB_DEADLOCK);
if (wsrep_on(trx->mysql_thd) &&
trx->lock.was_chosen_as_deadlock_victim) {
return(DB_DEADLOCK);
}
/* Enqueue the lock request that will wait to be granted */
lock = lock_rec_create(c_lock, type_mode | LOCK_WAIT,
block, heap_no, index, trx, TRUE);
lock = lock_rec_create(
c_lock, thr,
type_mode | LOCK_WAIT, block, heap_no,
index, trx, TRUE);
#else
lock = lock_rec_create(
type_mode | LOCK_WAIT, block, heap_no,
Expand Down Expand Up @@ -2229,9 +2235,9 @@ lock_rec_add_to_queue(

somebody_waits:
#ifdef WITH_WSREP
return(lock_rec_create(
NULL, type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
return(lock_rec_create(NULL, NULL,
type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
#else
return(lock_rec_create(
type_mode, block, heap_no, index, trx,
Expand Down Expand Up @@ -2303,10 +2309,10 @@ lock_rec_lock_fast(
if (!impl) {
/* Note that we don't own the trx mutex. */
#ifdef WITH_WSREP
lock = lock_rec_create(
NULL, mode, block, heap_no, index, trx, FALSE);
lock = lock_rec_create(NULL, thr,
mode, block, heap_no, index, trx, FALSE);
#else
lock = lock_rec_create(
lock = lock_rec_create(
mode, block, heap_no, index, trx, FALSE);
#endif

Expand Down Expand Up @@ -2361,10 +2367,10 @@ lock_rec_lock_slow(
dict_index_t* index, /*!< in: index of record */
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
#ifdef WITH_WSREP
lock_t *c_lock;
lock_t* c_lock(NULL);
#endif
trx_t* trx;
lock_t* lock;
dberr_t err = DB_SUCCESS;

Expand Down Expand Up @@ -2430,6 +2436,9 @@ lock_rec_lock_slow(
ut_ad(lock == NULL);
enqueue_waiting:
#ifdef WITH_WSREP
/* c_lock is NULL here if jump to enqueue_waiting happened
but it's ok because lock is not NULL in that case and c_lock
is not used. */
err = lock_rec_enqueue_waiting(
c_lock, mode, block, heap_no,
lock, index, thr);
Expand Down
Loading

0 comments on commit 447b19a

Please sign in to comment.