Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Forgotten files for 'dump-to-disk' feature

  • Loading branch information...
commit 3322f4b3e398d2f3c2d84de77943fc4e56c7db11 1 parent 76d73aa
@dkrotx authored
View
1  Makefile.am
@@ -16,6 +16,7 @@ memcached_SOURCES = memcached.c memcached.h \
thread.c daemon.c \
stats.c stats.h \
util.c util.h \
+ dd.c dd.h \
trace.h cache.h sasl_defs.h
if BUILD_CACHE
View
13 README
@@ -1,3 +1,16 @@
+-------------------------------------------------------------------------------
+Memcached-dd
+-------------------------------------------------------------------------------
+
+Fork of memcached providing dump to disk feature.
+Comparing to other solution(s) you may heard about, it's pure memcached with only patch. Not noSQL wich just support memcached protocol (Tarantool or MemBase).
+
+Usage is straightforward: just add `-F file' option to command-line. Memcached will read this `file' at start and write to file.tmp when SIGUSR2 received. Then (after successfull write), it will rename file.tmp -> file. So, `file' should be never truncated.
+Dump performs in separate thread, so it doesn't affect memcached itself.
+
+-------------------------------------------------------------------------------
+Original README:
+-------------------------------------------------------------------------------
Dependencies:
-- libevent, http://www.monkey.org/~provos/libevent/ (libevent-dev)
View
35 assoc.c
@@ -26,6 +26,7 @@
#include <pthread.h>
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t assoc_expansion_lock = PTHREAD_MUTEX_INITIALIZER;
typedef unsigned long int ub4; /* unsigned 4-byte quantities */
@@ -51,6 +52,7 @@ static unsigned int hash_items = 0;
/* Flag: Are we in the middle of expanding now? */
static bool expanding = false;
+static bool expanding_locked = false;
/*
* During expansion we migrate values with bucket granularity; this is how
@@ -160,7 +162,9 @@ int assoc_insert(item *it, const uint32_t hv) {
}
hash_items++;
- if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
+ if (! expanding && !expanding_locked &&
+ hash_items > (hashsize(hashpower) * 3) / 2)
+ {
assoc_expand();
}
@@ -189,15 +193,33 @@ void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
}
+void assoc_get_storage(assoc_storage *storage) {
+ storage->buckets = primary_hashtable;
+ storage->nbuckets = hashsize(hashpower);
+ storage->hashpower = hashpower;
+}
+
+
+bool assoc_lock_expansion(bool lock) {
+ expanding_locked = lock;
+ return expanding;
+}
+
+
static volatile int do_run_maintenance_thread = 1;
#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
static void *assoc_maintenance_thread(void *arg) {
-
+ bool expansion_started = false;
while (do_run_maintenance_thread) {
int ii = 0;
+
+ if (!expansion_started) {
+ pthread_mutex_lock(&assoc_expansion_lock);
+ expansion_started = true;
+ }
/* Lock the cache, and bulk move multiple buckets to the new
* hash table. */
@@ -231,12 +253,21 @@ static void *assoc_maintenance_thread(void *arg) {
}
if (!expanding) {
+ if (expansion_started) {
+ pthread_mutex_unlock(&assoc_expansion_lock);
+ expansion_started = false;
+ }
+
/* We are done expanding.. just wait for next invocation */
pthread_cond_wait(&maintenance_cond, &cache_lock);
}
pthread_mutex_unlock(&cache_lock);
}
+
+ if (expansion_started)
+ pthread_mutex_unlock(&assoc_expansion_lock);
+
return NULL;
}
View
11 assoc.h
@@ -7,3 +7,14 @@ void do_assoc_move_next_bucket(void);
int start_assoc_maintenance_thread(void);
void stop_assoc_maintenance_thread(void);
+typedef struct _assoc_storage
+{
+ item **buckets;
+ unsigned int nbuckets;
+ unsigned int hashpower;
+} assoc_storage;
+
+void assoc_get_storage(assoc_storage *storage);
+bool assoc_lock_expansion(bool lock);
+
+extern pthread_mutex_t assoc_expansion_lock;
View
23 memcached.c
@@ -146,7 +146,7 @@ static void maxconns_handler(const int fd, const short which, void *arg) {
* unix time. Use the fact that delta can't exceed one month (and real time value can't
* be that low).
*/
-static rel_time_t realtime(const time_t exptime) {
+rel_time_t realtime(const time_t exptime) {
/* no. of seconds in 30 days - largest possible delta exptime */
if (exptime == 0) return 0; /* 0 means never expire */
@@ -211,6 +211,7 @@ static void settings_init(void) {
settings.oldest_live = 0;
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
settings.socketpath = NULL; /* by default, not using a unix socket */
+ settings.dump_file = NULL; /* by default, not dump to file */
settings.factor = 1.25;
settings.chunk_size = 48; /* space for a modest key and value */
settings.num_threads = 4; /* N workers */
@@ -4465,6 +4466,7 @@ static void usage(void) {
"-u <username> assume identity of <username> (only when run as root)\n"
"-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
"-M return error on memory exhausted (rather than removing items)\n"
+ "-F file use file for snapshots: recover from file at startup, and write on SIGUSR2\n"
"-c <num> max simultaneous connections (default: 1024)\n"
"-k lock down all paged memory. Note that there is a\n"
" limit on how much memory you may lock. Trying to\n"
@@ -4724,6 +4726,9 @@ int main (int argc, char **argv) {
bool tcp_specified = false;
bool udp_specified = false;
+ /* for restoring previous dump */
+ snapshot_status *psnap = NULL;
+
char *subopts;
char *subopts_value;
enum {
@@ -4781,6 +4786,7 @@ int main (int argc, char **argv) {
"B:" /* Binding protocol */
"I:" /* Max item size */
"S" /* Sasl ON */
+ "F:" /* dump cache content to file (SIGUSR2) */
"o:" /* Extended generic options */
))) {
switch (c) {
@@ -4956,6 +4962,9 @@ int main (int argc, char **argv) {
#endif
settings.sasl = true;
break;
+ case 'F':
+ settings.dump_file = optarg;
+ break;
case 'o': /* It's sub-opts time! */
subopts = optarg;
@@ -5120,6 +5129,14 @@ int main (int argc, char **argv) {
/* initialize main thread libevent instance */
main_base = event_init();
+ if (settings.dump_file) {
+ psnap = dd_open(settings.dump_file);
+ if (psnap) {
+ fprintf(stderr, "Found previous snapshot: %d records (hashsize -> %d)\n", psnap->nelems, psnap->hashpower);
+ settings.hashpower_init = psnap->hashpower;
+ }
+ }
+
/* initialize other stuff */
stats_init();
assoc_init(settings.hashpower_init);
@@ -5149,6 +5166,10 @@ int main (int argc, char **argv) {
/* initialise clock event */
clock_handler(0, 0, 0);
+ if (psnap) {
+ dd_restore(psnap);
+ }
+
/* create unix mode sockets after dropping privileges */
if (settings.socketpath != NULL) {
errno = 0;
View
11 memcached.h
@@ -285,6 +285,7 @@ struct settings {
rel_time_t oldest_live; /* ignore existing items older than this */
int evict_to_free;
char *socketpath; /* path to unix socket if using local socket */
+ char *dump_file;
int access; /* access mask (a la chmod) for unix domain socket */
double factor; /* chunk size growth factor */
int chunk_size;
@@ -360,6 +361,13 @@ typedef struct {
struct event_base *base; /* libevent handle this thread uses */
} LIBEVENT_DISPATCHER_THREAD;
+
+typedef struct {
+ pthread_t thread_id; /* unique ID of this thread */
+ struct event_base *base; /* libevent handle this thread uses */
+ struct event evdd;
+} LIBEVENT_DD_THREAD;
+
/**
* The structure representing a connection into memcached.
*/
@@ -498,6 +506,7 @@ static inline int mutex_lock(pthread_mutex_t *mutex)
#include "trace.h"
#include "hash.h"
#include "util.h"
+#include "dd.h"
/*
* Functions such as the libevent-related calls that need to do cross-thread
@@ -548,6 +557,8 @@ void append_stat(const char *name, ADD_STAT add_stats, conn *c,
enum store_item_type store_item(item *item, int comm, conn *c);
+rel_time_t realtime(const time_t exptime);
+
#if HAVE_DROP_PRIVILEGES
extern void drop_privileges(void);
#else
View
93 thread.c
@@ -10,6 +10,7 @@
#include <errno.h>
#include <string.h>
#include <pthread.h>
+#include <signal.h>
#ifdef __sun
#include <atomic.h>
@@ -69,6 +70,11 @@ static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
static LIBEVENT_THREAD *threads;
/*
+ * Thread performing dump of cache by request (SIGUSR2).
+ */
+static LIBEVENT_DD_THREAD dd_thread;
+
+/*
* Number of worker threads that have finished setting themselves up.
*/
static int init_count = 0;
@@ -373,6 +379,79 @@ int is_listen_thread() {
return pthread_self() == dispatcher_thread.thread_id;
}
+/*
+ * Dump schema requires assoc expansion to be locked -
+ * it walks throught it's buckets while saving.
+ */
+static void on_sigdump(int evfd, short ev, void *arg)
+{
+ FILE *f;
+ char *tmpname;
+ bool expanding;
+
+ mutex_lock(&cache_lock);
+ expanding = assoc_lock_expansion(true);
+ mutex_unlock(&cache_lock);
+
+
+ tmpname = malloc(strlen(settings.dump_file) + sizeof(".tmp"));
+ if (!tmpname) {
+ fprintf(stderr, "malloc (filename) failed");
+ }
+ strcpy(tmpname, settings.dump_file);
+ strcat(tmpname, ".tmp");
+
+ fprintf(stderr, "Dump cache content to %s\n", settings.dump_file);
+ f = fopen(tmpname, "w");
+ if (f != NULL)
+ {
+ bool ok;
+ if (expanding) {
+ fprintf(stderr, "Waiting for assoc-expansion to end...\n");
+ pthread_mutex_lock(&assoc_expansion_lock);
+ }
+
+ ok = dd_dump(f);
+ fclose(f);
+
+ if (ok) {
+ fprintf(stderr, "Moving temprorary %s -> %s\n", tmpname, settings.dump_file);
+ if (rename(tmpname, settings.dump_file) == -1) {
+ fprintf(stderr, "Failed to rename %s to %s: %s\n", tmpname, settings.dump_file, strerror(errno));
+ }
+ }
+ else {
+ fprintf(stderr, "Failed to dump file to %s: %s", tmpname, strerror(errno));
+ }
+ }
+
+ mutex_lock(&cache_lock);
+ assoc_lock_expansion(false); /* enable assoc expansion */
+ mutex_unlock(&cache_lock);
+
+ if (expanding) {
+ pthread_mutex_unlock(&assoc_expansion_lock);
+ }
+}
+
+
+static void *setup_dd_thread(void *vme) {
+ LIBEVENT_DD_THREAD *me = vme;
+ me->base = event_init();
+ if (! me->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+
+ signal_set(&me->evdd, SIGUSR2, on_sigdump, NULL);
+ event_base_set(me->base, &me->evdd);
+ signal_add(&me->evdd, NULL);
+
+ event_base_loop(me->base, 0);
+ return NULL;
+}
+
+
/********************************* ITEM ACCESS *******************************/
/*
@@ -655,6 +734,17 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
}
}
+
+static void create_dd_thread()
+{
+ int ret;
+ if ((ret = pthread_create(&dd_thread.thread_id, NULL, setup_dd_thread, &dd_thread)) != 0) {
+ fprintf(stderr, "Can't create dd thread: %s\n", strerror(ret));
+ exit(1);
+ }
+ pthread_detach(dd_thread.thread_id);
+}
+
/*
* Initializes the thread subsystem, creating various worker threads.
*
@@ -674,6 +764,9 @@ void thread_init(int nthreads, struct event_base *main_base) {
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
+ if (settings.dump_file != NULL)
+ create_dd_thread();
+
/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
Please sign in to comment.
Something went wrong with that request. Please try again.