Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fixes to flat allocator, merge from trunk

Summary: you guys didn't think you would get off that easy, did you? :)
         
         main changes:
         1) merged from trunk.
         2) instead of using item* in the assoc table, use a generic typedef'ed type called item_ptr_t.  this allows us to use shrunken pointers for flat allocator for the h_next pointers.
         3) use shrunken pointers for h_next pointers.
         4) pass the pointer of the item we expect to delete to assoc_delete.  assert that the item we find based on key,nkey matches that item.
         5) got rid of the conn_t change (good idea, but terrible for merging in the future)
         6) use always_assert for checks that are one-time checks and should always be done.
         7) refactor out type casting from flat_storage module.
         8) added lifetime stat to flat_allocator
         9) added item_memcpy_from to support timestamp retrieval.
         10) added a memory pool interface to all alloc/free/realloc calls.  this allows us to track how much memory each class of buffers is taking.
         
         bug fixes:
         1) the previous implementation of find_unreferenced_broken_chunk used a master refcount per large broken chunk.  this does not work when the body of a small referenced chunk is in the large broken chunk.  now we properly follow the prev pointers to the head block to account for refcount.
         2) we were improperly clobbering the return value from find_unreferenced_broken_chunk
         3) migrating small chunks was always treating the blocks as body blocks, even though it could very well be title blocks.
         4) flushing expired items should handle both lru's.
         5) in item_get_max_riov for flat_allocator, provision an additional riov for the cr-lf.
         6) item_memcpy_to was totally buggy.

Reviewed By: marc

Test Plan: - libmcc test/test.py -a passes.
           - flat_storage unit tests pass.
           - ran stress test with more arithmetic ops to catch some more corner cases

Revert: OK


git-svn-id: http://svn.facebook.com/svnroot/projects/memcached/branches/memcached-storage@97173 2c7ba8d8-a2f7-0310-a573-de162e16dcc7
  • Loading branch information...
commit 7a1f4fa71f14d532453e2ca462137685b57e029c 1 parent f045fd8
ttung authored
View
10 Makefile.am
@@ -3,12 +3,18 @@ bin_PROGRAMS = memcached memcached-debug
memcached_SOURCES = memcached.c slabs.c slabs.h \
slabs_items.c slabs_items.h assoc.c assoc.h memcached.h \
thread.c stats.c stats.h binary_sm.c binary_sm.h binary_protocol.h generic.h \
- items.h flat_storage.c flat_storage.h flat_storage_support.h
+ items.h flat_storage.c flat_storage.h flat_storage_support.h \
+ sigseg.c sigseg.h \
+ memory_pool.h memory_pool_classes.h
memcached_debug_SOURCES = $(memcached_SOURCES)
memcached_CFLAGS = -Wall
-memcached_CPPFLAGS = -DNDEBUG
+memcached_CPPFLAGS = -DNDEBUG -DNO_CPP_DEMANGLE
memcached_LDADD = @LIBOBJS@
+memcached_LDFLAGS = -ldl -rdynamic
+memcached_debug_CFLAGS = $(memcached_CFLAGS)
+memcached_debug_CPPFLAGS = -DNO_CPP_DEMANGLE
memcached_debug_LDADD = $(memcached_LDADD)
+memcached_debug_LDFLAGS = $(memcached_LDFLAGS)
SUBDIRS = doc
DIST_DIRS = scripts
View
88 assoc.c
@@ -461,13 +461,13 @@ static unsigned int hashpower = 16;
#define hashmask(n) (hashsize(n)-1)
/* Main hash table. This is where we look except during expansion. */
-static item** primary_hashtable = 0;
+static item_ptr_t* primary_hashtable = 0;
/*
* Previous hash table. During expansion, we look here for keys that haven't
* been moved over to the primary yet.
*/
-static item** old_hashtable = 0;
+static item_ptr_t* old_hashtable = 0;
/* Number of items in the hash table. */
static unsigned int hash_items = 0;
@@ -482,8 +482,8 @@ static bool expanding = false;
static unsigned int expand_bucket = 0;
void assoc_init(void) {
- unsigned int hash_size = hashsize(hashpower) * sizeof(void*);
- primary_hashtable = malloc(hash_size);
+ unsigned int hash_size = hashsize(hashpower) * sizeof(item_ptr_t);
+ primary_hashtable = pool_malloc(hash_size, ASSOC_POOL);
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
@@ -493,23 +493,23 @@ void assoc_init(void) {
item *assoc_find(const char *key, const size_t nkey) {
uint32_t hv = hash(key, nkey, 0);
- item *it;
+ item_ptr_t iptr;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
- it = old_hashtable[oldbucket];
+ iptr = old_hashtable[oldbucket];
} else {
- it = primary_hashtable[hv & hashmask(hashpower)];
+ iptr = primary_hashtable[hv & hashmask(hashpower)];
}
- while (it) {
- if ((nkey == ITEM_nkey(it)) &&
- (memcmp(key, ITEM_key(it), nkey) == 0)) {
- return it;
+ while (iptr) {
+ if ((nkey == ITEM_nkey(ITEM(iptr))) &&
+ (memcmp(key, ITEM_key(ITEM(iptr)), nkey) == 0)) {
+ return ITEM(iptr);
}
- it = ITEM_h_next(it);
+ iptr = ITEM_PTR_h_next(iptr);
}
return 0;
}
@@ -517,9 +517,9 @@ item *assoc_find(const char *key, const size_t nkey) {
/* returns the address of the item pointer before the key. if *item == 0,
the item wasn't found */
-static item** _hashitem_before (const char *key, const size_t nkey) {
+static item_ptr_t* _hashitem_before (const char *key, const size_t nkey) {
uint32_t hv = hash(key, nkey, 0);
- item **pos;
+ item_ptr_t* pos;
unsigned int oldbucket;
if (expanding &&
@@ -530,8 +530,8 @@ static item** _hashitem_before (const char *key, const size_t nkey) {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
- while (*pos && ((nkey != ITEM_nkey(*pos)) || memcmp(key, ITEM_key(*pos), nkey))) {
- pos = ITEM_h_next_p(*pos);
+ while (*pos && ((nkey != ITEM_nkey(ITEM(*pos))) || memcmp(key, ITEM_key(ITEM(*pos)), nkey))) {
+ pos = ITEM_h_next_p(ITEM(*pos));
}
return pos;
}
@@ -540,7 +540,7 @@ static item** _hashitem_before (const char *key, const size_t nkey) {
static void assoc_expand(void) {
old_hashtable = primary_hashtable;
- primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
+ primary_hashtable = pool_calloc(hashsize(hashpower + 1), sizeof(item_ptr_t), ASSOC_POOL);
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
@@ -556,24 +556,26 @@ static void assoc_expand(void) {
/* migrates the next bucket to the primary hashtable if we're expanding. */
void do_assoc_move_next_bucket(void) {
- item *it, *next;
+ item_ptr_t iptr, next;
int bucket;
if (expanding) {
- for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- next = ITEM_h_next(it);
+ for (iptr = old_hashtable[expand_bucket]; ITEM_PTR_IS_NULL(iptr); iptr = next) {
+ next = ITEM_PTR_h_next(iptr);
- bucket = hash(ITEM_key(it), ITEM_nkey(it), 0) & hashmask(hashpower);
- ITEM_set_h_next(it, primary_hashtable[bucket]);
- primary_hashtable[bucket] = it;
+ bucket = hash(ITEM_key(ITEM(iptr)), ITEM_nkey(ITEM(iptr)), 0) & hashmask(hashpower);
+ ITEM_set_h_next(ITEM(iptr), primary_hashtable[bucket]);
+ primary_hashtable[bucket] = iptr;
}
- old_hashtable[expand_bucket] = NULL;
+ old_hashtable[expand_bucket] = NULL_ITEM_PTR;
expand_bucket++;
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
- free(old_hashtable);
+ pool_free(old_hashtable,
+ (hashsize(hashpower - 1) * sizeof(item_ptr_t)),
+ ASSOC_POOL);
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
@@ -592,10 +594,10 @@ int assoc_insert(item *it) {
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
ITEM_set_h_next(it, old_hashtable[oldbucket]);
- old_hashtable[oldbucket] = it;
+ old_hashtable[oldbucket] = ITEM_PTR(it);
} else {
ITEM_set_h_next(it, primary_hashtable[hv & hashmask(hashpower)]);
- primary_hashtable[hv & hashmask(hashpower)] = it;
+ primary_hashtable[hv & hashmask(hashpower)] = ITEM_PTR(it);
}
hash_items++;
@@ -611,26 +613,26 @@ int assoc_insert(item *it) {
* old_it with (ITEM_key(it), ITEM_nkey(it)) -> it. returns old_it.
*/
item* assoc_update(item *it) {
- item** before = _hashitem_before(ITEM_key(it), ITEM_nkey(it));
+ item_ptr_t* before = _hashitem_before(ITEM_key(it), ITEM_nkey(it));
item* old_it;
-
+
assert(before != NULL);
- old_it = *before;
+ old_it = ITEM(*before);
- *before = it;
+ *before = ITEM_PTR(it);
return old_it;
}
-void assoc_delete(const char *key, const size_t nkey) {
- item **before = _hashitem_before(key, nkey);
+void assoc_delete(const char *key, const size_t nkey, item_ptr_t to_be_deleted) {
+ item_ptr_t* before = _hashitem_before(key, nkey);
+ assert(*before == to_be_deleted);
if (*before) {
- item *nxt = ITEM_h_next(*before);
- ITEM_set_h_next(*before, 0); /* probably pointless, but whatever. */
- *before = nxt;
+ item_ptr_t next = ITEM_PTR_h_next(*before);
+ *before = next;
hash_items--;
return;
}
@@ -644,24 +646,24 @@ int do_assoc_expire_regex(char *pattern) {
#ifdef HAVE_REGEX_H
regex_t regex;
int bucket;
- item *it;
+ item_ptr_t iptr;
if (regcomp(&regex, pattern, REG_EXTENDED | REG_NOSUB))
return 0;
for (bucket = 0; bucket < hashsize(hashpower); bucket++) {
- for (it = primary_hashtable[bucket]; it != NULL; it = ITEM_h_next(it)) {
- if (regexec(&regex, ITEM_key(it), 0, NULL, 0) == 0) {
+ for (iptr = primary_hashtable[bucket]; ITEM_PTR_IS_NULL(iptr); iptr = ITEM_PTR_h_next(iptr)) {
+ if (regexec(&regex, ITEM_key(ITEM(iptr)), 0, NULL, 0) == 0) {
/* the item matches; mark it expired. */
- ITEM_set_exptime(it, 1);
+ ITEM_set_exptime(ITEM(iptr), 1);
}
}
}
if (expanding) {
for (bucket = expand_bucket; bucket < hashsize(hashpower-1); bucket++) {
- for (it = old_hashtable[bucket]; it != NULL; it = ITEM_h_next(it)) {
- if (regexec(&regex, ITEM_key(it), 0, NULL, 0) == 0) {
+ for (iptr = old_hashtable[bucket]; ITEM_PTR_IS_NULL(iptr); iptr = ITEM_PTR_h_next(iptr)) {
+ if (regexec(&regex, ITEM_key(ITEM(iptr)), 0, NULL, 0) == 0) {
/* the item matches; mark it expired. */
- ITEM_set_exptime(it, 1);
+ ITEM_set_exptime(ITEM(iptr), 1);
}
}
}
View
2  assoc.h
@@ -9,7 +9,7 @@ void assoc_init(void);
item *assoc_find(const char *key, const size_t nkey);
int assoc_insert(item *item);
item* assoc_update(item *it);
-void assoc_delete(const char *key, const size_t nkey);
+void assoc_delete(const char *key, const size_t nkey, item_ptr_t iptr);
void do_assoc_move_next_bucket(void);
uint32_t hash( const void *key, size_t length, const uint32_t initval);
int do_assoc_expire_regex(char *pattern);
View
2  binary_protocol.h
@@ -161,7 +161,7 @@ typedef struct empty_rep_s {
typedef struct value_rep_s {
// this handles the following replies:
// get
- // getr
+ // getq
BINARY_PROTOCOL_REPLY_HEADER;
uint32_t flags;
// value goes here.
View
327 binary_sm.c
@@ -74,38 +74,30 @@ extern struct event_base* main_base;
static inline void bp_get_req_cmd_info(bp_cmd_t cmd, bp_cmd_info_t* info);
// prototypes for the state machine.
-static inline void binary_sm(conn_t* c);
+static inline void binary_sm(conn* c);
// prototypes for handlers of the various states in the SM.
-static inline bp_handler_res_t handle_header_size_unknown(conn_t* c);
-static inline bp_handler_res_t handle_header_size_known(conn_t* c);
-static inline bp_handler_res_t handle_direct_receive(conn_t* c);
-static inline bp_handler_res_t handle_process(conn_t* c);
-static inline bp_handler_res_t handle_writing(conn_t* c);
+static inline bp_handler_res_t handle_header_size_unknown(conn* c);
+static inline bp_handler_res_t handle_header_size_known(conn* c);
+static inline bp_handler_res_t handle_direct_receive(conn* c);
+static inline bp_handler_res_t handle_process(conn* c);
+static inline bp_handler_res_t handle_writing(conn* c);
// prototypes for handlers of various commands/command classes.
-static void handle_echo_cmd(conn_t* c);
-static void handle_version_cmd(conn_t* c);
-static void handle_get_cmd(conn_t* c);
-static void handle_update_cmd(conn_t* c);
-static void handle_delete_cmd(conn_t* c);
-static void handle_arith_cmd(conn_t* c);
-
-static void* allocate_reply_header(conn_t* c, size_t size, void* req);
-static void release_reply_headers(conn_t* c);
-
-/* TT FIXME: these are binary protocol versions of the same functions.
- * reintegrate them with the mainline functions to avoid future divergence. */
-static void bp_write_err_msg(conn_t* c, const char* str);
-static int bp_try_read_network(conn_t* c);
-static int bp_try_read_udp(conn_t* c);
-static int bp_transmit(conn_t* c);
+static void handle_echo_cmd(conn* c);
+static void handle_version_cmd(conn* c);
+static void handle_get_cmd(conn* c);
+static void handle_update_cmd(conn* c);
+static void handle_delete_cmd(conn* c);
+static void handle_arith_cmd(conn* c);
+
+static void* allocate_reply_header(conn* c, size_t size, void* req);
/**
* when libevent tells us that a socket has data to read, we read it and process
* it.
*/
-void process_binary_protocol(conn_t* c) {
+void process_binary_protocol(conn* c) {
int sfd, flags;
socklen_t addrlen;
struct sockaddr addr;
@@ -135,7 +127,7 @@ void process_binary_protocol(conn_t* c) {
assert(c->riov_size >= 1);
dispatch_conn_new(sfd, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, false, c->binary);
+ DATA_BUFFER_SIZE, false, c->binary, &addr, addrlen);
return;
}
@@ -148,10 +140,12 @@ bp_hdr_pool_t* bp_allocate_hdr_pool(bp_hdr_pool_t* next)
long memchunk, memchunk_start;
bp_hdr_pool_t* retval;
- memchunk_start = memchunk = (long) malloc(sizeof(bp_hdr_pool_t) + BP_HDR_POOL_INIT_SIZE);
+ memchunk_start = memchunk = (long) pool_malloc(sizeof(bp_hdr_pool_t) + BP_HDR_POOL_INIT_SIZE,
+ CONN_BUFFER_BP_HDRPOOL_POOL);
if (memchunk_start == (long) NULL) {
return NULL;
}
+
retval = (bp_hdr_pool_t*) memchunk;
memchunk += sizeof(bp_hdr_pool_t);
memchunk += BUFFER_ALIGNMENT - 1;
@@ -164,15 +158,39 @@ bp_hdr_pool_t* bp_allocate_hdr_pool(bp_hdr_pool_t* next)
}
+void bp_shrink_hdr_pool(conn* c)
+{
+ bp_hdr_pool_t* bph;
+ while (c->bp_hdr_pool->next != NULL) {
+ bph = c->bp_hdr_pool;
+ c->bp_hdr_pool = c->bp_hdr_pool->next;
+ pool_free(bph, sizeof(bp_hdr_pool_t) + BP_HDR_POOL_INIT_SIZE, CONN_BUFFER_BP_HDRPOOL_POOL);
+ }
+}
+
+
+void bp_release_hdr_pool(conn* c) {
+ bp_hdr_pool_t* bph;
+ while (c->bp_hdr_pool != NULL) {
+ bph = c->bp_hdr_pool;
+ c->bp_hdr_pool = c->bp_hdr_pool->next;
+ pool_free(bph, sizeof(bp_hdr_pool_t) + BP_HDR_POOL_INIT_SIZE, CONN_BUFFER_BP_HDRPOOL_POOL);
+ }
+}
+
+
/**
* handles the state machine.
*
* @param c the connection to process the state machine for.
*/
-static inline void binary_sm(conn_t* c) {
+static inline void binary_sm(conn* c) {
bp_handler_res_t result = {0, 0};
+ conn_states_t prev_state;
while (! result.stop) {
+ prev_state = c->state;
+
switch (c->state) {
case conn_bp_header_size_unknown:
result = handle_header_size_unknown(c);
@@ -209,13 +227,19 @@ static inline void binary_sm(conn_t* c) {
assert(0);
}
+ if (prev_state == conn_bp_writing &&
+ c->state == conn_bp_header_size_unknown) {
+ /* in between requests. shrink connection buffers. */
+ conn_shrink(c);
+ }
+
if (result.try_buffer_read) {
result.try_buffer_read = 0;
if ((c->udp &&
- bp_try_read_udp(c)) ||
+ try_read_udp(c)) ||
(c->udp == 0 &&
- bp_try_read_network(c)))
+ try_read_network(c)))
continue;
result.stop = 1;
@@ -313,7 +337,7 @@ static inline void bp_get_req_cmd_info(bp_cmd_t cmd, bp_cmd_info_t* info)
}
-static inline bp_handler_res_t handle_header_size_unknown(conn_t* c)
+static inline bp_handler_res_t handle_header_size_unknown(conn* c)
{
empty_req_t* null_empty_header;
char* empty_header_ptr, * cmd_ptr;
@@ -354,7 +378,7 @@ static inline bp_handler_res_t handle_header_size_unknown(conn_t* c)
}
-static inline bp_handler_res_t handle_header_size_known(conn_t* c)
+static inline bp_handler_res_t handle_header_size_known(conn* c)
{
size_t bytes_needed = c->bp_info.header_size;
size_t bytes_available = c->rbytes;
@@ -404,7 +428,7 @@ static inline bp_handler_res_t handle_header_size_known(conn_t* c)
// NOTE: null-terminating the string!
str_size = ntohl(c->u.string_req.body_length) - (sizeof(string_req_t) - BINARY_PROTOCOL_REQUEST_HEADER_SZ);
- c->bp_string = malloc(str_size + 1);
+ c->bp_string = pool_malloc(str_size + 1, CONN_BUFFER_BP_STRING_POOL);
if (c->bp_string == NULL) {
// not enough memory, skip straight to the process step, which
// should deal with this situation.
@@ -429,7 +453,7 @@ static inline bp_handler_res_t handle_header_size_known(conn_t* c)
}
-static inline bp_handler_res_t handle_direct_receive(conn_t* c)
+static inline bp_handler_res_t handle_direct_receive(conn* c)
{
bp_handler_res_t retval = {0, 0};
@@ -437,8 +461,8 @@ static inline bp_handler_res_t handle_direct_receive(conn_t* c)
* check if the receive buffer has any more content. move that to the
* destination.
*/
- if (c->rbytes > 0 &&
- c->riov_left > 0) {
+ while (c->rbytes > 0 &&
+ c->riov_left > 0) {
struct iovec* current_iov = &c->riov[c->riov_curr];
size_t bytes_to_copy = (c->rbytes <= current_iov->iov_len) ? c->rbytes : current_iov->iov_len;
@@ -497,10 +521,11 @@ static inline bp_handler_res_t handle_direct_receive(conn_t* c)
if (settings.verbose > 1) {
fprintf(stderr, ">%d receiving key %s\n", c->sfd, c->bp_key);
}
+
it = item_alloc(c->bp_key, c->u.key_value_req.keylen,
ntohl(c->u.key_value_req.flags),
realtime(ntohl(c->u.key_value_req.exptime)),
- value_len);
+ value_len, get_request_addr(c));
if (it == NULL) {
// this is an error condition. head straight to the
@@ -571,7 +596,7 @@ static inline bp_handler_res_t handle_direct_receive(conn_t* c)
if (res == 0) {
c->state = conn_closing;
- } else if (res == -1 && (errno = EAGAIN || errno == EWOULDBLOCK)) {
+ } else if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0) {
fprintf(stderr, "Couldn't update event\n");
@@ -590,7 +615,7 @@ static inline bp_handler_res_t handle_direct_receive(conn_t* c)
}
-static inline bp_handler_res_t handle_process(conn_t* c)
+static inline bp_handler_res_t handle_process(conn* c)
{
bp_handler_res_t retval = {0, 0};
@@ -665,11 +690,11 @@ static inline bp_handler_res_t handle_process(conn_t* c)
}
-static inline bp_handler_res_t handle_writing(conn_t* c)
+static inline bp_handler_res_t handle_writing(conn* c)
{
bp_handler_res_t retval = {0, 0};
- switch (bp_transmit(c)) {
+ switch (transmit(c)) {
case TRANSMIT_COMPLETE:
c->icurr = c->ilist;
while (c->ileft > 0) {
@@ -686,7 +711,6 @@ static inline bp_handler_res_t handle_writing(conn_t* c)
c->msgused = 0;
c->iovused = 0;
- release_reply_headers(c);
break;
case TRANSMIT_INCOMPLETE:
@@ -702,7 +726,7 @@ static inline bp_handler_res_t handle_writing(conn_t* c)
}
-static void handle_echo_cmd(conn_t* c)
+static void handle_echo_cmd(conn* c)
{
empty_rep_t* rep;
@@ -729,7 +753,7 @@ static void handle_echo_cmd(conn_t* c)
}
-static void handle_version_cmd(conn_t* c)
+static void handle_version_cmd(conn* c)
{
string_rep_t* rep;
@@ -757,7 +781,7 @@ static void handle_version_cmd(conn_t* c)
}
-static void handle_get_cmd(conn_t* c)
+static void handle_get_cmd(conn* c)
{
value_rep_t* rep;
item* it;
@@ -771,6 +795,7 @@ static void handle_get_cmd(conn_t* c)
stats.get_cmds ++;
if (it) {
stats.get_hits ++;
+ stats_size_buckets_get(ITEM_nkey(it) + ITEM_nbytes(it));
} else {
stats.get_misses ++;
}
@@ -795,7 +820,8 @@ static void handle_get_cmd(conn_t* c)
if (it) {
// the cache hit case.
if (c->ileft >= c->isize) {
- item **new_list = realloc(c->ilist, sizeof(item *)*c->isize*2);
+ item **new_list = pool_realloc(c->ilist, sizeof(item *)*c->isize*2,
+ sizeof(item*) * c->isize, CONN_BUFFER_ILIST_POOL);
if (new_list) {
c->isize *= 2;
c->ilist = new_list;
@@ -807,11 +833,6 @@ static void handle_get_cmd(conn_t* c)
*(c->ilist + c->ileft) = it;
item_update(it);
- STATS_LOCK();
- stats.get_hits++;
- stats_size_buckets_get(ITEM_nkey(it) + ITEM_nbytes(it));
- STATS_UNLOCK();
-
// fill out the headers.
rep->status = mcc_res_found;
rep->body_length = htonl((sizeof(*rep) - BINARY_PROTOCOL_REPLY_HEADER_SZ) +
@@ -827,10 +848,6 @@ static void handle_get_cmd(conn_t* c)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
}
} else {
- STATS_LOCK();
- stats.get_misses++;
- STATS_UNLOCK();
-
if (c->u.key_req.cmd == BP_GET_CMD) {
// cache miss on the terminating GET command.
rep->status = mcc_res_notfound;
@@ -857,7 +874,7 @@ static void handle_get_cmd(conn_t* c)
}
-static void handle_update_cmd(conn_t* c)
+static void handle_update_cmd(conn* c)
{
empty_rep_t* rep;
item* it = c->item;
@@ -924,7 +941,7 @@ static void handle_update_cmd(conn_t* c)
}
-static void handle_delete_cmd(conn_t* c)
+static void handle_delete_cmd(conn* c)
{
empty_rep_t* rep;
item* it;
@@ -996,7 +1013,7 @@ static void handle_delete_cmd(conn_t* c)
}
-static void handle_arith_cmd(conn_t* c)
+static void handle_arith_cmd(conn* c)
{
number_rep_t* rep;
item* it;
@@ -1015,9 +1032,11 @@ static void handle_arith_cmd(conn_t* c)
if (it) {
char* out;
uint32_t val;
+
delta = ntohl(c->u.key_number_req.number);
+
out = add_delta(it, (c->u.key_number_req.cmd == BP_INCR_CMD),
- delta, temp, &val);
+ delta, temp, &val, get_request_addr(c));
if (out != temp) {
// some error occured.
@@ -1031,8 +1050,17 @@ static void handle_arith_cmd(conn_t* c)
rep->value = val;
rep->status = mcc_res_stored;
}
+
+ STATS_LOCK();
+ stats.arith_cmds ++;
+ stats.arith_hits ++;
+ STATS_UNLOCK();
} else {
rep->status = mcc_res_notfound;
+
+ STATS_LOCK();
+ stats.arith_cmds ++;
+ STATS_UNLOCK();
}
if (add_iov(c, rep, sizeof(number_rep_t), true)) {
@@ -1048,7 +1076,7 @@ static void handle_arith_cmd(conn_t* c)
}
-static void* allocate_reply_header(conn_t* c, size_t size, void* req)
+static void* allocate_reply_header(conn* c, size_t size, void* req)
{
empty_req_t* srcreq = (empty_req_t*) req;
empty_rep_t* retval;
@@ -1072,19 +1100,7 @@ static void* allocate_reply_header(conn_t* c, size_t size, void* req)
}
-static void release_reply_headers(conn_t* c)
-{
- bp_hdr_pool_t* bph;
- while (c->bp_hdr_pool->next != NULL) {
- bph = c->bp_hdr_pool;
- c->bp_hdr_pool = c->bp_hdr_pool->next;
- free(bph);
- }
-}
-
-
-static void bp_write_err_msg(conn_t* c, const char* str)
-{
+void bp_write_err_msg(conn* c, const char* str) {
string_rep_t* rep;
rep = (string_rep_t*) c->wbuf;
@@ -1106,172 +1122,3 @@ static void bp_write_err_msg(conn_t* c, const char* str)
c->state = conn_bp_error;
}
-
-
-/*
- * read a UDP request.
- * return 0 if there's nothing to read.
- */
-static int bp_try_read_udp(conn_t* c) {
- int res;
-
- c->request_addr_size = sizeof(c->request_addr);
- res = recvfrom(c->sfd, c->rbuf, c->rsize,
- 0, &c->request_addr, &c->request_addr_size);
- if (res > 8) {
- unsigned char *buf = (unsigned char *)c->rbuf;
- STATS_LOCK();
- stats.bytes_read += res;
- STATS_UNLOCK();
-
- /* Beginning of UDP packet is the request ID; save it. */
- c->request_id = buf[0] * 256 + buf[1];
-
- /* If this is a multi-packet request, drop it. */
- if (buf[4] != 0 || buf[5] != 1) {
- bp_write_err_msg(c, "multi-packet request not supported");
- return 0;
- }
-
- /* Don't care about any of the rest of the header. */
- res -= 8;
- memmove(c->rbuf, c->rbuf + 8, res);
-
- c->rbytes += res;
- c->rcurr = c->rbuf;
- return 1;
- }
- return 0;
-}
-
-/*
- * read from network as much as we can, handle buffer overflow and connection
- * close.
- * before reading, move the remaining incomplete fragment of a command
- * (if any) to the beginning of the buffer.
- * return 0 if there's nothing to read on the first read.
- */
-static int bp_try_read_network(conn_t* c) {
- int gotdata = 0;
- int res;
-
- if (c->rcurr != c->rbuf) {
- if (c->rbytes != 0) /* otherwise there's nothing to copy */
- memmove(c->rbuf, c->rcurr, c->rbytes);
- c->rcurr = c->rbuf;
- }
-
- while (1) {
- if (c->rbytes >= c->rsize) {
- char *new_rbuf = realloc(c->rbuf, c->rsize*2);
- if (!new_rbuf) {
- if (settings.verbose > 0) {
- fprintf(stderr, "Couldn't realloc input buffer\n");
- }
- bp_write_err_msg(c, "out of memory");
- return 1;
- }
- c->rcurr = c->rbuf = new_rbuf;
- c->rsize *= 2;
- }
-
- /* unix socket mode doesn't need this, so zeroed out. but why
- * is this done for every command? presumably for UDP
- * mode. */
- if (!settings.socketpath) {
- c->request_addr_size = sizeof(c->request_addr);
- } else {
- c->request_addr_size = 0;
- }
-
- res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
- if (res > 0) {
- STATS_LOCK();
- stats.bytes_read += res;
- STATS_UNLOCK();
- gotdata = 1;
- c->rbytes += res;
- continue;
- }
- if (res == 0) {
- /* connection closed */
- c->state = conn_closing;
- return 1;
- }
- if (res == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) break;
- else return 0;
- }
- }
- return gotdata;
-}
-
-
-/*
- * Transmit the next chunk of data from our list of msgbuf structures.
- *
- * Returns:
- * TRANSMIT_COMPLETE All done writing.
- * TRANSMIT_INCOMPLETE More data remaining to write.
- * TRANSMIT_SOFT_ERROR Can't write any more right now.
- * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
- */
-static int bp_transmit(conn_t* c) {
- int res;
-
- if (c->msgcurr < c->msgused &&
- c->msglist[c->msgcurr].msg_iovlen == 0) {
- /* Finished writing the current msg; advance to the next. */
- c->msgcurr++;
- }
- if (c->msgcurr < c->msgused) {
- struct msghdr *m = &c->msglist[c->msgcurr];
- res = sendmsg(c->sfd, m, 0);
- if (res > 0) {
- STATS_LOCK();
- stats.bytes_written += res;
- STATS_UNLOCK();
-
- /* We've written some of the data. Remove the completed
- iovec entries from the list of pending writes. */
- while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
- res -= m->msg_iov->iov_len;
- m->msg_iovlen--;
- m->msg_iov++;
- }
-
- /* Might have written just part of the last iovec entry;
- adjust it so the next write will do the rest. */
- if (res > 0) {
- m->msg_iov->iov_base += res;
- m->msg_iov->iov_len -= res;
- }
- return TRANSMIT_INCOMPLETE;
- }
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- if (!update_event(c, EV_WRITE | EV_PERSIST)) {
- if (settings.verbose > 0) {
- fprintf(stderr, "Couldn't update event\n");
- }
- c->state = conn_closing;
- return TRANSMIT_HARD_ERROR;
- }
- return TRANSMIT_SOFT_ERROR;
- }
- /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
- we have a real error, on which we close the connection */
- if (settings.verbose > 0) {
- perror("Failed to write, and not due to blocking");
- }
-
- if (c->udp) {
- c->state = conn_bp_header_size_unknown;
- } else {
- c->state = conn_closing;
- }
- return TRANSMIT_HARD_ERROR;
- } else {
- return TRANSMIT_COMPLETE;
- }
-}
-
View
5 binary_sm.h
@@ -27,7 +27,10 @@ struct bp_hdr_pool_s {
#include "memcached.h"
-extern void process_binary_protocol(conn_t* c);
+extern void process_binary_protocol(conn* c);
extern bp_hdr_pool_t* bp_allocate_hdr_pool(bp_hdr_pool_t* next);
+extern void bp_write_err_msg(conn* c, const char* str);
+extern void bp_shrink_hdr_pool(conn* c);
+extern void bp_release_hdr_pool(conn* c);
#endif /* #if !defined(_binary_sm_h_) */
View
471 flat_storage.c
@@ -15,15 +15,13 @@
#if defined(USE_FLAT_ALLOCATOR)
-#define FLAT_STORAGE_STATIC_DECL
+#define FLAT_STORAGE_MODULE
#include "assoc.h"
#include "flat_storage.h"
#include "memcached.h"
#include "stats.h"
-#undef FLAT_STORAGE_STATIC_DECL
-
typedef enum {
COALESCE_NO_PROGRESS, /* no progress was made in coalescing a block */
COALESCE_LARGE_CHUNK_FORMED, /* a large chunk was formed */
@@ -43,23 +41,15 @@ static void unbreak_large_chunk(large_chunk_t* lc, bool mandatory);
static void item_free(item *it);
-
-#if defined(FLAT_STORAGE_TESTS)
-#define STATIC
-#else
-#define STATIC static
-#endif /* #if defined(FLAT_STORAGE_TESTS) */
-
-
/**
* flat storage code
*/
void flat_storage_init(size_t maxbytes) {
intptr_t addr;
- assert(fsi.initialized == false);
- assert(maxbytes % LARGE_CHUNK_SZ == 0);
- assert(maxbytes % FLAT_STORAGE_INCREMENT_DELTA == 0);
+ always_assert(fsi.initialized == false);
+ always_assert(maxbytes % LARGE_CHUNK_SZ == 0);
+ always_assert(maxbytes % FLAT_STORAGE_INCREMENT_DELTA == 0);
fsi.mmap_start = mmap(NULL,
maxbytes + LARGE_CHUNK_SZ - 1, /* alloc extra to
* ensure we can align
@@ -91,62 +81,66 @@ void flat_storage_init(size_t maxbytes) {
/* shouldn't fail here.... right? */
flat_storage_alloc();
- assert(fsi.large_free_list_sz != 0);
+ always_assert(fsi.large_free_list_sz != 0);
fsi.initialized = 1;
}
void item_init(void) {
- /* run a bunch of asserts to make sure that there are no inconsistent or
+ /* run a bunch of always_asserts to make sure that there are no inconsistent or
* inherently wrong implementation details. */
- assert(LARGE_CHUNK_SZ >= SMALL_CHUNK_SZ);
- assert(LARGE_CHUNK_SZ >= CHUNK_ADDRESSING_SZ);
- assert( (LARGE_CHUNK_SZ % CHUNK_ADDRESSING_SZ) == 0 );
- assert( (LARGE_CHUNK_SZ / SMALL_CHUNK_SZ) <=
- (LARGE_CHUNK_SZ / CHUNK_ADDRESSING_SZ) );
- assert(LARGE_TITLE_CHUNK_DATA_SZ >= KEY_MAX_LENGTH);
- assert(SMALL_CHUNKS_PER_LARGE_CHUNK >= 2);
+ always_assert(LARGE_CHUNK_SZ >= SMALL_CHUNK_SZ);
+ always_assert(LARGE_CHUNK_SZ >= CHUNK_ADDRESSING_SZ);
+ always_assert( (LARGE_CHUNK_SZ % CHUNK_ADDRESSING_SZ) == 0 );
+ always_assert( (LARGE_CHUNK_SZ / SMALL_CHUNK_SZ) <=
+ (LARGE_CHUNK_SZ / CHUNK_ADDRESSING_SZ) );
+ always_assert(LARGE_TITLE_CHUNK_DATA_SZ >= KEY_MAX_LENGTH);
+ always_assert(SMALL_CHUNKS_PER_LARGE_CHUNK >= 2);
/* make sure that the size of the structure is what they're supposed to be. */
- assert(sizeof(large_chunk_t) == LARGE_CHUNK_SZ);
- assert(sizeof(large_title_chunk_t) + LARGE_CHUNK_TAIL_SZ == LARGE_CHUNK_SZ);
- assert(sizeof(large_body_chunk_t) + LARGE_CHUNK_TAIL_SZ == LARGE_CHUNK_SZ);
- assert(sizeof(small_chunk_t) == SMALL_CHUNK_SZ);
- assert(sizeof(small_title_chunk_t) + SMALL_CHUNK_TAIL_SZ == SMALL_CHUNK_SZ);
- assert(sizeof(small_body_chunk_t) + SMALL_CHUNK_TAIL_SZ == SMALL_CHUNK_SZ);
+ always_assert(sizeof(large_chunk_t) == LARGE_CHUNK_SZ);
+ always_assert(sizeof(large_title_chunk_t) + LARGE_CHUNK_TAIL_SZ == LARGE_CHUNK_SZ);
+ always_assert(sizeof(large_body_chunk_t) + LARGE_CHUNK_TAIL_SZ == LARGE_CHUNK_SZ);
+ always_assert(sizeof(small_chunk_t) == SMALL_CHUNK_SZ);
+ always_assert(sizeof(small_title_chunk_t) + SMALL_CHUNK_TAIL_SZ == SMALL_CHUNK_SZ);
+ always_assert(sizeof(small_body_chunk_t) + SMALL_CHUNK_TAIL_SZ == SMALL_CHUNK_SZ);
/* make sure that the fields line up in item */
- assert( &(((item*) 0)->empty_header.h_next) == &(((item*) 0)->large_title.h_next) );
- assert( &(((item*) 0)->empty_header.h_next) == &(((item*) 0)->small_title.h_next) );
- assert( &(((item*) 0)->empty_header.next) == &(((item*) 0)->large_title.next) );
- assert( &(((item*) 0)->empty_header.next) == &(((item*) 0)->small_title.next) );
- assert( &(((item*) 0)->empty_header.prev) == &(((item*) 0)->large_title.prev) );
- assert( &(((item*) 0)->empty_header.prev) == &(((item*) 0)->small_title.prev) );
- assert( &(((item*) 0)->empty_header.next_chunk) == &(((item*) 0)->large_title.next_chunk) );
- assert( &(((item*) 0)->empty_header.next_chunk) == &(((item*) 0)->small_title.next_chunk) );
- assert( &(((item*) 0)->empty_header.time) == &(((item*) 0)->large_title.time) );
- assert( &(((item*) 0)->empty_header.time) == &(((item*) 0)->small_title.time) );
- assert( &(((item*) 0)->empty_header.exptime) == &(((item*) 0)->large_title.exptime) );
- assert( &(((item*) 0)->empty_header.exptime) == &(((item*) 0)->small_title.exptime) );
- assert( &(((item*) 0)->empty_header.nbytes) == &(((item*) 0)->large_title.nbytes) );
- assert( &(((item*) 0)->empty_header.nbytes) == &(((item*) 0)->small_title.nbytes) );
- assert( &(((item*) 0)->empty_header.refcount) == &(((item*) 0)->large_title.refcount) );
- assert( &(((item*) 0)->empty_header.refcount) == &(((item*) 0)->small_title.refcount) );
- assert( &(((item*) 0)->empty_header.nkey) == &(((item*) 0)->large_title.nkey) );
- assert( &(((item*) 0)->empty_header.nkey) == &(((item*) 0)->small_title.nkey) );
+ always_assert( &(((item*) 0)->empty_header.h_next) == &(((item*) 0)->large_title.h_next) );
+ always_assert( &(((item*) 0)->empty_header.h_next) == &(((item*) 0)->small_title.h_next) );
+ always_assert( &(((item*) 0)->empty_header.next) == &(((item*) 0)->large_title.next) );
+ always_assert( &(((item*) 0)->empty_header.next) == &(((item*) 0)->small_title.next) );
+ always_assert( &(((item*) 0)->empty_header.prev) == &(((item*) 0)->large_title.prev) );
+ always_assert( &(((item*) 0)->empty_header.prev) == &(((item*) 0)->small_title.prev) );
+ always_assert( &(((item*) 0)->empty_header.next_chunk) == &(((item*) 0)->large_title.next_chunk) );
+ always_assert( &(((item*) 0)->empty_header.next_chunk) == &(((item*) 0)->small_title.next_chunk) );
+ always_assert( &(((item*) 0)->empty_header.time) == &(((item*) 0)->large_title.time) );
+ always_assert( &(((item*) 0)->empty_header.time) == &(((item*) 0)->small_title.time) );
+ always_assert( &(((item*) 0)->empty_header.exptime) == &(((item*) 0)->large_title.exptime) );
+ always_assert( &(((item*) 0)->empty_header.exptime) == &(((item*) 0)->small_title.exptime) );
+ always_assert( &(((item*) 0)->empty_header.nbytes) == &(((item*) 0)->large_title.nbytes) );
+ always_assert( &(((item*) 0)->empty_header.nbytes) == &(((item*) 0)->small_title.nbytes) );
+ always_assert( &(((item*) 0)->empty_header.refcount) == &(((item*) 0)->large_title.refcount) );
+ always_assert( &(((item*) 0)->empty_header.refcount) == &(((item*) 0)->small_title.refcount) );
+ always_assert( &(((item*) 0)->empty_header.nkey) == &(((item*) 0)->large_title.nkey) );
+ always_assert( &(((item*) 0)->empty_header.nkey) == &(((item*) 0)->small_title.nkey) );
+
+ /* make sure that the casting functions in flat_storage.h are sane. */
+ always_assert( (void*) &(((item*) 0)->small_title) == ((void*) 0));
+ always_assert( (void*) &(((item*) 0)->large_title) == ((void*) 0));
/* ensure that our increment delta is large enough to accomodate one
* instance of the largest item we support. */
- assert( (FLAT_STORAGE_INCREMENT_DELTA / LARGE_CHUNK_SZ) * MIN_LARGE_CHUNK_CAPACITY >=
+ always_assert( (FLAT_STORAGE_INCREMENT_DELTA / LARGE_CHUNK_SZ) * MIN_LARGE_CHUNK_CAPACITY >=
MAX_ITEM_SIZE );
/* ensure that the first piece of a broken large chunk is the same address
* as the large_title_chunk / large_body_chunk. this is so that the
* chunkptrs map correctly. */
- assert( (intptr_t) &(((large_chunk_t*) 0)->lc_broken.lbc[0]) ==
+ always_assert( (intptr_t) &(((large_chunk_t*) 0)->lc_broken.lbc[0]) ==
(intptr_t) &(((large_chunk_t*) 0)->lc_title) );
- assert(FLAT_STORAGE_INCREMENT_DELTA % LARGE_CHUNK_SZ == 0);
+ always_assert(FLAT_STORAGE_INCREMENT_DELTA % LARGE_CHUNK_SZ == 0);
}
@@ -160,6 +154,7 @@ STATIC bool flat_storage_alloc(void) {
}
initialize_end = fsi.uninitialized_start + (FLAT_STORAGE_INCREMENT_DELTA / LARGE_CHUNK_SZ);
+ stats.item_storage_allocated += FLAT_STORAGE_INCREMENT_DELTA;
/* initialize the large chunks. */
for (;
fsi.uninitialized_start < initialize_end;
@@ -331,7 +326,6 @@ static void break_large_chunk(chunk_t* chunk) {
}
chunk->lc.lc_broken.small_chunks_allocated = 0;
- chunk->lc.lc_broken.refcount = 0;
/* STATS: update */
fsi.stats.large_broken_chunks ++;
@@ -435,7 +429,7 @@ STATIC item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start) {
/* it is silly that we have to make this typecast, but
* there's no other way to make this assignment without
* a cast, even though it ought to be possible. */
- item* small_item = (item*) small_title;
+ item* small_item = get_item_from_small_title(small_title);
return small_item;
}
@@ -471,7 +465,7 @@ STATIC item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start) {
/* it is silly that we have to make this typecast, but
* there's no other way to make this assignment without
* a cast, even though it ought to be possible. */
- item* large_item = (item*) large_title;
+ item* large_item = get_item_from_large_title(large_title);
return large_item;
}
@@ -490,6 +484,42 @@ STATIC item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start) {
}
+static bool small_chunk_referenced(const small_chunk_t* sc) {
+ assert((sc->flags & SMALL_CHUNK_INITIALIZED) != 0);
+ if (sc->flags & SMALL_CHUNK_FREE) {
+ return false; /* free nodes count as refcount = 0. */
+ } else {
+ for (;
+ (sc->flags & SMALL_CHUNK_TITLE) == 0;
+ sc = &get_chunk_address(sc->sc_body.prev_chunk)->sc) {
+ assert((sc->flags & (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED)) ==
+ (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED));
+ }
+
+ assert((sc->flags & (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED | SMALL_CHUNK_TITLE)) ==
+ (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED | SMALL_CHUNK_TITLE));
+ return (sc->sc_title.refcount == 0) ? false : true;
+ }
+}
+
+
+static bool large_broken_chunk_referenced(const large_broken_chunk_t* lc) {
+ unsigned counter;
+
+ for (counter = 0;
+ counter < SMALL_CHUNKS_PER_LARGE_CHUNK;
+ counter ++) {
+ const small_chunk_t* iter = &(lc->lbc[counter]);
+
+ if (small_chunk_referenced(iter)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
/*
* if search_depth is zero, then the search depth is not limited. if the search
* depth is non-zero, constrain search to the first search_depth items on the
@@ -497,7 +527,7 @@ STATIC item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start) {
*/
static large_chunk_t* find_unreferenced_broken_chunk(size_t search_depth) {
small_chunk_t* small_chunk_iter;
- size_t counter;
+ unsigned counter;
for (counter = 0,
small_chunk_iter = fsi.small_free_list;
@@ -507,12 +537,12 @@ static large_chunk_t* find_unreferenced_broken_chunk(size_t search_depth) {
large_chunk_t* lc = get_parent_chunk(small_chunk_iter);
large_broken_chunk_t* pc = &(lc->lc_broken);
- if (pc->refcount == 0) {
+ if (large_broken_chunk_referenced(pc) == false) {
return lc;
}
}
- return 0;
+ return NULL;
}
@@ -565,21 +595,10 @@ static coalesce_progress_t coalesce_free_small_chunks(rel_time_t large_lru_item_
}
lc = find_unreferenced_broken_chunk(0);
- small_lru_item = NULL;
-
- for (i = 0,
- small_lru_item = get_lru_item(SMALL_CHUNK, &small_lru_item->small_title);
- i < SMALL_LRU_SEARCH_DEPTH && small_lru_item != NULL;
- i ++) {
- /* does this chunk have refcount == 0? if so, free use it. */
- large_broken_chunk_t* lc = &(get_parent_chunk((small_chunk_t*) small_lru_item)->lc_broken);
- if (lc->refcount == 0) {
- break;
- }
- }
- if (i == SMALL_LRU_SEARCH_DEPTH) {
- return ( small_free_list_sz_pre == fsi.small_free_list_sz ) ?
- COALESCE_NO_PROGRESS : COALESCE_FORWARD_PROGRESS;
+ if (lc == NULL) {
+ /* we don't want to be stuck in an infinite loop if we can't find a
+ * large unreferenced chunk, so just report no progress. */
+ return COALESCE_NO_PROGRESS;
}
/* STATS: update */
@@ -616,6 +635,9 @@ static coalesce_progress_t coalesce_free_small_chunks(rel_time_t large_lru_item_
for (i = 0; i < SMALL_CHUNKS_PER_LARGE_CHUNK; i ++) {
small_chunk_t* iter = &(lc->lc_broken.lbc[i]);
chunk_t* old_chunk = (chunk_t*) iter;
+ (void) old_chunk; /* when optimizing, old_chunk is not
+ * used. this is to quiesce the
+ * compiler warning. */
assert( (iter->flags & SMALL_CHUNK_INITIALIZED) ==
SMALL_CHUNK_INITIALIZED);
@@ -637,55 +659,59 @@ static coalesce_progress_t coalesce_free_small_chunks(rel_time_t large_lru_item_
chunk_t* next, * prev;
small_chunk_t* next_chunk;
- /* another meaningless cast that we're forced to do */
- new_it = (item*) &(replacement->sc_title);
+ new_it = get_item_from_small_title(&(replacement->sc_title));
/* edit the forward and backward links. */
if (replacement->sc_title.next != NULL_CHUNKPTR) {
next = get_chunk_address(replacement->sc_title.next);
assert(next->sc.sc_title.prev == get_chunkptr(old_chunk));
next->sc.sc_title.prev = replacement_chunkptr;
- } else if (fsi.small_lru_tail == (item*) old_chunk) {
- fsi.small_lru_tail = (item*) replacement;
+ } else {
+ assert(fsi.small_lru_tail == get_item_from_small_title(&old_chunk->sc.sc_title));
+ fsi.small_lru_tail = get_item_from_small_title(&replacement->sc_title);
}
if (replacement->sc_title.prev != NULL_CHUNKPTR) {
prev = get_chunk_address(replacement->sc_title.prev);
assert(prev->sc.sc_title.next == get_chunkptr(old_chunk));
prev->sc.sc_title.next = replacement_chunkptr;
- } else if (fsi.small_lru_head == (item*) old_chunk) {
- fsi.small_lru_head = (item*) replacement;
+ } else {
+ assert(fsi.small_lru_head == get_item_from_small_title(&old_chunk->sc.sc_title));
+ fsi.small_lru_head = get_item_from_small_title(&replacement->sc_title);
}
- /* edit the next_chunk's prev_next_chunk link */
+ /* edit the next_chunk's prev_chunk link */
next_chunk = &(get_chunk_address(replacement->sc_title.next_chunk))->sc;
if (next_chunk != NULL) {
- assert(next_chunk->sc_body.prev_next_chunk == &iter->sc_title.next_chunk);
- next_chunk->sc_body.prev_next_chunk = &replacement->sc_title.next_chunk;
+ assert(next_chunk->sc_body.prev_chunk == get_chunkptr(old_chunk));
+ next_chunk->sc_body.prev_chunk = replacement_chunkptr;
}
+ /* update flags */
+ replacement->flags |= (SMALL_CHUNK_USED | SMALL_CHUNK_TITLE);
+
/* do the replacement in the mapping. */
assoc_update(new_it);
-
- /* update flags */
- assert(replacement->flags == (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED | SMALL_CHUNK_TITLE));
} else {
/* body block. this is more straightforward */
- chunkptr_t new_chunk_ptr = get_chunkptr(_replacement);
- small_chunk_t* next_chunk;
+ small_chunk_t* prev_chunk = &(get_chunk_address(replacement->sc_body.prev_chunk))->sc;
+ small_chunk_t* next_chunk = &(get_chunk_address(replacement->sc_body.next_chunk))->sc;;
/* update the previous block's next pointer */
- *(replacement->sc_body.prev_next_chunk) = new_chunk_ptr;
+ if (prev_chunk->flags & SMALL_CHUNK_TITLE) {
+ prev_chunk->sc_title.next_chunk = replacement_chunkptr;
+ } else {
+ prev_chunk->sc_body.next_chunk = replacement_chunkptr;
+ }
- /* edit the next_chunk's prev_next_chunk link */
- next_chunk = &(get_chunk_address(replacement->sc_body.next_chunk))->sc;
+ /* edit the next_chunk's prev_chunk link */
if (next_chunk != NULL) {
- assert(next_chunk->sc_body.prev_next_chunk == &iter->sc_body.next_chunk);
- next_chunk->sc_body.prev_next_chunk = &replacement->sc_body.next_chunk;
+ assert(next_chunk->sc_body.prev_chunk == get_chunkptr(old_chunk));
+ next_chunk->sc_body.prev_chunk = replacement_chunkptr;
}
/* update flags */
- assert(replacement->flags == (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED));
+ replacement->flags |= (SMALL_CHUNK_USED);
}
/* don't push this onto the free list. if we do, we'll immediately
@@ -861,7 +887,8 @@ static bool flat_storage_lru_evict(chunk_type_t chunk_type, size_t nchunks) {
/* allocates one item capable of storing a key of size nkey and a value field of
* size nbytes. stores the key, flags, and exptime. the value field is not
* initialized. if there is insufficient memory, NULL is returned. */
-item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const size_t nbytes) {
+item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime,
+ const size_t nbytes, const struct in_addr addr) {
if (is_large_chunk(nkey, nbytes)) {
/* allocate a large chunk */
@@ -878,6 +905,7 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
large_title_chunk_t* title;
large_body_chunk_t* body;
chunkptr_t* prev_next;
+ size_t write_offset = nkey + nbytes;
while (fsi.large_free_list_sz < needed) {
assert(prev_free != fsi.large_free_list_sz);
@@ -911,7 +939,7 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
temp->lc.flags |= (LARGE_CHUNK_USED | LARGE_CHUNK_TITLE);
assert(temp != NULL);
title = &(temp->lc.lc_title);
- title->h_next = NULL;
+ title->h_next = NULL_ITEM_PTR;
title->next = title->prev = title->next_chunk = NULL_CHUNKPTR;
title->refcount = 1; /* the caller will have a reference */
title->it_flags = ITEM_VALID;
@@ -922,7 +950,17 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
title->flags = flags;
prev_next = &title->next_chunk;
+ if (needed == 1) {
+ /* ip address might be written on this block */
+ assert(write_offset <= LARGE_TITLE_CHUNK_DATA_SZ);
+ if (LARGE_TITLE_CHUNK_DATA_SZ - write_offset >= sizeof(addr)) {
+ memcpy(&(title->data[write_offset]), &addr, sizeof(addr));
+ title->it_flags |= ITEM_HAS_IP_ADDRESS;
+ }
+ }
+
needed --;
+ write_offset -= LARGE_TITLE_CHUNK_DATA_SZ;
/* STATS: update */
fsi.stats.large_title_chunks ++;
@@ -936,11 +974,21 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
*(prev_next) = get_chunkptr(temp);
prev_next = &body->next_chunk;
+ if (needed == 1) {
+ /* ip address might be written on this block */
+ assert(write_offset <= LARGE_BODY_CHUNK_DATA_SZ);
+ if (LARGE_BODY_CHUNK_DATA_SZ - write_offset >= sizeof(addr)) {
+ memcpy(&(temp->lc.lc_body.data[write_offset]), &addr, sizeof(addr));
+ title->it_flags |= ITEM_HAS_IP_ADDRESS;
+ }
+ }
+
needed --;
+ write_offset -= LARGE_BODY_CHUNK_DATA_SZ;
}
*(prev_next) = NULL_CHUNKPTR;
- return (item*) title;
+ return get_item_from_large_title(title);
} else {
/* allocate a small chunk */
@@ -956,8 +1004,9 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
chunk_t* temp;
small_title_chunk_t* title;
small_body_chunk_t* body;
+ chunkptr_t prev;
chunkptr_t* prev_next;
- large_broken_chunk_t* pc;
+ size_t write_offset = nkey + nbytes;
while (fsi.small_free_list_sz < needed) {
assert(small_prev_free != fsi.small_free_list_sz ||
@@ -991,39 +1040,62 @@ item* do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
temp->sc.flags |= (SMALL_CHUNK_USED | SMALL_CHUNK_TITLE);
assert(temp != NULL);
title = &(temp->sc.sc_title);
- title->h_next = NULL;
+ title->h_next = NULL_ITEM_PTR;
title->next = title->prev = title->next_chunk = NULL_CHUNKPTR;
title->refcount = 1; /* the caller will have a reference */
- pc = &(get_parent_chunk(&temp->sc)->lc_broken);
- pc->refcount ++;
title->it_flags = ITEM_VALID;
title->nkey = nkey;
title->nbytes = nbytes;
memcpy(title->data, key, nkey);
title->exptime = exptime;
title->flags = flags;
+ prev = get_chunkptr(temp);
prev_next = &title->next_chunk;
+ if (needed == 1) {
+ /* ip address might be written on this block */
+ assert(write_offset <= SMALL_TITLE_CHUNK_DATA_SZ);
+ if (SMALL_TITLE_CHUNK_DATA_SZ - write_offset >= sizeof(addr)) {
+ memcpy(&(title->data[write_offset]), &addr, sizeof(addr));
+ title->it_flags |= ITEM_HAS_IP_ADDRESS;
+ }
+ }
+
needed --;
+ write_offset -= SMALL_TITLE_CHUNK_DATA_SZ;
/* STATS: update */
fsi.stats.small_title_chunks ++;
fsi.stats.small_body_chunks += needed;
while (needed > 0) {
+ chunkptr_t current_chunkptr;
temp = free_list_pop(SMALL_CHUNK);
temp->sc.flags |= SMALL_CHUNK_USED;
assert(temp != NULL);
+
+ current_chunkptr = get_chunkptr(temp);
body = &(temp->sc.sc_body);
- *(prev_next) = get_chunkptr(temp);
- body->prev_next_chunk = prev_next;
+ *(prev_next) = current_chunkptr;
+ body->prev_chunk = prev;
prev_next = &body->next_chunk;
+ prev = current_chunkptr;
+
+ if (needed == 1) {
+ /* ip address might be written on this block */
+ assert(write_offset <= SMALL_BODY_CHUNK_DATA_SZ);
+ if (SMALL_BODY_CHUNK_DATA_SZ - write_offset >= sizeof(addr)) {
+ memcpy(&(temp->sc.sc_body.data[write_offset]), &addr, sizeof(addr));
+ title->it_flags |= ITEM_HAS_IP_ADDRESS;
+ }
+ }
needed --;
+ write_offset -= SMALL_BODY_CHUNK_DATA_SZ;
}
*(prev_next) = NULL_CHUNKPTR;
- return (item*) title;
+ return get_item_from_small_title(title);
}
}
@@ -1038,11 +1110,11 @@ static void item_free(item *it) {
#endif /* #if !defined(NDEBUG) */
bool is_large_chunks = is_item_large_chunk(it);
- assert(it->empty_header.it_flags == ITEM_VALID);
+ assert((it->empty_header.it_flags & ~(ITEM_HAS_IP_ADDRESS))== ITEM_VALID);
assert(it->empty_header.refcount == 0);
assert(it->empty_header.next == NULL_CHUNKPTR);
assert(it->empty_header.prev == NULL_CHUNKPTR);
- assert(it->empty_header.h_next == NULL);
+ assert(it->empty_header.h_next == NULL_ITEM_PTR);
/* find all the chunks and liberate them. */
next_chunk = it->empty_header.next_chunk;
@@ -1130,6 +1202,13 @@ bool item_size_ok(const size_t nkey, const int flags, const int nbytes) {
}
+bool item_need_realloc(const item* it,
+ const size_t new_nkey, const int new_flags, const size_t new_nbytes) {
+ return (is_item_large_chunk(it) != is_large_chunk(new_nkey, new_nbytes) ||
+ chunks_in_item(it) != chunks_needed(new_nkey, new_nbytes));
+}
+
+
static void item_link_q(item *it) {
assert(it->empty_header.next == NULL_CHUNKPTR);
assert(it->empty_header.prev == NULL_CHUNKPTR);
@@ -1164,8 +1243,8 @@ static void item_unlink_q(item* it) {
if (is_item_large_chunk(it)) {
item* next, * prev;
- next = (item*) get_chunk_address(it->empty_header.next);
- prev = (item*) get_chunk_address(it->empty_header.prev);
+ next = get_item_from_chunk(get_chunk_address(it->empty_header.next));
+ prev = get_item_from_chunk(get_chunk_address(it->empty_header.prev));
if (it == fsi.large_lru_head) {
assert(prev == NULL);
@@ -1185,8 +1264,8 @@ static void item_unlink_q(item* it) {
} else {
item* next, * prev;
- next = (item*) get_chunk_address(it->empty_header.next);
- prev = (item*) get_chunk_address(it->empty_header.prev);
+ next = get_item_from_chunk(get_chunk_address(it->empty_header.next));
+ prev = get_item_from_chunk(get_chunk_address(it->empty_header.prev));
if (it == fsi.small_lru_head) {
assert(prev == NULL);
@@ -1222,7 +1301,7 @@ int do_item_link(item* it) {
assoc_insert(it);
STATS_LOCK();
- stats.curr_bytes += ITEM_ntotal(it);
+ stats.item_total_size += ITEM_nkey(it) + ITEM_nbytes(it);
stats.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();
@@ -1238,7 +1317,7 @@ void do_item_unlink(item* it, long flags) {
if (it->empty_header.it_flags & ITEM_LINKED) {
it->empty_header.it_flags &= ~(ITEM_LINKED);
STATS_LOCK();
- stats.curr_bytes -= ITEM_ntotal(it);
+ stats.item_total_size -= ITEM_nkey(it) + ITEM_nbytes(it);
stats.curr_items -= 1;
if (flags & UNLINK_IS_EVICT) {
stats_size_buckets_evict(ITEM_nkey(it) + ITEM_nbytes(it));
@@ -1247,7 +1326,8 @@ void do_item_unlink(item* it, long flags) {
if (settings.detail_enabled) {
stats_prefix_record_removal(ITEM_key(it), ITEM_ntotal(it), it->empty_header.time, flags);
}
- assoc_delete(ITEM_key(it), ITEM_nkey(it));
+ assoc_delete(ITEM_key(it), ITEM_nkey(it), ITEM_PTR(it));
+ it->empty_header.h_next = NULL_ITEM_PTR;
item_unlink_q(it);
if (it->empty_header.refcount == 0) {
item_free(it);
@@ -1263,11 +1343,6 @@ void do_item_deref(item* it) {
/* may not be ITEM_LINKED because the unlink may have preceeded the remove. */
if (it->empty_header.refcount != 0) {
it->empty_header.refcount --;
- if (is_item_large_chunk(it) == false) {
- /* need to decrement the refcount in the parent block */
- large_broken_chunk_t* pc = &(get_parent_chunk((small_chunk_t*) it)->lc_broken);
- pc->refcount --;
- }
}
assert((it->empty_header.it_flags & ITEM_DELETED) == 0 ||
it->empty_header.refcount != 0);
@@ -1292,16 +1367,30 @@ void do_item_update(item* it) {
}
int do_item_replace(item* it, item* new_it) {
- assert(it->empty_header.it_flags & ITEM_VALID);
- assert(new_it->empty_header.it_flags & ITEM_VALID);
+ int retval;
- do_item_unlink(it, UNLINK_NORMAL);
- return do_item_link(new_it);
+ // if item is already unlinked by another thread, we'd get the
+ // current one
+ if ((it->empty_header.it_flags & ITEM_LINKED) == 0) {
+ it = assoc_find(ITEM_key(it), ITEM_nkey(it));
+ }
+
+ if (it != NULL) {
+ // though there might not be a current one if the other thread did a
+ // delete.
+ assert((it->empty_header.it_flags & (ITEM_VALID | ITEM_LINKED)) ==
+ (ITEM_VALID | ITEM_LINKED));
+ do_item_unlink(it, UNLINK_NORMAL);
+ }
+
+ assert(new_it->empty_header.it_flags & ITEM_VALID);
+ retval = do_item_link(new_it);
+ return retval;
}
char* do_item_cachedump(const chunk_type_t type, const unsigned int limit, unsigned int* bytes) {
- unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
+ unsigned int memlimit = ITEM_CACHEDUMP_LIMIT; /* 2MB max response size */
char *buffer;
unsigned int bufcurr;
item *it;
@@ -1329,7 +1418,7 @@ char* do_item_cachedump(const chunk_type_t type, const unsigned int limit, unsig
strcpy(buffer + bufcurr, temp);
bufcurr += len;
shown++;
- it = (item*) get_chunk_address(it->empty_header.next);
+ it = get_item_from_chunk(get_chunk_address(it->empty_header.next));
}
memcpy(buffer + bufcurr, "END\r\n", 6);
@@ -1344,7 +1433,7 @@ char* do_item_stats_sizes(int* bytes) {
const size_t max_item_size = sizeof(large_chunk_t) + KEY_MAX_LENGTH + MAX_ITEM_SIZE;
const int num_buckets = (max_item_size + 32 - 1) / 32; /* max object, divided into 32 bytes size buckets */
unsigned int *histogram = (unsigned int *)malloc((size_t)num_buckets * sizeof(int));
- char *buf = (char *)malloc(2 * 1024 * 1024); /* 2MB max response size */
+ char *buf = (char *)malloc(ITEM_STATS_SIZES); /* 2MB max response size */
int i;
if (histogram == 0 || buf == 0) {
@@ -1362,7 +1451,7 @@ char* do_item_stats_sizes(int* bytes) {
int bucket = ntotal / 32;
if ((ntotal % 32) != 0) bucket++;
if (bucket < num_buckets) histogram[bucket]++;
- iter = (item*) get_chunk_address(iter->small_title.next);
+ iter = get_item_from_chunk(get_chunk_address(iter->small_title.next));
}
iter = fsi.large_lru_head;
@@ -1371,7 +1460,7 @@ char* do_item_stats_sizes(int* bytes) {
int bucket = ntotal / 32;
if ((ntotal % 32) != 0) bucket++;
if (bucket < num_buckets) histogram[bucket]++;
- iter = (item*) get_chunk_address(iter->large_title.next);
+ iter = get_item_from_chunk(get_chunk_address(iter->large_title.next));
}
/* write the buffer */
@@ -1396,7 +1485,21 @@ void do_item_flush_expired(void) {
iter != NULL;
iter = next) {
if (iter->small_title.time >= settings.oldest_live) {
- next = (item*) get_chunk_address(iter->small_title.next);
+ next = get_item_from_chunk(get_chunk_address(iter->small_title.next));
+ assert( (iter->empty_header.it_flags & (ITEM_VALID | ITEM_LINKED)) ==
+ (ITEM_VALID | ITEM_LINKED) );
+ do_item_unlink(iter, UNLINK_NORMAL);
+ } else {
+ /* We've hit the first old item. Continue to the next queue. */
+ break;
+ }
+ }
+
+ for (iter = fsi.large_lru_head;
+ iter != NULL;
+ iter = next) {
+ if (iter->large_title.time >= settings.oldest_live) {
+ next = get_item_from_chunk(get_chunk_address(iter->large_title.next));
assert( (iter->empty_header.it_flags & (ITEM_VALID | ITEM_LINKED)) ==
(ITEM_VALID | ITEM_LINKED) );
do_item_unlink(iter, UNLINK_NORMAL);
@@ -1437,11 +1540,6 @@ item* do_item_get_notedeleted(const char* key, const size_t nkey, bool* delete_l
if (it != NULL) {
it->empty_header.refcount ++;
- if (is_item_large_chunk(it) == false) {
- /* need to decrement the refcount in the parent block */
- large_broken_chunk_t* pc = &(get_parent_chunk((small_chunk_t*) it)->lc_broken);
- pc->refcount ++;
- }
}
return it;
}
@@ -1451,11 +1549,6 @@ item* do_item_get_nocheck(const char* key, const size_t nkey) {
item *it = assoc_find(key, nkey);
if (it) {
it->empty_header.refcount ++;
- if (is_item_large_chunk(it) == false) {
- /* need to decrement the refcount in the parent block */
- large_broken_chunk_t* pc = &(get_parent_chunk((small_chunk_t*) it)->lc_broken);
- pc->refcount ++;
- }
}
return it;
}
@@ -1469,53 +1562,83 @@ bool item_delete_lock_over(item* it) {
char* do_flat_allocator_stats(size_t* result_size) {
- size_t left = 2048, written, i;
- char* buffer = malloc(left), * now;
+ size_t bufsize = 2048, offset = 0, i;
+ char* buffer = malloc(bufsize);
+ char terminator[] = "END\r\n";
+ item* small_lru_item = NULL, * large_lru_item = NULL;
+ rel_time_t * small_lru_item_timestamp, * large_lru_item_timestamp;
+ rel_time_t oldest_item_lifetime;
if (buffer == NULL) {
*result_size = 0;
return NULL;
}
- now = buffer;
-
- written = snprintf(now, left,
- "STAT large_title_chunks %" PRINTF_INT64_MODIFIER "u\n"
- "STAT large_body_chunks %" PRINTF_INT64_MODIFIER "u\n"
- "STAT large_broken_chunks %" PRINTF_INT64_MODIFIER "u\n"
- "STAT small_title_chunks %" PRINTF_INT64_MODIFIER "u\n"
- "STAT small_body_chunks %" PRINTF_INT64_MODIFIER "u\n",
- fsi.stats.large_title_chunks,
- fsi.stats.large_body_chunks,
- fsi.stats.large_broken_chunks,
- fsi.stats.small_title_chunks,
- fsi.stats.small_body_chunks);
- now += written;
- left -= written;
- for (i = 0; i < SMALL_CHUNKS_PER_LARGE_CHUNK + 1; i ++) {
- written = snprintf(now, left, "STAT broken_chunk_histogram %lu %" PRINTF_INT64_MODIFIER "u\n", i, fsi.stats.broken_chunk_histogram[i]);
-
- now += written;
- left -= written;
+ /* get the LRU items */
+ small_lru_item = get_lru_item(SMALL_CHUNK, NULL);
+ large_lru_item = get_lru_item(LARGE_CHUNK, NULL);
+
+ /* it is possible that get_lru_item(..) will return NULL, but we'll
+ * never use these pointers without checking the return values. this
+ * is just an elaborate way to circumvent the compiler's warning that
+ * large_lru_item_timestamp may be used uninitialized. */
+ small_lru_item_timestamp = &small_lru_item->small_title.time;
+ large_lru_item_timestamp = &large_lru_item->large_title.time;
+
+ /* have the items, figure out which item to release. */
+ if (small_lru_item == NULL &&
+ large_lru_item == NULL) {
+ oldest_item_lifetime = 0;
+ } else if (small_lru_item == NULL && large_lru_item != NULL) {
+ oldest_item_lifetime = current_time - *large_lru_item_timestamp;
+ } else if (small_lru_item != NULL && large_lru_item == NULL) {
+ oldest_item_lifetime = current_time - *small_lru_item_timestamp;
+ } else {
+ /* tie goes to large items, because they're easier to deal with. in
+ * any case, this is extraordinarily unlikely. */
+ if (*small_lru_item_timestamp < *large_lru_item_timestamp) {
+ oldest_item_lifetime = current_time - *small_lru_item_timestamp;
+ } else {
+ oldest_item_lifetime = current_time - *large_lru_item_timestamp;
+ }
}
- written = snprintf(now, left,
- "STAT break_events %" PRINTF_INT64_MODIFIER "u\n"
- "STAT unbreak_events %" PRINTF_INT64_MODIFIER "u\n"
- "STAT migrates %" PRINTF_INT64_MODIFIER "u\n"
- "STAT unused_memory %" PRINTF_INT64_MODIFIER "u\n"
- "STAT large_free_list_sz %" PRINTF_INT64_MODIFIER "u\n"
- "STAT small_free_list_sz %" PRINTF_INT64_MODIFIER "u\n",
- fsi.stats.break_events,
- fsi.stats.unbreak_events,
- fsi.stats.migrates,
- fsi.unused_memory,
- fsi.large_free_list_sz,
- fsi.small_free_list_sz);
- now += written;
- left -= written;
-
- *result_size = (now - buffer);
+ offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
+ "STAT large_title_chunks %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT large_body_chunks %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT large_broken_chunks %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT small_title_chunks %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT small_body_chunks %" PRINTF_INT64_MODIFIER "u\n",
+ fsi.stats.large_title_chunks,
+ fsi.stats.large_body_chunks,
+ fsi.stats.large_broken_chunks,
+ fsi.stats.small_title_chunks,
+ fsi.stats.small_body_chunks);
+
+ for (i = 0; i < SMALL_CHUNKS_PER_LARGE_CHUNK + 1; i ++) {
+ offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
+ "STAT broken_chunk_histogram %lu %" PRINTF_INT64_MODIFIER "u\n", i, fsi.stats.broken_chunk_histogram[i]);
+ }
+
+ offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
+ "STAT break_events %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT unbreak_events %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT migrates %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT unused_memory %lu\n"
+ "STAT large_free_list_sz %lu\n"
+ "STAT small_free_list_sz %lu\n"
+ "STAT oldest_item_lifetime %lus\n",
+ fsi.stats.break_events,
+ fsi.stats.unbreak_events,
+ fsi.stats.migrates,
+ fsi.unused_memory,
+ fsi.large_free_list_sz,
+ fsi.small_free_list_sz,
+ oldest_item_lifetime);
+
+ offset = append_to_buffer(buffer, bufsize, offset, 0, terminator);
+
+ *result_size = offset;
return buffer;
}
View
88 flat_storage.h
@@ -95,16 +95,29 @@
#if defined(FLAT_STORAGE_TESTS)
#define STATIC
+#if defined(FLAT_STORAGE_MODULE)
+#define STATIC_DECL(decl) decl
+#else
#define STATIC_DECL(decl) extern decl
+#endif /* #if defined(FLAT_STORAGE_MODULE) */
+
#else
#define STATIC static
-#if defined(FLAT_STORAGE_STATIC_DECL)
+#if defined(FLAT_STORAGE_MODULE)
#define STATIC_DECL(decl) static decl
-#elif !defined(STATIC_DECL)
+#else
#define STATIC_DECL(decl)
-#endif /* #if !defined(STATIC_DECL) */
+#endif /* #if defined(FLAT_STORAGE_MODULE) */
#endif /* #if defined(FLAT_STORAGE_TESTS) */
+/* create an always_assert macro for tests that are so cheap that they should
+ * always be performed, regardless of NDEBUG */
+#if defined(NDEBUG)
+#define always_assert(condition) if (! (condition)) { fprintf(stderr, "%s\n", #condition); abort(); }
+#else
+#define always_assert assert
+#endif /* #if defined(NDEBUG) */
+
/**
* constants
*/
@@ -139,6 +152,7 @@ typedef enum it_flags_e {
ITEM_VALID = 0x1,
ITEM_LINKED = 0x2, /* linked into the LRU. */
ITEM_DELETED = 0x4, /* deferred delete. */
+ ITEM_HAS_IP_ADDRESS = 0x10,
} it_flags_t;
@@ -149,7 +163,7 @@ typedef enum chunk_type_e {
#define LARGE_CHUNK_SZ 1024 /* large chunk size */
-#define SMALL_CHUNK_SZ 120 /* small chunk size */
+#define SMALL_CHUNK_SZ 124 /* small chunk size */
#define FLAT_STORAGE_INCREMENT_DELTA (LARGE_CHUNK_SZ * 2048) /* initialize 2k
* chunks at a time. */
@@ -184,7 +198,9 @@ typedef enum chunk_type_e {
*/
typedef uint32_t chunkptr_t;
+typedef chunkptr_t item_ptr_t;
#define NULL_CHUNKPTR ((chunkptr_t) (0))
+#define NULL_ITEM_PTR ((item_ptr_t) (0))
/**
* forward declarations
@@ -195,7 +211,7 @@ typedef struct large_chunk_s large_chunk_t;
typedef struct small_chunk_s small_chunk_t;
#define TITLE_CHUNK_HEADER_CONTENTS \
- item* h_next; /* hash next */ \
+ item_ptr_t h_next; /* hash next */ \
chunkptr_t next; /* LRU next */ \
chunkptr_t prev; /* LRU prev */ \
chunkptr_t next_chunk; /* next chunk */ \
@@ -212,7 +228,7 @@ typedef struct small_chunk_s small_chunk_t;
chunkptr_t next_chunk;
#define SMALL_BODY_CHUNK_HEADER \
- chunkptr_t* prev_next_chunk; \
+ chunkptr_t prev_chunk; \
chunkptr_t next_chunk;
#define LARGE_CHUNK_TAIL \
@@ -297,7 +313,6 @@ typedef struct large_broken_chunk_s large_broken_chunk_t;
struct large_broken_chunk_s {
small_chunk_t lbc[SMALL_CHUNKS_PER_LARGE_CHUNK];
uint8_t small_chunks_allocated;
- uint8_t refcount;
};
@@ -465,7 +480,7 @@ static inline chunk_t* get_chunk_address(chunkptr_t chunkptr) {
/**
* this takes a chunk address and translates it to a chunkptr_t.
*/
-static inline chunkptr_t get_chunkptr(chunk_t* _addr) {
+static inline chunkptr_t get_chunkptr(const chunk_t* _addr) {
intptr_t addr = (intptr_t) _addr;
intptr_t diff = addr - ((intptr_t) fsi.flat_storage_start);
intptr_t large_chunk_index = diff / LARGE_CHUNK_SZ;
@@ -513,6 +528,45 @@ static inline const large_chunk_t* get_parent_chunk_const(const small_chunk_t* s
}
+/* the following are a set of abstractions to remove casting from flat_storage.c */
+static inline item* get_item_from_small_title(small_title_chunk_t* small_title) {
+ return (item*) small_title;
+}
+
+static inline item* get_item_from_large_title(large_title_chunk_t* large_title) {
+ return (item*) large_title;
+}
+
+static inline item* get_item_from_chunk(chunk_t* chunk) {
+ item* it = (item*) chunk;
+
+ if (it != NULL) {
+ assert( is_item_large_chunk(it) ?
+ (chunk->lc.flags == (LARGE_CHUNK_INITIALIZED | LARGE_CHUNK_USED | LARGE_CHUNK_TITLE)) :
+ (chunk->sc.flags == (SMALL_CHUNK_INITIALIZED | SMALL_CHUNK_USED | SMALL_CHUNK_TITLE)) );
+ }
+
+ return it;
+}
+
+static inline chunk_t* get_chunk_from_item(item* it) {
+ return (chunk_t*) it;
+}
+
+
+static inline chunk_t* get_chunk_from_small_chunk(small_chunk_t* sc) {
+ return (chunk_t*) sc;
+}
+
+
+static inline const chunk_t* get_chunk_from_small_chunk_const(const small_chunk_t* sc) {
+ return (const chunk_t*) sc;
+}
+
+
+static inline item* ITEM(item_ptr_t iptr) { return get_item_from_chunk(get_chunk_address( (chunkptr_t) iptr)); }
+static inline item_ptr_t ITEM_PTR(item* it) { return (item_ptr_t) get_chunkptr(get_chunk_from_item(it)); }
+static inline bool ITEM_PTR_IS_NULL(item_ptr_t iptr) { return iptr != NULL_ITEM_PTR; }
static inline char* ITEM_key(item* it) {
if (is_item_large_chunk(it)) {
return it->large_title.data;
@@ -552,20 +606,25 @@ static inline size_t ITEM_ntotal(item* it) {
}
static inline unsigned int ITEM_flags(item* it) { return it->empty_header.flags; }
+static inline rel_time_t ITEM_time(item* it) { return it->empty_header.time; }
static inline rel_time_t ITEM_exptime(item* it) { return it->empty_header.exptime; }
static inline unsigned short ITEM_refcount(item* it) { return it->empty_header.refcount; }
+static inline void ITEM_set_nbytes(item* it, int nbytes) { it->empty_header.nbytes = nbytes; }
static inline void ITEM_set_exptime(item* it, rel_time_t t) { it->empty_header.exptime = t; }
-static inline item* ITEM_h_next(item* it) { return it->empty_header.h_next; }
-static inline item** ITEM_h_next_p(item* it) { return &it->empty_header.h_next; }
+static inline item_ptr_t ITEM_PTR_h_next(item_ptr_t iptr) { return ITEM(iptr)->empty_header.h_next; }
+static inline item_ptr_t* ITEM_h_next_p(item* it) { return &it->empty_header.h_next; }
-static inline void ITEM_set_h_next(item* it, item* next) { it->empty_header.h_next = next; }
+static inline void ITEM_set_h_next(item* it, item_ptr_t next) { it->empty_header.h_next = next; }
static inline bool ITEM_is_valid(item* it) { return it->empty_header.it_flags & ITEM_VALID; }
+static inline bool ITEM_has_ip_address(item* it) { return it->empty_header.it_flags & ITEM_HAS_IP_ADDRESS; }
static inline void ITEM_mark_deleted(item* it) { it->empty_header.it_flags |= ITEM_DELETED; }
static inline void ITEM_unmark_deleted(item* it) { it->empty_header.it_flags &= ~ITEM_DELETED; }
+static inline void ITEM_set_has_ip_address(item* it) { it->empty_header.it_flags |= ITEM_HAS_IP_ADDRESS; }
+static inline void ITEM_clear_has_ip_address(item* it) { it->empty_header.it_flags &= ~(ITEM_HAS_IP_ADDRESS); }
extern void flat_storage_init(size_t maxbytes);
extern char* do_item_cachedump(const chunk_type_t type, const unsigned int limit, unsigned int *bytes);
@@ -575,4 +634,11 @@ extern char* do_flat_allocator_stats(size_t* bytes);
STATIC_DECL(bool flat_storage_alloc(void));
STATIC_DECL(item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start));
+#if !defined(FLAT_STORAGE_MODULE)
+#undef STATIC
+#undef STATIC_DECL
+#else
+#undef FLAT_STORAGE_MODULE
+#endif /* #if !defined(FLAT_STORAGE_MODULE) */
+
#endif /* #if !defined(_flat_storage_h_) */
View
289 flat_storage_support.h
@@ -36,13 +36,13 @@ static inline size_t __fss_MAX(size_t a, size_t b) {
}
}
-static inline int add_item_to_iov(conn_t *c, const item* it, bool send_cr_lf) {
+static inline int add_item_to_iov(conn *c, const item* it, bool send_cr_lf) {
const chunk_t* next;
const char* ptr;
size_t to_copy; /* bytes left in the current */
/* chunk. */
size_t title_data_size; /* this is a stupid kludge because if
- * we directly test nkey <
+ * we directly test nkey <
* LARGE_TITLE_CHUNK_DATA_SZ, it will
* always return true. this offends
* the compiler, so here we go. */
@@ -53,7 +53,7 @@ static inline int add_item_to_iov(conn_t *c, const item* it, bool send_cr_lf) {
if (is_item_large_chunk(it)) {
/* large chunk handling code. */
-
+
title_data_size = LARGE_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -78,7 +78,7 @@ static inline int add_item_to_iov(conn_t *c, const item* it, bool send_cr_lf) {
if ((retval = add_iov(c, ptr, to_copy, false)) != 0) {
return retval;
}
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -95,7 +95,7 @@ static inline int add_item_to_iov(conn_t *c, const item* it, bool send_cr_lf) {
}
} else {
/* small chunk handling code. */
-
+
title_data_size = SMALL_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -120,7 +120,7 @@ static inline int add_item_to_iov(conn_t *c, const item* it, bool send_cr_lf) {
if ((retval = add_iov(c, ptr, to_copy, false)) != 0) {
return retval;
}
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -158,7 +158,7 @@ static inline unsigned int item_get_max_riov(void) {
item_sz -= LARGE_TITLE_CHUNK_DATA_SZ;
return ((item_sz + LARGE_BODY_CHUNK_DATA_SZ - 1) / LARGE_BODY_CHUNK_DATA_SZ) +
- 1 /* for the header block */;
+ 1 /* for the header block */ + 1 /* for the cr-lf */;
}
static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect_cr_lf,
@@ -168,7 +168,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
size_t to_copy; /* bytes left in the current */
/* chunk. */
size_t title_data_size; /* this is a stupid kludge because if
- * we directly test nkey <
+ * we directly test nkey <
* LARGE_TITLE_CHUNK_DATA_SZ, it will
* always return true. this offends
* the compiler, so here we go. */
@@ -179,7 +179,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
if (is_item_large_chunk(it)) {
/* large chunk handling code. */
-
+
title_data_size = LARGE_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -204,7 +204,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
current_iov->iov_base = ptr;
current_iov->iov_len = to_copy;
current_iov ++;
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -221,7 +221,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
}
} else {
/* small chunk handling code. */
-
+
title_data_size = SMALL_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -246,7 +246,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
current_iov->iov_base = ptr;
current_iov->iov_len = to_copy;
current_iov ++;
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -268,7 +268,7 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
current_iov->iov_len = 2;
current_iov ++;
}
-
+
assert(current_iov - iov <= item_get_max_riov());
return current_iov - iov; /* number of IOVs written. */
}
@@ -279,7 +279,7 @@ static inline int item_strtoul(const item* it, int base) {
size_t to_copy; /* bytes left in the current */
/* chunk. */
size_t title_data_size; /* this is a stupid kludge because if
- * we directly test nkey <
+ * we directly test nkey <
* LARGE_TITLE_CHUNK_DATA_SZ, it will
* always return true. this offends
* the compiler, so here we go. */
@@ -288,7 +288,7 @@ static inline int item_strtoul(const item* it, int base) {
if (is_item_large_chunk(it)) {
/* large chunk handling code. */
-
+
title_data_size = LARGE_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -328,7 +328,7 @@ static inline int item_strtoul(const item* it, int base) {
}
}
}
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -345,7 +345,7 @@ static inline int item_strtoul(const item* it, int base) {
}
} else {
/* small chunk handling code. */
-
+
title_data_size = SMALL_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
@@ -385,7 +385,7 @@ static inline int item_strtoul(const item* it, int base) {
}
}
}
-
+
nbytes -= to_copy;
/* break if we're done. */
@@ -405,29 +405,37 @@ static inline int item_strtoul(const item* it, int base) {
return value;
}
-static inline void item_memcpy_to(item* it, const void* src, size_t nbytes) {
+
+static inline void item_memcpy_to(item* it, size_t offset, const void* src, size_t nbytes,
+ bool beyond_item_boundary) {
chunk_t* next;
char* ptr;
- size_t to_copy; /* bytes left in the current */
- /* chunk. */
+ size_t to_scan; /* bytes left in current chunk. */
+ size_t start_offset, end_offset; /* these are the offsets (from the start
+ * of the value segment) to the start of
+ * the data value. */
+ size_t left; /* bytes left in the item */
size_t title_data_size; /* this is a stupid kludge because if
- * we directly test nkey <
+ * we directly test nkey <
* LARGE_TITLE_CHUNK_DATA_SZ, it will
* always return true. this offends
* the compiler, so here we go. */
- assert(it->empty_header.nbytes >= nbytes);
+ assert(it->empty_header.nbytes >= offset + nbytes || beyond_item_boundary);
+ left = it->empty_header.nbytes;
if (is_item_large_chunk(it)) {
/* large chunk handling code. */
-
+
title_data_size = LARGE_TITLE_CHUNK_DATA_SZ;
/* is there any data in the title block? */
if (it->empty_header.nkey < title_data_size) {
/* some data in the title block. */
next = get_chunk_address(it->empty_header.next_chunk);
ptr = &it->large_title.data[it->empty_header.nkey];
- to_copy = __fss_MIN(nbytes, LARGE_TITLE_CHUNK_DATA_SZ - (it->empty_header.nkey));
+ start_offset = 0;
+ end_offset = __fss_MIN(left, LARGE_TITLE_CHUNK_DATA_SZ - (it->empty_header.nkey)) - 1;
+ to_scan = end_offset - start_offset + 1;
} else {
/* no data in the title block, that means the key is exactly the
* same size as LARGE_TITLE_CHUNK_DATA_SZ.
@@ -435,40 +443,60 @@ static inline void item_memcpy_to(item* it, const void* src, size_t nbytes) {
next = get_chunk_address(it->empty_header.next_chunk);
assert( (LARGE_CHUNK_INITIALIZED | LARGE_CHUNK_USED) == next->lc.flags );
ptr = next->lc.lc_body.data;
- to_copy = __fss_MIN(nbytes, LARGE_TITLE_CHUNK_DATA_SZ);
+ start_offset = 0;
+ end_offset = __fss_MIN(left, LARGE_BODY_CHUNK_DATA_SZ) - 1;
+ to_scan = end_offset - start_offset + 1;
/* move on to the next one. */
next = get_chunk_address(next->lc.lc_body.next_chunk);
}
- while (nbytes > 0) {
- memcpy(ptr, src, to_copy);
-
- src += to_copy;
- nbytes -= to_copy;
+ /* advance over pages writing while doing the appropriate memcpys. */
+ do {
+ /* is either offset between start_offset and end_offset, or offset +
+ * nbytess - 1 between start_offset and end_offset? */
+ if ( (start_offset >= offset &&
+ start_offset <= (offset + nbytes - 1)) ||
+ (end_offset >= offset &&
+ end_offset <= (offset + nbytes - 1)) ) {
+ /* we have some memcpyting to do. */