Skip to content

Commit

Permalink
Fixed issue on atomic writes on startup, removed incorrect assert.
Browse files Browse the repository at this point in the history
Fixed issue on file space extend when posix_fallocate is used.

Merged second iteration of multi-threaded flush code.
  • Loading branch information
Jan Lindström committed Feb 4, 2014
1 parent 8c5d5bc commit 55fab3d
Show file tree
Hide file tree
Showing 11 changed files with 513 additions and 1,143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ buffer_pool_bytes_dirty disabled
buffer_pool_pages_free disabled
buffer_pages_created disabled
buffer_pages_written disabled
buffer_index_pages_written disabled
buffer_pages_read disabled
buffer_data_reads disabled
buffer_data_written disabled
Expand Down Expand Up @@ -160,6 +161,13 @@ compress_pages_compressed disabled
compress_pages_decompressed disabled
compression_pad_increments disabled
compression_pad_decrements disabled
compress_saved disabled
compress_trim_sect512 disabled
compress_trim_sect4096 disabled
compress_pages_page_compressed disabled
compress_page_compressed_trim_op disabled
compress_page_compressed_trim_op_saved disabled
compress_pages_page_decompressed disabled
index_splits disabled
index_merges disabled
adaptive_hash_searches disabled
Expand Down
122 changes: 39 additions & 83 deletions storage/innobase/buf/buf0flu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Created 11/11/1995 Heikki Tuuri
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "os0file.h"
#include "os0sync.h"
#include "trx0sys.h"
#include "srv0mon.h"
#include "mysql/plugin.h"
Expand Down Expand Up @@ -1934,11 +1935,16 @@ buf_flush_LRU(
/* JAN: TODO: */
/*******************************************************************//**/
extern int is_pgcomp_wrk_init_done(void);
extern int pgcomp_flush_work_items(int buf_pool_inst, int *pages_flushed,
int flush_type, int min_n, unsigned long long lsn_limit);
extern int pgcomp_flush_work_items(
int buf_pool_inst,
int *pages_flushed,
enum buf_flush flush_type,
int min_n,
lsn_t lsn_limit);

#define MT_COMP_WATER_MARK 50

#ifdef UNIV_DEBUG
#include <time.h>
int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time)
{
Expand All @@ -1959,8 +1965,15 @@ int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_t

return 0;
}
#endif

static os_fast_mutex_t pgcomp_mtx;

void pgcomp_init(void)
{
os_fast_mutex_init(PFS_NOT_INSTRUMENTED, &pgcomp_mtx);
}

static pthread_mutex_t pgcomp_mtx = PTHREAD_MUTEX_INITIALIZER;
/*******************************************************************//**
Multi-threaded version of buf_flush_list
*/
Expand All @@ -1983,7 +1996,10 @@ pgcomp_buf_flush_list(
{
ulint i;
bool success = true;
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time;
#endif
int cnt_flush[MTFLUSH_MAX_WORKER];

if (n_processed) {
*n_processed = 0;
Expand All @@ -2001,96 +2017,34 @@ pgcomp_buf_flush_list(
#ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0);
#endif
if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) {
int cnt_flush[32];

//stack_trace();
pthread_mutex_lock(&pgcomp_mtx);
//gettimeofday(&p_start_time, 0x0);
//fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n);
pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit);

for (i = 0; i < srv_buf_pool_instances; i++) {
if (n_processed) {
*n_processed += cnt_flush[i];
}
if (cnt_flush[i]) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
cnt_flush[i]);

}
}

pthread_mutex_unlock(&pgcomp_mtx);
os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LIST,
min_n, lsn_limit);
os_fast_mutex_unlock(&pgcomp_mtx);

#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(success);
}
/* Flush to lsn_limit in all buffer pool instances */
for (i = 0; i < srv_buf_pool_instances; i++) {
buf_pool_t* buf_pool;
ulint page_count = 0;

buf_pool = buf_pool_from_array(i);

if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) {
/* We have two choices here. If lsn_limit was
specified then skipping an instance of buffer
pool means we cannot guarantee that all pages
up to lsn_limit has been flushed. We can
return right now with failure or we can try
to flush remaining buffer pools up to the
lsn_limit. We attempt to flush other buffer
pools based on the assumption that it will
help in the retry which will follow the
failure. */
success = false;

continue;
}

page_count = buf_flush_batch(
buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit);

buf_flush_end(buf_pool, BUF_FLUSH_LIST);

buf_flush_common(BUF_FLUSH_LIST, page_count);

if (n_processed) {
*n_processed += page_count;
*n_processed += cnt_flush[i];
}

if (page_count) {
if (cnt_flush[i]) {
MONITOR_INC_VALUE_CUMULATIVE(
MONITOR_FLUSH_BATCH_TOTAL_PAGE,
MONITOR_FLUSH_BATCH_COUNT,
MONITOR_FLUSH_BATCH_PAGES,
page_count);
cnt_flush[i]);
}
}

#if UNIV_DEBUG
#ifdef UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);

fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", (
min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
fprintf(stderr, "%s: [1] [*n_processed: (min:%lu)%lu %llu usec]\n",
__FUNCTION__, (min_n * srv_buf_pool_instances), *n_processed,
(unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
#endif
return(success);
}
#endif

/* JAN: TODO: END: */

/*******************************************************************//**
Expand Down Expand Up @@ -2292,18 +2246,21 @@ ulint
pgcomp_buf_flush_LRU_tail(void)
/*====================*/
{
#ifdef UNIV_DEBUG
struct timeval p_start_time, p_end_time, d_time;
#endif
ulint total_flushed=0, i=0;
int cnt_flush[32];

#if UNIV_DEBUG
#ifdef UNIV_DEBUG
gettimeofday(&p_start_time, 0x0);
#endif
assert(is_pgcomp_wrk_init_done());
ut_ad(is_pgcomp_wrk_init_done());

pthread_mutex_lock(&pgcomp_mtx);
os_fast_mutex_lock(&pgcomp_mtx);
pgcomp_flush_work_items(srv_buf_pool_instances,
cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
os_fast_mutex_unlock(&pgcomp_mtx);

for (i = 0; i < srv_buf_pool_instances; i++) {
if (cnt_flush[i]) {
Expand All @@ -2317,8 +2274,6 @@ pgcomp_buf_flush_LRU_tail(void)
}
}

pthread_mutex_unlock(&pgcomp_mtx);

#if UNIV_DEBUG
gettimeofday(&p_end_time, 0x0);
timediff(&p_end_time, &p_start_time, &d_time);
Expand Down Expand Up @@ -2894,6 +2849,7 @@ buf_flush_validate(
}

#endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */
#endif /* !UNIV_HOTBACKUP */


#ifdef UNIV_DEBUG
Expand Down
1 change: 0 additions & 1 deletion storage/innobase/fil/fil0fil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,6 @@ fil_space_create(
DBUG_EXECUTE_IF("fil_space_create_failure", return(false););

ut_a(fil_system);
ut_a(fsp_flags_is_valid(flags));

/* Look for a matching tablespace and if found free it. */
do {
Expand Down
4 changes: 4 additions & 0 deletions storage/innobase/include/dict0dict.ic
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,10 @@ dict_tf_set(
if (awrites != ATOMIC_WRITES_DEFAULT) {
*flags |= (atomic_writes << DICT_TF_POS_ATOMIC_WRITES);
ut_ad(dict_tf_get_atomic_writes(*flags) == awrites);
}

if (awrites == ATOMIC_WRITES_ON ||
(awrites == ATOMIC_WRITES_DEFAULT && srv_use_atomic_writes )) {
*flags |= (1 << DICT_TF_POS_ATOMIC_BLOBS);
}

Expand Down
4 changes: 4 additions & 0 deletions storage/innobase/include/srv0srv.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ extern my_bool srv_use_atomic_writes;
/* If this flag IS TRUE, then we use lz4 to compress/decompress pages */
extern my_bool srv_use_lz4;

/* Number of flush threads */
#define MTFLUSH_MAX_WORKER 64
extern ulint srv_mtflush_threads;

#ifdef __WIN__
extern ibool srv_use_native_conditions;
#endif /* __WIN__ */
Expand Down
Loading

0 comments on commit 55fab3d

Please sign in to comment.