Skip to content

Commit

Permalink
cache_lock refactoring
Browse files Browse the repository at this point in the history
item_lock() now protects accesses to item structures. cache_lock is just for
LRU and LRU stats. This patch removes cache_lock from a number of places it's
no longer needed.

Some pre-existing bugs became obvious: flush_all, cachedump, and slab
reassignment's do_item_get short-circuit all need repairs.
  • Loading branch information
dormando committed Jan 1, 2015
1 parent c553002 commit 69d1c69
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 43 deletions.
43 changes: 28 additions & 15 deletions items.c
Expand Up @@ -43,6 +43,7 @@ static volatile int do_run_lru_crawler_thread = 0;
static int lru_crawler_initialized = 0;
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER;

void item_stats_reset(void) {
mutex_lock(&cache_lock);
Expand All @@ -52,9 +53,13 @@ void item_stats_reset(void) {


/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
uint64_t get_cas_id(void) {
static uint64_t cas_id = 0;
return ++cas_id;
pthread_mutex_lock(&cas_id_lock);
uint64_t next_id = ++cas_id;
pthread_mutex_unlock(&cas_id_lock);
return next_id;
}

/* Enable this for reference-count debugging. */
Expand Down Expand Up @@ -314,7 +319,6 @@ static void item_unlink_q(item *it) {
int do_item_link(item *it, const uint32_t hv) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
mutex_lock(&cache_lock);
it->it_flags |= ITEM_LINKED;
it->time = current_time;

Expand All @@ -327,27 +331,28 @@ int do_item_link(item *it, const uint32_t hv) {
/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
assoc_insert(it, hv);
mutex_lock(&cache_lock);
item_link_q(it);
refcount_incr(&it->refcount);
mutex_unlock(&cache_lock);
refcount_incr(&it->refcount);

return 1;
}

void do_item_unlink(item *it, const uint32_t hv) {
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
mutex_lock(&cache_lock);
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
STATS_LOCK();
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey, hv);
mutex_lock(&cache_lock);
item_unlink_q(it);
mutex_unlock(&cache_lock);
do_item_remove(it);
}
mutex_unlock(&cache_lock);
}

/* FIXME: Is it necessary to keep this copy/pasted code? */
Expand Down Expand Up @@ -395,13 +400,13 @@ void do_item_update(item *it) {
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);

mutex_lock(&cache_lock);
if ((it->it_flags & ITEM_LINKED) != 0) {
mutex_lock(&cache_lock);
item_unlink_q(it);
it->time = current_time;
item_link_q(it);
mutex_unlock(&cache_lock);
}
mutex_unlock(&cache_lock);
}
}

Expand Down Expand Up @@ -572,21 +577,30 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) {

/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
//mutex_lock(&cache_lock);
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(&it->refcount);
/* Optimization for slab reassignment. prevents popular items from
* jamming in busy wait. Can only do this here to satisfy lock order
* of item_lock, cache_lock, slabs_lock. */
if (slab_rebalance_signal &&
* of item_lock, slabs_lock. */
/* This was made unsafe by removal of the cache_lock:
* slab_rebalance_signal and slab_rebal.* are modified in a separate
* thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
* NULL (0), but slab_end is still equal to some value, this would end
* up unlinking every item fetched.
* This is either an acceptable loss, or if slab_rebalance_signal is
* true, slab_start/slab_end should be put behind the slabs_lock.
* Which would cause a huge potential slowdown.
* Could also use a specific lock for slab_rebal.* and
* slab_rebalance_signal (shorter lock?)
*/
/*if (slab_rebalance_signal &&
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
}
}*/
}
//mutex_unlock(&cache_lock);
int was_found = 0;

if (settings.verbose > 2) {
Expand Down Expand Up @@ -912,7 +926,6 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return CRAWLER_RUNNING;
}
pthread_mutex_lock(&cache_lock);

if (strcmp(slabs, "all") == 0) {
for (sid = 0; sid < LARGEST_ID; sid++) {
Expand All @@ -925,7 +938,6 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {

if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
|| sid >= POWER_LARGEST) {
pthread_mutex_unlock(&cache_lock);
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_BADCLASS;
}
Expand All @@ -934,6 +946,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
}

for (sid = 0; sid < LARGEST_ID; sid++) {
pthread_mutex_lock(&cache_lock);
if (tocrawl[sid] != 0 && tails[sid] != NULL) {
if (settings.verbose > 2)
fprintf(stderr, "Kicking LRU crawler off for slab %d\n", sid);
Expand All @@ -948,8 +961,8 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
}
pthread_mutex_unlock(&cache_lock);
}
pthread_mutex_unlock(&cache_lock);
pthread_cond_signal(&lru_crawler_cond);
STATS_LOCK();
stats.lru_crawler_running = true;
Expand Down
2 changes: 0 additions & 2 deletions memcached.c
Expand Up @@ -3275,9 +3275,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
if (res + 2 <= it->nbytes && it->refcount == 2) { /* replace in-place */
/* When changing the value without replacing the item, we
need to update the CAS on the existing item. */
mutex_lock(&cache_lock); /* FIXME */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
mutex_unlock(&cache_lock);

memcpy(ITEM_data(it), buf, res);
memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
Expand Down
2 changes: 2 additions & 0 deletions memcached.h
Expand Up @@ -344,8 +344,10 @@ extern struct settings settings;
* Structure for storing items within memcached.
*/
typedef struct _stritem {
/* Protected by LRU locks */
struct _stritem *next;
struct _stritem *prev;
/* Rest are protected by an item lock */
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
Expand Down
26 changes: 11 additions & 15 deletions slabs.c
Expand Up @@ -432,7 +432,6 @@ void slabs_adjust_mem_requested(unsigned int id, size_t old, size_t ntotal)
pthread_mutex_unlock(&slabs_lock);
}

static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
static pthread_cond_t slab_rebalance_cond = PTHREAD_COND_INITIALIZER;
static volatile int do_run_slab_thread = 1;
static volatile int do_run_slab_rebalance_thread = 1;
Expand All @@ -444,7 +443,6 @@ static int slab_rebalance_start(void) {
slabclass_t *s_cls;
int no_go = 0;

pthread_mutex_lock(&cache_lock);
pthread_mutex_lock(&slabs_lock);

if (slab_rebal.s_clsid < POWER_SMALLEST ||
Expand All @@ -465,7 +463,6 @@ static int slab_rebalance_start(void) {

if (no_go != 0) {
pthread_mutex_unlock(&slabs_lock);
pthread_mutex_unlock(&cache_lock);
return no_go; /* Should use a wrapper function... */
}

Expand All @@ -485,7 +482,6 @@ static int slab_rebalance_start(void) {
}

pthread_mutex_unlock(&slabs_lock);
pthread_mutex_unlock(&cache_lock);

STATS_LOCK();
stats.slab_reassign_running = true;
Expand All @@ -498,7 +494,7 @@ enum move_status {
MOVE_PASS=0, MOVE_DONE, MOVE_BUSY, MOVE_LOCKED
};

/* refcount == 0 is safe since nobody can incr while cache_lock is held.
/* refcount == 0 is safe since nobody can incr while item_lock is held.
* refcount != 0 is impossible since flags/etc can be modified in other
* threads. instead, note we found a busy one and bail. logic in do_item_get
* will prevent busy items from continuing to be busy
Expand All @@ -510,7 +506,6 @@ static int slab_rebalance_move(void) {
int refcount = 0;
enum move_status status = MOVE_PASS;

pthread_mutex_lock(&cache_lock);
pthread_mutex_lock(&slabs_lock);

s_cls = &slabclass[slab_rebal.s_clsid];
Expand Down Expand Up @@ -555,6 +550,9 @@ static int slab_rebalance_move(void) {
}
status = MOVE_BUSY;
}
/* Item lock must be held while modifying refcount */
if (status == MOVE_BUSY)
refcount_decr(&it->refcount);
item_trylock_unlock(hold_lock);
}
}
Expand All @@ -566,7 +564,6 @@ static int slab_rebalance_move(void) {
it->slabs_clsid = 255;
break;
case MOVE_BUSY:
refcount_decr(&it->refcount);
case MOVE_LOCKED:
slab_rebal.busy_items++;
was_busy++;
Expand All @@ -591,7 +588,6 @@ static int slab_rebalance_move(void) {
}

pthread_mutex_unlock(&slabs_lock);
pthread_mutex_unlock(&cache_lock);

return was_busy;
}
Expand All @@ -600,7 +596,6 @@ static void slab_rebalance_finish(void) {
slabclass_t *s_cls;
slabclass_t *d_cls;

pthread_mutex_lock(&cache_lock);
pthread_mutex_lock(&slabs_lock);

s_cls = &slabclass[slab_rebal.s_clsid];
Expand Down Expand Up @@ -628,7 +623,6 @@ static void slab_rebalance_finish(void) {
slab_rebalance_signal = 0;

pthread_mutex_unlock(&slabs_lock);
pthread_mutex_unlock(&cache_lock);

STATS_LOCK();
stats.slab_reassign_running = false;
Expand Down Expand Up @@ -667,11 +661,11 @@ static int slab_automove_decision(int *src, int *dst) {
}

item_stats_evictions(evicted_new);
pthread_mutex_lock(&cache_lock);
pthread_mutex_lock(&slabs_lock);
for (i = POWER_SMALLEST; i < power_largest; i++) {
total_pages[i] = slabclass[i].slabs;
}
pthread_mutex_unlock(&cache_lock);
pthread_mutex_unlock(&slabs_lock);

/* Find a candidate source; something with zero evicts 3+ times */
for (i = POWER_SMALLEST; i < power_largest; i++) {
Expand Down Expand Up @@ -869,12 +863,14 @@ int start_slab_maintenance_thread(void) {
return 0;
}

/* The maintenance thread is on a sleep/loop cycle, so it should join after a
* short wait */
void stop_slab_maintenance_thread(void) {
mutex_lock(&cache_lock);
mutex_lock(&slabs_rebalance_lock);
do_run_slab_thread = 0;
do_run_slab_rebalance_thread = 0;
pthread_cond_signal(&maintenance_cond);
pthread_mutex_unlock(&cache_lock);
pthread_cond_signal(&slab_rebalance_cond);
pthread_mutex_unlock(&slabs_rebalance_lock);

/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
Expand Down
22 changes: 11 additions & 11 deletions thread.c
Expand Up @@ -36,7 +36,8 @@ struct conn_queue {
pthread_mutex_t lock;
};

/* Lock for cache operations (item_*, assoc_*) */
/* Lock for cache LRU operations
* Was old global lock for all item_*, assoc_*, etc operations */
pthread_mutex_t cache_lock;

/* Connection lock around accepting new connections */
Expand Down Expand Up @@ -111,17 +112,18 @@ unsigned short refcount_decr(unsigned short *refcount) {
#endif
}

/* item_lock() must be held for an item before any modifications to either its
* associated hash bucket, or the structure itself.
* LRU modifications must hold the item lock, and the LRU lock.
* LRU's accessing items must item_trylock() before modifying an item.
* Items accessable from an LRU must not be freed or modified
* without first locking and removing from the LRU.
*/

void item_lock(uint32_t hv) {
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}

/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
* no-op, as it's only called from within the item lock if necessary.
* However, we can't mix a no-op and threads which are still synchronizing to
* GLOBAL. So instead we just always try to lock. When in GLOBAL mode this
* turns into an effective no-op. Threads re-synchronize after the power level
* switch so it should stay safe.
*/
void *item_trylock(uint32_t hv) {
pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
if (pthread_mutex_trylock(lock) == 0) {
Expand Down Expand Up @@ -154,9 +156,7 @@ static void register_thread_initialized(void) {
pthread_mutex_unlock(&worker_hang_lock);
}

/* Must not be called with any deeper locks held:
* item locks, cache_lock, stats_lock, etc
*/
/* Must not be called with any deeper locks held */
void pause_threads(enum pause_thread_types type) {
char buf[1];
int i;
Expand Down

0 comments on commit 69d1c69

Please sign in to comment.