Skip to content
Permalink
Browse files
MDEV-23399: Remove recv_writer_thread
Recovery works just fine without a separate thread whose only
task is to tell the page cleaner thread to do its job.

recv_sys_t::apply(): Flush the buffer pool at the end of each batch.

Reviewed by: Vladislav Vaintroub
  • Loading branch information
dr-m committed Oct 15, 2020
1 parent fa70c14 commit b535a79
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 230 deletions.
@@ -2535,41 +2535,8 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
" page cleaner thread priority can be changed."
" See the man page of setpriority().";
}
/* Signal that setpriority() has been attempted. */
os_event_set(recv_sys.flush_end);
#endif /* UNIV_LINUX */

do {
/* treat flushing requests during recovery. */
ulint n_flushed_lru = 0;
ulint n_flushed_list = 0;

os_event_wait(recv_sys.flush_start);

if (!recv_writer_thread_active) {
break;
}

if (recv_sys.flush_lru) {
/* Flush pages from end of LRU if required */
pc_request(0, LSN_MAX);
while (pc_flush_slot() > 0) {}
pc_wait_finished(&n_flushed_lru, &n_flushed_list);
} else {
/* Flush all pages */
do {
pc_request(ULINT_MAX, LSN_MAX);
while (pc_flush_slot() > 0) {}
} while (!pc_wait_finished(&n_flushed_lru,
&n_flushed_list));
}

os_event_reset(recv_sys.flush_start);
os_event_set(recv_sys.flush_end);
} while (recv_writer_thread_active);

os_event_wait(buf_flush_event);

ulint ret_sleep = 0;
ulint n_evicted = 0;
ulint n_flushed_last = 0;
@@ -525,7 +525,6 @@ static PSI_mutex_info all_innodb_mutexes[] = {
PSI_KEY(page_zip_stat_per_index_mutex),
PSI_KEY(purge_sys_pq_mutex),
PSI_KEY(recv_sys_mutex),
PSI_KEY(recv_writer_mutex),
PSI_KEY(redo_rseg_mutex),
PSI_KEY(noredo_rseg_mutex),
# ifdef UNIV_DEBUG
@@ -579,7 +578,6 @@ performance schema instrumented if "UNIV_PFS_THREAD"
is defined */
static PSI_thread_info all_innodb_threads[] = {
PSI_KEY(page_cleaner_thread),
PSI_KEY(recv_writer_thread),
PSI_KEY(trx_rollback_clean_thread),
PSI_KEY(thread_pool_thread)
};
@@ -33,9 +33,6 @@ Created 9/20/1997 Heikki Tuuri

#include <deque>

/** Is recv_writer_thread active? */
extern bool recv_writer_thread_active;

/** @return whether recovery is currently running. */
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)

@@ -226,16 +223,6 @@ struct recv_sys_t
/** whether recv_recover_page(), invoked from buf_page_read_complete(),
should apply log records*/
bool apply_log_recs;

ib_mutex_t writer_mutex;/*!< mutex coordinating
flushing between recv_writer_thread and
the recovery thread. */
os_event_t flush_start;/*!< event to activate
page cleaner threads */
os_event_t flush_end;/*!< event to signal that the page
cleaner has finished the request */
/** whether to flush from buf_pool.LRU instead of buf_pool.flush_list */
bool flush_lru;
/** whether recv_apply_hashed_log_recs() is running */
bool apply_batch_on;
byte* buf; /*!< buffer for parsing log records */
@@ -519,7 +519,6 @@ extern ulong srv_buf_dump_status_frequency;

# ifdef UNIV_PFS_THREAD
extern mysql_pfs_key_t page_cleaner_thread_key;
extern mysql_pfs_key_t recv_writer_thread_key;
extern mysql_pfs_key_t trx_rollback_clean_thread_key;
extern mysql_pfs_key_t thread_pool_thread_key;

@@ -66,7 +66,6 @@ extern mysql_pfs_key_t recalc_pool_mutex_key;
extern mysql_pfs_key_t page_cleaner_mutex_key;
extern mysql_pfs_key_t purge_sys_pq_mutex_key;
extern mysql_pfs_key_t recv_sys_mutex_key;
extern mysql_pfs_key_t recv_writer_mutex_key;
extern mysql_pfs_key_t rtr_active_mutex_key;
extern mysql_pfs_key_t rtr_match_mutex_key;
extern mysql_pfs_key_t rtr_path_mutex_key;
@@ -254,8 +254,6 @@ enum latch_level_t {

SYNC_TRX_I_S_RWLOCK,

SYNC_RECV_WRITER,

/** Level is varying. Only used with buffer pool page locks, which
do not have a fixed level, but instead have their level set after
the page is locked; see e.g. ibuf_bitmap_get_map_page(). */
@@ -291,7 +289,6 @@ enum latch_id_t {
LATCH_ID_PURGE_SYS_PQ,
LATCH_ID_RECALC_POOL,
LATCH_ID_RECV_SYS,
LATCH_ID_RECV_WRITER,
LATCH_ID_REDO_RSEG,
LATCH_ID_NOREDO_RSEG,
LATCH_ID_RW_LOCK_DEBUG,
@@ -985,9 +982,6 @@ struct sync_checker : public sync_check_functor_t
{
if (some_allowed) {
switch (level) {
case SYNC_RECV_WRITER:
/* This only happens in
recv_apply_hashed_log_recs. */
case SYNC_DICT:
case SYNC_DICT_OPERATION:
case SYNC_FTS_CACHE:
@@ -1555,14 +1555,6 @@ void logs_empty_and_mark_files_at_shutdown()
ut_ad(log_sys.is_initialised() || !srv_was_started);
ut_ad(fil_system.is_initialised() || !srv_was_started);

if (!srv_read_only_mode) {
if (recv_sys.flush_start) {
/* This is in case recv_writer_thread was never
started, or buf_flush_page_cleaner
failed to notice its termination. */
os_event_set(recv_sys.flush_start);
}
}
#define COUNT_INTERVAL 600U
#define CHECK_INTERVAL 100000U
os_thread_sleep(CHECK_INTERVAL);
@@ -90,13 +90,6 @@ is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
static lsn_t recv_max_page_lsn;

#ifdef UNIV_PFS_THREAD
mysql_pfs_key_t recv_writer_thread_key;
#endif /* UNIV_PFS_THREAD */

/** Is recv_writer_thread active? */
bool recv_writer_thread_active;

/** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */
struct log_phys_t : public log_rec_t
{
@@ -915,7 +908,6 @@ fil_name_process(char* name, ulint len, ulint space_id, bool deleted)
void recv_sys_t::close()
{
ut_ad(this == &recv_sys);
ut_ad(!recv_writer_thread_active);

if (is_initialised())
{
@@ -924,17 +916,13 @@ void recv_sys_t::close()
clear();
ut_d(mutex_exit(&mutex));

os_event_destroy(flush_start);
os_event_destroy(flush_end);

if (buf)
{
ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
buf= nullptr;
}

last_stored_lsn= 0;
mutex_free(&writer_mutex);
mutex_free(&mutex);
}

@@ -944,80 +932,13 @@ void recv_sys_t::close()
close_files();
}

/******************************************************************//**
recv_writer thread tasked with flushing dirty pages from the buffer
pools.
@return a dummy parameter */
extern "C"
os_thread_ret_t
DECLARE_THREAD(recv_writer_thread)(
/*===============================*/
void* arg MY_ATTRIBUTE((unused)))
/*!< in: a dummy parameter required by
os_thread_create */
{
my_thread_init();
ut_ad(!srv_read_only_mode);

#ifdef UNIV_PFS_THREAD
pfs_register_thread(recv_writer_thread_key);
#endif /* UNIV_PFS_THREAD */

#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "recv_writer thread running, id "
<< os_thread_pf(os_thread_get_curr_id());
#endif /* UNIV_DEBUG_THREAD_CREATION */

while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {

/* Wait till we get a signal to clean the LRU list.
Bounded by max wait time of 100ms. */
int64_t sig_count = os_event_reset(buf_flush_event);
os_event_wait_time_low(buf_flush_event, 100000, sig_count);

mutex_enter(&recv_sys.writer_mutex);

if (!recv_recovery_is_on()) {
mutex_exit(&recv_sys.writer_mutex);
break;
}

/* Flush pages from end of LRU if required */
os_event_reset(recv_sys.flush_end);
recv_sys.flush_lru = true;
os_event_set(recv_sys.flush_start);
os_event_wait(recv_sys.flush_end);

mutex_exit(&recv_sys.writer_mutex);
}

recv_writer_thread_active = false;

my_thread_end();
/* We count the number of threads in os_thread_exit().
A created thread should always use that to exit and not
use return() to exit. */
os_thread_exit();

OS_THREAD_DUMMY_RETURN;
}

/** Initialize the redo log recovery subsystem. */
void recv_sys_t::create()
{
ut_ad(this == &recv_sys);
ut_ad(!is_initialised());
ut_ad(!flush_start);
ut_ad(!flush_end);
mutex_create(LATCH_ID_RECV_SYS, &mutex);
mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);

if (!srv_read_only_mode) {
flush_start = os_event_create(0);
flush_end = os_event_create(0);
}

flush_lru = true;
apply_log_recs = false;
apply_batch_on = false;

@@ -1066,22 +987,14 @@ void recv_sys_t::debug_free()
{
ut_ad(this == &recv_sys);
ut_ad(is_initialised());
ut_ad(!recv_recovery_is_on());
mutex_enter(&mutex);

pages.clear();
ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);

buf= nullptr;

/* wake page cleaner up to progress */
if (!srv_read_only_mode)
{
ut_ad(!recv_recovery_is_on());
ut_ad(!recv_writer_thread_active);
os_event_reset(buf_flush_event);
os_event_set(flush_start);
}

mutex_exit(&mutex);
}

@@ -2759,35 +2672,28 @@ void recv_sys_t::apply(bool last_batch)
}
}

if (!last_batch)
if (last_batch)
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
else
{
/* Flush all the file pages to disk and invalidate them in buf_pool */
mutex_exit(&mutex);
mlog_init.reset();
log_mutex_exit();
}

/* Stop the recv_writer thread from issuing any LRU flush batches. */
mutex_enter(&writer_mutex);

/* Wait for any currently run batch to end. */
buf_flush_wait_LRU_batch_end();
ut_ad(!log_mutex_own());
mutex_exit(&mutex);

os_event_reset(flush_end);
flush_lru= false;
os_event_set(flush_start);
os_event_wait(flush_end);
buf_flush_wait_LRU_batch_end();
buf_flush_sync();

if (!last_batch)
{
buf_pool_invalidate();

/* Allow batches from recv_writer thread. */
mutex_exit(&writer_mutex);

log_mutex_enter();
mutex_enter(&mutex);
mlog_init.reset();
}
else
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);

mutex_enter(&mutex);

ut_d(after_apply= true);
clear();
@@ -3120,7 +3026,6 @@ recv_group_scan_log_recs(
recv_sys.recovered_lsn = *contiguous_lsn;
recv_sys.scanned_checkpoint_no = 0;
ut_ad(recv_max_page_lsn == 0);
ut_ad(last_phase || !recv_writer_thread_active);
mutex_exit(&recv_sys.mutex);

lsn_t start_lsn;
@@ -3574,11 +3479,6 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)

ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);

/* Spawn the background thread to flush dirty pages
from the buffer pools. */
recv_writer_thread_active = true;
os_thread_create(recv_writer_thread);

if (rescan) {
contiguous_lsn = checkpoint_lsn;

@@ -3659,35 +3559,11 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
}

/** Complete recovery from a checkpoint. */
void
recv_recovery_from_checkpoint_finish(void)
void recv_recovery_from_checkpoint_finish()
{
/* Make sure that the recv_writer thread is done. This is
required because it grabs various mutexes and we want to
ensure that when we enable sync_order_checks there is no
mutex currently held by any thread. */
mutex_enter(&recv_sys.writer_mutex);

/* Free the resources of the recovery system */
recv_sys.recovery_on = false;

/* By acquring the mutex we ensure that the recv_writer thread
won't trigger any more LRU batches. Now wait for currently
in progress batches to finish. */
buf_flush_wait_LRU_batch_end();

mutex_exit(&recv_sys.writer_mutex);

ulint count = 0;
while (recv_writer_thread_active) {
++count;
os_thread_sleep(100000);
if (srv_print_verbose_log && count > 600) {
ib::info() << "Waiting for recv_writer to"
" finish flushing of buffer pool";
count = 0;
}
}

recv_sys.recovery_on= false;

recv_sys.debug_free();

0 comments on commit b535a79

Please sign in to comment.