<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array"/>
  <modified type="array">
    <modified>
      <diff>@@ -69,6 +69,17 @@
 
 	* Explicitly compare against NULL or zero in many places.
 
+2007-03-05
+	* Steven Grimm &lt;sgrimm@facebook.com&gt;: Per-object-type stats collection
+	  support. Specify the object type delimiter with the -D command line
+	  option. Turn stats gathering on and off with &quot;stats detail on&quot; and
+	  &quot;stats detail off&quot;. Dump the per-object-type details with
+	  &quot;stats detail dump&quot;.
+
+2007-03-01
+	* Steven Grimm &lt;sgrimm@facebook.com&gt;: Fix an off-by-one error in the
+	  multithreaded version's message passing code.
+
 2006-12-23
 	* fix expirations of items set with absolute expiration times in
 	  the past, before the server's start time.  bug was introduced in
@@ -97,10 +108,20 @@
 	* Steve Peters &lt;steve@fisharerojo.org&gt;: OpenBSD has a malloc.h,
 	but warns to use stdlib.h instead
 
+2006-11-22
+	* Steven Grimm &lt;sgrimm@facebook.com&gt;: Add support for multithreaded
+	  execution. Run configure with &quot;--enable-threads&quot; to enable. See
+	  doc/threads.txt for details.
+
 2006-11-13
 	* Iain Wade &lt;iwade@optusnet.com.au&gt;: Fix for UDP responses on non-&quot;get&quot;
 	 commands.
 
+2006-10-15
+	* Steven Grimm &lt;sgrimm@facebook.com&gt;: Dynamic sizing of hashtable to
+	  reduce collisions on very large caches and conserve memory on
+	  small caches.
+
 2006-10-13
 	* Steven Grimm &lt;sgrimm@facebook.com&gt;: New faster hash function.
 </diff>
      <filename>ChangeLog</filename>
    </modified>
    <modified>
      <diff>@@ -1,6 +1,6 @@
 bin_PROGRAMS = memcached memcached-debug
 
-memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h 
+memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h thread.c stats.c stats.h
 memcached_debug_SOURCES = $(memcached_SOURCES)
 memcached_CPPFLAGS = -DNDEBUG
 memcached_LDADD = @LIBOBJS@</diff>
      <filename>Makefile.am</filename>
    </modified>
    <modified>
      <diff>@@ -12,10 +12,9 @@
  *
  * $Id$
  */
-#include &quot;config.h&quot;
-#include &lt;sys/types.h&gt;
+
+#include &quot;memcached.h&quot;
 #include &lt;sys/stat.h&gt;
-#include &lt;sys/time.h&gt;
 #include &lt;sys/socket.h&gt;
 #include &lt;sys/signal.h&gt;
 #include &lt;sys/resource.h&gt;
@@ -24,13 +23,9 @@
 #include &lt;stdio.h&gt;
 #include &lt;string.h&gt;
 #include &lt;unistd.h&gt;
-#include &lt;netinet/in.h&gt;
 #include &lt;errno.h&gt;
-#include &lt;event.h&gt;
 #include &lt;assert.h&gt;
 
-#include &quot;memcached.h&quot;
-
 /*
  * Since the hash function does bit manipulation, it needs to know
  * whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG
@@ -142,7 +137,7 @@ and these came close:
 }
 
 #if HASH_LITTLE_ENDIAN == 1
-static uint32_t hash(
+uint32_t hash(
   const void *key,       /* the key to hash */
   size_t      length,    /* length of the key */
   const uint32_t    initval)   /* initval */
@@ -323,7 +318,7 @@ static uint32_t hash(
  * from hashlittle() on all machines.  hashbig() takes advantage of
  * big-endian byte ordering.
  */
-static uint32_t hash( const void *key, size_t length, const uint32_t initval)
+uint32_t hash( const void *key, size_t length, const uint32_t initval)
 {
   uint32_t a,b,c;
   union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */
@@ -541,39 +536,41 @@ static void assoc_expand(void) {
 
     primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
     if (primary_hashtable) {
-    if (settings.verbose &gt; 1)
-        fprintf(stderr, &quot;Hash table expansion starting\n&quot;);
+        if (settings.verbose &gt; 1)
+            fprintf(stderr, &quot;Hash table expansion starting\n&quot;);
         hashpower++;
         expanding = 1;
         expand_bucket = 0;
-    assoc_move_next_bucket();
+        do_assoc_move_next_bucket();
     } else {
         primary_hashtable = old_hashtable;
-    /* Bad news, but we can keep running. */
+        /* Bad news, but we can keep running. */
     }
 }
 
 /* migrates the next bucket to the primary hashtable if we're expanding. */
-void assoc_move_next_bucket(void) {
+void do_assoc_move_next_bucket(void) {
     item *it, *next;
     int bucket;
 
     if (expanding) {
         for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
-        next = it-&gt;h_next;
+            next = it-&gt;h_next;
 
             bucket = hash(ITEM_key(it), it-&gt;nkey, 0) &amp; hashmask(hashpower);
             it-&gt;h_next = primary_hashtable[bucket];
             primary_hashtable[bucket] = it;
-    }
+        }
 
-    expand_bucket++;
-    if (expand_bucket == hashsize(hashpower - 1)) {
-        expanding = 0;
-        free(old_hashtable);
-        if (settings.verbose &gt; 1)
-            fprintf(stderr, &quot;Hash table expansion done\n&quot;);
-    }
+        old_hashtable[expand_bucket] = NULL;
+
+        expand_bucket++;
+        if (expand_bucket == hashsize(hashpower - 1)) {
+            expanding = 0;
+            free(old_hashtable);
+            if (settings.verbose &gt; 1)
+                fprintf(stderr, &quot;Hash table expansion done\n&quot;);
+        }
     }
 }
 </diff>
      <filename>assoc.c</filename>
    </modified>
    <modified>
      <diff>@@ -3,4 +3,5 @@ void assoc_init(void);
 item *assoc_find(const char *key, const size_t nkey);
 int assoc_insert(item *item);
 void assoc_delete(const char *key, const size_t nkey);
-void assoc_move_next_bucket(void);
+void do_assoc_move_next_bucket(void);
+uint32_t hash( const void *key, size_t length, const uint32_t initval);</diff>
      <filename>assoc.h</filename>
    </modified>
    <modified>
      <diff>@@ -94,6 +94,7 @@ dnl ----------------------------------------------------------------------------
 AC_SEARCH_LIBS(socket, socket)
 AC_SEARCH_LIBS(gethostbyname, nsl)
 AC_SEARCH_LIBS(mallinfo, malloc)
+AC_SEARCH_LIBS(pthread_create, pthread)
 
 AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)])
 
@@ -156,6 +157,15 @@ fi
 
 AC_C_ENDIAN
 
+dnl Check whether the user wants threads or not
+AC_ARG_ENABLE(threads,
+  [AS_HELP_STRING([--enable-threads],[support multithreaded execution])],
+  [if test &quot;$ac_cv_search_pthread_create&quot; != &quot;no&quot;; then
+    AC_DEFINE([USE_THREADS],,[Define this if you want to use pthreads])
+   else
+    AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
+   fi])
+
 AC_CHECK_FUNCS(mlockall)
 
 AC_CONFIG_FILES(Makefile doc/Makefile)</diff>
      <filename>configure.ac</filename>
    </modified>
    <modified>
      <diff>@@ -84,6 +84,18 @@ Print memcached and libevent licenses.
 .TP
 .B \-P &lt;filename&gt;
 Print pidfile to &lt;filename&gt;, only used under -d option.
+.TP
+.B \-t &lt;threads&gt;
+Number of threads to use to process incoming requests. This option is only
+meaningful if memcached was compiled with thread support enabled. It is 
+typically not useful to set this higher than the number of CPU cores on the
+memcached server.
+.TP
+.B \-D &lt;char&gt;
+Use &lt;char&gt; as the delimiter between key prefixes and IDs. This is used for
+per-prefix stats reporting. The default is &quot;:&quot; (colon). If this option is
+specified, stats collection is turned on automatically; if not, then it may
+be turned on by sending the &quot;stats detail on&quot; command to the server.
 .br
 .SH LICENSE
 The memcached daemon is copyright Danga Interactive and is distributed under </diff>
      <filename>doc/memcached.1</filename>
    </modified>
    <modified>
      <diff>@@ -1,8 +1,7 @@
 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 /* $Id$ */
-#include &lt;sys/types.h&gt;
+#include &quot;memcached.h&quot;
 #include &lt;sys/stat.h&gt;
-#include &lt;sys/time.h&gt;
 #include &lt;sys/socket.h&gt;
 #include &lt;sys/signal.h&gt;
 #include &lt;sys/resource.h&gt;
@@ -11,14 +10,10 @@
 #include &lt;stdio.h&gt;
 #include &lt;string.h&gt;
 #include &lt;unistd.h&gt;
-#include &lt;netinet/in.h&gt;
 #include &lt;errno.h&gt;
 #include &lt;time.h&gt;
-#include &lt;event.h&gt;
 #include &lt;assert.h&gt;
 
-#include &quot;memcached.h&quot;
-
 /* Forward Declarations */
 static void item_link_q(item *it);
 static void item_unlink_q(item *it);
@@ -44,6 +39,17 @@ void item_init(void) {
     }
 }
 
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+                fprintf(stderr, &quot;item %x refcnt(%c) %d %c%c%c\n&quot;, \
+                        it, op, it-&gt;refcount, \
+                        (it-&gt;it_flags &amp; ITEM_LINKED) ? 'L' : ' ', \
+                        (it-&gt;it_flags &amp; ITEM_SLABBED) ? 'S' : ' ', \
+                        (it-&gt;it_flags &amp; ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
 
 /*
  * Generates the variable-sized part of the header for an object.
@@ -65,7 +71,7 @@ static size_t item_make_header(const uint8_t nkey, const int flags, const int nb
 }
 
 /*@null@*/
-item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
+item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
     uint8_t nsuffix;
     item *it;
     char suffix[40];
@@ -98,9 +104,12 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t
 
         for (search = tails[id]; tries &gt; 0 &amp;&amp; search != NULL; tries--, search=search-&gt;prev) {
             if (search-&gt;refcount == 0) {
-               if (search-&gt;exptime &gt; current_time)
+               if (search-&gt;exptime &gt; current_time) {
+                       STATS_LOCK();
                        stats.evictions++;
-                item_unlink(search);
+                       STATS_UNLOCK();
+                }
+                do_item_unlink(search);
                 break;
             }
         }
@@ -115,7 +124,8 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t
     assert(it != heads[it-&gt;slabs_clsid]);
 
     it-&gt;next = it-&gt;prev = it-&gt;h_next = 0;
-    it-&gt;refcount = 0;
+    it-&gt;refcount = 1;     /* the caller will have a reference */
+    DEBUG_REFCNT(it, '*');
     it-&gt;it_flags = 0;
     it-&gt;nkey = nkey;
     it-&gt;nbytes = nbytes;
@@ -136,6 +146,7 @@ void item_free(item *it) {
     /* so slab size changer can tell later if item is already free or not */
     it-&gt;slabs_clsid = 0;
     it-&gt;it_flags |= ITEM_SLABBED;
+    DEBUG_REFCNT(it, 'F');
     slabs_free(it, ntotal);
 }
 
@@ -192,57 +203,66 @@ static void item_unlink_q(item *it) {
     return;
 }
 
-int item_link(item *it) {
+int do_item_link(item *it) {
     assert((it-&gt;it_flags &amp; (ITEM_LINKED|ITEM_SLABBED)) == 0);
     assert(it-&gt;nbytes &lt; 1048576);
     it-&gt;it_flags |= ITEM_LINKED;
     it-&gt;time = current_time;
     assoc_insert(it);
 
+    STATS_LOCK();
     stats.curr_bytes += ITEM_ntotal(it);
     stats.curr_items += 1;
     stats.total_items += 1;
+    STATS_UNLOCK();
 
     item_link_q(it);
 
     return 1;
 }
 
-void item_unlink(item *it) {
+void do_item_unlink(item *it) {
     if ((it-&gt;it_flags &amp; ITEM_LINKED) != 0) {
         it-&gt;it_flags &amp;= ~ITEM_LINKED;
+        STATS_LOCK();
         stats.curr_bytes -= ITEM_ntotal(it);
         stats.curr_items -= 1;
+        STATS_UNLOCK();
         assoc_delete(ITEM_key(it), it-&gt;nkey);
         item_unlink_q(it);
+        if (it-&gt;refcount == 0) item_free(it);
     }
-    if (it-&gt;refcount == 0) item_free(it);
 }
 
-void item_remove(item *it) {
+void do_item_remove(item *it) {
     assert((it-&gt;it_flags &amp; ITEM_SLABBED) == 0);
-    if (it-&gt;refcount != 0) it-&gt;refcount--;
+    if (it-&gt;refcount != 0) {
+        it-&gt;refcount--;
+        DEBUG_REFCNT(it, '-');
+    }
     assert((it-&gt;it_flags &amp; ITEM_DELETED) == 0 || it-&gt;refcount != 0);
     if (it-&gt;refcount == 0 &amp;&amp; (it-&gt;it_flags &amp; ITEM_LINKED) == 0) {
         item_free(it);
     }
 }
 
-void item_update(item *it) {
+void do_item_update(item *it) {
     if (it-&gt;time &lt; current_time - ITEM_UPDATE_INTERVAL) {
         assert((it-&gt;it_flags &amp; ITEM_SLABBED) == 0);
 
-        item_unlink_q(it);
-        it-&gt;time = current_time;
-        item_link_q(it);
+        if (it-&gt;it_flags &amp; ITEM_LINKED) {
+            item_unlink_q(it);
+            it-&gt;time = current_time;
+            item_link_q(it);
+        }
     }
 }
 
-int item_replace(item *it, item *new_it) {
+int do_item_replace(item *it, item *new_it) {
     assert((it-&gt;it_flags &amp; ITEM_SLABBED) == 0);
 
-    item_unlink(it);
-    return item_link(new_it);
+    do_item_unlink(it);
+    return do_item_link(new_it);
 }
 
 /*@null@*/
@@ -337,8 +357,59 @@ char* item_stats_sizes(int *bytes) {
     return buf;
 }
 
+/* returns true if a deleted item's delete-locked-time is over, and it
+   should be removed from the namespace */
+int item_delete_lock_over (item *it) {
+    assert(it-&gt;it_flags &amp; ITEM_DELETED);
+    return (current_time &gt;= it-&gt;exptime);
+}
+
+/* wrapper around assoc_find which does the lazy expiration/deletion logic */
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
+    item *it = assoc_find(key, nkey);
+    if (delete_locked) *delete_locked = 0;
+    if (it &amp;&amp; (it-&gt;it_flags &amp; ITEM_DELETED)) {
+        /* it's flagged as delete-locked.  let's see if that condition
+           is past due, and the 5-second delete_timer just hasn't
+           gotten to it yet... */
+        if (! item_delete_lock_over(it)) {
+            if (delete_locked) *delete_locked = 1;
+            it = 0;
+        }
+    }
+    if (it &amp;&amp; settings.oldest_live &amp;&amp; settings.oldest_live &lt;= current_time &amp;&amp;
+        it-&gt;time &lt;= settings.oldest_live) {
+        do_item_unlink(it);           // MTSAFE - cache_lock held
+        it = 0;
+    }
+    if (it &amp;&amp; it-&gt;exptime &amp;&amp; it-&gt;exptime &lt;= current_time) {
+        do_item_unlink(it);           // MTSAFE - cache_lock held
+        it = 0;
+    }
+
+    if (it) {
+        it-&gt;refcount++;
+        DEBUG_REFCNT(it, '+');
+    }
+    return it;
+}
+
+item *item_get(char *key, size_t nkey) {
+    return item_get_notedeleted(key, nkey, 0);
+}
+
+/* returns an item whether or not it's delete-locked or expired. */
+item *do_item_get_nocheck(char *key, size_t nkey) {
+    item *it = assoc_find(key, nkey);
+    if (it) {
+        it-&gt;refcount++;
+        DEBUG_REFCNT(it, '+');
+    }
+    return it;
+}
+
 /* expires items that are more recent than the oldest_live setting. */
-void item_flush_expired(void) {
+void do_item_flush_expired(void) {
     int i;
     item *iter, *next;
     if (settings.oldest_live == 0)
@@ -353,7 +424,7 @@ void item_flush_expired(void) {
             if (iter-&gt;time &gt;= settings.oldest_live) {
                 next = iter-&gt;next;
                 if ((iter-&gt;it_flags &amp; ITEM_SLABBED) == 0) {
-                    item_unlink(iter);
+                    do_item_unlink(iter);
                 }
             } else {
                 /* We've hit the first old item. Continue to the next queue. */</diff>
      <filename>items.c</filename>
    </modified>
    <modified>
      <diff>@@ -1,15 +1,15 @@
 /* See items.c */
 void item_init(void);
 /*@null@*/
-item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes);
+item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes);
 void item_free(item *it);
 bool item_size_ok(const size_t nkey, const int flags, const int nbytes);
 
-int item_link(item *it);    /* may fail if transgresses limits */
-void item_unlink(item *it);
-void item_remove(item *it);
-void item_update(item *it);   /* update LRU time to current and reposition */
-int item_replace(item *it, item *new_it);
+int  do_item_link(item *it);     /* may fail if transgresses limits */
+void do_item_unlink(item *it);
+void do_item_remove(item *it);
+void do_item_update(item *it);   /* update LRU time to current and reposition */
+int  do_item_replace(item *it, item *new_it);
 
 /*@null@*/
 char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
@@ -17,4 +17,8 @@ void item_stats(char *buffer, const int buflen);
 
 /*@null@*/
 char *item_stats_sizes(int *bytes);
-void item_flush_expired(void);
+void do_item_flush_expired(void);
+item *item_get(char *key, size_t nkey);
+
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *do_item_get_nocheck(char *key, size_t nkey);</diff>
      <filename>items.h</filename>
    </modified>
    <modified>
      <diff>@@ -15,10 +15,8 @@
  *
  *  $Id$
  */
-#include &quot;config.h&quot;
-#include &lt;sys/types.h&gt;
+#include &quot;memcached.h&quot;
 #include &lt;sys/stat.h&gt;
-#include &lt;sys/time.h&gt;
 #include &lt;sys/socket.h&gt;
 #include &lt;sys/un.h&gt;
 #include &lt;sys/signal.h&gt;
@@ -41,12 +39,10 @@
 #include &lt;stdio.h&gt;
 #include &lt;string.h&gt;
 #include &lt;unistd.h&gt;
-#include &lt;netinet/in.h&gt;
 #include &lt;netinet/tcp.h&gt;
 #include &lt;arpa/inet.h&gt;
 #include &lt;errno.h&gt;
 #include &lt;time.h&gt;
-#include &lt;event.h&gt;
 #include &lt;assert.h&gt;
 #include &lt;limits.h&gt;
 
@@ -64,8 +60,6 @@
 #endif
 #endif
 
-#include &quot;memcached.h&quot;
-
 /*
  * forward declarations
  */
@@ -85,7 +79,6 @@ static void settings_init(void);
 
 /* event handling, network IO */
 static void event_handler(const int fd, const short which, void *arg);
-static conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp);
 static void conn_close(conn *c);
 static void conn_init(void);
 static void accept_new_conns(const bool do_accept);
@@ -115,6 +108,7 @@ static item **todelete = 0;
 static int delcurr;
 static int deltotal;
 static conn *listen_conn;
+static struct event_base *main_base;
 
 #define TRANSMIT_COMPLETE   0
 #define TRANSMIT_INCOMPLETE 1
@@ -159,12 +153,16 @@ static void stats_init(void) {
        like 'settings.oldest_live' which act as booleans as well as
        values are now false in boolean context... */
     stats.started = time(0) - 2;
+    stats_prefix_init();
 }
 
 static void stats_reset(void) {
+    STATS_LOCK();
     stats.total_items = stats.total_conns = 0;
     stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
     stats.bytes_read = stats.bytes_written = 0;
+    stats_prefix_clear();
+    STATS_UNLOCK();
 }
 
 static void settings_init(void) {
@@ -180,6 +178,13 @@ static void settings_init(void) {
     settings.managed = false;
     settings.factor = 1.25;
     settings.chunk_size = 48;         /* space for a modest key and value */
+#ifdef USE_THREADS
+    settings.num_threads = 4;
+#else
+    settings.num_threads = 1;
+#endif
+    settings.prefix_delimiter = ':';
+    settings.detail_enabled = 0;
 }
 
 /* returns true if a deleted item's delete-locked-time is over, and it
@@ -189,37 +194,6 @@ static bool item_delete_lock_over (item *it) {
     return (current_time &gt;= it-&gt;exptime);
 }
 
-/* wrapper around assoc_find which does the lazy expiration/deletion logic */
-static item *get_item_notedeleted(const char *key, const size_t nkey, int *delete_locked) {
-    item *it = assoc_find(key, nkey);
-
-    if (delete_locked) *delete_locked = 0;
-
-    if (it != NULL &amp;&amp; (it-&gt;it_flags &amp; ITEM_DELETED)) {
-        /* it's flagged as delete-locked.  let's see if that condition
-           is past due, and the 5-second delete_timer just hasn't
-           gotten to it yet... */
-        if (! item_delete_lock_over(it)) {
-            if (delete_locked) *delete_locked = 1;
-            it = 0;
-        }
-    }
-    if (it != NULL &amp;&amp; settings.oldest_live != 0 &amp;&amp; settings.oldest_live &lt;= current_time &amp;&amp;
-        it-&gt;time &lt;= settings.oldest_live) {
-        item_unlink(it);
-        it = 0;
-    }
-    if (it != NULL &amp;&amp; it-&gt;exptime != 0 &amp;&amp; it-&gt;exptime &lt;= current_time) {
-        item_unlink(it);
-        it = 0;
-    }
-    return it;
-}
-
-static item *get_item(const char *key, const size_t nkey) {
-    return get_item_notedeleted(key, nkey, 0);
-}
-
 /*
  * Adds a message header to a connection.
  *
@@ -260,10 +234,16 @@ static int add_msghdr(conn *c)
     return 0;
 }
 
+
+/*
+ * Free list management for connections.
+ */
+
 static conn **freeconns;
 static int freetotal;
 static int freecurr;
 
+
 static void conn_init(void) {
     freetotal = 200;
     freecurr = 0;
@@ -273,15 +253,48 @@ static void conn_init(void) {
     return;
 }
 
-/*@null@*/
-static conn *conn_new(const int sfd, const int init_state, const int event_flags,
-                      const int read_buffer_size, const bool is_udp) {
+/*
+ * Returns a connection from the freelist, if any. Should call this using
+ * conn_from_freelist() for thread safety.
+ */
+conn *do_conn_from_freelist() {
     conn *c;
 
-    /* do we have a free conn structure from a previous close? */
     if (freecurr &gt; 0) {
         c = freeconns[--freecurr];
-    } else { /* allocate a new one */
+    } else {
+        c = NULL;
+    }
+
+    return c;
+}
+
+/*
+ * Adds a connection to the freelist. 0 = success. Should call this using
+ * conn_add_to_freelist() for thread safety.
+ */
+int do_conn_add_to_freelist(conn *c) {
+    if (freecurr &lt; freetotal) {
+        freeconns[freecurr++] = c;
+        return 0;
+    } else {
+        /* try to enlarge free connections array */
+        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
+        if (new_freeconns) {
+            freetotal *= 2;
+            freeconns = new_freeconns;
+            freeconns[freecurr++] = c;
+            return 0;
+        }
+    }
+    return 1;
+}
+
+conn *conn_new(const int sfd, const int init_state, const int event_flags,
+                const int read_buffer_size, const bool is_udp, struct event_base *base) {
+    conn *c = conn_from_freelist();
+
+    if (NULL == c) {
         if (!(c = (conn *)malloc(sizeof(conn)))) {
             perror(&quot;malloc()&quot;);
             return NULL;
@@ -317,7 +330,9 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags
             return NULL;
         }
 
+        STATS_LOCK();
         stats.conn_structs++;
+        STATS_UNLOCK();
     }
 
     if (settings.verbose &gt; 1) {
@@ -350,26 +365,20 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags
     c-&gt;gen = 0;
 
     event_set(&amp;c-&gt;event, sfd, event_flags, event_handler, (void *)c);
+    event_base_set(base, &amp;c-&gt;event);
     c-&gt;ev_flags = event_flags;
 
     if (event_add(&amp;c-&gt;event, 0) == -1) {
-        if (freecurr &lt; freetotal) {
-            freeconns[freecurr++] = c;
-        } else {
-            if (c-&gt;hdrbuf)
-                free (c-&gt;hdrbuf);
-            free (c-&gt;msglist);
-            free (c-&gt;rbuf);
-            free (c-&gt;wbuf);
-            free (c-&gt;ilist);
-            free (c-&gt;iov);
-            free (c);
+        if (conn_add_to_freelist(c)) {
+            conn_free(c);
         }
         return NULL;
     }
 
+    STATS_LOCK();
     stats.curr_conns++;
     stats.total_conns++;
+    STATS_UNLOCK();
 
     return c;
 }
@@ -378,7 +387,7 @@ static void conn_cleanup(conn *c) {
     assert(c != NULL);
 
     if (c-&gt;item) {
-        item_free(c-&gt;item);
+        item_remove(c-&gt;item);
         c-&gt;item = 0;
     }
 
@@ -397,7 +406,7 @@ static void conn_cleanup(conn *c) {
 /*
  * Frees a connection.
  */
-static void conn_free(conn *c) {
+void conn_free(conn *c) {
     if (c) {
         if (c-&gt;hdrbuf)
             free(c-&gt;hdrbuf);
@@ -429,24 +438,13 @@ static void conn_close(conn *c) {
     conn_cleanup(c);
 
     /* if the connection has big buffers, just free it */
-    if (c-&gt;rsize &gt; READ_BUFFER_HIGHWAT) {
+    if (c-&gt;rsize &gt; READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
         conn_free(c);
-    } else if (freecurr &lt; freetotal) {
-        /* if we have enough space in the free connections array, put the structure there */
-        freeconns[freecurr++] = c;
-    } else {
-        /* try to enlarge free connections array */
-        conn **new_freeconns = realloc((void *)freeconns, sizeof(conn *) * freetotal * 2);
-        if (new_freeconns) {
-            freetotal *= 2;
-            freeconns = new_freeconns;
-            freeconns[freecurr++] = c;
-        } else {
-            conn_free(c);
-        }
     }
 
+    STATS_LOCK();
     stats.curr_conns--;
+    STATS_UNLOCK();
 
     return;
 }
@@ -689,56 +687,63 @@ static void complete_nread(conn *c) {
 
     item *it = c-&gt;item;
     int comm = c-&gt;item_comm;
-    item *old_it;
-    int delete_locked = 0;
-    char *key = ITEM_key(it);
 
+    STATS_LOCK();
     stats.set_cmds++;
+    STATS_UNLOCK();
 
     if (strncmp(ITEM_data(it) + it-&gt;nbytes - 2, &quot;\r\n&quot;, 2) != 0) {
         out_string(c, &quot;CLIENT_ERROR bad data chunk&quot;);
-        goto err;
+    } else {
+        if (store_item(it, comm)) {
+            out_string(c, &quot;STORED&quot;);
+        } else {
+            out_string(c, &quot;NOT_STORED&quot;);
+        }
     }
 
-    old_it = get_item_notedeleted(key, it-&gt;nkey, &amp;delete_locked);
-
-    if (old_it != NULL &amp;&amp; comm == NREAD_ADD) {
-        item_update(old_it);  /* touches item, promotes to head of LRU */
-        out_string(c, &quot;NOT_STORED&quot;);
-        goto err;
-    }
+    item_remove(c-&gt;item);       /* release the c-&gt;item reference */
+    c-&gt;item = 0;
+}
 
-    if (old_it == NULL &amp;&amp; comm == NREAD_REPLACE) {
-        out_string(c, &quot;NOT_STORED&quot;);
-        goto err;
-    }
+/*
+ * Stores an item in the cache according to the semantics of one of the set
+ * commands. In threaded mode, this is protected by the cache lock.
+ *
+ * Returns true if the item was stored.
+ */
+int do_store_item(item *it, int comm) {
+    char *key = ITEM_key(it);
+    int delete_locked = 0;
+    item *old_it = do_item_get_notedeleted(key, it-&gt;nkey, &amp;delete_locked);
+    int stored = 0;
 
-    if (delete_locked != 0) {
-        if (comm == NREAD_REPLACE || comm == NREAD_ADD) {
-            out_string(c, &quot;NOT_STORED&quot;);
-            goto err;
-        }
+    if (old_it != NULL &amp;&amp; comm == NREAD_ADD) {
+        /* add only adds a nonexistent item, but promote to head of LRU */
+        do_item_update(old_it);
+    } else if (!old_it &amp;&amp; comm == NREAD_REPLACE) {
+        /* replace only replaces an existing value; don't store */
+    } else if (delete_locked &amp;&amp; (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
+        /* replace and add can't override delete locks; don't store */
+    } else {
+        /* &quot;set&quot; commands can override the delete lock
+           window... in which case we have to find the old hidden item
+           that's in the namespace/LRU but wasn't returned by
+           item_get.... because we need to replace it */
+        if (delete_locked)
+            old_it = do_item_get_nocheck(key, it-&gt;nkey);
+
+        if (old_it != NULL)
+            do_item_replace(old_it, it);
+        else
+            do_item_link(it);
 
-        /* but &quot;set&quot; commands can override the delete lock
-         window... in which case we have to find the old hidden item
-         that's in the namespace/LRU but wasn't returned by
-         get_item.... because we need to replace it (below) */
-        old_it = assoc_find(key, it-&gt;nkey);
+        stored = 1;
     }
 
     if (old_it)
-        item_replace(old_it, it);
-    else
-        item_link(it);
-
-    c-&gt;item = 0;
-    out_string(c, &quot;STORED&quot;);
-    return;
-
-err:
-     item_free(it);
-     c-&gt;item = 0;
-     return;
+        do_item_remove(old_it);         /* release our reference */
+    return stored;
 }
 
 typedef struct token_s {
@@ -816,6 +821,36 @@ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_
     return ntokens;
 }
 
+inline void process_stats_detail(conn *c, const char *command) {
+    assert(c != NULL);
+
+    if (strcmp(command, &quot;on&quot;) == 0) {
+        settings.detail_enabled = 1;
+        out_string(c, &quot;OK&quot;);
+    }
+    else if (strcmp(command, &quot;off&quot;) == 0) {
+        settings.detail_enabled = 0;
+        out_string(c, &quot;OK&quot;);
+    }
+    else if (strcmp(command, &quot;dump&quot;) == 0) {
+        int len;
+        char *stats = stats_prefix_dump(&amp;len);
+        if (NULL != stats) {
+            c-&gt;write_and_free = stats;
+            c-&gt;wcurr = stats;
+            c-&gt;wbytes = len;
+            conn_set_state(c, conn_write);
+            c-&gt;write_and_go = conn_read;
+        }
+        else {
+            out_string(c, &quot;SERVER_ERROR&quot;);
+        }
+    }
+    else {
+        out_string(c, &quot;CLIENT_ERROR usage: stats detail on|off|dump&quot;);
+    }
+}
+
 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
     rel_time_t now = current_time;
     char *command;
@@ -838,6 +873,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
 
         getrusage(RUSAGE_SELF, &amp;usage);
 
+        STATS_LOCK();
         pos += sprintf(pos, &quot;STAT pid %u\r\n&quot;, pid);
         pos += sprintf(pos, &quot;STAT uptime %u\r\n&quot;, now);
         pos += sprintf(pos, &quot;STAT time %ld\r\n&quot;, now + stats.started);
@@ -859,7 +895,9 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
         pos += sprintf(pos, &quot;STAT bytes_read %llu\r\n&quot;, stats.bytes_read);
         pos += sprintf(pos, &quot;STAT bytes_written %llu\r\n&quot;, stats.bytes_written);
         pos += sprintf(pos, &quot;STAT limit_maxbytes %llu\r\n&quot;, (unsigned long long)settings.maxbytes);
+        pos += sprintf(pos, &quot;STAT threads %u\r\n&quot;, settings.num_threads);
         pos += sprintf(pos, &quot;END&quot;);
+        STATS_UNLOCK();
         out_string(c, temp);
         return;
     }
@@ -989,6 +1027,14 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
         return;
     }
 
+    if (strcmp(subcommand, &quot;detail&quot;) == 0) {
+        if (ntokens &lt; 4)
+            process_stats_detail(c, &quot;&quot;);  /* outputs the error message */
+        else
+            process_stats_detail(c, tokens[2].value);
+        return;
+    }
+
     if (strcmp(subcommand, &quot;sizes&quot;) == 0) {
         int bytes = 0;
         char *buf = item_stats_sizes(&amp;bytes);
@@ -1042,8 +1088,13 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens)
                 return;
             }
 
+            STATS_LOCK();
             stats.get_cmds++;
-            it = get_item(key, nkey);
+            STATS_UNLOCK();
+            it = item_get(key, nkey);
+            if (settings.detail_enabled) {
+                stats_prefix_record_get(key, NULL != it);
+            }
             if (it) {
                 if (i &gt;= c-&gt;isize) {
                     item **new_list = realloc(c-&gt;ilist, sizeof(item *) * c-&gt;isize * 2);
@@ -1069,13 +1120,19 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens)
                 if (settings.verbose &gt; 1)
                     fprintf(stderr, &quot;&gt;%d sending key %s\n&quot;, c-&gt;sfd, ITEM_key(it));
 
+                /* item_get() has incremented it-&gt;refcount for us */
+                STATS_LOCK();
                 stats.get_hits++;
-                it-&gt;refcount++;
+                STATS_UNLOCK();
                 item_update(it);
                 *(c-&gt;ilist + i) = it;
                 i++;
 
-            } else stats.get_misses++;
+            } else {
+                STATS_LOCK();
+                stats.get_misses++;
+                STATS_UNLOCK();
+            }
 
             key_token++;
         }
@@ -1135,6 +1192,10 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
         return;
     }
 
+    if (settings.detail_enabled) {
+        stats_prefix_record_set(key);
+    }
+
     if (settings.managed) {
         int bucket = c-&gt;bucket;
         if (bucket == -1) {
@@ -1166,18 +1227,14 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
     c-&gt;ritem = ITEM_data(it);
     c-&gt;rlbytes = it-&gt;nbytes;
     conn_set_state(c, conn_nread);
-    return;
 }
 
 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
     char temp[32];
-    unsigned int value;
     item *it;
     unsigned int delta;
     char *key;
     size_t nkey;
-    int res;
-    char *ptr;
 
     assert(c != NULL);
 
@@ -1202,12 +1259,6 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
         }
     }
 
-    it = get_item(key, nkey);
-    if (!it) {
-        out_string(c, &quot;NOT_FOUND&quot;);
-        return;
-    }
-
     delta = strtoul(tokens[2].value, NULL, 10);
 
     if(errno == ERANGE) {
@@ -1215,14 +1266,38 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
         return;
     }
 
+    it = item_get(key, nkey);
+    if (!it) {
+        out_string(c, &quot;NOT_FOUND&quot;);
+        return;
+    }
+
+    out_string(c, add_delta(it, incr, delta, temp));
+    item_remove(it);         /* release our reference */
+}
+
+/*
+ * adds a delta value to a numeric item.
+ *
+ * it    item to adjust
+ * incr  true to increment value, false to decrement
+ * delta amount to adjust value by
+ * buf   buffer for response string
+ *
+ * returns a response string to send back to the client.
+ */
+char *do_add_delta(item *it, int incr, unsigned int delta, char *buf) {
+    char *ptr;
+    unsigned int value;
+    int res;
+
     ptr = ITEM_data(it);
     while ((*ptr != '\0') &amp;&amp; (*ptr &lt; '0' &amp;&amp; *ptr &gt; '9')) ptr++;    // BUG: can't be true
 
     value = strtol(ptr, NULL, 10);
 
     if(errno == ERANGE) {
-        out_string(c, &quot;CLIENT_ERROR cannot increment or decrement non-numeric value&quot;);
-        return;
+        return &quot;CLIENT_ERROR cannot increment or decrement non-numeric value&quot;;
     }
 
     if (incr != 0)
@@ -1231,24 +1306,24 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
         if (delta &gt;= value) value = 0;
         else value -= delta;
     }
-    snprintf(temp, 32, &quot;%u&quot;, value);
-    res = strlen(temp);
+    snprintf(buf, 32, &quot;%u&quot;, value);
+    res = strlen(buf);
     if (res + 2 &gt; it-&gt;nbytes) { /* need to realloc */
         item *new_it;
-        new_it = item_alloc(ITEM_key(it), it-&gt;nkey, atoi(ITEM_suffix(it) + 1), it-&gt;exptime, res + 2 );
+        new_it = do_item_alloc(ITEM_key(it), it-&gt;nkey, atoi(ITEM_suffix(it) + 1), it-&gt;exptime, res + 2 );
         if (new_it == 0) {
-            out_string(c, &quot;SERVER_ERROR out of memory&quot;);
-            return;
+            return &quot;SERVER_ERROR out of memory&quot;;
         }
-        memcpy(ITEM_data(new_it), temp, res);
+        memcpy(ITEM_data(new_it), buf, res);
         memcpy(ITEM_data(new_it) + res, &quot;\r\n&quot;, 3);
-        item_replace(it, new_it);
+        do_item_replace(it, new_it);
+        do_item_remove(new_it);       /* release our reference */
     } else { /* replace in-place */
-        memcpy(ITEM_data(it), temp, res);
+        memcpy(ITEM_data(it), buf, res);
         memset(ITEM_data(it) + res, ' ', it-&gt;nbytes - res - 2);
     }
-    out_string(c, temp);
-    return;
+
+    return buf;
 }
 
 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -1289,17 +1364,32 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
         }
     }
 
-    it = get_item(key, nkey);
-    if (!it) {
-        out_string(c, &quot;NOT_FOUND&quot;);
-        return;
+    if (settings.detail_enabled) {
+        stats_prefix_record_delete(key);
     }
 
-    if (exptime == 0) {
-        item_unlink(it);
-        out_string(c, &quot;DELETED&quot;);
-        return;
+    it = item_get(key, nkey);
+    if (it) {
+        if (exptime == 0) {
+            item_unlink(it);
+            item_remove(it);      /* release our reference */
+            out_string(c, &quot;DELETED&quot;);
+        } else {
+            /* our reference will be transfered to the delete queue */
+            out_string(c, defer_delete(it, exptime));
+        }
+    } else {
+        out_string(c, &quot;NOT_FOUND&quot;);
     }
+}
+
+/*
+ * Adds an item to the deferred-delete list so it can be reaped later.
+ *
+ * Returns the result to send to the client.
+ */
+char *do_defer_delete(item *it, time_t exptime)
+{
     if (delcurr &gt;= deltotal) {
         item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
         if (new_delete) {
@@ -1310,18 +1400,17 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
              * can't delete it immediately, user wants a delay,
              * but we ran out of memory for the delete queue
              */
-            out_string(c, &quot;SERVER_ERROR out of memory&quot;);
-            return;
+            item_remove(it);    /* release reference */
+            return &quot;SERVER_ERROR out of memory&quot;;
         }
     }
 
-    it-&gt;refcount++;
     /* use its expiration time as its deletion time now */
     it-&gt;exptime = realtime(exptime);
     it-&gt;it_flags |= ITEM_DELETED;
     todelete[delcurr++] = it;
-    out_string(c, &quot;DELETED&quot;);
-    return;
+
+    return &quot;DELETED&quot;;
 }
 
 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -1565,7 +1654,9 @@ static int try_read_udp(conn *c) {
                    0, &amp;c-&gt;request_addr, &amp;c-&gt;request_addr_size);
     if (res &gt; 8) {
         unsigned char *buf = (unsigned char *)c-&gt;rbuf;
+        STATS_LOCK();
         stats.bytes_read += res;
+        STATS_UNLOCK();
 
         /* Beginning of UDP packet is the request ID; save it. */
         c-&gt;request_id = buf[0] * 256 + buf[1];
@@ -1632,7 +1723,9 @@ static int try_read_network(conn *c) {
 
         res = read(c-&gt;sfd, c-&gt;rbuf + c-&gt;rbytes, c-&gt;rsize - c-&gt;rbytes);
         if (res &gt; 0) {
+            STATS_LOCK();
             stats.bytes_read += res;
+            STATS_UNLOCK();
             gotdata = 1;
             c-&gt;rbytes += res;
             continue;
@@ -1653,10 +1746,12 @@ static int try_read_network(conn *c) {
 static bool update_event(conn *c, const int new_flags) {
     assert(c != NULL);
 
+    struct event_base *base = c-&gt;event.ev_base;
     if (c-&gt;ev_flags == new_flags)
         return true;
     if (event_del(&amp;c-&gt;event) == -1) return false;
     event_set(&amp;c-&gt;event, c-&gt;sfd, new_flags, event_handler, (void *)c);
+    event_base_set(base, &amp;c-&gt;event);
     c-&gt;ev_flags = new_flags;
     if (event_add(&amp;c-&gt;event, 0) == -1) return false;
     return true;
@@ -1666,6 +1761,8 @@ static bool update_event(conn *c, const int new_flags) {
  * Sets whether we are listening for new connections or not.
  */
 void accept_new_conns(const bool do_accept) {
+    if (! is_listen_thread())
+        return;
     if (do_accept) {
         update_event(listen_conn, EV_READ | EV_PERSIST);
         if (listen(listen_conn-&gt;sfd, 1024) != 0) {
@@ -1704,7 +1801,9 @@ static int transmit(conn *c) {
 
         res = sendmsg(c-&gt;sfd, m, 0);
         if (res &gt; 0) {
+            STATS_LOCK();
             stats.bytes_written += res;
+            STATS_UNLOCK();
 
             /* We've written some of the data. Remove the completed
                iovec entries from the list of pending writes. */
@@ -1762,14 +1861,16 @@ static void drive_machine(conn *c) {
             addrlen = sizeof(addr);
             if ((sfd = accept(c-&gt;sfd, &amp;addr, &amp;addrlen)) == -1) {
                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    /* these are transient, so don't log anything */
                     stop = true;
-                    break;
                 } else if (errno == EMFILE) {
                     if (settings.verbose &gt; 0)
                         fprintf(stderr, &quot;Too many open connections\n&quot;);
                     accept_new_conns(false);
+                    stop = true;
                 } else {
                     perror(&quot;accept()&quot;);
+                    stop = true;
                 }
                 break;
             }
@@ -1779,14 +1880,8 @@ static void drive_machine(conn *c) {
                 close(sfd);
                 break;
             }
-            if (conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
-                            DATA_BUFFER_SIZE, false) == NULL) {
-                if (settings.verbose &gt; 0)
-                    fprintf(stderr, &quot;couldn't create new connection\n&quot;);
-                close(sfd);
-                break;
-            }
-
+            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+                                     DATA_BUFFER_SIZE, false);
             break;
 
         case conn_read:
@@ -1826,7 +1921,9 @@ static void drive_machine(conn *c) {
             /*  now try reading from the socket */
             res = read(c-&gt;sfd, c-&gt;ritem, c-&gt;rlbytes);
             if (res &gt; 0) {
+                STATS_LOCK();
                 stats.bytes_read += res;
+                STATS_UNLOCK();
                 c-&gt;ritem += res;
                 c-&gt;rlbytes -= res;
                 break;
@@ -1870,7 +1967,9 @@ static void drive_machine(conn *c) {
             /*  now try reading from the socket */
             res = read(c-&gt;sfd, c-&gt;rbuf, c-&gt;rsize &gt; c-&gt;sbytes ? c-&gt;sbytes : c-&gt;rsize);
             if (res &gt; 0) {
+                STATS_LOCK();
                 stats.bytes_read += res;
+                STATS_UNLOCK();
                 c-&gt;sbytes -= res;
                 break;
             }
@@ -1955,7 +2054,6 @@ static void drive_machine(conn *c) {
             stop = true;
             break;
         }
-
     }
 
     return;
@@ -1977,7 +2075,6 @@ void event_handler(const int fd, const short which, void *arg) {
         return;
     }
 
-    /* do as much I/O as possible until we block */
     drive_machine(c);
 
     /* wait for next event */
@@ -2188,6 +2285,7 @@ static void clock_handler(const int fd, const short which, void *arg) {
     }
 
     evtimer_set(&amp;clockevent, clock_handler, 0);
+    event_base_set(main_base, &amp;clockevent);
     evtimer_add(&amp;clockevent, &amp;t);
 
     set_current_time();
@@ -2208,22 +2306,28 @@ static void delete_handler(const int fd, const short which, void *arg) {
     }
 
     evtimer_set(&amp;deleteevent, delete_handler, 0);
+    event_base_set(main_base, &amp;deleteevent);
     evtimer_add(&amp;deleteevent, &amp;t);
-    {
-        int i=0, j=0;
-        for (i=0; i&lt;delcurr; i++) {
-            item *it = todelete[i];
-            if (item_delete_lock_over(it)) {
-                assert(it-&gt;refcount &gt; 0);
-                it-&gt;it_flags &amp;= ~ITEM_DELETED;
-                item_unlink(it);
-                item_remove(it);
-            } else {
-                todelete[j++] = it;
-            }
+    run_deferred_deletes();
+}
+
+/* Call run_deferred_deletes instead of this. */
+void do_run_deferred_deletes(void)
+{
+    int i, j = 0;
+
+    for (i = 0; i &lt; delcurr; i++) {
+        item *it = todelete[i];
+        if (item_delete_lock_over(it)) {
+            assert(it-&gt;refcount &gt; 0);
+            it-&gt;it_flags &amp;= ~ITEM_DELETED;
+            do_item_unlink(it);
+            do_item_remove(it);
+        } else {
+            todelete[j++] = it;
         }
-        delcurr = j;
     }
+    delcurr = j;
 }
 
 static void usage(void) {
@@ -2247,6 +2351,9 @@ static void usage(void) {
            &quot;-P &lt;file&gt;     save PID in &lt;file&gt;, only used with -d option\n&quot;
            &quot;-f &lt;factor&gt;   chunk size growth factor, default 1.25\n&quot;
            &quot;-n &lt;bytes&gt;    minimum space allocated for key+value+flags, default 48\n&quot;);
+#ifdef USE_THREADS
+    printf(&quot;-t &lt;num&gt;      number of threads to use, default 4\n&quot;);
+#endif
     return;
 }
 
@@ -2356,7 +2463,6 @@ static void sig_handler(const int sig) {
 
 int main (int argc, char **argv) {
     int c;
-    conn *u_conn;
     struct in_addr addr;
     bool lock_memory = false;
     bool daemonize = false;
@@ -2377,7 +2483,7 @@ int main (int argc, char **argv) {
     setbuf(stderr, NULL);
 
     /* process arguments */
-    while ((c = getopt(argc, argv, &quot;bp:s:U:m:Mc:khirvdl:u:P:f:s:n:&quot;)) != -1) {
+    while ((c = getopt(argc, argv, &quot;bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:&quot;)) != -1) {
         switch (c) {
         case 'U':
             settings.udpport = atoi(optarg);
@@ -2446,6 +2552,21 @@ int main (int argc, char **argv) {
                 return 1;
             }
             break;
+        case 't':
+            settings.num_threads = atoi(optarg);
+            if (settings.num_threads == 0) {
+                fprintf(stderr, &quot;Number of threads must be greater than 0\n&quot;);
+                return 1;
+            }
+            break;
+        case 'D':
+            if (! optarg || ! optarg[0]) {
+                fprintf(stderr, &quot;No delimiter specified\n&quot;);
+                return 1;
+            }
+            settings.prefix_delimiter = optarg[0];
+            settings.detail_enabled = 1;
+            break;
         default:
             fprintf(stderr, &quot;Illegal argument \&quot;%c\&quot;\n&quot;, c);
             return 1;
@@ -2559,10 +2680,11 @@ int main (int argc, char **argv) {
         }
     }
 
+    /* initialize main thread libevent instance */
+    main_base = event_init();
 
     /* initialize other stuff */
     item_init();
-    event_init();
     stats_init();
     assoc_init();
     conn_init();
@@ -2599,16 +2721,16 @@ int main (int argc, char **argv) {
         exit(EXIT_FAILURE);
     }
     /* create the initial listening connection */
-    if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, false))) {
+    if (!(listen_conn = conn_new(l_socket, conn_listening,
+                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
         fprintf(stderr, &quot;failed to create listening connection&quot;);
         exit(EXIT_FAILURE);
     }
-    /* create the initial listening udp connection */
-    if (u_socket &gt; -1 &amp;&amp;
-        !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, true))) {
-        fprintf(stderr, &quot;failed to create udp connection&quot;);
-        exit(EXIT_FAILURE);
-    }
+    /* save the PID in if we're a daemon */
+    if (daemonize)
+        save_pid(getpid(), pid_file);
+    /* start up worker threads if MT mode */
+    thread_init(settings.num_threads, main_base);
     /* initialise clock event */
     clock_handler(0, 0, 0);
     /* initialise deletion array and timer event */
@@ -2616,11 +2738,16 @@ int main (int argc, char **argv) {
     delcurr = 0;
     todelete = malloc(sizeof(item *) * deltotal);
     delete_handler(0, 0, 0); /* sets up the event */
-    /* save the PID in if we're a daemon */
-    if (daemonize)
-        save_pid(getpid(), pid_file);
-    /* enter the loop */
-    event_loop(0);
+    /* create the initial listening udp connection, monitored on all threads */
+    if (u_socket &gt; -1) {
+        for (c = 0; c &lt; settings.num_threads; c++) {
+            /* this is guaranteed to hit all threads because we round-robin */
+            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
+                              UDP_READ_BUFFER_SIZE, 1);
+        }
+    }
+    /* enter the event loop */
+    event_base_loop(main_base, 0);
     /* remove the PID file if we're a daemon */
     if (daemonize)
         remove_pidfile(pid_file);</diff>
      <filename>memcached.c</filename>
    </modified>
    <modified>
      <diff>@@ -1,5 +1,11 @@
 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 /* $Id$ */
+#include &quot;config.h&quot;
+#include &lt;sys/types.h&gt;
+#include &lt;sys/time.h&gt;
+#include &lt;netinet/in.h&gt;
+#include &lt;event.h&gt;
+
 #define DATA_BUFFER_SIZE 2048
 #define UDP_READ_BUFFER_SIZE 65536
 #define UDP_MAX_PAYLOAD_SIZE 1400
@@ -69,6 +75,9 @@ struct settings {
     char *socketpath;   /* path to unix socket if using local socket */
     double factor;          /* chunk size growth factor */
     int chunk_size;
+    int num_threads;        /* number of libevent threads to run */
+    char prefix_delimiter;  /* character that marks a key prefix (for stats) */
+    int detail_enabled;     /* nonzero if we're collecting detailed stats */
 };
 
 extern struct stats stats;
@@ -199,6 +208,120 @@ extern volatile rel_time_t current_time;
  * Functions
  */
 
+conn *do_conn_from_freelist();
+int do_conn_add_to_freelist(conn *c);
+char *do_defer_delete(item *item, time_t exptime);
+void do_run_deferred_deletes(void);
+char *do_add_delta(item *item, int incr, unsigned int delta, char *buf);
+int do_store_item(item *item, int comm);
+conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base);
+
+
+#include &quot;stats.h&quot;
 #include &quot;slabs.h&quot;
 #include &quot;assoc.h&quot;
 #include &quot;items.h&quot;
+
+
+/*
+ * In multithreaded mode, we wrap certain functions with lock management and
+ * replace the logic of some other functions. All wrapped functions have
+ * &quot;mt_&quot; and &quot;do_&quot; variants. In multithreaded mode, the plain version of a
+ * function is #define-d to the &quot;mt_&quot; variant, which often just grabs a
+ * lock and calls the &quot;do_&quot; function. In singlethreaded mode, the &quot;do_&quot;
+ * function is called directly.
+ *
+ * Functions such as the libevent-related calls that need to do cross-thread
+ * communication in multithreaded mode (rather than actually doing the work
+ * in the current thread) are called via &quot;dispatch_&quot; frontends, which are
+ * also #define-d to directly call the underlying code in singlethreaded mode.
+ */
+#ifdef USE_THREADS
+
+void thread_init(int nthreads, struct event_base *main_base);
+int  dispatch_event_add(int thread, conn *c);
+void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
+
+/* Lock wrappers for cache functions that are called from main loop. */
+char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf);
+conn *mt_conn_from_freelist(void);
+int   mt_conn_add_to_freelist(conn *c);
+char *mt_defer_delete(item *it, time_t exptime);
+int   mt_is_listen_thread(void);
+item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+void  mt_item_flush_expired(void);
+item *mt_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *mt_item_get_nocheck(char *key, size_t nkey);
+int   mt_item_link(item *it);
+void  mt_item_remove(item *it);
+int   mt_item_replace(item *it, item *new_it);
+void  mt_item_unlink(item *it);
+void  mt_item_update(item *it);
+void  mt_run_deferred_deletes(void);
+void *mt_slabs_alloc(size_t size);
+void  mt_slabs_free(void *ptr, size_t size);
+int   mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
+char *mt_slabs_stats(int *buflen);
+void  mt_stats_lock(void);
+void  mt_stats_unlock(void);
+int   mt_store_item(item *item, int comm);
+
+
+# define add_delta(x,y,z,a)          mt_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    mt_assoc_move_next_bucket()
+# define conn_from_freelist()        mt_conn_from_freelist()
+# define conn_add_to_freelist(x)     mt_conn_add_to_freelist(x)
+# define defer_delete(x,y)           mt_defer_delete(x,y)
+# define is_listen_thread()          mt_is_listen_thread()
+# define item_alloc(x,y,z,a,b)       mt_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        mt_item_flush_expired()
+# define item_get_nocheck(x,y)       mt_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
+# define item_link(x)                mt_item_link(x)
+# define item_remove(x)              mt_item_remove(x)
+# define item_replace(x,y)           mt_item_replace(x,y)
+# define item_update(x)              mt_item_update(x)
+# define item_unlink(x)              mt_item_unlink(x)
+# define run_deferred_deletes()      mt_run_deferred_deletes()
+# define slabs_alloc(x)              mt_slabs_alloc(x)
+# define slabs_free(x,y)             mt_slabs_free(x,y)
+# define slabs_reassign(x,y)         mt_slabs_reassign(x,y)
+# define slabs_stats(x)              mt_slabs_stats(x)
+# define store_item(x,y)             mt_store_item(x,y)
+
+# define STATS_LOCK()                mt_stats_lock()
+# define STATS_UNLOCK()              mt_stats_unlock()
+
+#else /* !USE_THREADS */
+
+# define add_delta(x,y,z,a)          do_add_delta(x,y,z,a)
+# define assoc_move_next_bucket()    do_assoc_move_next_bucket()
+# define conn_from_freelist()        do_conn_from_freelist()
+# define conn_add_to_freelist(x)     do_conn_add_to_freelist(x)
+# define defer_delete(x,y)           do_defer_delete(x,y)
+# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
+# define dispatch_event_add(t,c)     event_add(&amp;(c)-&gt;event, 0)
+# define is_listen_thread()          1
+# define item_alloc(x,y,z,a,b)       do_item_alloc(x,y,z,a,b)
+# define item_flush_expired()        do_item_flush_expired()
+# define item_get_nocheck(x,y)       do_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
+# define item_link(x)                do_item_link(x)
+# define item_remove(x)              do_item_remove(x)
+# define item_replace(x,y)           do_item_replace(x,y)
+# define item_unlink(x)              do_item_unlink(x)
+# define item_update(x)              do_item_update(x)
+# define run_deferred_deletes()      do_run_deferred_deletes()
+# define slabs_alloc(x)              do_slabs_alloc(x)
+# define slabs_free(x,y)             do_slabs_free(x,y)
+# define slabs_reassign(x,y)         do_slabs_reassign(x,y)
+# define slabs_stats(x)              do_slabs_stats(x)
+# define store_item(x,y)             do_store_item(x,y)
+# define thread_init(x,y)            0
+
+# define STATS_LOCK()                /**/
+# define STATS_UNLOCK()              /**/
+
+#endif /* !USE_THREADS */
+
+</diff>
      <filename>memcached.h</filename>
    </modified>
    <modified>
      <diff>@@ -9,10 +9,8 @@
  *
  * $Id$
  */
-#include &quot;config.h&quot;
-#include &lt;sys/types.h&gt;
+#include &quot;memcached.h&quot;
 #include &lt;sys/stat.h&gt;
-#include &lt;sys/time.h&gt;
 #include &lt;sys/socket.h&gt;
 #include &lt;sys/signal.h&gt;
 #include &lt;sys/resource.h&gt;
@@ -21,18 +19,15 @@
 #include &lt;stdio.h&gt;
 #include &lt;string.h&gt;
 #include &lt;unistd.h&gt;
-#include &lt;netinet/in.h&gt;
 #include &lt;errno.h&gt;
-#include &lt;event.h&gt;
 #include &lt;assert.h&gt;
 #include &lt;stdbool.h&gt;
 
-#include &quot;memcached.h&quot;
-
 #define POWER_SMALLEST 1
 #define POWER_LARGEST  200
 #define POWER_BLOCK 1048576
 #define CHUNK_ALIGN_BYTES (sizeof(void *))
+#define DONT_PREALLOC_SLABS
 
 /* powers-of-N allocation structures */
 
@@ -63,7 +58,7 @@ static int power_largest;
 /*
  * Forward Declarations
  */
-static int slabs_newslab(const unsigned int id);
+static int do_slabs_newslab(const unsigned int id);
 
 #ifndef DONT_PREALLOC_SLABS
 /* Preallocate as many slab pages as possible (called from slabs_init)
@@ -161,7 +156,7 @@ static void slabs_preallocate (const unsigned int maxslabs) {
     for (i = POWER_SMALLEST; i &lt;= POWER_LARGEST; i++) {
         if (++prealloc &gt; maxslabs)
             return;
-        slabs_newslab(i);
+        do_slabs_newslab(i);
     }
 
 }
@@ -179,7 +174,7 @@ static int grow_slab_list (const unsigned int id) {
     return 1;
 }
 
-static int slabs_newslab(const unsigned int id) {
+static int do_slabs_newslab(const unsigned int id) {
     slabclass_t *p = &amp;slabclass[id];
 #ifdef ALLOW_SLABS_REASSIGN
     int len = POWER_BLOCK;
@@ -206,7 +201,7 @@ static int slabs_newslab(const unsigned int id) {
 }
 
 /*@null@*/
-void *slabs_alloc(const size_t size) {
+void *do_slabs_alloc(const size_t size) {
     slabclass_t *p;
 
     unsigned int id = slabs_clsid(size);
@@ -225,7 +220,7 @@ void *slabs_alloc(const size_t size) {
 
     /* fail unless we have space at the end of a recently allocated page,
        we have something on our freelist, or we could allocate a new page */
-    if (! (p-&gt;end_page_ptr != 0 || p-&gt;sl_curr != 0 || slabs_newslab(id)  != 0))
+    if (! (p-&gt;end_page_ptr != 0 || p-&gt;sl_curr != 0 || do_slabs_newslab(id) != 0))
         return 0;
 
     /* return off our freelist, if we have one */
@@ -246,7 +241,7 @@ void *slabs_alloc(const size_t size) {
     return NULL;  /* shouldn't ever get here */
 }
 
-void slabs_free(void *ptr, const size_t size) {
+void do_slabs_free(void *ptr, const size_t size) {
     unsigned char id = slabs_clsid(size);
     slabclass_t *p;
 
@@ -276,7 +271,7 @@ void slabs_free(void *ptr, const size_t size) {
 }
 
 /*@null@*/
-char* slabs_stats(int *buflen) {
+char* do_slabs_stats(int *buflen) {
     int i, total;
     char *buf = (char *)malloc(power_largest * 200 + 100);
     char *bufcurr = buf;
@@ -318,7 +313,7 @@ char* slabs_stats(int *buflen) {
    1 = success
    0 = fail
    -1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid) {
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid) {
     void *slab, *slab_end;
     slabclass_t *p, *dp;
     void *iter;</diff>
      <filename>slabs.c</filename>
    </modified>
    <modified>
      <diff>@@ -14,17 +14,17 @@ void slabs_init(const size_t limit, const double factor);
 unsigned int slabs_clsid(const size_t size);
 
 /* Allocate object of given length. 0 on error */ /*@null@*/
-void *slabs_alloc(const size_t size);
+void *do_slabs_alloc(const size_t size);
 
 /* Free previously allocated object */
-void slabs_free(void *ptr, size_t size);
+void do_slabs_free(void *ptr, size_t size);
 
 /* Fill buffer with stats */ /*@null@*/
-char* slabs_stats(int *buflen);
+char* do_slabs_stats(int *buflen);
 
 /* Request some slab be moved between classes
   1 = success
    0 = fail
    -1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid);
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid);
 </diff>
      <filename>slabs.h</filename>
    </modified>
    <modified>
      <diff>@@ -37,7 +37,7 @@ my $sock = $server-&gt;sock;
 my $stats = mem_stats($sock);
 
 # Test number of keys
-is(scalar(keys(%$stats)), 21, &quot;21 stats values&quot;);
+is(scalar(keys(%$stats)), 22, &quot;22 stats values&quot;);
 
 # Test initial state
 foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses bytes_written)) {</diff>
      <filename>t/stats.t</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>5b304997cc5e19cc9eb6856b70716574492c2989</id>
    </parent>
  </parents>
  <author>
    <name>Steven Grimm</name>
    <email>sgrimm@facebook.com</email>
  </author>
  <url>http://github.com/dustin/memcached/commit/56b8339e0606c1e59987c8d6368dfe727f3914b8</url>
  <id>56b8339e0606c1e59987c8d6368dfe727f3914b8</id>
  <committed-date>2007-04-16T08:34:03-07:00</committed-date>
  <authored-date>2007-04-16T08:34:03-07:00</authored-date>
  <message>Merge multithreaded into trunk, commit #2 (first commit only did the
new files, not the modified ones.)


git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@509 b0b603af-a30f-0410-a34e-baf09ae79d0b</message>
  <tree>6a3b5a81e147e26321f88ff679d345cb3cea025f</tree>
  <committer>
    <name>Steven Grimm</name>
    <email>sgrimm@facebook.com</email>
  </committer>
</commit>
