Skip to content

Commit

Permalink
MDEV-20621 FULLTEXT INDEX activity causes InnoDB hang
Browse files Browse the repository at this point in the history
- fts_optimize_thread() uses dict_table_t object instead of table id.
So that it doesn't acquire dict_sys->mutex. It leads to remove the
hang of dict_sys->mutex between fts_optimize_thread() and other threads.

- in_queue to indicate whether the table is in fts_optimize_queue. It
is protected by fts_optimize_wq->mutex to avoid any race condition.

- fts_optimize_init() adds the fts table to the fts_optimize_wq
  • Loading branch information
Thirunarayanan authored and dr-m committed Oct 25, 2019
1 parent bd22650 commit a41d429
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 262 deletions.
7 changes: 6 additions & 1 deletion storage/innobase/dict/dict0load.cc
Expand Up @@ -46,6 +46,7 @@ Created 4/24/1996 Heikki Tuuri
#include "dict0priv.h"
#include "ha_prototypes.h" /* innobase_casedn_str() */
#include "fts0priv.h"
#include "fts0opt.h"

/** Following are the InnoDB system tables. The positions in
this array are referenced by enum dict_system_table_id. */
Expand Down Expand Up @@ -2548,8 +2549,12 @@ dict_load_table(
FTS */
fts_optimize_remove_table(table);
fts_free(table);
} else {
} else if (fts_optimize_wq) {
fts_optimize_add_table(table);
} else {
/* fts_optimize_thread is not started yet.
So make the table as non-evictable from cache. */
dict_table_move_from_lru_to_non_lru(table);
}
}

Expand Down
172 changes: 79 additions & 93 deletions storage/innobase/fts/fts0opt.cc
Expand Up @@ -34,14 +34,15 @@ Completed 2011/7/10 Sunny and Jimmy Yang
#include "ut0wqueue.h"
#include "srv0start.h"
#include "zlib.h"
#include "fts0opt.h"

#ifndef UNIV_NONINL
#include "fts0types.ic"
#include "fts0vlc.ic"
#endif

/** The FTS optimize thread's work queue. */
static ib_wqueue_t* fts_optimize_wq;
ib_wqueue_t* fts_optimize_wq;

/** The FTS vector to store fts_slot_t */
static ib_vector_t* fts_slots;
Expand Down Expand Up @@ -169,8 +170,8 @@ struct fts_encode_t {
/** We use this information to determine when to start the optimize
cycle for a table. */
struct fts_slot_t {
/** table identifier, or 0 if the slot is empty */
table_id_t table_id;
/** table, or NULL if the slot is unused */
dict_table_t* table;

/** whether this slot is being processed */
bool running;
Expand Down Expand Up @@ -2456,14 +2457,7 @@ fts_optimize_table_bk(
return(DB_SUCCESS);
}

dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);

if (!table) {
slot->last_run = now;
return DB_SUCCESS;
}

dict_table_t* table = slot->table;
dberr_t error;

if (fil_table_accessible(table)
Expand All @@ -2483,8 +2477,6 @@ fts_optimize_table_bk(
error = DB_SUCCESS;
}

dict_table_close(table, FALSE, FALSE);

return(error);
}
/*********************************************************************//**
Expand Down Expand Up @@ -2627,11 +2619,13 @@ UNIV_INTERN void fts_optimize_add_table(dict_table_t* table)

msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);

ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
mutex_enter(&fts_optimize_wq->mutex);

ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);

mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);

mutex_exit(&fts_optimize_wq->mutex);
}

/**********************************************************************//**
Expand All @@ -2648,7 +2642,7 @@ fts_optimize_remove_table(
fts_msg_del_t* remove;

/* if the optimize system not yet initialized, return */
if (!fts_optimize_is_init()) {
if (!fts_optimize_wq) {
return;
}

Expand All @@ -2660,12 +2654,10 @@ fts_optimize_remove_table(
return;
}

fts_t* fts = table->fts;
mutex_enter(&fts->bg_threads_mutex);
bool is_in_optimize_queue = fts->in_queue;
mutex_exit(&fts->bg_threads_mutex);
mutex_enter(&fts_optimize_wq->mutex);

if (!is_in_optimize_queue) {
if (!table->fts->in_queue) {
mutex_exit(&fts_optimize_wq->mutex);
return;
}

Expand All @@ -2681,15 +2673,17 @@ fts_optimize_remove_table(
remove->event = event;
msg->ptr = remove;

ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);

mutex_exit(&fts_optimize_wq->mutex);

os_event_wait(event);

os_event_free(event);

mutex_enter(&fts->bg_threads_mutex);
fts->in_queue = false;
mutex_exit(&fts->bg_threads_mutex);
ut_d(mutex_enter(&fts_optimize_wq->mutex));
ut_ad(!table->fts->in_queue);
ut_d(mutex_exit(&fts_optimize_wq->mutex));
}

/** Send sync fts cache for the table.
Expand All @@ -2700,10 +2694,9 @@ fts_optimize_request_sync_table(
dict_table_t* table)
{
fts_msg_t* msg;
table_id_t* table_id;

/* if the optimize system not yet initialized, return */
if (!fts_optimize_is_init()) {
if (!fts_optimize_wq) {
return;
}

Expand All @@ -2715,39 +2708,36 @@ fts_optimize_request_sync_table(
return;
}

msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);

table_id = static_cast<table_id_t*>(
mem_heap_alloc(msg->heap, sizeof(table_id_t)));
*table_id = table->id;
msg->ptr = table_id;
mutex_enter(&fts_optimize_wq->mutex);

ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);

mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);

mutex_exit(&fts_optimize_wq->mutex);
}

/** Add a table to fts_slots if it doesn't already exist. */
static bool fts_optimize_new_table(dict_table_t* table)
{
ut_ad(table);

ulint i;
fts_slot_t* slot;
fts_slot_t* empty = NULL;
const table_id_t table_id = table->id;
ut_ad(table_id);

/* Search for duplicates, also find a free slot if one exists. */
for (i = 0; i < ib_vector_size(fts_slots); ++i) {

slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));

if (!slot->table_id) {
if (!slot->table) {
empty = slot;
} else if (slot->table_id == table_id) {
} else if (slot->table == table) {
/* Already exists in our optimize queue. */
return(FALSE);
return false;
}
}

Expand All @@ -2756,37 +2746,36 @@ static bool fts_optimize_new_table(dict_table_t* table)

memset(slot, 0x0, sizeof(*slot));

slot->table_id = table->id;
slot->running = false;

return(TRUE);
slot->table = table;
return true;
}

/** Remove a table from fts_slots if it exists.
@param[in,out] table table to be removed from fts_slots */
static bool fts_optimize_del_table(const dict_table_t* table)
{
const table_id_t table_id = table->id;
ut_ad(table_id);

for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
fts_slot_t* slot;

slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));

if (slot->table_id == table_id) {
if (slot->table == table) {
if (fts_enable_diag_print) {
ib_logf(IB_LOG_LEVEL_INFO,
"FTS Optimize Removing table %s",
table->name);
}

slot->table_id = 0;
return(TRUE);
mutex_enter(&fts_optimize_wq->mutex);
slot->table->fts->in_queue = false;
mutex_exit(&fts_optimize_wq->mutex);

slot->table = NULL;
return true;
}
}

return(FALSE);
return false;
}

/**********************************************************************//**
Expand All @@ -2800,7 +2789,7 @@ static ulint fts_optimize_how_many()
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));
if (slot->table_id == 0) {
if (!slot->table) {
continue;
}

Expand Down Expand Up @@ -2836,22 +2825,14 @@ static bool fts_is_sync_needed()
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));

if (slot->table_id == 0) {
continue;
}

dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
if (!table) {
if (!slot->table) {
continue;
}

if (table->fts && table->fts->cache) {
total_memory += table->fts->cache->total_size;
if (slot->table->fts && slot->table->fts->cache) {
total_memory += slot->table->fts->cache->total_size;
}

dict_table_close(table, FALSE, FALSE);

if (total_memory > fts_max_total_cache_size) {
return(true);
}
Expand All @@ -2861,22 +2842,16 @@ static bool fts_is_sync_needed()
}

/** Sync fts cache of a table
@param[in] table_id table id */
static void fts_optimize_sync_table(table_id_t table_id)
@param[in,out] table table to be synced */
static void fts_optimize_sync_table(dict_table_t* table)
{
if (dict_table_t* table = dict_table_open_on_id(
table_id, FALSE, DICT_TABLE_OP_NORMAL)) {
if (fil_table_accessible(table)
&& table->fts && table->fts->cache) {
fts_sync_table(table, true, false, false);
}
if (fil_table_accessible(table)
&& table->fts && table->fts->cache) {
fts_sync_table(table, true, false, false);
}

DBUG_EXECUTE_IF(
"ib_optimize_wq_hang",
DBUG_EXECUTE_IF("ib_optimize_wq_hang",
os_thread_sleep(6000000););

dict_table_close(table, FALSE, FALSE);
}
}

/**********************************************************************//**
Expand Down Expand Up @@ -2918,7 +2893,7 @@ fts_optimize_thread(
ib_vector_get(fts_slots, current));

/* Handle the case of empty slots. */
if (slot->table_id) {
if (slot->table) {
slot->running = true;
fts_optimize_table_bk(slot);
}
Expand Down Expand Up @@ -2978,7 +2953,7 @@ fts_optimize_thread(
os_thread_sleep(300000););

fts_optimize_sync_table(
*static_cast<table_id_t*>(msg->ptr));
static_cast<dict_table_t*>(msg->ptr));
break;

default:
Expand All @@ -2997,8 +2972,8 @@ fts_optimize_thread(
fts_slot_t* slot = static_cast<fts_slot_t*>(
ib_vector_get(fts_slots, i));

if (table_id_t table_id = slot->table_id) {
fts_optimize_sync_table(table_id);
if (slot->table) {
fts_optimize_sync_table(slot->table);
}
}
}
Expand Down Expand Up @@ -3028,24 +3003,35 @@ fts_optimize_init(void)
ut_ad(!srv_read_only_mode);

/* For now we only support one optimize thread. */
ut_a(!fts_optimize_is_init());
ut_a(!fts_optimize_wq);

fts_optimize_wq = ib_wqueue_create();
ut_a(fts_optimize_wq != NULL);
last_check_sync_time = time(NULL);

os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
}
/* Add fts tables to fts slots which could be skipped
during dict_load_table() because fts_optimize_thread
wasn't even started. */
mutex_enter(&dict_sys->mutex);

/**********************************************************************//**
Check whether the work queue is initialized.
@return TRUE if optimze queue is initialized. */
UNIV_INTERN
ibool
fts_optimize_is_init(void)
/*======================*/
{
return(fts_optimize_wq != NULL);
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
table != NULL;
table = UT_LIST_GET_NEXT(table_LRU, table)) {

if (!table->fts || !dict_table_has_fts_index(table)) {
continue;
}

/* fts_optimize_thread is not started yet. So there is no
need to acqquire fts_optimize_wq->mutex for adding the fts
table to the fts slots. */
ut_ad(!table->can_be_evicted);
fts_optimize_new_table(table);
table->fts->in_queue = true;
}

mutex_exit(&dict_sys->mutex);
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
}

/**********************************************************************//**
Expand Down

0 comments on commit a41d429

Please sign in to comment.