Skip to content

Commit

Permalink
Fix race condition in dbengine (netdata#7533)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfundul committed Dec 16, 2019
1 parent c8c72f1 commit 53ab093
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 72 deletions.
12 changes: 8 additions & 4 deletions daemon/main.c
Expand Up @@ -340,10 +340,12 @@ int help(int exitcode) {
" -W unittest Run internal unittests and exit.\n\n"
#ifdef ENABLE_DBENGINE
" -W createdataset=N Create a DB engine dataset of N seconds and exit.\n\n"
" -W stresstest=A,B,C,D,E Run a DB engine stress test for A seconds,\n"
" -W stresstest=A,B,C,D,E,F\n"
" Run a DB engine stress test for A seconds,\n"
" with B writers and C readers, with a ramp up\n"
" time of D seconds for writers, a page cache\n"
" size of E MiB, and exit.\n\n"
" size of E MiB, an optional disk space limit\n"
" of F MiB and exit.\n\n"
#endif
" -W set section option value\n"
" set netdata.conf option from the command line.\n\n"
Expand Down Expand Up @@ -956,7 +958,7 @@ int main(int argc, char **argv) {
else if(strncmp(optarg, stresstest_string, strlen(stresstest_string)) == 0) {
char *endptr;
unsigned test_duration_sec = 0, dset_charts = 0, query_threads = 0, ramp_up_seconds = 0,
page_cache_mb = 0;
page_cache_mb = 0, disk_space_mb = 0;

optarg += strlen(stresstest_string);
test_duration_sec = (unsigned)strtoul(optarg, &endptr, 0);
Expand All @@ -968,8 +970,10 @@ int main(int argc, char **argv) {
ramp_up_seconds = (unsigned)strtoul(endptr + 1, &endptr, 0);
if (',' == *endptr)
page_cache_mb = (unsigned)strtoul(endptr + 1, &endptr, 0);
if (',' == *endptr)
disk_space_mb = (unsigned)strtoul(endptr + 1, &endptr, 0);
dbengine_stress_test(test_duration_sec, dset_charts, query_threads, ramp_up_seconds,
page_cache_mb);
page_cache_mb, disk_space_mb);
return 0;
}
#endif
Expand Down
83 changes: 56 additions & 27 deletions daemon/unit_test.c
Expand Up @@ -1723,7 +1723,7 @@ int test_dbengine(void)

default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;

debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
fprintf(stderr, "Initializing localhost with hostname 'unittest-dbengine'\n");
host = dbengine_rrdhost_find_or_create("unittest-dbengine");
if (NULL == host)
return 1;
Expand Down Expand Up @@ -1915,6 +1915,9 @@ static void generate_dbengine_chart(void *arg)
rrdset_done(st);
thread_info->time_max = time_current;
}
for (j = 0; j < DSET_DIMS; ++j) {
rrdeng_store_metric_finalize(rd[j]);
}
}

void generate_dbengine_dataset(unsigned history_seconds)
Expand All @@ -1935,7 +1938,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;

error_log_limit_unlimited();
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-dataset'");
fprintf(stderr, "Initializing localhost with hostname 'dbengine-dataset'\n");

host = dbengine_rrdhost_find_or_create("dbengine-dataset");
if (NULL == host)
Expand Down Expand Up @@ -1986,6 +1989,7 @@ struct dbengine_query_thread {
unsigned history_seconds; /* how far back in the past to go */
volatile long done; /* initialize to 0, set to 1 to stop thread */
unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */
uint8_t delete_old_data; /* if non zero then data are deleted when disk space is exhausted */

struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */
};
Expand All @@ -1995,7 +1999,7 @@ static void query_dbengine_chart(void *arg)
struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
const int DSET_CHARTS = thread_info->dset_charts;
const int DSET_DIMS = thread_info->dset_dims;
time_t time_after, time_before, time_min, time_max, duration;
time_t time_after, time_before, time_min, time_approx_min, time_max, duration;
int i, j, update_every = 1;
RRDSET *st;
RRDDIM *rd;
Expand All @@ -2015,6 +2019,13 @@ static void query_dbengine_chart(void *arg)

time_min = thread_info->time_present - thread_info->history_seconds + 1;
time_max = thread_info->chart_threads[i]->time_max;

if (thread_info->delete_old_data) {
/* A time window of twice the disk space is sufficient for compression space savings of up to 50% */
time_approx_min = time_max - (default_rrdeng_disk_quota_mb * 2 * 1024 * 1024) /
(((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number));
time_min = MAX(time_min, time_approx_min);
}
if (!time_max) {
time_before = time_after = time_min;
} else {
Expand All @@ -2030,52 +2041,64 @@ static void query_dbengine_chart(void *arg)
expected = unpack_storage_number(pack_storage_number((calculated_number) generatedv, SN_EXISTS));

if (unlikely(rd->state->query_ops.is_finished(&handle))) {
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
}
break;
}
n = rd->state->query_ops.next_metric(&handle, &time_retrieved);
if (SN_EMPTY_SLOT == n) {
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
}
break;
}
++thread_info->queried_metrics_nr;
value = unpack_storage_number(n);

same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if (!same) {
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected, value);
++thread_info->errors;
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT
", ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected, value);
++thread_info->errors;
}
}
if (time_retrieved != time_now) {
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
++thread_info->errors;
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr,
" DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
++thread_info->errors;
}
}
}
rd->state->query_ops.finalize(&handle);
} while(!thread_info->done);
}

void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB)
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB)
{
const unsigned DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
const unsigned HISTORY_SECONDS = 3600 * 24 * 365; /* 1 year of history */
const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 years of history */
RRDHOST *host = NULL;
struct dbengine_chart_thread **chart_threads;
struct dbengine_query_thread **query_threads;
unsigned i, j;
time_t time_start, time_end;

error_log_limit_unlimited();

if (!TEST_DURATION_SEC)
TEST_DURATION_SEC = 10;
if (!DSET_CHARTS)
Expand All @@ -2087,13 +2110,18 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi

default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
default_rrdeng_page_cache_mb = PAGE_CACHE_MB;
// Worst case for uncompressible data
default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) /
(1024 * 1024);
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
if (DISK_SPACE_MB) {
fprintf(stderr, "By setting disk space limit data are allowed to be deleted. "
"Data validation is turned off for this run.\n");
default_rrdeng_disk_quota_mb = DISK_SPACE_MB;
} else {
// Worst case for uncompressible data
default_rrdeng_disk_quota_mb =
(((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / (1024 * 1024);
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
}

error_log_limit_unlimited();
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-stress-test'");
fprintf(stderr, "Initializing localhost with hostname 'dbengine-stress-test'\n");

host = dbengine_rrdhost_find_or_create("dbengine-stress-test");
if (NULL == host)
Expand All @@ -2112,7 +2140,7 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi
"%u MiB of page cache.\n",
RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB);

time_start = now_realtime_sec();
time_start = now_realtime_sec() + HISTORY_SECONDS; /* move history to the future */
for (i = 0 ; i < DSET_CHARTS ; ++i) {
chart_threads[i]->host = host;
chart_threads[i]->chartname = "random";
Expand Down Expand Up @@ -2146,6 +2174,7 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi
for (j = 0 ; j < DSET_CHARTS ; ++j) {
query_threads[i]->chart_threads[j] = chart_threads[j];
}
query_threads[i]->delete_old_data = DISK_SPACE_MB ? 1 : 0;
assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i]));
}
sleep(TEST_DURATION_SEC);
Expand Down
2 changes: 1 addition & 1 deletion daemon/unit_test.h
Expand Up @@ -12,7 +12,7 @@ extern int unit_test_buffer(void);
extern int test_dbengine(void);
extern void generate_dbengine_dataset(unsigned history_seconds);
extern void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB);
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB);

#endif

Expand Down
34 changes: 21 additions & 13 deletions database/engine/pagecache.c
Expand Up @@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
uv_cond_broadcast(&pg_cache_descr->cond);
}

void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wake_up_waiters_unsafe(descr);
rrdeng_page_descr_mutex_unlock(ctx, descr);
}

/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
Expand Down Expand Up @@ -135,36 +142,34 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_

/*
* The caller must hold page descriptor lock.
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;

if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
if (exclusive_access)
pg_cache_descr->flags |= RRD_PAGE_LOCKED;
++pg_cache_descr->refcnt;

return 1;
}

/*
* The caller must hold page descriptor lock.
* Same return values as pg_cache_try_get_unsafe() without doing anything.
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;

if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
(exclusive_access && pg_cache_descr->refcnt)) {
if (!pg_cache_can_get_unsafe(descr, exclusive_access))
return 0;
}

if (exclusive_access)
pg_cache_descr->flags |= RRD_PAGE_LOCKED;
++pg_cache_descr->refcnt;

return 1;
}
Expand Down Expand Up @@ -429,8 +434,11 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
pg_cache_put(ctx, descr);
if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED)
rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
(void)sleep_usec(1000); /* 1 msec */
}
destroy:
freez(descr);
pg_cache_update_metric_times(page_index);
Expand Down
1 change: 1 addition & 0 deletions database/engine/pagecache.h
Expand Up @@ -148,6 +148,7 @@ struct page_cache { /* TODO: add statistics */
};

extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
Expand Down
10 changes: 5 additions & 5 deletions database/engine/rrdengine.c
Expand Up @@ -121,19 +121,19 @@ void read_extent_cb(uv_fs_t* req)
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
rrdeng_page_descr_mutex_unlock(ctx, descr);
pg_cache_replaceQ_insert(ctx, descr);
if (xt_io_descr->release_descr) {
pg_cache_put_unsafe(descr);
pg_cache_put(ctx, descr);
} else {
pg_cache_wake_up_waiters_unsafe(descr);
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
pg_cache_wake_up_waiters(ctx, descr);
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
Expand Down
10 changes: 5 additions & 5 deletions database/engine/rrdengineapi.c
Expand Up @@ -473,16 +473,16 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
next_page_time = handle->next_page_time * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
next_page_time = handle->next_page_time * USEC_PER_SEC;
}
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
Expand Down

0 comments on commit 53ab093

Please sign in to comment.