Permalink
Browse files

connection buffer sharing

Summary: - create a pool of connection buffers.  these are giant buffers that are just mmaped.  if we don't use it all, then we never fault in the page.
         - track the maximum usage of a buffer.  the client code must report how much of the buffer it used, otherwise the module assumes the entire buffer was touched.
         - if a buffer goes beyond a certain limit, we throw it away back to the OS.  if a global limit is hit, we also start reclaiming free buffers.
         - when getting a buffer, the module will always return the buffer that was used the most.  this allows us to minimize the amount of memory touched.

Reviewed By: ps

Test Plan: - ran the stress test against memcached with alloc_conn_buffer randomly returning NULL.  libmcc reported a lot of errors (not surprising) but memcached did not crash.
           - ran with freelist_check on, which ensures that the connection buffer free list is sane.
           - ran with conn_buffer_corruption_detection, which takes every buffer returned from memcached, and marks it unreadable/unwritable.  if memcached subsequently accesses this memory, it will segfault.  this ran fine until the OS refused to give us the same page back, but it was at least a few minutes.
           - the test/conn_buffer_test/* code is a stub that i never finished since the two checks embedded in the code are pretty thorough.  maybe one day. :)
           - this has run in production.

Revert: OK


git-svn-id: http://svn.facebook.com/svnroot/projects/memcached/trunk@104567 2c7ba8d8-a2f7-0310-a573-de162e16dcc7
  • Loading branch information...
1 parent 434ed39 commit 5d88e647475597b3cd6b172e5f3577c48788d3d8 ttung committed Jun 16, 2008
Showing with 1,058 additions and 168 deletions.
  1. +1 −1 Makefile.am
  2. +31 −11 binary_sm.c
  3. +22 −4 configure.ac
  4. +561 −0 conn_buffer.c
  5. +115 −0 conn_buffer.h
  6. +27 −9 flat_storage.h
  7. +29 −21 flat_storage_support.h
  8. +10 −0 generic.h
  9. +186 −105 memcached.c
  10. +10 −2 memcached.h
  11. +1 −0 memory_pool_classes.h
  12. +1 −0 slabs_items.c
  13. +28 −12 slabs_items_support.h
  14. +36 −3 thread.c
View
2 Makefile.am
@@ -4,7 +4,7 @@ memcached_SOURCES = memcached.c slabs.c slabs.h \
slabs_items.c slabs_items.h assoc.c assoc.h memcached.h \
thread.c stats.c stats.h binary_sm.c binary_sm.h binary_protocol.h generic.h \
items.h flat_storage.c flat_storage.h flat_storage_support.h \
- sigseg.c sigseg.h \
+ sigseg.c sigseg.h conn_buffer.c conn_buffer.h \
memory_pool.h memory_pool_classes.h
memcached_debug_SOURCES = $(memcached_SOURCES)
memcached_CFLAGS = -Wall
View
42 binary_sm.c
@@ -31,6 +31,7 @@
#include <sys/uio.h>
#include "binary_protocol.h"
+#include "conn_buffer.h"
#include "items.h"
#include "memcached.h"
#include "stats.h"
@@ -123,9 +124,6 @@ void process_binary_protocol(conn* c) {
return;
}
- /* tcp connections mandate at least one riov to receive the key and strings. */
- assert(c->riov_size >= 1);
-
dispatch_conn_new(sfd, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, false, c->binary, &addr, addrlen);
return;
@@ -406,6 +404,20 @@ static inline bp_handler_res_t handle_header_size_known(conn* c)
}
}
+ assert(c->riov == NULL);
+ assert(c->riov_size == 0);
+ c->riov = (struct iovec*) alloc_conn_buffer(0 /* no hint provided,
+ * because we don't
+ * know how much the
+ * value will
+ * require. */);
+ if (c->riov == NULL) {
+ bp_write_err_msg(c, "out of memory");
+ return retval;
+ }
+ c->riov_size = 1;
+ report_max_rusage(c->riov, sizeof(struct iovec));
+
/* set up the receive. */
c->riov[0].iov_base = c->bp_key;
c->riov[0].iov_len = c->u.empty_req.keylen;
@@ -527,7 +539,8 @@ static inline bp_handler_res_t handle_direct_receive(conn* c)
realtime(ntohl(c->u.key_value_req.exptime)),
value_len, get_request_addr(c));
- if (it == NULL) {
+ if (it == NULL ||
+ item_setup_receive(it, c) == false) {
// this is an error condition. head straight to the
// process state, which must handle this and set the
// result field to mc_res_remote_error.
@@ -536,12 +549,6 @@ static inline bp_handler_res_t handle_direct_receive(conn* c)
break;
}
c->item = it;
- c->riov_left = item_setup_receive(it, c->riov,
- false /* do NOT
- expect
- CR-LF */,
- NULL);
- c->riov_curr = 0;
c->state = conn_bp_waiting_for_value;
} else {
// head to processing.
@@ -565,6 +572,14 @@ static inline bp_handler_res_t handle_direct_receive(conn* c)
assert(0);
}
+ if (c->state == conn_bp_process) {
+ /* going into the process stage. we can release our receive IOV
+ * buffers. */
+ free_conn_buffer(c->riov, 0);
+ c->riov = NULL;
+ c->riov_size = 0;
+ }
+
return retval;
}
@@ -622,7 +637,12 @@ static inline bp_handler_res_t handle_process(conn* c)
// if we haven't set up the msghdrs structure to hold the outbound messages,
// do so now.
if (c->msgused == 0) {
- add_msghdr(c);
+ if (add_msghdr(c) != 0) {
+ /* add_msghdr failed. we probably can't reply, so just close the
+ * connection. */
+ c->state = conn_closing;
+ return retval;
+ }
}
switch (c->u.empty_req.cmd) {
View
26 configure.ac
@@ -1,5 +1,5 @@
AC_PREREQ(2.52)
-AC_INIT(memcached, 1.2.3, brad@danga.com)
+AC_INIT(memcached, 1.2.3d-cr0, brad@danga.com)
AC_CANONICAL_SYSTEM
AC_CONFIG_SRCDIR(memcached.c)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION)
@@ -100,7 +100,21 @@ AC_SEARCH_LIBS(pthread_create, pthread)
AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)])
AC_HEADER_STDBOOL
+AC_HEADER_TIME
AC_C_CONST
+AC_C_INLINE
+AC_C_VOLATILE
+
+AC_TYPE_PID_T
+AC_TYPE_SIZE_T
+AC_TYPE_UINT8_T
+AC_TYPE_UINT16_T
+AC_TYPE_UINT32_T
+AC_TYPE_UINT64_T
+AC_TYPE_SIGNAL
+AC_TYPE_SSIZE_T
+
+
AC_CHECK_HEADER(regex.h, AC_DEFINE(HAVE_REGEX_H,,[do we have regex.h?]))
AC_CHECK_HEADER(malloc.h, AC_DEFINE(HAVE_MALLOC_H,,[do we have malloc.h?]))
AC_CHECK_MEMBER([struct mallinfo.arena], [
@@ -111,6 +125,7 @@ AC_CHECK_MEMBER([struct mallinfo.arena], [
)
AC_CHECK_HEADER(execinfo.h, AC_DEFINE(HAVE_EXECINFO_H,,[do we have execinfo.h?]))
AC_CHECK_HEADER(stdarg.h, AC_DEFINE(HAVE_STDARG_H,,[do we have stdarg.h?]))
+AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h malloc.h netdb.h netinet/in.h sys/socket.h sys/time.h syslog.h])
dnl From licq: Copyright (c) 2000 Dirk Mueller
dnl Check if the type socklen_t is defined anywhere
@@ -194,7 +209,7 @@ AC_ARG_ENABLE(udp-reply-ports,
dnl Check whether the user wants the slab allocator or not
AC_ARG_ENABLE(slab_allocator,
[AS_HELP_STRING([--enable-slab-allocator],[use the slab allocator (default=yes)])],
- [if test "$enableval" = "no"; then
+ [if test "$enableval" = "no"; then
want_slab_allocator="no"
else
want_slab_allocator="yes"
@@ -204,7 +219,7 @@ AC_ARG_ENABLE(slab_allocator,
dnl Check whether the user wants the flat allocator or not
AC_ARG_ENABLE(flat_allocator,
[AS_HELP_STRING([--enable-flat-allocator],[use the flat allocator (default=no)])],
- [if test "$enableval" = "no"; then
+ [if test "$enableval" = "no"; then
want_flat_allocator="no"
else
want_flat_allocator="yes"
@@ -225,7 +240,10 @@ elif test "x$want_flat_allocator" = "xyes"; then
]AC_DEFINE([USE_FLAT_ALLOCATOR],,[Define this if you want to use the flat allocator])[
fi]
-AC_CHECK_FUNCS(mlockall)
+AC_CHECK_FUNCS([dup2 socket inet_ntoa])
+AC_CHECK_FUNCS([mlockall getpagesize munmap])
+AC_CHECK_FUNCS([memchr memmove memset strtol strtoul strerror])
+AC_CHECK_FUNCS([regcomp])
AC_CHECK_LIB(dl, dladdr)
AC_CHECK_FUNCS(dladdr)
View
561 conn_buffer.c
@@ -0,0 +1,561 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+#include <assert.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#define CONN_BUFFER_MODULE
+#include "memcached.h"
+
+#include "conn_buffer.h"
+
+// this will enable the rigorous checking of the free list following any
+// free list operation. this could be expensive and is probably generally
+// inadvisable.
+// #define FREELIST_CHECK
+
+// this will enable the rigorous detection of memory corruption bugs by
+// preventing the reuse of address space previously occupied by a connection
+// buffer. connection buffers are always destroyed upon their return to the
+// conn_buffer system.
+// #define CONN_BUFFER_CORRUPTION_DETECTION
+
+#define HEAP_ENTRY_TO_INDEX(level, nth) ((1 << (level)) + (nth) - 1)
+#define HEAP_PARENT(index) ((((index) + 1) >> 1) - 1)
+#define HEAP_LEFT_CHILD(index) ((((index) + 1) << 1) - 1 + 0)
+#define HEAP_RIGHT_CHILD(index) ((((index) + 1) << 1) - 1 + 1)
+
+#ifdef CONN_BUFFER_CORRUPTION_DETECTION
+static const bool detect_corruption = true;
+#else
+static const bool detect_corruption = false;
+#endif /* #ifdef CONN_BUFFER_CORRUPTION_DETECTION */
+
+
+STATIC int cb_freelist_check(void) {
+#if defined(FREELIST_CHECK)
+ size_t i, found_entries, rsize_total;
+ bool end_found = false;
+
+ /* num free buffers agrees with reality? */
+ for (i = 0, found_entries = 0, rsize_total = 0;
+ i < cbs.free_buffer_list_capacity;
+ i ++) {
+ size_t left_child, right_child;
+
+ if (cbs.free_buffers[i] == NULL) {
+ end_found = true;
+ continue;
+ }
+
+ assert(end_found == false);
+ assert(cbs.free_buffers[i]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbs.free_buffers[i]->in_freelist == true);
+ assert(cbs.free_buffers[i]->used == false);
+ found_entries ++;
+
+ rsize_total += cbs.free_buffers[i]->max_rusage;
+
+ left_child = HEAP_LEFT_CHILD(i);
+ right_child = HEAP_RIGHT_CHILD(i);
+
+ if (left_child < cbs.num_free_buffers) {
+ /* valid left child */
+ assert(cbs.free_buffers[left_child] != NULL);
+ assert(cbs.free_buffers[left_child]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbs.free_buffers[left_child]->in_freelist == true);
+ assert(cbs.free_buffers[left_child]->used == false);
+
+ assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[left_child]->max_rusage);
+ }
+
+ if (right_child < cbs.num_free_buffers) {
+ /* valid right child */
+ assert(cbs.free_buffers[right_child] != NULL);
+ assert(cbs.free_buffers[right_child]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbs.free_buffers[right_child]->in_freelist == true);
+ assert(cbs.free_buffers[right_child]->used == false);
+
+ assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[right_child]->max_rusage);
+ }
+ }
+
+ assert(found_entries == cbs.num_free_buffers);
+ assert(rsize_total == cbs.total_rsize_in_freelist);
+#endif /* #if defined(FREELIST_CHECK) */
+
+ return 0;
+}
+
+
+static size_t round_up_to_page(size_t bytes) {
+ if ((bytes % cbs.settings.page_size) != 0) {
+ bytes = ((bytes + cbs.settings.page_size - 1) & ~ (cbs.settings.page_size - 1));
+ }
+
+ return bytes;
+}
+
+
+static void add_conn_buffer_to_freelist(conn_buffer_t* buffer) {
+ size_t index;
+
+ assert(cb_freelist_check() == 0);
+ (void) cb_freelist_check; /* dummy rvalue to avoid compiler warning. */
+
+ assert(buffer->signature == CONN_BUFFER_SIGNATURE);
+ assert(buffer->in_freelist == false);
+ assert(buffer->used == false);
+
+ if (cbs.num_free_buffers >= cbs.free_buffer_list_capacity) {
+ if (cbs.free_buffers == NULL) {
+ cbs.free_buffer_list_capacity = cbs.settings.initial_buffer_count;
+ cbs.free_buffers = (conn_buffer_t**) pool_malloc(sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
+ CONN_BUFFER_POOL);
+ } else {
+ cbs.free_buffers = pool_realloc(cbs.free_buffers,
+ sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity * 2,
+ sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
+ CONN_BUFFER_POOL);
+ cbs.free_buffer_list_capacity *= 2;
+ }
+
+ memset(&cbs.free_buffers[cbs.num_free_buffers], 0,
+ sizeof(conn_buffer_t*) * (cbs.free_buffer_list_capacity - cbs.num_free_buffers));
+ }
+
+ buffer->in_freelist = true;
+
+ assert(cbs.free_buffers[cbs.num_free_buffers] == NULL);
+ cbs.free_buffers[cbs.num_free_buffers] = buffer;
+ index = cbs.num_free_buffers;
+ cbs.num_free_buffers ++;
+ cbs.total_rsize_in_freelist += buffer->max_rusage;
+
+ while (index != 0) {
+ size_t parent_index = HEAP_PARENT(index);
+
+ if (cbs.free_buffers[index]->max_rusage >
+ cbs.free_buffers[parent_index]->max_rusage) {
+ conn_buffer_t* temp;
+
+ /* swap */
+ temp = cbs.free_buffers[index];
+ cbs.free_buffers[index] = cbs.free_buffers[parent_index];
+ cbs.free_buffers[parent_index] = temp;
+ } else {
+ /* no swap occured, so we can stop the reheaping operation */
+ break;
+ }
+ }
+ assert(cb_freelist_check() == 0);
+}
+
+
+static conn_buffer_t* remove_conn_buffer_from_freelist(size_t max_rusage_hint) {
+ conn_buffer_t* ret;
+ conn_buffer_t* compare;
+ size_t index;
+
+ assert(cb_freelist_check() == 0);
+
+ if (cbs.num_free_buffers == 0) {
+ assert(cbs.free_buffers[0] == NULL);
+ return NULL;
+ }
+
+ ret = cbs.free_buffers[0];
+ cbs.free_buffers[0] = NULL;
+ assert(ret->signature == CONN_BUFFER_SIGNATURE);
+ assert(ret->in_freelist == true);
+ assert(ret->used == false);
+ ret->in_freelist = false;
+
+ cbs.num_free_buffers --;
+ cbs.total_rsize_in_freelist -= ret->max_rusage;
+
+ if (cbs.num_free_buffers == 0) {
+ assert(cb_freelist_check() == 0);
+ return ret;
+ }
+
+ index = 0;
+ compare = cbs.free_buffers[cbs.num_free_buffers];
+ cbs.free_buffers[cbs.num_free_buffers] = NULL;
+
+ while (true) {
+ size_t left_child_index = HEAP_LEFT_CHILD(index);
+ size_t right_child_index = HEAP_RIGHT_CHILD(index);
+ bool valid_left, valid_right, swap_left, swap_right;
+
+ valid_left = (left_child_index < cbs.num_free_buffers) ? true : false;
+ valid_right = (right_child_index < cbs.num_free_buffers) ? true : false;
+
+ swap_left = (valid_left &&
+ cbs.free_buffers[left_child_index]->max_rusage >
+ compare->max_rusage) ? true : false;
+ swap_right = (valid_right &&
+ cbs.free_buffers[right_child_index]->max_rusage >
+ compare->max_rusage) ? true : false;
+
+ /* it's possible that we'd want to swap with both (i.e., bigger than
+ * both). pick the larger one to swap with.
+ */
+ if (swap_left && swap_right) {
+ if (cbs.free_buffers[left_child_index]->max_rusage >
+ cbs.free_buffers[right_child_index]->max_rusage) {
+ /* left is greater, swap with left. */
+ swap_right = false;
+ } else {
+ swap_left = false;
+ }
+ }
+
+ if (swap_left) {
+ assert(cbs.free_buffers[index] == NULL);
+ cbs.free_buffers[index] = cbs.free_buffers[left_child_index];
+ cbs.free_buffers[left_child_index] = NULL;
+ index = left_child_index;
+ } else if (swap_right) {
+ assert(cbs.free_buffers[index] == NULL);
+ cbs.free_buffers[index] = cbs.free_buffers[right_child_index];
+ cbs.free_buffers[right_child_index] = NULL;
+ index = right_child_index;
+ } else {
+ assert(cbs.free_buffers[index] == NULL);
+ cbs.free_buffers[index] = compare;
+ break;
+ }
+ }
+
+ assert(cb_freelist_check() == 0);
+ return ret;
+}
+
+
+static conn_buffer_t* make_conn_buffer(void) {
+ conn_buffer_t* buffer;
+
+ if (cbs.total_rsize + cbs.settings.page_size >= cbs.settings.total_rsize_range_top) {
+ /* we don't start the reclamation here because we didn't actually exceed
+ * the top range.
+ */
+ return NULL;
+ }
+
+ buffer = mmap(NULL,
+ CONN_BUFFER_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_PRIVATE | MAP_ANON,
+ -1, 0);
+
+ if (buffer == NULL) {
+ return NULL;
+ }
+
+ buffer->signature = CONN_BUFFER_SIGNATURE;
+ buffer->max_rusage = round_up_to_page(CONN_BUFFER_HEADER_SZ);
+ buffer->in_freelist = false;
+ buffer->used = false;
+
+ cbs.total_rsize += buffer->max_rusage;
+
+ return buffer;
+}
+
+
+static bool try_remap(void* ptr, const size_t range, unsigned remap_attempts) {
+ void** remaps = malloc(sizeof(void*) * remap_attempts);
+ unsigned c;
+ bool success = false;
+
+ for (c = 0; c < remap_attempts; c ++) {
+ remaps[c] = mmap(ptr, range, PROT_NONE, MAP_PRIVATE | MAP_ANON, -1, 0);
+
+ if (remaps[c] == ptr) {
+ success = true;
+ break;
+ }
+
+ /* do the address ranges overlap? */
+ if (remaps[c] >= ptr + range ||
+ ptr >= remaps[c] + range) {
+ /* no overlap, we're good to continue */
+ continue;
+ }
+
+ /* overlap, so we can't continue. */
+ break;
+ }
+
+ if (success == true) {
+ /* unmap all the other mmap attempts. */
+ unsigned j;
+
+ for (j = 0; j < c; j ++) {
+ munmap(remaps[j], range);
+ }
+
+ assert(remaps[j] == ptr);
+ }
+
+ free(remaps);
+ return success;
+}
+
+
+static void destroy_conn_buffer(conn_buffer_t* buffer) {
+ void* ptr = buffer;
+ size_t range = buffer->max_rusage;
+
+ assert(buffer->in_freelist == false);
+ assert(buffer->used == false);
+ assert(cbs.total_rsize > 0);
+
+ cbs.stats.destroys ++;
+ cbs.total_rsize -= buffer->max_rusage;
+ munmap(buffer, CONN_BUFFER_SIZE);
+
+ /* if we're trying to detect corruption, we need to freeze out the address
+ * space used by the connection buffer that we're destroying. */
+ if (detect_corruption) {
+ void* remap = mmap(buffer, range, PROT_NONE, MAP_PRIVATE | MAP_ANON, -1, 0);
+ if (remap != ptr) {
+ if (! (remap >= ptr + range ||
+ ptr >= remap + range)) {
+ /* overlap... can't continue */
+ abort();
+ }
+
+ if (try_remap(ptr, range,
+ 50 /* how many mmaps will we try to accomplish
+ * our memory corruption detection */) == false) {
+ abort();
+ }
+
+ munmap(remap, range);
+ }
+ }
+}
+
+
+static conn_buffer_t* get_buffer_from_data_ptr(void* _ptr) {
+ intptr_t ptr = (intptr_t) _ptr;
+ conn_buffer_t* buffer;
+
+ ptr -= CONN_BUFFER_HEADER_SZ;
+ buffer = (conn_buffer_t*) ptr;
+
+ assert(buffer->signature == CONN_BUFFER_SIGNATURE);
+
+ return buffer;
+}
+
+
+void conn_buffer_init(size_t initial_buffer_count,
+ size_t buffer_rsize_limit,
+ size_t total_rsize_range_bottom,
+ size_t total_rsize_range_top) {
+ size_t i;
+
+ always_assert( cbs.initialized == false );
+ always_assert( (CONN_BUFFER_HEADER_SZ % sizeof(void*)) == 0 );
+
+ memset(&cbs, 0, sizeof(conn_buffer_status_t));
+
+ cbs.settings.page_size = getpagesize();
+
+ always_assert( (cbs.settings.page_size & (cbs.settings.page_size - 1)) == 0);
+
+ /* write in some defaults */
+ if (initial_buffer_count == 0) {
+ initial_buffer_count = CONN_BUFFER_INITIAL_BUFFER_COUNT_DEFAULT;
+ }
+ if (buffer_rsize_limit == 0) {
+ buffer_rsize_limit = CONN_BUFFER_RSIZE_LIMIT_DEFAULT;
+ }
+ if (total_rsize_range_bottom == 0) {
+ total_rsize_range_bottom = CONN_BUFFER_TOTAL_RSIZE_RANGE_BOTTOM_DEFAULT;
+ }
+ if (total_rsize_range_top == 0) {
+ total_rsize_range_top = CONN_BUFFER_TOTAL_RSIZE_RANGE_TOP_DEFAULT;
+ }
+
+ always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_bottom);
+ always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_top);
+ // always_assert(buffer_rsize_limit < total_rsize_range_bottom);
+ always_assert(total_rsize_range_bottom < total_rsize_range_top);
+ always_assert(buffer_rsize_limit >= cbs.settings.page_size);
+
+ cbs.settings.initial_buffer_count = initial_buffer_count;
+ cbs.settings.buffer_rsize_limit = buffer_rsize_limit;
+ cbs.settings.total_rsize_range_bottom = total_rsize_range_bottom;
+ cbs.settings.total_rsize_range_top = total_rsize_range_top;
+
+ for (i = 0; i < initial_buffer_count; i ++) {
+ conn_buffer_t* buffer;
+
+ buffer = make_conn_buffer();
+ always_assert(buffer != NULL);
+ add_conn_buffer_to_freelist(buffer);
+ }
+
+ cbs.initialized = true;
+}
+
+
+void do_conn_buffer_reclamation(void) {
+ if (cbs.reclamation_in_progress) {
+ if (cbs.num_free_buffers != 0) {
+ /* grab the most space-consuming buffer and reclaim it. */
+ conn_buffer_t* tofree = remove_conn_buffer_from_freelist(CONN_BUFFER_SIZE);
+
+ destroy_conn_buffer(tofree);
+ }
+
+ if (cbs.num_free_buffers == 0 ||
+ cbs.total_rsize <= cbs.settings.total_rsize_range_bottom) {
+ cbs.reclamation_in_progress = false;
+ }
+ }
+}
+
+
+/**
+ * allocate a connection buffer. max_rusage_hint is a hint for how much
+ * of the buffer will be used in the worst case. if it is 0, the hint is
+ * discarded. currently, the hint is ignored. */
+void* do_alloc_conn_buffer(size_t max_rusage_hint) {
+ conn_buffer_t* buffer;
+
+ if ( (buffer = remove_conn_buffer_from_freelist(max_rusage_hint)) == NULL &&
+ (buffer = make_conn_buffer()) == NULL ) {
+ cbs.stats.allocs_failed ++;
+ return NULL;
+ }
+
+ cbs.stats.allocs ++;
+
+ assert(buffer->signature == CONN_BUFFER_SIGNATURE);
+ assert(buffer->in_freelist == false);
+ assert(buffer->used == false);
+ buffer->used = true;
+ buffer->rusage_updated = false;
+ buffer->prev_rusage = buffer->max_rusage;
+
+ do_conn_buffer_reclamation();
+
+ return buffer->data;
+}
+
+
+void do_free_conn_buffer(void* ptr, ssize_t max_rusage) {
+ conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
+
+ assert(buffer->signature == CONN_BUFFER_SIGNATURE);
+ assert(buffer->in_freelist == false);
+ assert(buffer->used == true);
+
+ buffer->used = false;
+
+ if (max_rusage == -1) {
+ if (buffer->rusage_updated == false) {
+ /* no one has reported any usage info on this block. assume the worse. */
+ max_rusage = CONN_BUFFER_SIZE;
+ } else {
+ max_rusage = buffer->max_rusage;
+ }
+ } else {
+ max_rusage = max_rusage + CONN_BUFFER_HEADER_SZ;
+ }
+ max_rusage = round_up_to_page(max_rusage);
+
+ if (buffer->max_rusage > max_rusage) {
+ max_rusage = buffer->max_rusage;
+ }
+
+ // bump counter
+ cbs.stats.frees ++;
+
+ /* do we reclaim this buffer? */
+ if (max_rusage >= cbs.settings.buffer_rsize_limit ||
+ detect_corruption) {
+ /* yes, reclaim now... we must set the max_rusage to the previously
+ * known rusage because that delta was never accounted for. */
+ buffer->max_rusage = buffer->prev_rusage;
+ destroy_conn_buffer(buffer);
+ } else {
+ /* adjust stats */
+ cbs.total_rsize += (max_rusage - buffer->prev_rusage);
+
+ /* return to the free list */
+ add_conn_buffer_to_freelist(buffer);
+ }
+
+ /* should we start a reclamation? */
+ if (cbs.reclamation_in_progress == false &&
+ cbs.total_rsize >= cbs.settings.total_rsize_range_top) {
+ cbs.stats.reclamations_started ++;
+ cbs.reclamation_in_progress = true;
+ }
+
+ do_conn_buffer_reclamation();
+}
+
+
+void report_max_rusage(void* ptr, size_t max_rusage) {
+ conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
+
+ assert(buffer->signature == CONN_BUFFER_SIGNATURE);
+ assert(buffer->in_freelist == false);
+ assert(buffer->used == true);
+
+ buffer->rusage_updated = true;
+
+ max_rusage = round_up_to_page(max_rusage + CONN_BUFFER_HEADER_SZ);
+ if (max_rusage > buffer->max_rusage) {
+ buffer->max_rusage = max_rusage;
+ }
+
+ /* yeah, we're reading a variable in a thread-unsafe way, but we'll do a
+ * second check once we grab the lock. */
+ if (cbs.reclamation_in_progress) {
+ conn_buffer_reclamation();
+ }
+}
+
+
+char* do_conn_buffer_stats(size_t* result_size) {
+ size_t bufsize = 2048, offset = 0;
+ char* buffer = malloc(bufsize);
+ char terminator[] = "END\r\n";
+
+ if (buffer == NULL) {
+ *result_size = 0;
+ return NULL;
+ }
+
+ offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
+ "STAT num_free_buffers %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT total_rsize %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT total_rsize_in_freelist %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT allocates %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT frees %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT failed_allocates %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT destroys %" PRINTF_INT64_MODIFIER "u\n"
+ "STAT reclamations_started %" PRINTF_INT64_MODIFIER "u\n",
+ cbs.num_free_buffers,
+ cbs.total_rsize,
+ cbs.total_rsize_in_freelist,
+ cbs.stats.allocs,
+ cbs.stats.frees,
+ cbs.stats.allocs_failed,
+ cbs.stats.destroys,
+ cbs.stats.reclamations_started);
+
+ offset = append_to_buffer(buffer, bufsize, offset, 0, terminator);
+
+ *result_size = offset;
+
+ return buffer;
+}
View
115 conn_buffer.h
@@ -0,0 +1,115 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+#include "generic.h"
+
+#if !defined(_conn_buffer_h_)
+#define _conn_buffer_h_
+
+#if defined(CONN_BUFFER_TESTS)
+#define STATIC
+#if defined(CONN_BUFFER_MODULE)
+#define STATIC_DECL(decl) decl
+#else
+#define STATIC_DECL(decl) extern decl
+#endif /* #if defined(CONN_BUFFER_MODULE) */
+
+#else
+#define STATIC static
+#if defined(CONN_BUFFER_MODULE)
+#define STATIC_DECL(decl) static decl
+#else
+#define STATIC_DECL(decl)
+#endif /* #if defined(CONN_BUFFER_MODULE) */
+#endif /* #if defined(CONN_BUFFER_TESTS) */
+
+#define CONN_BUFFER_INITIAL_BUFFER_COUNT_DEFAULT (8)
+#define CONN_BUFFER_RSIZE_LIMIT_DEFAULT (128 * 1024)
+#define CONN_BUFFER_TOTAL_RSIZE_RANGE_BOTTOM_DEFAULT (8 * 1024 * 1024)
+#define CONN_BUFFER_TOTAL_RSIZE_RANGE_TOP_DEFAULT (16 * 1024 * 1024)
+
+#define CONN_BUFFER_SIZE (16 * 1024 * 1024)
+#define CONN_BUFFER_SIGNATURE (0xbeadbeef)
+
+#define CONN_BUFFER_HEADER_CONTENTS \
+ uint32_t signature; \
+ uint32_t prev_rusage; \
+ uint32_t max_rusage; \
+ uint8_t unused[3]; \
+ unsigned char :5; \
+ unsigned char rusage_updated:1; \
+ unsigned char in_freelist:1; \
+ unsigned char used:1;
+
+#define CONN_BUFFER_HEADER_SZ sizeof(struct { CONN_BUFFER_HEADER_CONTENTS })
+
+#define CONN_BUFFER_DATA_SZ (CONN_BUFFER_SIZE - CONN_BUFFER_HEADER_SZ)
+typedef struct conn_buffer_s conn_buffer_t;
+struct conn_buffer_s {
+ CONN_BUFFER_HEADER_CONTENTS;
+ unsigned char data[CONN_BUFFER_DATA_SZ];
+};
+
+
+typedef struct conn_buffer_status_s conn_buffer_status_t;
+struct conn_buffer_status_s {
+ conn_buffer_t** free_buffers;
+ size_t num_free_buffers;
+ size_t free_buffer_list_capacity;
+
+ size_t total_rsize;
+ size_t total_rsize_in_freelist;
+
+ bool reclamation_in_progress;
+ bool initialized;
+
+ struct {
+ size_t initial_buffer_count; /* initial buffers set up */
+ size_t buffer_rsize_limit; /* if the reported usage of a block is
+ * greater or equal to this limit, the
+ * block is immediately returned to the
+ * OS. otherwise, the block is
+ * recycled. */
+ size_t total_rsize_range_top; /* the total reported usage of the
+ * conn_buffer system is not to exceed
+ * this limit. */
+ size_t total_rsize_range_bottom;/* when the total_rsize has hit
+ * total_rsize_range_top, buffers are
+ * returned to the OS until we reach
+ * this value *or* there's nothing else
+ * on the freelist to return. */
+ size_t page_size; /* page size on the OS. */
+ } settings;
+
+ struct {
+ uint64_t allocs;
+ uint64_t frees;
+ uint64_t destroys;
+ uint64_t reclamations_started;
+ uint64_t allocs_failed;
+ } stats;
+};
+
+
+DECL_MT_FUNC(void*, alloc_conn_buffer, (size_t max_rusage_hint));
+DECL_MT_FUNC(void, free_conn_buffer, (void* ptr, ssize_t max_rusage));
+DECL_MT_FUNC(void, conn_buffer_reclamation, (void));
+DECL_MT_FUNC(char*, conn_buffer_stats, (size_t* result_size));
+
+extern void report_max_rusage(void* ptr, size_t max_rusage);
+
+extern void conn_buffer_init(size_t initial_buffer_count,
+ size_t buffer_rsize_limit,
+ size_t total_rsize_range_bottom,
+ size_t total_rsize_range_top);
+
+STATIC_DECL(int cb_freelist_check(void));
+STATIC_DECL(conn_buffer_status_t cbs);
+
+#if !defined(CONN_BUFFER_MODULE)
+#undef STATIC
+#undef STATIC_DECL
+#else
+#undef CONN_BUFFER_MODULE
+#endif /* #if !defined(CONN_BUFFER_MODULE) */
+
+#endif /* #if !defined(_conn_buffer_h_) */
View
36 flat_storage.h
@@ -110,14 +110,6 @@
#endif /* #if defined(FLAT_STORAGE_MODULE) */
#endif /* #if defined(FLAT_STORAGE_TESTS) */
-/* create an always_assert macro for tests that are so cheap that they should
- * always be performed, regardless of NDEBUG */
-#if defined(NDEBUG)
-#define always_assert(condition) if (! (condition)) { fprintf(stderr, "%s\n", #condition); abort(); }
-#else
-#define always_assert assert
-#endif /* #if defined(NDEBUG) */
-
/**
* constants
*/
@@ -458,6 +450,32 @@ static inline size_t chunks_in_item(const item* it) {
}
+/* returns the number of chunks in the item. */
+static inline size_t data_chunks_in_item(const item* it) {
+ size_t count = chunks_in_item(it);
+ size_t title_data_size;
+
+ /* if we have no data, return 0. */
+ if (it->empty_header.nbytes == 0) {
+ return 0;
+ }
+
+ if (is_item_large_chunk(it)) {
+ title_data_size = LARGE_TITLE_CHUNK_DATA_SZ;
+ } else {
+ title_data_size = SMALL_TITLE_CHUNK_DATA_SZ;
+ }
+
+ /* if the key takes the entirety of the title block, then we don't count
+ * that one. */
+ if (title_data_size == it->empty_header.nkey) {
+ count --;
+ }
+
+ return count;
+}
+
+
static inline size_t slackspace(const size_t nkey, const size_t nbytes) {
size_t item_sz = nkey + nbytes;
@@ -665,7 +683,7 @@ static inline void ITEM_clear_has_ip_address(item* it) { it->empty_header.it_f
extern void flat_storage_init(size_t maxbytes);
extern char* do_item_cachedump(const chunk_type_t type, const unsigned int limit, unsigned int *bytes);
-extern char* do_flat_allocator_stats(size_t* bytes);
+DECL_MT_FUNC(char*, flat_allocator_stats, (size_t* bytes));
STATIC_DECL(bool flat_storage_alloc(void));
STATIC_DECL(item* get_lru_item(chunk_type_t chunk_type, small_title_chunk_t* start));
View
50 flat_storage_support.h
@@ -19,6 +19,7 @@
#include "flat_storage.h"
#include "memcached.h"
+#include "conn_buffer.h"
static inline size_t __fss_MIN(size_t a, size_t b) {
if (a < b) {
@@ -55,26 +56,32 @@ static inline int add_item_to_iov(conn *c, const item* it, bool send_cr_lf) {
}
}
-/** the flat storage driver treats MAX_ITEM_SIZE the largest value we can
- * accomodate. */
-static inline unsigned int item_get_max_riov(void) {
- size_t item_sz = MAX_ITEM_SIZE + KEY_MAX_LENGTH;
- if (item_sz < LARGE_TITLE_CHUNK_DATA_SZ) {
- /* can we fit in the title chunk? really unlikely, but since these are
- * compile-time constants, testing is essentially free. */
- return 1;
- }
- /* okay, how many body chunks do we need to store the entire thing? */
- item_sz -= LARGE_TITLE_CHUNK_DATA_SZ;
+static inline size_t item_setup_receive(item* it, conn* c) {
+ struct iovec* current_iov;
+ size_t iov_len_required = data_chunks_in_item(it);
- return ((item_sz + LARGE_BODY_CHUNK_DATA_SZ - 1) / LARGE_BODY_CHUNK_DATA_SZ) +
- 1 /* for the header block */ + 1 /* for the cr-lf */;
-}
+ assert(sizeof(struct iovec) * iov_len_required <= CONN_BUFFER_DATA_SZ);
-static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect_cr_lf,
- char* crlf) {
- struct iovec* current_iov = iov;
+ if (c->binary == false) {
+ iov_len_required ++; /* to accomodate the cr-lf */
+
+ assert(c->riov == NULL);
+ assert(c->riov_size == 0);
+ c->riov = (struct iovec*) alloc_conn_buffer(sizeof(struct iovec) * iov_len_required);
+ if (c->riov == NULL) {
+ return false;
+ }
+ }
+ /* in binary protocol, receiving the key already requires the riov to be set
+ * up. */
+
+ report_max_rusage(c->riov, sizeof(struct iovec) * iov_len_required);
+ c->riov_size = iov_len_required;
+ c->riov_left = iov_len_required;
+ c->riov_curr = 0;
+
+ current_iov = c->riov;
#define ITEM_SETUP_RECEIVE_APPLIER(it, ptr, bytes) \
current_iov->iov_base = ptr; \
@@ -85,14 +92,15 @@ static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect
#undef ITEM_SETUP_RECEIVE_APPLIER
- if (expect_cr_lf) {
- current_iov->iov_base = crlf;
+ if (c->binary == false) {
+ current_iov->iov_base = c->crlf;
current_iov->iov_len = 2;
current_iov ++;
}
- assert(current_iov - iov <= item_get_max_riov());
- return current_iov - iov; /* number of IOVs written. */
+ assert(current_iov - c->riov == iov_len_required);
+
+ return true;
}
static inline int item_strtoul(const item* it, int base) {
View
10 generic.h
@@ -97,6 +97,16 @@ typedef unsigned int rel_time_t;
#include <stdarg.h>
+/* create an always_assert macro for tests that are so cheap that they should
+ * always be performed, regardless of NDEBUG */
+#if defined(NDEBUG)
+#define always_assert(condition) if (! (condition)) { fprintf(stderr, "%s\n", #condition); abort(); }
+#else
+#define always_assert assert
+#endif /* #if defined(NDEBUG) */
+
+#define DECL_MT_FUNC(ret_type, func_name, args) extern ret_type do_ ## func_name args; extern ret_type mt_ ## func_name args;
+
// bump a counter up by one. return 0 if the counter has overflowed, nonzero otherwise.
#define BUMP(cntr) ((++(cntr)) != 0)
View
291 memcached.c
@@ -53,6 +53,7 @@
#include "memcached.h"
#include "stats.h"
#include "sigseg.h"
+#include "conn_buffer.h"
#if defined(USE_SLAB_ALLOCATOR)
#include "slabs_items_support.h"
@@ -235,6 +236,10 @@ int add_msghdr(conn* c)
msg_flags, the last 3 of which aren't defined on solaris: */
memset(msg, 0, sizeof(struct msghdr));
+ if (ensure_iov_space(c) != 0) {
+ return -1;
+ }
+
msg->msg_iov = &c->iov[c->iovused];
msg->msg_name = &c->request_addr;
msg->msg_namelen = c->request_addr_size;
@@ -401,49 +406,40 @@ conn *conn_new(const int sfd, const int init_state, const int event_flags,
c->request_addr_size = addrlen;
}
- c->rbuf = c->wbuf = 0;
- c->ilist = 0;
- c->iov = 0;
- c->msglist = 0;
- c->hdrbuf = 0;
- c->bp_key = 0;
-
- c->rsize = read_buffer_size;
+ c->rsize = 0;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
- c->iovsize = IOV_LIST_INITIAL;
+ c->iovsize = 0;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
- c->riov_size = item_get_max_riov();
+ c->riov_size = 0;
- c->rbuf = (char *)pool_malloc((size_t)c->rsize, CONN_BUFFER_RBUF_POOL);
+ c->rbuf = NULL;
c->wbuf = (char *)pool_malloc((size_t)c->wsize, CONN_BUFFER_WBUF_POOL);
c->ilist = (item **)pool_malloc(sizeof(item *) * c->isize, CONN_BUFFER_ILIST_POOL);
- c->iov = (struct iovec *)pool_malloc(sizeof(struct iovec) * c->iovsize, CONN_BUFFER_IOV_POOL);
+ c->iov = NULL;
c->msglist = (struct msghdr *)pool_malloc(sizeof(struct msghdr) * c->msgsize, CONN_BUFFER_MSGLIST_POOL);
- c->riov = (struct iovec*)pool_malloc(sizeof(struct iovec) * c->riov_size, CONN_BUFFER_RIOV_POOL);
+ c->hdrbuf = NULL;
+ c->riov = NULL;
if (is_binary) {
// because existing functions expects the key to be null-terminated,
// we must do so as well.
c->bp_key = (char*)pool_malloc(sizeof(char) * KEY_MAX_LENGTH + 1, CONN_BUFFER_BP_KEY_POOL);
c->bp_hdr_pool = bp_allocate_hdr_pool(NULL);
+ } else {
+ c->bp_key = NULL;
+ c->bp_hdr_pool = NULL;
}
- if (c->rbuf == 0 ||
- c->wbuf == 0 ||
+ if (c->wbuf == 0 ||
c->ilist == 0 ||
- c->iov == 0 ||
c->msglist == 0 ||
- c->riov == NULL ||
(is_binary && c->bp_key == 0)) {
- if (c->rbuf != 0) pool_free(c->rbuf, c->rsize, CONN_BUFFER_RBUF_POOL);
if (c->wbuf != 0) pool_free(c->wbuf, c->wsize, CONN_BUFFER_WBUF_POOL);
if (c->ilist !=0) pool_free(c->ilist, sizeof(item*) * c->isize, CONN_BUFFER_ILIST_POOL);
- if (c->iov != 0) pool_free(c->iov, sizeof(struct iovec) * c->iovsize, CONN_BUFFER_IOV_POOL);
if (c->msglist != 0) pool_free(c->msglist, sizeof(struct msghdr) * c->msgsize, CONN_BUFFER_MSGLIST_POOL);
- if (c->riov != NULL) pool_free(c->riov, sizeof(struct iovec) * c->riov_size, CONN_BUFFER_RIOV_POOL);
if (c->bp_key != 0) pool_free(c->bp_key, sizeof(char) * KEY_MAX_LENGTH + 1, CONN_BUFFER_BP_KEY_POOL);
if (c->bp_hdr_pool != NULL) bp_release_hdr_pool(c);
pool_free(c, 1 * sizeof(conn), CONN_POOL);
@@ -485,8 +481,8 @@ conn *conn_new(const int sfd, const int init_state, const int event_flags,
c->binary = is_binary;
c->state = init_state;
c->rbytes = c->wbytes = 0;
- c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
+ c->wcurr = c->wbuf;
c->icurr = c->ilist;
c->ileft = 0;
c->iovused = 0;
@@ -538,6 +534,23 @@ void conn_cleanup(conn* c) {
free(c->write_and_free);
c->write_and_free = 0;
}
+
+ if (c->rbuf) {
+ free_conn_buffer(c->rbuf, 0); /* no idea how much was used... */
+ c->rbuf = NULL;
+ c->rsize = 0;
+ }
+ if (c->iov) {
+ free_conn_buffer(c->iov, 0); /* no idea how much was used... */
+ c->iov = NULL;
+ c->iovsize = 0;
+ }
+
+ if (c->riov) {
+ free_conn_buffer(c->riov, 0); /* no idea how much was used... */
+ c->riov = NULL;
+ c->riov_size = 0;
+ }
}
/*
@@ -550,15 +563,15 @@ void conn_free(conn* c) {
if (c->msglist)
pool_free(c->msglist, sizeof(struct msghdr) * c->msgsize, CONN_BUFFER_MSGLIST_POOL);
if (c->rbuf)
- pool_free(c->rbuf, c->rsize, CONN_BUFFER_RBUF_POOL);
+ free_conn_buffer(c->rbuf, 0);
if (c->wbuf)
pool_free(c->wbuf, c->wsize, CONN_BUFFER_WBUF_POOL);
if (c->ilist)
pool_free(c->ilist, sizeof(item*) * c->isize, CONN_BUFFER_ILIST_POOL);
if (c->iov)
- pool_free(c->iov, sizeof(struct iovec) * c->iovsize, CONN_BUFFER_IOV_POOL);
+ free_conn_buffer(c->iov, c->iovused * sizeof(struct iovec));
if (c->riov)
- pool_free(c->riov, sizeof(struct iovec) * c->riov_size, CONN_BUFFER_RIOV_POOL);
+ free_conn_buffer(c->riov, 0);
if (c->bp_key)
pool_free(c->bp_key, sizeof(char) * KEY_MAX_LENGTH + 1, CONN_BUFFER_BP_KEY_POOL);
if (c->bp_hdr_pool)
@@ -609,20 +622,14 @@ void conn_shrink(conn* c) {
if (c->udp)
return;
- if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
- char *newbuf;
-
- if (c->rcurr != c->rbuf)
- memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
-
- newbuf = (char *)pool_realloc((void *)c->rbuf, DATA_BUFFER_SIZE,
- c->rsize, CONN_BUFFER_RBUF_POOL);
-
- if (newbuf) {
- c->rbuf = newbuf;
- c->rsize = DATA_BUFFER_SIZE;
- }
- /* TODO check other branch... */
+ if (c->rbytes == 0 && c->rbuf != NULL) {
+ /* drop the buffer since we have no bytes to preserve. */
+ free_conn_buffer(c->rbuf, 0);
+ c->rbuf = NULL;
+ c->rcurr = NULL;
+ c->rsize = 0;
+ } else {
+ memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
c->rcurr = c->rbuf;
}
@@ -660,16 +667,16 @@ void conn_shrink(conn* c) {
/* TODO check error condition? */
}
- if (c->iovsize > IOV_LIST_HIGHWAT) {
- struct iovec *newbuf = (struct iovec *) pool_realloc((void *)c->iov,
- IOV_LIST_INITIAL * sizeof(c->iov[0]),
- c->iovsize * sizeof(c->iov[0]),
- CONN_BUFFER_IOV_POOL);
- if (newbuf) {
- c->iov = newbuf;
- c->iovsize = IOV_LIST_INITIAL;
- }
- /* TODO check return value */
+ if (c->riov) {
+ free_conn_buffer(c->riov, 0);
+ c->riov = NULL;
+ c->riov_size = 0;
+ }
+
+ if (c->iov != NULL) {
+ free_conn_buffer(c->iov, 0);
+ c->iov = NULL;
+ c->iovsize = 0;
}
if (c->binary) {
@@ -689,6 +696,10 @@ static void conn_set_state(conn* c, int state) {
if (state == conn_read) {
conn_shrink(c);
assoc_move_next_bucket();
+
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
}
c->state = state;
}
@@ -704,24 +715,19 @@ static void conn_set_state(conn* c, int state) {
static int ensure_iov_space(conn* c) {
assert(c != NULL);
- if (c->iovused >= c->iovsize) {
- int i, iovnum;
- struct iovec *new_iov = (struct iovec *) pool_realloc(c->iov,
- (c->iovsize * 2) * sizeof(struct iovec),
- c->iovsize * sizeof(struct iovec),
- CONN_BUFFER_IOV_POOL);
- if (! new_iov)
- return -1;
- c->iov = new_iov;
- c->iovsize *= 2;
-
- /* Point all the msghdr structures at the new list. */
- for (i = 0, iovnum = 0; i < c->msgused; i++) {
- c->msglist[i].msg_iov = &c->iov[iovnum];
- iovnum += c->msglist[i].msg_iovlen;
+ if (c->iovsize == 0) {
+ c->iov = (struct iovec *)alloc_conn_buffer(0);
+ if (c->iov != NULL) {
+ c->iovsize = CONN_BUFFER_DATA_SZ / sizeof(struct iovec);
}
}
+ if (c->iovused >= c->iovsize) {
+ return -1;
+ }
+
+ report_max_rusage(c->iov, (c->iovused + 1) * sizeof(struct iovec));
+
return 0;
}
@@ -742,6 +748,7 @@ int add_iov(conn* c, const void *buf, int len, bool is_start) {
bool limit_to_mtu;
assert(c != NULL);
+ assert(c->msgused > 0);
do {
m = &c->msglist[c->msgused - 1];
@@ -853,6 +860,9 @@ static void out_string(conn* c, const char *str) {
size_t len;
assert(c != NULL);
+ assert(c->msgcurr == 0);
+ c->msgused = 0;
+ c->iovused = 0;
if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);
@@ -1022,6 +1032,10 @@ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_
/* set up a connection to write a buffer then free it, used for stats */
static void write_and_free(conn* c, char *buf, int bytes) {
+ assert(c->msgcurr == 0);
+ c->msgused = 0;
+ c->iovused = 0;
+
if (buf) {
c->write_and_free = buf;
c->wcurr = buf;
@@ -1335,6 +1349,14 @@ static void process_stat(conn* c, token_t *tokens, const size_t ntokens) {
return;
}
+ if (strcmp(subcommand, "conn_buffer") == 0) {
+ size_t bytes = 0;
+ char* buf = conn_buffer_stats(&bytes);
+
+ write_and_free(c, buf, bytes);
+ return;
+ }
+
out_string(c, "ERROR");
}
@@ -1695,7 +1717,8 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
it = item_alloc(key, nkey, flags, realtime(exptime), vlen,
get_request_addr(c));
- if (it == 0) {
+ if (it == 0 ||
+ item_setup_receive(it, c) == false) {
if (! item_size_ok(nkey, flags, vlen))
out_string(c, "SERVER_ERROR object too large for cache");
else
@@ -1710,10 +1733,9 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
* we get to complete_nread and check
* for the CR-LF, we're sure that we're
* not reading stale data. */
+
c->item_comm = comm;
c->item = it;
- c->riov_left = item_setup_receive(it, c->riov, true /* expect cr-lf */, c->crlf);
- c->riov_curr = 0;
conn_set_state(c, conn_nread);
}
@@ -1970,16 +1992,17 @@ static void process_command(conn* c, char *command) {
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
- /*
- * for commands set/add/replace, we build an item and read the data
- * directly into it, then continue in nread_complete().
+ /* ensure that conn_set_state going into the conn_read state cleared the
+ * c->msg* and c->iov* counters.
*/
+ assert(c->msgcurr == 0);
+ assert(c->msgused == 0);
+ assert(c->iovused == 0);
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
if (add_msghdr(c) != 0) {
- out_string(c, "SERVER_ERROR out of memory");
+ /* if we can't allocate the msghdr, we can't really send the error
+ * message. so just close the connection. */
+ conn_set_state(c, conn_closing);
return;
}
@@ -2187,6 +2210,13 @@ static int try_read_command(conn* c) {
char *el, *cont;
assert(c != NULL);
+
+ /* we have no allocated buffers, so we definitely don't have any commands to
+ * read. */
+ if (c->rbuf == NULL) {
+ return 0;
+ }
+
assert(c->rcurr < (c->rbuf + c->rsize));
if (c->rbytes == 0)
@@ -2223,6 +2253,23 @@ int try_read_udp(conn* c) {
assert(c != NULL);
assert(c->rbytes == 0);
+ if (c->rbuf == NULL) {
+ /* no idea how big the buffer will need to be. */
+ c->rbuf = (char*) alloc_conn_buffer(0);
+
+ if (c->rbuf != NULL) {
+ c->rcurr = c->rbuf;
+ c->rsize = CONN_BUFFER_DATA_SZ;
+ } else {
+ if (c->binary) {
+ bp_write_err_msg(c, "out of memory");
+ } else {
+ out_string(c, "SERVER_ERROR out of memory");
+ }
+ return 0;
+ }
+ }
+
c->request_addr_size = sizeof(c->request_addr);
res = recvfrom(c->sfd, c->rbuf, c->rsize,
0, &c->request_addr, &c->request_addr_size);
@@ -2248,6 +2295,9 @@ int try_read_udp(conn* c) {
return 0;
}
+ /* report peak usage here */
+ report_max_rusage(c->rbuf, res);
+
#if defined(HAVE_UDP_REPLY_PORTS) && defined(USE_THREADS)
reply_ports = ntohs(*((uint16_t*)(buf + 6)));
c->xfd = c->ufd;
@@ -2267,7 +2317,14 @@ int try_read_udp(conn* c) {
c->rbytes += res;
c->rcurr = c->rbuf;
return 1;
+ } else {
+ /* return the conn buffer. */
+ free_conn_buffer(c->rbuf, 8 - 1 /* worst case for memory usage */);
+ c->rbuf = NULL;
+ c->rcurr = NULL;
+ c->rsize = 0;
}
+
return 0;
}
@@ -2285,41 +2342,41 @@ int try_read_network(conn* c) {
assert(c != NULL);
- if (c->rcurr != c->rbuf) {
- if (c->rbytes != 0) /* otherwise there's nothing to copy */
- memmove(c->rbuf, c->rcurr, c->rbytes);
- c->rcurr = c->rbuf;
- }
-
- while (1) {
- if (c->rbytes >= c->rsize) {
- char *new_rbuf = pool_realloc(c->rbuf, c->rsize * 2, c->rsize, CONN_BUFFER_RBUF_POOL);
- if (!new_rbuf) {
- if (settings.verbose > 0) {
- fprintf(stderr, "Couldn't realloc input buffer\n");
- }
-
- if (c->binary) {
- bp_write_err_msg(c, "out of memory");
- } else {
- c->rbytes = 0; /* ignore what we read */
- out_string(c, "SERVER_ERROR out of memory");
- c->write_and_go = conn_closing;
- }
- return 1;
+ if (c->rbuf != NULL) {
+ if (c->rcurr != c->rbuf) {
+ if (c->rbytes != 0) /* otherwise there's nothing to copy */
+ memmove(c->rbuf, c->rcurr, c->rbytes);
+ c->rcurr = c->rbuf;
+ }
+ } else {
+ c->rbuf = (char*) alloc_conn_buffer(0);
+ if (c->rbuf != NULL) {
+ c->rcurr = c->rbuf;
+ c->rsize = CONN_BUFFER_DATA_SZ;
+ } else {
+ if (c->binary) {
+ bp_write_err_msg(c, "out of memory");
+ } else {
+ out_string(c, "SERVER_ERROR out of memory");
}
- c->rcurr = c->rbuf = new_rbuf;
- c->rsize *= 2;
+ return 0;
}
+ }
+ while (1) {
avail = c->rsize - c->rbytes;
+
res = read(c->sfd, c->rbuf + c->rbytes, avail);
if (res > 0) {
STATS_LOCK();
stats.bytes_read += res;
STATS_UNLOCK();
gotdata = 1;
c->rbytes += res;
+
+ /* report peak usage here */
+ report_max_rusage(c->rbuf, c->rbytes);
+
if (res < avail) {
break;
}
@@ -2334,7 +2391,16 @@ int try_read_network(conn* c) {
return 1;
}
else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) break;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* if we have no data, release the connection buffer */
+ if (c->rbytes == 0) {
+ free_conn_buffer(c->rbuf, 0);
+ c->rbuf = NULL;
+ c->rcurr = NULL;
+ c->rsize = 0;
+ }
+ break;
+ }
else return 0;
}
}
@@ -2616,13 +2682,19 @@ static void drive_machine(conn* c) {
break;
}
+ assert(c->rbuf != NULL);
+
/* now try reading from the socket */
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
STATS_LOCK();
stats.bytes_read += res;
STATS_UNLOCK();
c->sbytes -= res;
+
+ /* report peak usage here */
+ report_max_rusage(c->rbuf, res);
+
break;
}
if (res == 0) { /* end of stream */
@@ -2647,12 +2719,15 @@ static void drive_machine(conn* c) {
case conn_write:
/*
- * We want to write out a simple response. If we haven't already,
- * assemble it into a msgbuf list (this will be a single-entry
- * list for TCP or a two-entry list for UDP).
+ * We want to write out a simple response. If we haven't already,
+ * assemble it into a msgbuf list (this will be a single-entry list
+ * for TCP or a two-entry list for UDP).
*/
- if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
- if (add_iov(c, c->wcurr, c->wbytes, true) != 0 ||
+
+ if (c->iovused == 0) {
+ assert(c->msgused == 0);
+ if (add_msghdr(c) != 0 ||
+ add_iov(c, c->wcurr, c->wbytes, true) != 0 ||
(c->udp && build_udp_headers(c) != 0)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't build response\n");
@@ -3030,6 +3105,8 @@ static void usage(void) {
printf("-R Maximum number of requests per event\n"
" limits the number of requests process for a given connection\n"
" to prevent starvation. default 10\n");
+ printf("-C Maximum bytes used for connection buffers\n"
+ " default 16MB\n");
return;
}
@@ -3159,7 +3236,7 @@ int main (int argc, char **argv) {
setbuf(stderr, NULL);
/* process arguments */
- while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:n:N:R:")) != -1) {
+ while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:n:N:R:C:")) != -1) {
switch (c) {
case 'U':
settings.udpport = atoi(optarg);
@@ -3251,6 +3328,10 @@ int main (int argc, char **argv) {
settings.binary_udpport = atoi(optarg);
break;
+ case 'C':
+ settings.max_conn_buffer_bytes = atoi(optarg);
+ break;
+
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -3386,7 +3467,6 @@ int main (int argc, char **argv) {
}
}
-
/* initialize main thread libevent instance */
main_base = event_init();
@@ -3401,6 +3481,7 @@ int main (int argc, char **argv) {
#if defined(USE_FLAT_ALLOCATOR)
flat_storage_init(settings.maxbytes);
#endif /* #if defined(USE_FLAT_ALLOCATOR) */
+ conn_buffer_init(0, 0, settings.max_conn_buffer_bytes / 2, settings.max_conn_buffer_bytes);
/* managed instance? alloc and zero a bucket array */
if (settings.managed) {
View
12 memcached.h
@@ -174,6 +174,8 @@ struct settings_s {
int detail_enabled; /* nonzero if we're collecting detailed stats */
int reqs_per_event; /* Maximum number of requests to process on each
io-event. */
+ size_t max_conn_buffer_bytes; /* high-water mark for memory taken by
+ * connection buffers. */
};
@@ -371,17 +373,20 @@ char *mt_slabs_stats(int *buflen);
void mt_stats_lock(void);
void mt_stats_unlock(void);
int mt_store_item(item *item, int comm);
-char *mt_flat_allocator_stats(size_t* result_length);
# define add_delta mt_add_delta
+# define alloc_conn_buffer mt_alloc_conn_buffer
# define append_thread_stats mt_append_thread_stats
# define assoc_expire_regex mt_assoc_expire_regex
# define assoc_move_next_bucket mt_assoc_move_next_bucket
# define conn_from_freelist mt_conn_from_freelist
# define conn_add_to_freelist mt_conn_add_to_freelist
+# define conn_buffer_reclamation mt_conn_buffer_reclamation
+# define conn_buffer_stats mt_conn_buffer_stats
# define defer_delete mt_defer_delete
# define flat_allocator_stats mt_flat_allocator_stats
+# define free_conn_buffer mt_free_conn_buffer
# define is_listen_thread mt_is_listen_thread
# define item_alloc mt_item_alloc
# define item_cachedump mt_item_cachedump
@@ -401,22 +406,25 @@ char *mt_flat_allocator_stats(size_t* result_length);
# define slabs_rebalance mt_slabs_rebalance
# define slabs_stats mt_slabs_stats
# define store_item mt_store_item
-
# define STATS_LOCK() mt_stats_lock()
# define STATS_UNLOCK() mt_stats_unlock()
#else /* !USE_THREADS */
# define add_delta do_add_delta
+# define alloc_conn_buffer do_alloc_conn_buffer
# define append_thread_stats(b,s,o,r) o
# define assoc_expire_regex do_assoc_expire_regex
# define assoc_move_next_bucket do_assoc_move_next_bucket
# define conn_from_freelist do_conn_from_freelist
# define conn_add_to_freelist do_conn_add_to_freelist
+# define conn_buffer_reclamation do_conn_buffer_reclamation
+# define conn_buffer_stats do_conn_buffer_stats
# define defer_delete do_defer_delete
# define dispatch_conn_new(x,y,z,a,b,c,d,e) conn_new(x,y,z,a,b,c,d,e,main_base)
# define dispatch_event_add(t,c) event_add(&(c)->event, 0)
# define flat_allocator_stats do_flat_allocator_stats
+# define free_conn_buffer do_free_conn_buffer
# define is_listen_thread() 1
# define item_alloc do_item_alloc
# define item_cachedump do_item_cachedump
View
1 memory_pool_classes.h
@@ -4,6 +4,7 @@
MEMORY_POOL(ASSOC_POOL, assoc_alloc, "assoc")
MEMORY_POOL(CONN_POOL, conn_alloc, "conn")
+MEMORY_POOL(CONN_BUFFER_POOL, conn_buffer_alloc, "conn_buffer")
MEMORY_POOL(CONN_BUFFER_RBUF_POOL, conn_buffer_rbuf_alloc, "conn_buffer_rbuf")
MEMORY_POOL(CONN_BUFFER_WBUF_POOL, conn_buffer_wbuf_alloc, "conn_buffer_wbuf")
MEMORY_POOL(CONN_BUFFER_ILIST_POOL, conn_buffer_ilist_alloc, "conn_buffer_ilist")
View
1 slabs_items.c
@@ -22,6 +22,7 @@
#include "assoc.h"
#include "slabs.h"
#include "stats.h"
+#include "conn_buffer.h"
#include "slabs_items_support.h"
/* Forward Declarations */
View
40 slabs_items_support.h
@@ -33,22 +33,38 @@ static inline int add_item_to_iov(conn *c, const item* it, bool send_cr_lf) {
}
}
-static inline unsigned int item_get_max_riov(void) {
- return 2;
-}
-static inline size_t item_setup_receive(item* it, struct iovec* iov, bool expect_cr_lf,
- char* crlf) {
- iov->iov_base = ITEM_data(it);
- iov->iov_len = it->nbytes;
+static inline bool item_setup_receive(item* it, conn* c) {
+ size_t iov_len_required;
+
+ if (c->binary == false) {
+ iov_len_required = 2;
+
+ assert(c->riov == NULL);
+ assert(c->riov_size == 0);
+ c->riov = (struct iovec*) alloc_conn_buffer(sizeof(struct iovec) * iov_len_required);
+ if (c->riov == NULL) {
+ return false;
+ }
+ } else {
+ iov_len_required = 1;
+ }
+
+ report_max_rusage(c->riov, sizeof(struct iovec) * iov_len_required);
+ c->riov_size = iov_len_required;
+ c->riov_left = iov_len_required;
+ c->riov_curr = 0;
+
+ c->riov[0].iov_base = ITEM_data(it);
+ c->riov[0].iov_len = it->nbytes;
- if (! expect_cr_lf) {
- return 1;
+ if (c->binary) {
+ return true;
} else {
- (iov + 1)->iov_base = crlf;
- (iov + 1)->iov_len = 2;
+ c->riov[1].iov_base = c->crlf;
+ c->riov[1].iov_len = 2;
- return 2;
+ return true;
}
}
View
39 thread.c
@@ -20,6 +20,7 @@
#include "assoc.h"
#include "items.h"
#include "stats.h"
+#include "conn_buffer.h"
#define ITEMS_PER_ALLOC 64
@@ -61,6 +62,9 @@ static pthread_mutex_t slabs_lock;
/* Lock for global stats */
static pthread_mutex_t stats_lock;
+/* Lock for global stats */
+static pthread_mutex_t conn_buffer_lock;
+
/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
@@ -614,9 +618,6 @@ void mt_slabs_rebalance() {
#if defined(USE_FLAT_ALLOCATOR)
/******************************* FLAT ALLOCATOR ******************************/
-/*
- * Stores an item in the cache (high level, obeys set/add/replace semantics)
- */
char* mt_flat_allocator_stats(size_t* result_size) {
char* ret;
@@ -627,6 +628,37 @@ char* mt_flat_allocator_stats(size_t* result_size) {
}
#endif /* #if defined(USE_FLAT_ALLOCATOR) */
+/******************************* CONN BUFFER ******************************/
+void* mt_alloc_conn_buffer(size_t max_rusage_hint) {
+ void* ret;
+
+ pthread_mutex_lock(&conn_buffer_lock);
+ ret = do_alloc_conn_buffer(max_rusage_hint);
+ pthread_mutex_unlock(&conn_buffer_lock);
+ return ret;
+}
+
+void mt_free_conn_buffer(void* ptr, ssize_t max_rusage) {
+ pthread_mutex_lock(&conn_buffer_lock);
+ do_free_conn_buffer(ptr, max_rusage);
+ pthread_mutex_unlock(&conn_buffer_lock);
+}
+
+void mt_conn_buffer_reclamation(void) {
+ pthread_mutex_lock(&conn_buffer_lock);
+ do_conn_buffer_reclamation();
+ pthread_mutex_unlock(&conn_buffer_lock);
+}
+
+char* mt_conn_buffer_stats(size_t* result_size) {
+ char* ret;
+
+ pthread_mutex_lock(&cache_lock);
+ ret = do_conn_buffer_stats(result_size);
+ pthread_mutex_unlock(&cache_lock);
+ return ret;
+}
+
/******************************* GLOBAL STATS ******************************/
void mt_stats_lock() {
@@ -652,6 +684,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
pthread_mutex_init(&slabs_lock, NULL);
#endif /* #if defined(USE_SLAB_ALLOCATOR) */
pthread_mutex_init(&stats_lock, NULL);
+ pthread_mutex_init(&conn_buffer_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);

0 comments on commit 5d88e64

Please sign in to comment.