Skip to content

Commit

Permalink
Configurable storage engine for Netdata agents: step 3 (netdata#12892)
Browse files Browse the repository at this point in the history
* storage engine: add host context API

Add a new API to allow storage engines to manage host contexts.
* Replace single global context with per-engine global context
* Context is full managed by storage engines: a storage engine
  can use no context, a global engine context, per host contexts,
  or a mix of these.
* Currently, only dbengine uses contexts.
  Following the current logic, legacy hosts use their own context,
  while non-legacy hosts share the global context.

* storage engine: use empty function instead of null for context ops

* rrdhost: don't check return value for void call

* rrdhost: create context with host

* storage engine: move rrddim ops to rrddim_mem.{c,h}

* storage engine: don't use NULL for end-of-list marker

* storage engine: fallback to default engine
  • Loading branch information
aberaud committed Jun 16, 2022
1 parent 131e5f5 commit 100a12c
Show file tree
Hide file tree
Showing 19 changed files with 320 additions and 217 deletions.
3 changes: 3 additions & 0 deletions daemon/analytics.c
Expand Up @@ -2,6 +2,9 @@

#include "common.h"
#include "buildinfo.h"
#ifdef ENABLE_DBENGINE
#include "database/engine/rrdengineapi.h"
#endif

struct analytics_data analytics_data;
extern void analytics_exporting_connectors (BUFFER *b);
Expand Down
5 changes: 4 additions & 1 deletion daemon/global_statistics.c
@@ -1,6 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "common.h"
#ifdef ENABLE_DBENGINE
#include "database/engine/rrdengineapi.h"
#endif

#define GLOBAL_STATS_RESET_WEB_USEC_MAX 0x01

Expand Down Expand Up @@ -456,7 +459,7 @@ static void dbengine_statistics_charts(void) {

rrdhost_foreach_read(host) {
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && !rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
if (&multidb_ctx == host->rrdeng_ctx) {
if (host->rrdeng_ctx == host->rrdeng_ctx->engine->context) {
if (counted_multihost_db)
continue; /* Only count multi-host DB once */
counted_multihost_db = 1;
Expand Down
21 changes: 15 additions & 6 deletions daemon/main.c
Expand Up @@ -3,6 +3,10 @@
#include "common.h"
#include "buildinfo.h"
#include "static_threads.h"
#include "database/storage_engine.h"
#ifdef ENABLE_DBENGINE
#include "database/engine/rrdengineapi.h"
#endif

int netdata_zero_metrics_enabled;
int netdata_anonymous_statistics_enabled;
Expand Down Expand Up @@ -54,13 +58,18 @@ void netdata_cleanup_and_exit(int ret) {

// free the database
info("EXIT: freeing database memory...");
#ifdef ENABLE_DBENGINE
rrdeng_prepare_exit(&multidb_ctx);
#endif
for (STORAGE_ENGINE* eng = storage_engine_foreach_init(); eng; eng = storage_engine_foreach_next(eng)) {
if (eng->context)
eng->api.engine_ops.exit(eng->context);
}

rrdhost_free_all();
#ifdef ENABLE_DBENGINE
rrdeng_exit(&multidb_ctx);
#endif
for (STORAGE_ENGINE* eng = storage_engine_foreach_init(); eng; eng = storage_engine_foreach_next(eng)) {
if (eng->context) {
eng->api.engine_ops.destroy(eng->context);
eng->context = NULL;
}
}
}
sql_close_database();

Expand Down
7 changes: 5 additions & 2 deletions daemon/unit_test.c
@@ -1,6 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "common.h"
#ifdef ENABLE_DBENGINE
#include "database/engine/rrdengineapi.h"
#endif

static int check_number_printing(void) {
struct {
Expand Down Expand Up @@ -1157,7 +1160,7 @@ int run_test(struct test *test)
RRDSET *st = rrdset_create_localhost("netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest", NULL, 1
, test->update_every, RRDSET_TYPE_LINE);
RRDDIM *rd = rrddim_add(st, "dim1", NULL, test->multiplier, test->divisor, test->algorithm);

RRDDIM *rd2 = NULL;
if(test->feed2)
rd2 = rrddim_add(st, "dim2", NULL, test->multiplier, test->divisor, test->algorithm);
Expand All @@ -1173,7 +1176,7 @@ int run_test(struct test *test)

if(c) {
time_now += test->feed[c].microseconds;
fprintf(stderr, " > %s: feeding position %lu, after %0.3f seconds (%0.3f seconds from start), delta " CALCULATED_NUMBER_FORMAT ", rate " CALCULATED_NUMBER_FORMAT "\n",
fprintf(stderr, " > %s: feeding position %lu, after %0.3f seconds (%0.3f seconds from start), delta " CALCULATED_NUMBER_FORMAT ", rate " CALCULATED_NUMBER_FORMAT "\n",
test->name, c+1,
(float)test->feed[c].microseconds / 1000000.0,
(float)time_now / 1000000.0,
Expand Down
9 changes: 4 additions & 5 deletions database/engine/rrdengine.c
Expand Up @@ -1339,13 +1339,12 @@ void rrdeng_worker(void* arg)
*/
void rrdengine_main(void)
{
int ret;
struct rrdengine_instance *ctx;
STORAGE_ENGINE_INSTANCE* ctx;

sanity_check();
ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB);
if (ret) {
exit(ret);
ctx = rrdeng_init(storage_engine_get(RRD_MEMORY_MODE_DBENGINE), NULL);
if (!ctx) {
exit(1);
}
rrdeng_exit(ctx);
fprintf(stderr, "Hello world!");
Expand Down
2 changes: 2 additions & 0 deletions database/engine/rrdengine.h
Expand Up @@ -13,6 +13,7 @@
#include <openssl/evp.h>
#include "daemon/common.h"
#include "../rrd.h"
#include "../storage_engine.h"
#include "rrddiskprotocol.h"
#include "rrdenginelib.h"
#include "datafile.h"
Expand Down Expand Up @@ -226,6 +227,7 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele
#define QUIESCED (2) /* is set after all threads have finished running */

struct rrdengine_instance {
STORAGE_ENGINE_INSTANCE parent;
struct metalog_instance *metalog_ctx;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
Expand Down
99 changes: 57 additions & 42 deletions database/engine/rrdengineapi.c
@@ -1,8 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"

/* Default global database instance */
struct rrdengine_instance multidb_ctx;
#include "../storage_engine.h"

int db_engine_use_malloc = 0;
int default_rrdeng_page_cache_mb = 32;
Expand All @@ -11,9 +9,16 @@ int default_multidb_disk_quota_mb = 256;
/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;

void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id);
void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);

static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host)
{
return host->rrdeng_ctx;
return (struct rrdengine_instance*) host->rrdeng_ctx;
}

/* This UUID is not unique across hosts */
Expand Down Expand Up @@ -69,7 +74,7 @@ void rrdeng_metric_init(RRDDIM *rd)
pg_cache = &ctx->pg_cache;

rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
if (host != localhost && host->rrdeng_ctx == &multidb_ctx)
if (host != localhost && host->rrdeng_ctx->engine && host->rrdeng_ctx == host->rrdeng_ctx->engine->context)
is_multihost_child = 1;

uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
Expand Down Expand Up @@ -196,16 +201,12 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx = handle->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = handle->descr;
storage_number *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;

ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;

if (descr) {
/* Make alignment decisions */

Expand Down Expand Up @@ -820,10 +821,11 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
void rrdeng_get_37_statistics(STORAGE_ENGINE_INSTANCE* context, unsigned long long *array)
{
if (ctx == NULL)
if (context == NULL)
return;
struct rrdengine_instance* ctx = (struct rrdengine_instance*) context;

struct page_cache *pg_cache = &ctx->pg_cache;

Expand Down Expand Up @@ -874,19 +876,23 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
}

/*
* Returns 0 on success, negative on error
*/
int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
unsigned disk_space_mb)

STORAGE_ENGINE_INSTANCE*
rrdeng_init(STORAGE_ENGINE* eng, RRDHOST *host)
{
struct rrdengine_instance *ctx;
int error;
uint32_t max_open_files;

max_open_files = rlimit_nofile.rlim_cur / 4;
bool is_legacy = is_legacy_child(host->machine_guid);
if (!is_legacy && eng->context) {
if (host->rrd_memory_mode == eng->id && host->rrdeng_ctx == NULL) {
host->rrdeng_ctx = eng->context;
}
return eng->context;
}

/* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
uint32_t max_open_files = rlimit_nofile.rlim_cur / 4;
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
if (rrdeng_reserved_file_descriptors > max_open_files) {
error(
Expand All @@ -895,15 +901,18 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p

rrd_stat_atomic_add(&global_fs_errors, 1);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return UV_EMFILE;
return NULL;//UV_EMFILE;
}
char dbfiles_path[FILENAME_MAX + 1];

if (NULL == ctxp) {
ctx = &multidb_ctx;
memset(ctx, 0, sizeof(*ctx));
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
snprintfz(dbfiles_path, FILENAME_MAX, "%s/dbengine", host->cache_dir);
mkdir(dbfiles_path, 0775);

int page_cache_mb = default_rrdeng_page_cache_mb;
int disk_space_mb = default_rrdeng_disk_quota_mb;

ctx = callocz(1, sizeof(*ctx));
ctx->parent.engine = eng;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
Expand All @@ -926,6 +935,15 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */
ctx->host = host;

// Attach context as the global context
if (!is_legacy && !eng->context) {
eng->context = (STORAGE_ENGINE_INSTANCE *)ctx;
}
// Attach context as the host context
if (host->rrd_memory_mode == eng->id && host->rrdeng_ctx == NULL) {
host->rrdeng_ctx = eng->context;
}

memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
ctx->worker_config.ctx = ctx;
init_page_cache(ctx);
Expand All @@ -950,30 +968,30 @@ int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_p
goto error_after_rrdeng_worker;
}

return 0;
return (STORAGE_ENGINE_INSTANCE *)ctx;

error_after_rrdeng_worker:
finalize_rrd_files(ctx);
error_after_init_rrd_files:
free_page_cache(ctx);
if (ctx != &multidb_ctx) {
if ((STORAGE_ENGINE_INSTANCE *)ctx != eng->context) {
freez(ctx);
*ctxp = NULL;
}
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return UV_EIO;
return NULL;//UV_EIO;
}

/*
* Returns 0 on success, 1 on error
*/
int rrdeng_exit(struct rrdengine_instance *ctx)
void rrdeng_exit(STORAGE_ENGINE_INSTANCE* context)
{
struct rrdeng_cmd cmd;

if (NULL == ctx) {
return 1;
if (NULL == context) {
return;
}
struct rrdengine_instance* ctx = (struct rrdengine_instance*)context;

/* TODO: add page to page cache */
cmd.opcode = RRDENG_SHUTDOWN;
Expand All @@ -984,21 +1002,18 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
finalize_rrd_files(ctx);
//metalog_exit(ctx->metalog_ctx);
free_page_cache(ctx);

if (ctx != &multidb_ctx) {
freez(ctx);
}
freez(ctx);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
}

void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
void rrdeng_prepare_exit(STORAGE_ENGINE_INSTANCE* context)
{
struct rrdeng_cmd cmd;

if (NULL == ctx) {
if (NULL == context) {
return;
}
struct rrdengine_instance* ctx = (struct rrdengine_instance*)context;

completion_init(&ctx->rrdengine_completion);
cmd.opcode = RRDENG_QUIESCE;
Expand Down
17 changes: 4 additions & 13 deletions database/engine/rrdengineapi.h
Expand Up @@ -17,21 +17,13 @@ extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
extern int default_multidb_disk_quota_mb;
extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure;
extern struct rrdengine_instance multidb_ctx;

struct rrdeng_region_info {
time_t start_time;
int update_every;
unsigned points;
};

extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id);
extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle);
extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);

extern void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid);
extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid,
uuid_t *ret_uuid);
Expand All @@ -52,14 +44,13 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
extern void rrdeng_get_37_statistics(STORAGE_ENGINE_INSTANCE *ctx, unsigned long long *array);

/* must call once before using anything */
extern int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
unsigned disk_space_mb);
extern STORAGE_ENGINE_INSTANCE* rrdeng_init(STORAGE_ENGINE* eng, RRDHOST *host);

extern int rrdeng_exit(struct rrdengine_instance *ctx);
extern void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
extern void rrdeng_exit(STORAGE_ENGINE_INSTANCE *ctx);
extern void rrdeng_prepare_exit(STORAGE_ENGINE_INSTANCE *ctx);
extern int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t);

#endif /* NETDATA_RRDENGINEAPI_H */
14 changes: 14 additions & 0 deletions database/ram/rrddim_mem.c
@@ -1,6 +1,20 @@
// SPDX-License-Identifier: GPL-3.0-or-later

#include "rrddim_mem.h"
#include "../storage_engine.h"

STORAGE_ENGINE_INSTANCE* rrddim_storage_engine_instance_new(STORAGE_ENGINE* engine, RRDHOST *host) {
(void)engine; (void)host;
return NULL;
}

void rrddim_storage_engine_instance_exit(STORAGE_ENGINE_INSTANCE* context) {
(void)context;
}

void rrddim_storage_engine_instance_destroy(STORAGE_ENGINE_INSTANCE* context) {
(void)context;
}

// ----------------------------------------------------------------------------
// RRDDIM legacy data collection functions
Expand Down

0 comments on commit 100a12c

Please sign in to comment.