Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement snapshotting at shutdown #363

Merged
merged 35 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
23b3d3f
Implement a mechanism to store and get the type of a iouring fd (stor…
danielealbano Apr 22, 2023
8f03a03
Ensure that the networking code mark the request to enqueue an fd to …
danielealbano Apr 22, 2023
7749aa2
Introduce a new snapshot_at_shutdown parameter under database.snapshots
danielealbano Apr 22, 2023
1c539ea
Rework the worker event loop to initialize and terminate the networki…
danielealbano Apr 22, 2023
4be5c8d
Do not report certain kind of errors if the worker is terminating
danielealbano Apr 22, 2023
c081f69
To ensure that the service threads terminate after the worker split t…
danielealbano Apr 22, 2023
b82f3e0
Fix memory leak
danielealbano Apr 22, 2023
7b360e4
The shutdown command can now simply control the behavior of snapshot_…
danielealbano Apr 22, 2023
ee5b7fd
Update headers after refactoring
danielealbano Apr 22, 2023
3fab6a5
Improve logging for the redis snapshot load
danielealbano Apr 22, 2023
9a86c45
Reduce the wait time in the wait loop for the epoch gc
danielealbano Apr 22, 2023
23828c9
Rework the epoch gc shutdown code to ensure they will always be termi…
danielealbano Apr 22, 2023
c9ae094
Update the tests for the signal handler thread to match the new imple…
danielealbano Apr 22, 2023
b30bea5
Fix headers
danielealbano Apr 22, 2023
665f0fd
Drop unused variable
danielealbano Apr 22, 2023
04fbda5
Update comments
danielealbano Apr 22, 2023
7f2d54d
Increase the wait time to 10ms to ensure that a snapshot will run
danielealbano Apr 22, 2023
1d5a272
Don't generate snapshots at the shutdown by default during the tests
danielealbano Apr 22, 2023
7c09f5f
Implement a mechanism to terminate running worker fibers
danielealbano Apr 22, 2023
2113eb5
Terminate the fiber that generate the snapshots before marking a snap…
danielealbano Apr 22, 2023
4bfae27
Introduce an iteration sequence number to track how many times a snap…
danielealbano Apr 22, 2023
3b3cd2f
Implement a test for SHUTDOWN NOSAVE and rework how the operations ar…
danielealbano Apr 22, 2023
f988224
Use the iteration to detect if a new backup was generated, don't test…
danielealbano Apr 22, 2023
e0ed252
Store the worker internal interface in a new field
danielealbano Apr 22, 2023
420f503
Drop functions that are not required anymore
danielealbano Apr 22, 2023
2941150
Update tests after new parameter introduced
danielealbano Apr 22, 2023
f62db7f
Update test-program after refactoring of the termination switches
danielealbano Apr 22, 2023
19d8723
Ensure the available cpus for the affinity is set to null during tests
danielealbano Apr 22, 2023
f1557f8
No reason to wait this long for the iterations of the internal wait loop
danielealbano Apr 22, 2023
e2ce842
No reason to wait
danielealbano Apr 22, 2023
82954aa
Don't access the general config to check or change the snapshot on sh…
danielealbano Apr 22, 2023
722d828
Doesn't really matter if close fails, the internal checks have been r…
danielealbano Apr 22, 2023
1e49f00
Refactor the Prometheus tests after the updates to the termination me…
danielealbano Apr 22, 2023
5ecb1da
Fix signal handler tests after termination refactoring
danielealbano Apr 22, 2023
0fdc880
No reason to wait half a second to exit the thread if the termination…
danielealbano Apr 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etc/cachegrand.yaml.skel
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ database:
path: /var/lib/cachegrand/dump.rdb
# The interval between the snapshots, the allowed units are s, m, h, if not specified the default is seconds.
interval: 30m
# If enabled, a snapshot will be taken at shutdown
snapshot_at_shutdown: true
# The number of keys that must be changed before a snapshot is taken, 0 means no limit
min_keys_changed: 1
# The amount of data that must be changed before a snapshot is taken, the allowed units are b, k, m, g, 0 means no
Expand Down
1 change: 1 addition & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ struct config_database_snapshots {
char *path;
char *interval_str;
char *min_data_changed_str;
bool snapshot_at_shutdown;
int64_t interval_ms;
int64_t min_keys_changed;
int64_t min_data_changed;
Expand Down
3 changes: 3 additions & 0 deletions src/config_cyaml_schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ const cyaml_schema_field_t config_database_snapshots_schema[] = {
CYAML_FIELD_STRING_PTR(
"interval", CYAML_FLAG_DEFAULT,
config_database_snapshots_t, interval_str, 0, 20),
CYAML_FIELD_BOOL(
"snapshot_at_shutdown", CYAML_FLAG_DEFAULT,
config_database_snapshots_t, snapshot_at_shutdown),
CYAML_FIELD_UINT(
"min_keys_changed", CYAML_FLAG_DEFAULT | CYAML_FLAG_OPTIONAL,
config_database_snapshots_t, min_keys_changed),
Expand Down
2 changes: 1 addition & 1 deletion src/epoch_gc_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
extern "C" {
#endif

#define EPOCH_GC_THREAD_LOOP_WAIT_TIME_MS 5000
#define EPOCH_GC_THREAD_LOOP_WAIT_TIME_MS 3
#define EPOCH_GC_THREAD_LOG_PRODUCER_PREFIX_TEMPLATE "[epoch gc %d]"
#define EPOCH_GC_THREAD_NAME_TEMPLATE "epoch_gc_%d"

Expand Down
15 changes: 3 additions & 12 deletions src/module/redis/command/module_redis_command_shutdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,9 @@ MODULE_REDIS_COMMAND_FUNCPTR_COMMAND_END(shutdown) {
module_redis_command_shutdown_context_t *context = connection_context->command.context;

if (context->nosave_save.value.save_save.has_token) {
uint64_t start_time_ms;
// Check if the snapshot is already running
if (!module_redis_command_helper_save_is_running(connection_context)) {
start_time_ms = module_redis_command_helper_save_request(connection_context);
} else {
start_time_ms = clock_monotonic_int64_ms() - 1;
}

if (!module_redis_command_helper_save_wait(connection_context, start_time_ms)) {
// If the operation fails it means that something really bad just happened, skip directly to the termination
goto end;
}
connection_context->db->config->snapshot.snapshot_at_shutdown = true;
} else if (context->nosave_save.value.nosave_nosave.has_token) {
connection_context->db->config->snapshot.snapshot_at_shutdown = false;
}

if (!module_redis_connection_send_ok(connection_context)) {
Expand Down
4 changes: 3 additions & 1 deletion src/module/redis/snapshot/module_redis_snapshot_load.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,16 @@ bool module_redis_snapshot_load(
counter_expires = 0;
counter_expires_expired = 0;

LOG_I(TAG, "Loading the snapshot file <%s>", path);

// Load the data
module_redis_snapshot_load_data(snapshot_channel);

// Close the snapshot file
storage_close(snapshot_channel);

// Report the results
LOG_I(TAG, "Loaded the snapshot file <%s>", path);
LOG_I(TAG, "Snapshot loaded");
LOG_I(TAG, "Found:");
LOG_I(TAG, "> %lu string(s)", counter_strings);
LOG_I(TAG, "> %lu value(s) with expirations", counter_expires - counter_expires_expired);
Expand Down
9 changes: 3 additions & 6 deletions src/network/io/network_io_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,9 @@ bool network_io_common_socket_close(
ret = false;
}

// Try to close the socket anyway if the shutdown fails
if (close(fd)) {
LOG_E(TAG, "Error closing the socket with fd <%d>", fd);
LOG_E_OS_ERROR(TAG);
ret = false;
}
// If the close fails it means the socket is already closed, we don't really need to care
close(fd);
ret = true;

return ret;
}
Expand Down
31 changes: 13 additions & 18 deletions src/program.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
#define TAG "program"

static program_context_t program_context_global = { 0 };
bool_volatile_t program_terminate_event_loop = false;

static char* config_path_default = CACHEGRAND_CONFIG_PATH_DEFAULT;

Expand All @@ -85,13 +84,13 @@ void program_reset_context() {
}

signal_handler_thread_context_t* program_signal_handler_thread_initialize(
bool_volatile_t *terminate_event_loop,
program_context_t *program_context) {
signal_handler_thread_context_t *signal_handler_thread_context;

program_context->signal_handler_thread_context = signal_handler_thread_context =
xalloc_alloc_zero(sizeof(signal_handler_thread_context_t));
signal_handler_thread_context->terminate_event_loop = terminate_event_loop;
signal_handler_thread_context->workers_terminate_event_loop = &program_context->workers_terminate_event_loop;
signal_handler_thread_context->program_terminate_event_loop = &program_context->program_terminate_event_loop;

LOG_V(TAG, "Creating signal handler thread");

Expand All @@ -110,21 +109,21 @@ signal_handler_thread_context_t* program_signal_handler_thread_initialize(
}

bool program_epoch_gc_workers_initialize(
bool_volatile_t *terminate_event_loop,
program_context_t *program_context) {

int epoch_gc_workers_count = (int)EPOCH_GC_OBJECT_TYPE_MAX;
program_context->epoch_gc_workers_count = epoch_gc_workers_count;
program_context->epoch_gc_workers_context =
xalloc_alloc_zero(epoch_gc_workers_count * sizeof(epoch_gc_worker_context_t));

LOG_V(TAG, "Creating epoch gc workers");

for(int object_type_index = 0; object_type_index < epoch_gc_workers_count; object_type_index++) {
for(int object_type_index = 0; object_type_index < program_context->epoch_gc_workers_count; object_type_index++) {
epoch_gc_worker_context_t *epoch_gc_worker_context =
&program_context->epoch_gc_workers_context[object_type_index];

epoch_gc_worker_context->epoch_gc = epoch_gc_init(object_type_index);
epoch_gc_worker_context->terminate_event_loop = terminate_event_loop;
epoch_gc_worker_context->terminate_event_loop = &program_context->program_terminate_event_loop;
epoch_gc_worker_context->stats.collected_objects = 0;

if (pthread_create(
Expand All @@ -148,7 +147,6 @@ void program_workers_initialize_count(
}

worker_context_t* program_workers_initialize_context(
bool_volatile_t *terminate_event_loop,
program_context_t *program_context) {
timespec_t started_on_timestamp = { 0 };
worker_context_t *workers_context;
Expand All @@ -168,7 +166,7 @@ worker_context_t* program_workers_initialize_context(
&started_on_timestamp,
program_context->workers_count,
worker_index,
terminate_event_loop,
&program_context->workers_terminate_event_loop,
&program_context->storage_db_loaded,
program_context->config,
program_context->db);
Expand Down Expand Up @@ -220,10 +218,6 @@ bool program_workers_ensure_started(
return true;
}

bool* program_get_terminate_event_loop() {
return (bool*)&program_terminate_event_loop;
}

void program_request_terminate(
bool_volatile_t *terminate_event_loop) {
*terminate_event_loop = true;
Expand Down Expand Up @@ -265,7 +259,7 @@ void program_wait_loop(
while(
!program_should_terminate(terminate_event_loop) &&
!program_has_aborted_workers(workers_context, workers_count)) {
usleep(WORKER_LOOP_MAX_WAIT_TIME_MS * 1000);
usleep(WORKER_LOOP_MAX_WAIT_TIME_MS * 100);
}

LOG_V(TAG, "Wait loop terminated");
Expand Down Expand Up @@ -617,6 +611,7 @@ bool program_config_setup_storage_db(
config->snapshot.rotation_max_files = program_context->config->database->snapshots->rotation != NULL
? program_context->config->database->snapshots->rotation->max_files
: 0;
config->snapshot.snapshot_at_shutdown = program_context->config->database->snapshots->snapshot_at_shutdown;
}

if (program_context->config->database->backend == CONFIG_DATABASE_BACKEND_FILE) {
Expand Down Expand Up @@ -689,6 +684,9 @@ void program_cleanup(
xalloc_free(program_context->workers_context);
}

// Ensure that that everything will start to shutdown after the workers have terminated
program_request_terminate(&program_context->program_terminate_event_loop);

if (program_context->signal_handler_thread_context) {
program_signal_handler_thread_cleanup(
program_context->signal_handler_thread_context);
Expand Down Expand Up @@ -804,21 +802,18 @@ int program_main(

// Initialize the epoch gc workers
if (program_signal_handler_thread_initialize(
&program_terminate_event_loop,
program_context) == NULL) {
goto end;
}

// Initialize the epoch gc workers
if (program_epoch_gc_workers_initialize(
&program_terminate_event_loop,
program_context) == false) {
goto end;
}

// Initialize the workers
if (program_workers_initialize_context(
&program_terminate_event_loop,
program_context) == NULL) {
goto end;
}
Expand All @@ -835,14 +830,14 @@ int program_main(
program_wait_loop(
program_context->workers_context,
program_context->workers_count,
&program_terminate_event_loop);
&program_context->workers_terminate_event_loop);

return_res = 0;

end:
// The program_request_terminate is invoked to be sure that if the termination is being triggered because a worker
// thread is aborting, every other thread is also notified and will terminate the execution
program_request_terminate(&program_terminate_event_loop);
program_request_terminate(&program_context->workers_terminate_event_loop);

LOG_I(TAG, "Terminating");

Expand Down
6 changes: 2 additions & 4 deletions src/program.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ struct program_context {
uint32_t epoch_gc_workers_count;
epoch_gc_worker_context_t *epoch_gc_workers_context;
bool_volatile_t storage_db_loaded;
bool_volatile_t workers_terminate_event_loop;
bool_volatile_t program_terminate_event_loop;
};

program_context_t *program_get_context();
Expand All @@ -30,7 +32,6 @@ bool program_config_setup_storage_db(
program_context_t* program_context);

bool program_epoch_gc_workers_initialize(
bool_volatile_t *terminate_event_loop,
program_context_t *program_context);

void program_epoch_gc_workers_cleanup(
Expand All @@ -41,11 +42,8 @@ void program_workers_initialize_count(
program_context_t *program_context);

worker_context_t* program_workers_initialize_context(
volatile bool *terminate_event_loop,
program_context_t *program_context);

bool* program_get_terminate_event_loop();

void program_request_terminate(
volatile bool *terminate_event_loop);

Expand Down
6 changes: 3 additions & 3 deletions src/signal_handler_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ void signal_handler_thread_handle_signal(

LOG_I(
TAG,
"Received signal <%s (%d)>, requesting loop termination",
"Received signal <%s (%d)>, requesting workers termination",
signal_name,
signal_number);

*signal_handler_thread_internal_context->terminate_event_loop = true;
*signal_handler_thread_internal_context->workers_terminate_event_loop = true;
MEMORY_FENCE_STORE();
}

bool signal_handler_thread_should_terminate(
signal_handler_thread_context_t *context) {
MEMORY_FENCE_LOAD();
return *context->terminate_event_loop;
return *context->program_terminate_event_loop;
}

void signal_handler_thread_register_signal_handlers(
Expand Down
5 changes: 3 additions & 2 deletions src/signal_handler_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
extern "C" {
#endif

#define SIGNAL_HANDLER_THREAD_LOOP_MAX_WAIT_TIME_MS 500
#define SIGNAL_HANDLER_THREAD_LOOP_MAX_WAIT_TIME_MS 3
#define SIGNAL_HANDLER_THREAD_LOG_PRODUCER_PREFIX_FORMAT_STRING "[signal handler thread]"
#define SIGNAL_HANDLER_THREAD_NAME "signal_handler"

Expand All @@ -17,7 +17,8 @@ extern uint8_t signal_handler_thread_managed_signals_count;

struct signal_handler_thread_context {
pthread_t pthread;
volatile bool *terminate_event_loop;
volatile bool *workers_terminate_event_loop;
volatile bool *program_terminate_event_loop;
};

void signal_handler_thread_handle_signal(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/db/storage_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ storage_db_t* storage_db_new(
}

// Import the hard and soft limits for the keys eviction
memcpy(&db->limits, &config->limits, sizeof(storage_db_limits_t));
memcpy(&db->limits, &config->limits, sizeof(storage_db_config_limits_t));

return db;

Expand Down
18 changes: 9 additions & 9 deletions src/storage/db/storage_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct storage_db_keys_eviction_kv_list_entry {
} __attribute__((__aligned__(16)));
typedef struct storage_db_keys_eviction_kv_list_entry storage_db_keys_eviction_kv_list_entry_t;

struct storage_db_limits {
struct storage_db_config_limits {
struct {
uint64_t soft_limit;
uint64_t hard_limit;
Expand All @@ -48,9 +48,7 @@ struct storage_db_limits {
hashtable_bucket_count_t hard_limit;
} keys_count;
};
typedef struct storage_db_limits storage_db_limits_t;

typedef int (*storage_db_keys_eviction_list_sort_cb)(const void *, const void*);
typedef struct storage_db_config_limits storage_db_config_limits_t;

struct storage_db_keys_eviction_list_entry {
uint16_t key_size;
Expand Down Expand Up @@ -83,23 +81,24 @@ enum storage_db_entry_index_value_type {
};
typedef enum storage_db_entry_index_value_type storage_db_entry_index_value_type_t;

struct storage_db_snapshot {
struct storage_db_config_snapshot {
bool enabled;
uint64_t interval_ms;
uint64_t min_keys_changed;
uint64_t min_data_changed;
char *path;
uint64_t rotation_max_files;
bool snapshot_at_shutdown;
};
typedef struct storage_db_snapshot storage_db_snapshot_t;
typedef struct storage_db_config_snapshot storage_db_config_snapshot_t;

// general config parameters to initialize and use the internal storage db (e.g. storage backend, amount of memory for
// the hashtable, other optional stuff)
typedef struct storage_db_config storage_db_config_t;
struct storage_db_config {
storage_db_backend_type_t backend_type;
storage_db_limits_t limits;
storage_db_snapshot_t snapshot;
storage_db_config_limits_t limits;
storage_db_config_snapshot_t snapshot;
union {
struct {
char *basedir_path;
Expand Down Expand Up @@ -158,6 +157,7 @@ struct storage_db {
} shards;
struct {
spinlock_lock_t spinlock;
uint64_t iteration;
uint64_volatile_t next_run_time_ms;
uint64_volatile_t start_time_ms;
uint64_volatile_t end_time_ms;
Expand All @@ -181,7 +181,7 @@ struct storage_db {
storage_db_config_t *config;
storage_db_worker_t *workers;
uint16_t workers_count;
storage_db_limits_t limits;
storage_db_config_limits_t limits;
slots_bitmap_mpmc_t *counters_slots_bitmap;
storage_db_counters_t counters[STORAGE_DB_WORKERS_MAX];
};
Expand Down
Loading