Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

connection buffers are per-thread to avoid lock contention

Summary: - A connection buffer group is created per thread.  It gets the thread id so if asserts are compiled in, it can determine if it is being used by a different thread.
         - conn_new and dispatch_conn_new no longer needs an initial read buffer size.  it is meaningless in the context of connection buffers.
         
         Minor change:
         - udp support for stress test

Reviewed By: ps

Test Plan: ran stress test with asserts on for 1 hr with 4 clients that reconnect every 15 minutes.
           
           ran in production with asserts on for 10 minutes (and still going...)

Revert: OK


git-svn-id: http://svn.facebook.com/svnroot/projects/memcached/trunk@122288 2c7ba8d8-a2f7-0310-a573-de162e16dcc7
  • Loading branch information...
commit c0dbcafaca0d7c6a671c1f754064793348031c47 1 parent 93a414a
authored September 18, 2008
9  binary_sm.c
@@ -125,7 +125,7 @@ void process_binary_protocol(conn* c) {
125 125
         }
126 126
 
127 127
         dispatch_conn_new(sfd, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
128  
-                          DATA_BUFFER_SIZE, false, c->binary, &addr, addrlen);
  128
+                          NULL, false, c->binary, &addr, addrlen);
129 129
         return;
130 130
     }
131 131
 
@@ -406,7 +406,8 @@ static inline bp_handler_res_t handle_header_size_known(conn* c)
406 406
 
407 407
             assert(c->riov == NULL);
408 408
             assert(c->riov_size == 0);
409  
-            c->riov = (struct iovec*) alloc_conn_buffer(0 /* no hint provided,
  409
+            c->riov = (struct iovec*) alloc_conn_buffer(c->cbg,
  410
+                                                        0 /* no hint provided,
410 411
                                                            * because we don't
411 412
                                                            * know how much the
412 413
                                                            * value will
@@ -416,7 +417,7 @@ static inline bp_handler_res_t handle_header_size_known(conn* c)
416 417
                 return retval;
417 418
             }
418 419
             c->riov_size = 1;
419  
-            report_max_rusage(c->riov, sizeof(struct iovec));
  420
+            report_max_rusage(c->cbg, c->riov, sizeof(struct iovec));
420 421
 
421 422
             /* set up the receive. */
422 423
             c->riov[0].iov_base = c->bp_key;
@@ -576,7 +577,7 @@ static inline bp_handler_res_t handle_direct_receive(conn* c)
576 577
         if (c->state == conn_bp_process) {
577 578
             /* going into the process stage.  we can release our receive IOV
578 579
              * buffers. */
579  
-            free_conn_buffer(c->riov, 0);
  580
+            free_conn_buffer(c->cbg, c->riov, 0);
580 581
             c->riov = NULL;
581 582
             c->riov_size = 0;
582 583
         }
401  conn_buffer.c
@@ -25,63 +25,70 @@
25 25
 #define HEAP_LEFT_CHILD(index)           ((((index) + 1) << 1) - 1 + 0)
26 26
 #define HEAP_RIGHT_CHILD(index)          ((((index) + 1) << 1) - 1 + 1)
27 27
 
  28
+
28 29
 #ifdef CONN_BUFFER_CORRUPTION_DETECTION
29 30
 static const bool detect_corruption = true;
30 31
 #else
31 32
 static const bool detect_corruption = false;
32 33
 #endif /* #ifdef CONN_BUFFER_CORRUPTION_DETECTION */
  34
+static struct {
  35
+    size_t page_size;
  36
+    int global_initialized;
  37
+    conn_buffer_group_t* cbg_list;
  38
+    size_t cbg_count;
  39
+} l;
33 40
 
34 41
 
35  
-STATIC int cb_freelist_check(void) {
  42
+STATIC int cb_freelist_check(conn_buffer_group_t* cbg) {
36 43
 #if defined(FREELIST_CHECK)
37 44
     size_t i, found_entries, rsize_total;
38 45
     bool end_found = false;
39 46
 
40 47
     /* num free buffers agrees with reality? */
41 48
     for (i = 0, found_entries = 0, rsize_total = 0;
42  
-         i < cbs.free_buffer_list_capacity;
  49
+         i < cbg->free_buffer_list_capacity;
43 50
          i ++) {
44 51
         size_t left_child, right_child;
45 52
 
46  
-        if (cbs.free_buffers[i] == NULL) {
  53
+        if (cbg->free_buffers[i] == NULL) {
47 54
             end_found = true;
48 55
             continue;
49 56
         }
50 57
 
51 58
         assert(end_found == false);
52  
-        assert(cbs.free_buffers[i]->signature == CONN_BUFFER_SIGNATURE);
53  
-        assert(cbs.free_buffers[i]->in_freelist == true);
54  
-        assert(cbs.free_buffers[i]->used == false);
  59
+        assert(cbg->free_buffers[i]->signature == CONN_BUFFER_SIGNATURE);
  60
+        assert(cbg->free_buffers[i]->in_freelist == true);
  61
+        assert(cbg->free_buffers[i]->used == false);
55 62
         found_entries ++;
56 63
 
57  
-        rsize_total += cbs.free_buffers[i]->max_rusage;
  64
+        rsize_total += cbg->free_buffers[i]->max_rusage;
58 65
 
59 66
         left_child = HEAP_LEFT_CHILD(i);
60 67
         right_child = HEAP_RIGHT_CHILD(i);
61 68
 
62  
-        if (left_child < cbs.num_free_buffers) {
  69
+        if (left_child < cbg->num_free_buffers) {
63 70
             /* valid left child */
64  
-            assert(cbs.free_buffers[left_child] != NULL);
65  
-            assert(cbs.free_buffers[left_child]->signature == CONN_BUFFER_SIGNATURE);
66  
-            assert(cbs.free_buffers[left_child]->in_freelist == true);
67  
-            assert(cbs.free_buffers[left_child]->used == false);
  71
+            assert(cbg->free_buffers[left_child] != NULL);
  72
+            assert(cbg->free_buffers[left_child]->signature == CONN_BUFFER_SIGNATURE);
  73
+            assert(cbg->free_buffers[left_child]->in_freelist == true);
  74
+            assert(cbg->free_buffers[left_child]->used == false);
68 75
 
69  
-            assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[left_child]->max_rusage);
  76
+            assert(cbg->free_buffers[i]->max_rusage >= cbg->free_buffers[left_child]->max_rusage);
70 77
         }
71 78
 
72  
-        if (right_child < cbs.num_free_buffers) {
  79
+        if (right_child < cbg->num_free_buffers) {
73 80
             /* valid right child */
74  
-            assert(cbs.free_buffers[right_child] != NULL);
75  
-            assert(cbs.free_buffers[right_child]->signature == CONN_BUFFER_SIGNATURE);
76  
-            assert(cbs.free_buffers[right_child]->in_freelist == true);
77  
-            assert(cbs.free_buffers[right_child]->used == false);
  81
+            assert(cbg->free_buffers[right_child] != NULL);
  82
+            assert(cbg->free_buffers[right_child]->signature == CONN_BUFFER_SIGNATURE);
  83
+            assert(cbg->free_buffers[right_child]->in_freelist == true);
  84
+            assert(cbg->free_buffers[right_child]->used == false);
78 85
 
79  
-            assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[right_child]->max_rusage);
  86
+            assert(cbg->free_buffers[i]->max_rusage >= cbg->free_buffers[right_child]->max_rusage);
80 87
         }
81 88
     }
82 89
 
83  
-    assert(found_entries == cbs.num_free_buffers);
84  
-    assert(rsize_total == cbs.total_rsize_in_freelist);
  90
+    assert(found_entries == cbg->num_free_buffers);
  91
+    assert(rsize_total == cbg->total_rsize_in_freelist);
85 92
 #endif /* #if defined(FREELIST_CHECK) */
86 93
 
87 94
     return 0;
@@ -89,121 +96,121 @@ STATIC int cb_freelist_check(void) {
89 96
 
90 97
 
91 98
 static size_t round_up_to_page(size_t bytes) {
92  
-    if ((bytes % cbs.settings.page_size) != 0) {
93  
-        bytes = ((bytes + cbs.settings.page_size - 1) & ~ (cbs.settings.page_size - 1));
  99
+    if ((bytes % l.page_size) != 0) {
  100
+        bytes = ((bytes + l.page_size - 1) & ~ (l.page_size - 1));
94 101
     }
95 102
 
96 103
     return bytes;
97 104
 }
98 105
 
99 106
 
100  
-static void add_conn_buffer_to_freelist(conn_buffer_t* buffer) {
  107
+static void add_conn_buffer_to_freelist(conn_buffer_group_t* cbg, conn_buffer_t* buffer) {
101 108
     size_t index;
102 109
 
103  
-    assert(cb_freelist_check() == 0);
  110
+    assert(cb_freelist_check(cbg) == 0);
104 111
     (void) cb_freelist_check;      /* dummy rvalue to avoid compiler warning. */
105 112
 
106 113
     assert(buffer->signature == CONN_BUFFER_SIGNATURE);
107 114
     assert(buffer->in_freelist == false);
108 115
     assert(buffer->used == false);
109 116
 
110  
-    if (cbs.num_free_buffers >= cbs.free_buffer_list_capacity) {
111  
-        if (cbs.free_buffers == NULL) {
112  
-            cbs.free_buffer_list_capacity = cbs.settings.initial_buffer_count;
113  
-            cbs.free_buffers = (conn_buffer_t**) pool_malloc(sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
  117
+    if (cbg->num_free_buffers >= cbg->free_buffer_list_capacity) {
  118
+        if (cbg->free_buffers == NULL) {
  119
+            cbg->free_buffer_list_capacity = cbg->settings.initial_buffer_count;
  120
+            cbg->free_buffers = (conn_buffer_t**) pool_malloc(sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity,
114 121
                                                              CONN_BUFFER_POOL);
115 122
         } else {
116  
-            cbs.free_buffers = pool_realloc(cbs.free_buffers,
117  
-                                            sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity * 2,
118  
-                                            sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
  123
+            cbg->free_buffers = pool_realloc(cbg->free_buffers,
  124
+                                            sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity * 2,
  125
+                                            sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity,
119 126
                                             CONN_BUFFER_POOL);
120  
-            cbs.free_buffer_list_capacity *= 2;
  127
+            cbg->free_buffer_list_capacity *= 2;
121 128
         }
122 129
 
123  
-        memset(&cbs.free_buffers[cbs.num_free_buffers], 0,
124  
-               sizeof(conn_buffer_t*) * (cbs.free_buffer_list_capacity - cbs.num_free_buffers));
  130
+        memset(&cbg->free_buffers[cbg->num_free_buffers], 0,
  131
+               sizeof(conn_buffer_t*) * (cbg->free_buffer_list_capacity - cbg->num_free_buffers));
125 132
     }
126 133
 
127 134
     buffer->in_freelist = true;
128 135
 
129  
-    assert(cbs.free_buffers[cbs.num_free_buffers] == NULL);
130  
-    cbs.free_buffers[cbs.num_free_buffers] = buffer;
131  
-    index = cbs.num_free_buffers;
132  
-    cbs.num_free_buffers ++;
133  
-    cbs.total_rsize_in_freelist += buffer->max_rusage;
  136
+    assert(cbg->free_buffers[cbg->num_free_buffers] == NULL);
  137
+    cbg->free_buffers[cbg->num_free_buffers] = buffer;
  138
+    index = cbg->num_free_buffers;
  139
+    cbg->num_free_buffers ++;
  140
+    cbg->total_rsize_in_freelist += buffer->max_rusage;
134 141
 
135 142
     while (index != 0) {
136 143
         size_t parent_index = HEAP_PARENT(index);
137 144
 
138  
-        if (cbs.free_buffers[index]->max_rusage >
139  
-            cbs.free_buffers[parent_index]->max_rusage) {
  145
+        if (cbg->free_buffers[index]->max_rusage >
  146
+            cbg->free_buffers[parent_index]->max_rusage) {
140 147
             conn_buffer_t* temp;
141 148
 
142 149
             /* swap */
143  
-            temp = cbs.free_buffers[index];
144  
-            cbs.free_buffers[index] = cbs.free_buffers[parent_index];
145  
-            cbs.free_buffers[parent_index] = temp;
  150
+            temp = cbg->free_buffers[index];
  151
+            cbg->free_buffers[index] = cbg->free_buffers[parent_index];
  152
+            cbg->free_buffers[parent_index] = temp;
146 153
         } else {
147 154
             /* no swap occured, so we can stop the reheaping operation */
148 155
             break;
149 156
         }
150 157
     }
151  
-    assert(cb_freelist_check() == 0);
  158
+    assert(cb_freelist_check(cbg) == 0);
152 159
 }
153 160
 
154 161
 
155  
-static conn_buffer_t* remove_conn_buffer_from_freelist(size_t max_rusage_hint) {
  162
+static conn_buffer_t* remove_conn_buffer_from_freelist(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
156 163
     conn_buffer_t* ret;
157 164
     conn_buffer_t* compare;
158 165
     size_t index;
159 166
 
160  
-    assert(cb_freelist_check() == 0);
  167
+    assert(cb_freelist_check(cbg) == 0);
161 168
 
162  
-    if (cbs.num_free_buffers == 0) {
163  
-        assert(cbs.free_buffers[0] == NULL);
  169
+    if (cbg->num_free_buffers == 0) {
  170
+        assert(cbg->free_buffers[0] == NULL);
164 171
         return NULL;
165 172
     }
166 173
 
167  
-    ret = cbs.free_buffers[0];
168  
-    cbs.free_buffers[0] = NULL;
  174
+    ret = cbg->free_buffers[0];
  175
+    cbg->free_buffers[0] = NULL;
169 176
     assert(ret->signature == CONN_BUFFER_SIGNATURE);
170 177
     assert(ret->in_freelist == true);
171 178
     assert(ret->used == false);
172 179
     ret->in_freelist = false;
173 180
 
174  
-    cbs.num_free_buffers --;
175  
-    cbs.total_rsize_in_freelist -= ret->max_rusage;
  181
+    cbg->num_free_buffers --;
  182
+    cbg->total_rsize_in_freelist -= ret->max_rusage;
176 183
 
177  
-    if (cbs.num_free_buffers == 0) {
178  
-        assert(cb_freelist_check() == 0);
  184
+    if (cbg->num_free_buffers == 0) {
  185
+        assert(cb_freelist_check(cbg) == 0);
179 186
         return ret;
180 187
     }
181 188
 
182 189
     index = 0;
183  
-    compare = cbs.free_buffers[cbs.num_free_buffers];
184  
-    cbs.free_buffers[cbs.num_free_buffers] = NULL;
  190
+    compare = cbg->free_buffers[cbg->num_free_buffers];
  191
+    cbg->free_buffers[cbg->num_free_buffers] = NULL;
185 192
 
186 193
     while (true) {
187 194
         size_t left_child_index = HEAP_LEFT_CHILD(index);
188 195
         size_t right_child_index = HEAP_RIGHT_CHILD(index);
189 196
         bool valid_left, valid_right, swap_left, swap_right;
190 197
 
191  
-        valid_left = (left_child_index < cbs.num_free_buffers) ? true : false;
192  
-        valid_right = (right_child_index < cbs.num_free_buffers) ? true : false;
  198
+        valid_left = (left_child_index < cbg->num_free_buffers) ? true : false;
  199
+        valid_right = (right_child_index < cbg->num_free_buffers) ? true : false;
193 200
 
194 201
         swap_left = (valid_left &&
195  
-                     cbs.free_buffers[left_child_index]->max_rusage >
  202
+                     cbg->free_buffers[left_child_index]->max_rusage >
196 203
                      compare->max_rusage) ? true : false;
197 204
         swap_right = (valid_right &&
198  
-                      cbs.free_buffers[right_child_index]->max_rusage >
  205
+                      cbg->free_buffers[right_child_index]->max_rusage >
199 206
                       compare->max_rusage) ? true : false;
200 207
 
201 208
         /* it's possible that we'd want to swap with both (i.e., bigger than
202 209
          * both).  pick the larger one to swap with.
203 210
          */
204 211
         if (swap_left && swap_right) {
205  
-            if (cbs.free_buffers[left_child_index]->max_rusage >
206  
-                cbs.free_buffers[right_child_index]->max_rusage) {
  212
+            if (cbg->free_buffers[left_child_index]->max_rusage >
  213
+                cbg->free_buffers[right_child_index]->max_rusage) {
207 214
                 /* left is greater, swap with left. */
208 215
                 swap_right = false;
209 216
             } else {
@@ -212,31 +219,31 @@ static conn_buffer_t* remove_conn_buffer_from_freelist(size_t max_rusage_hint) {
212 219
         }
213 220
 
214 221
         if (swap_left) {
215  
-            assert(cbs.free_buffers[index] == NULL);
216  
-            cbs.free_buffers[index] = cbs.free_buffers[left_child_index];
217  
-            cbs.free_buffers[left_child_index] = NULL;
  222
+            assert(cbg->free_buffers[index] == NULL);
  223
+            cbg->free_buffers[index] = cbg->free_buffers[left_child_index];
  224
+            cbg->free_buffers[left_child_index] = NULL;
218 225
             index = left_child_index;
219 226
         } else if (swap_right) {
220  
-            assert(cbs.free_buffers[index] == NULL);
221  
-            cbs.free_buffers[index] = cbs.free_buffers[right_child_index];
222  
-            cbs.free_buffers[right_child_index] = NULL;
  227
+            assert(cbg->free_buffers[index] == NULL);
  228
+            cbg->free_buffers[index] = cbg->free_buffers[right_child_index];
  229
+            cbg->free_buffers[right_child_index] = NULL;
223 230
             index = right_child_index;
224 231
         } else {
225  
-            assert(cbs.free_buffers[index] == NULL);
226  
-            cbs.free_buffers[index] = compare;
  232
+            assert(cbg->free_buffers[index] == NULL);
  233
+            cbg->free_buffers[index] = compare;
227 234
             break;
228 235
         }
229 236
     }
230 237
 
231  
-    assert(cb_freelist_check() == 0);
  238
+    assert(cb_freelist_check(cbg) == 0);
232 239
     return ret;
233 240
 }
234 241
 
235 242
 
236  
-static conn_buffer_t* make_conn_buffer(void) {
  243
+static conn_buffer_t* make_conn_buffer(conn_buffer_group_t* cbg) {
237 244
     conn_buffer_t* buffer;
238 245
 
239  
-    if (cbs.total_rsize + cbs.settings.page_size >= cbs.settings.total_rsize_range_top) {
  246
+    if (cbg->total_rsize + l.page_size >= cbg->settings.total_rsize_range_top) {
240 247
         /* we don't start the reclamation here because we didn't actually exceed
241 248
          * the top range.
242 249
          */
@@ -258,7 +265,7 @@ static conn_buffer_t* make_conn_buffer(void) {
258 265
     buffer->in_freelist = false;
259 266
     buffer->used = false;
260 267
 
261  
-    cbs.total_rsize += buffer->max_rusage;
  268
+    cbg->total_rsize += buffer->max_rusage;
262 269
 
263 270
     return buffer;
264 271
 }
@@ -304,16 +311,17 @@ static bool try_remap(void* ptr, const size_t range, unsigned remap_attempts) {
304 311
 }
305 312
 
306 313
 
307  
-static void destroy_conn_buffer(conn_buffer_t* buffer) {
  314
+static void destroy_conn_buffer(conn_buffer_group_t* cbg, conn_buffer_t* buffer) {
308 315
     void* ptr = buffer;
309 316
     size_t range = buffer->max_rusage;
310 317
 
311 318
     assert(buffer->in_freelist == false);
312 319
     assert(buffer->used == false);
313  
-    assert(cbs.total_rsize > 0);
  320
+    assert(cbg->total_rsize > 0);
  321
+
314 322
 
315  
-    cbs.stats.destroys ++;
316  
-    cbs.total_rsize -= buffer->max_rusage;
  323
+    cbg->stats.destroys ++;
  324
+    cbg->total_rsize -= buffer->max_rusage;
317 325
     munmap(buffer, CONN_BUFFER_SIZE);
318 326
 
319 327
     /* if we're trying to detect corruption, we need to freeze out the address
@@ -352,20 +360,34 @@ static conn_buffer_t* get_buffer_from_data_ptr(void* _ptr) {
352 360
 }
353 361
 
354 362
 
355  
-void conn_buffer_init(size_t initial_buffer_count,
356  
-                      size_t buffer_rsize_limit,
357  
-                      size_t total_rsize_range_bottom,
358  
-                      size_t total_rsize_range_top) {
359  
-    size_t i;
  363
+static void conn_buffer_reclamation(conn_buffer_group_t* cbg) {
  364
+    if (cbg->reclamation_in_progress) {
  365
+        if (cbg->num_free_buffers != 0) {
  366
+            /* grab the most space-consuming buffer and reclaim it. */
  367
+            conn_buffer_t* tofree = remove_conn_buffer_from_freelist(cbg, CONN_BUFFER_SIZE);
  368
+
  369
+            destroy_conn_buffer(cbg, tofree);
  370
+        }
  371
+
  372
+        if (cbg->num_free_buffers == 0 ||
  373
+            cbg->total_rsize <= cbg->settings.total_rsize_range_bottom) {
  374
+            cbg->reclamation_in_progress = false;
  375
+        }
  376
+    }
  377
+}
360 378
 
361  
-    always_assert( cbs.initialized == false );
362  
-    always_assert( (CONN_BUFFER_HEADER_SZ % sizeof(void*)) == 0 );
363 379
 
364  
-    memset(&cbs, 0, sizeof(conn_buffer_status_t));
  380
+static void conn_buffer_group_init(conn_buffer_group_t* const cbg,
  381
+                                   size_t initial_buffer_count,
  382
+                                   size_t buffer_rsize_limit,
  383
+                                   size_t total_rsize_range_bottom,
  384
+                                   size_t total_rsize_range_top) {
  385
+    size_t i;
365 386
 
366  
-    cbs.settings.page_size = getpagesize();
  387
+    always_assert( cbg->initialized == false );
  388
+    always_assert( (CONN_BUFFER_HEADER_SZ % sizeof(void*)) == 0 );
367 389
 
368  
-    always_assert( (cbs.settings.page_size & (cbs.settings.page_size - 1)) == 0);
  390
+    always_assert( (l.page_size & (l.page_size - 1)) == 0);
369 391
 
370 392
     /* write in some defaults */
371 393
     if (initial_buffer_count == 0) {
@@ -381,60 +403,71 @@ void conn_buffer_init(size_t initial_buffer_count,
381 403
         total_rsize_range_top = CONN_BUFFER_TOTAL_RSIZE_RANGE_TOP_DEFAULT;
382 404
     }
383 405
 
384  
-    always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_bottom);
385  
-    always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_top);
  406
+    always_assert(initial_buffer_count * l.page_size <= total_rsize_range_bottom);
  407
+    always_assert(initial_buffer_count * l.page_size <= total_rsize_range_top);
386 408
     // always_assert(buffer_rsize_limit < total_rsize_range_bottom);
387 409
     always_assert(total_rsize_range_bottom < total_rsize_range_top);
388  
-    always_assert(buffer_rsize_limit >= cbs.settings.page_size);
  410
+    always_assert(buffer_rsize_limit >= l.page_size);
389 411
 
390  
-    cbs.settings.initial_buffer_count = initial_buffer_count;
391  
-    cbs.settings.buffer_rsize_limit = buffer_rsize_limit;
392  
-    cbs.settings.total_rsize_range_bottom = total_rsize_range_bottom;
393  
-    cbs.settings.total_rsize_range_top = total_rsize_range_top;
  412
+    cbg->settings.initial_buffer_count = initial_buffer_count;
  413
+    cbg->settings.buffer_rsize_limit = buffer_rsize_limit;
  414
+    cbg->settings.total_rsize_range_bottom = total_rsize_range_bottom;
  415
+    cbg->settings.total_rsize_range_top = total_rsize_range_top;
394 416
 
395 417
     for (i = 0; i < initial_buffer_count; i ++) {
396 418
         conn_buffer_t* buffer;
397 419
 
398  
-        buffer = make_conn_buffer();
  420
+        buffer = make_conn_buffer(cbg);
399 421
         always_assert(buffer != NULL);
400  
-        add_conn_buffer_to_freelist(buffer);
  422
+        add_conn_buffer_to_freelist(cbg, buffer);
401 423
     }
402 424
 
403  
-    cbs.initialized = true;
  425
+    pthread_mutex_init(&cbg->lock, NULL);
  426
+
  427
+    cbg->initialized = true;
404 428
 }
405 429
 
406 430
 
407  
-void do_conn_buffer_reclamation(void) {
408  
-    if (cbs.reclamation_in_progress) {
409  
-        if (cbs.num_free_buffers != 0) {
410  
-            /* grab the most space-consuming buffer and reclaim it. */
411  
-            conn_buffer_t* tofree = remove_conn_buffer_from_freelist(CONN_BUFFER_SIZE);
  431
+void conn_buffer_init(unsigned groups,
  432
+                      size_t initial_buffer_count,
  433
+                      size_t buffer_rsize_limit,
  434
+                      size_t total_rsize_range_bottom,
  435
+                      size_t total_rsize_range_top) {
  436
+    unsigned ix;
412 437
 
413  
-            destroy_conn_buffer(tofree);
414  
-        }
  438
+    l.page_size = getpagesize();
  439
+    l.cbg_list = calloc(groups, sizeof(conn_buffer_group_t));
415 440
 
416  
-        if (cbs.num_free_buffers == 0 ||
417  
-            cbs.total_rsize <= cbs.settings.total_rsize_range_bottom) {
418  
-            cbs.reclamation_in_progress = false;
419  
-        }
  441
+    for (ix = 0; ix < groups; ix ++) {
  442
+        conn_buffer_group_init(&l.cbg_list[ix], initial_buffer_count, buffer_rsize_limit,
  443
+                               total_rsize_range_bottom, total_rsize_range_top);
420 444
     }
  445
+    l.cbg_count = groups;
  446
+
  447
+    l.global_initialized = true;
421 448
 }
422 449
 
423 450
 
424 451
 /**
425 452
  * allocate a connection buffer.  max_rusage_hint is a hint for how much
426 453
  * of the buffer will be used in the worst case.  if it is 0, the hint is
427  
- * discarded.  currently, the hint is ignored. */
428  
-void* do_alloc_conn_buffer(size_t max_rusage_hint) {
  454
+ * discarded.  currently, the hint is ignored.
  455
+ *
  456
+ * this is a thread-guarded function, i.e., it should only be called for a
  457
+ * connection buffer group by the thread it is assigned to.
  458
+ */
  459
+static void* do_alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
429 460
     conn_buffer_t* buffer;
430 461
 
431  
-    if ( (buffer = remove_conn_buffer_from_freelist(max_rusage_hint)) == NULL &&
432  
-         (buffer = make_conn_buffer()) == NULL ) {
433  
-        cbs.stats.allocs_failed ++;
  462
+    assert(cbg->settings.tid == pthread_self());
  463
+
  464
+    if ( (buffer = remove_conn_buffer_from_freelist(cbg, max_rusage_hint)) == NULL &&
  465
+         (buffer = make_conn_buffer(cbg)) == NULL ) {
  466
+        cbg->stats.allocs_failed ++;
434 467
         return NULL;
435 468
     }
436 469
 
437  
-    cbs.stats.allocs ++;
  470
+    cbg->stats.allocs ++;
438 471
 
439 472
     assert(buffer->signature == CONN_BUFFER_SIGNATURE);
440 473
     assert(buffer->in_freelist == false);
@@ -443,15 +476,26 @@ void* do_alloc_conn_buffer(size_t max_rusage_hint) {
443 476
     buffer->rusage_updated = false;
444 477
     buffer->prev_rusage = buffer->max_rusage;
445 478
 
446  
-    do_conn_buffer_reclamation();
  479
+    conn_buffer_reclamation(cbg);
447 480
 
448 481
     return buffer->data;
449 482
 }
450 483
 
451 484
 
452  
-void do_free_conn_buffer(void* ptr, ssize_t max_rusage) {
  485
+/**
  486
+ * releases a connection buffer.  max_rusage_hint is a hint for how much of the
  487
+ * buffer was used in the worst case.  if it is 0 and no one has ever called
  488
+ * report_max_rusage on this buffer, it is assumed that the entire buffer has
  489
+ * been accessed.  if it is 0 and someone has called report_max_rusage, then the
  490
+ * largest value reported is used.
  491
+ *
  492
+ * this is a thread-guarded function, i.e., it should only be called for a
  493
+ * connection buffer group by the thread it is assigned to.
  494
+ */
  495
+static void do_free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage) {
453 496
     conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
454 497
 
  498
+    assert(cbg->settings.tid == pthread_self());
455 499
     assert(buffer->signature == CONN_BUFFER_SIGNATURE);
456 500
     assert(buffer->in_freelist == false);
457 501
     assert(buffer->used == true);
@@ -475,37 +519,44 @@ void do_free_conn_buffer(void* ptr, ssize_t max_rusage) {
475 519
     }
476 520
 
477 521
     // bump counter
478  
-    cbs.stats.frees ++;
  522
+    cbg->stats.frees ++;
479 523
 
480 524
     /* do we reclaim this buffer? */
481  
-    if (max_rusage >= cbs.settings.buffer_rsize_limit ||
  525
+    if (max_rusage >= cbg->settings.buffer_rsize_limit ||
482 526
         detect_corruption) {
483 527
         /* yes, reclaim now...  we must set the max_rusage to the previously
484 528
          * known rusage because that delta was never accounted for. */
485 529
         buffer->max_rusage = buffer->prev_rusage;
486  
-        destroy_conn_buffer(buffer);
  530
+        destroy_conn_buffer(cbg, buffer);
487 531
     } else {
488 532
         /* adjust stats */
489  
-        cbs.total_rsize += (max_rusage - buffer->prev_rusage);
  533
+        cbg->total_rsize += (max_rusage - buffer->prev_rusage);
490 534
 
491 535
         /* return to the free list */
492  
-        add_conn_buffer_to_freelist(buffer);
  536
+        add_conn_buffer_to_freelist(cbg, buffer);
493 537
     }
494 538
 
495 539
     /* should we start a reclamation? */
496  
-    if (cbs.reclamation_in_progress == false &&
497  
-        cbs.total_rsize >= cbs.settings.total_rsize_range_top) {
498  
-        cbs.stats.reclamations_started ++;
499  
-        cbs.reclamation_in_progress = true;
  540
+    if (cbg->reclamation_in_progress == false &&
  541
+        cbg->total_rsize >= cbg->settings.total_rsize_range_top) {
  542
+        cbg->stats.reclamations_started ++;
  543
+        cbg->reclamation_in_progress = true;
500 544
     }
501 545
 
502  
-    do_conn_buffer_reclamation();
  546
+    conn_buffer_reclamation(cbg);
503 547
 }
504 548
 
505 549
 
506  
-void report_max_rusage(void* ptr, size_t max_rusage) {
  550
+/**
  551
+ * report the maximum usage of a connection buffer.
  552
+ *
  553
+ * this is a thread-guarded function, i.e., it should only be called for a
  554
+ * connection buffer group by the thread it is assigned to.
  555
+ */
  556
+static void do_report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage) {
507 557
     conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
508 558
 
  559
+    assert(cbg->settings.tid == pthread_self());
509 560
     assert(buffer->signature == CONN_BUFFER_SIGNATURE);
510 561
     assert(buffer->in_freelist == false);
511 562
     assert(buffer->used == true);
@@ -517,24 +568,90 @@ void report_max_rusage(void* ptr, size_t max_rusage) {
517 568
         buffer->max_rusage = max_rusage;
518 569
     }
519 570
 
520  
-    /* yeah, we're reading a variable in a thread-unsafe way, but we'll do a
  571
+    /* yeah, we're reading a variable in a group-unsafe way, but we'll do a
521 572
      * second check once we grab the lock. */
522  
-    if (cbs.reclamation_in_progress) {
523  
-        conn_buffer_reclamation();
  573
+    if (cbg->reclamation_in_progress) {
  574
+        conn_buffer_reclamation(cbg);
  575
+    }
  576
+}
  577
+
  578
+
  579
+void* alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
  580
+    void* ret;
  581
+
  582
+    pthread_mutex_lock(&cbg->lock);
  583
+    ret = do_alloc_conn_buffer(cbg, max_rusage_hint);
  584
+    pthread_mutex_unlock(&cbg->lock);
  585
+    return ret;
  586
+}
  587
+
  588
+void free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage) {
  589
+    pthread_mutex_lock(&cbg->lock);
  590
+    do_free_conn_buffer(cbg, ptr, max_rusage);
  591
+    pthread_mutex_unlock(&cbg->lock);
  592
+}
  593
+
  594
+void report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage) {
  595
+    pthread_mutex_lock(&cbg->lock);
  596
+    do_report_max_rusage(cbg, ptr, max_rusage);
  597
+    pthread_mutex_unlock(&cbg->lock);
  598
+}
  599
+
  600
+
  601
+conn_buffer_group_t* get_conn_buffer_group(unsigned group) {
  602
+    assert(group < l.cbg_count);
  603
+    return &l.cbg_list[group];
  604
+}
  605
+
  606
+
  607
+/**
  608
+ * assign a thread id to a connection buffer group.  returns false if no errors
  609
+ * occur.
  610
+ */
  611
+bool assign_thread_id_to_conn_buffer_group(unsigned group, pthread_t tid) {
  612
+    assert(group < l.cbg_count);
  613
+    if (group < l.cbg_count) {
  614
+        assert(l.cbg_list[group].settings.tid == 0);
  615
+        if (l.cbg_list[group].settings.tid == 0) {
  616
+            l.cbg_list[group].settings.tid = tid;
  617
+            return false;
  618
+        }
524 619
     }
  620
+    return true;
525 621
 }
526 622
 
527 623
 
528  
-char* do_conn_buffer_stats(size_t* result_size) {
  624
+char* conn_buffer_stats(size_t* result_size) {
529 625
     size_t bufsize = 2048, offset = 0;
530 626
     char* buffer = malloc(bufsize);
531 627
     char terminator[] = "END\r\n";
  628
+    unsigned ix;
  629
+
  630
+    size_t num_free_buffers = 0;
  631
+    size_t total_rsize = 0;
  632
+    size_t total_rsize_in_freelist = 0;
  633
+    conn_buffer_stats_t stats;
532 634
 
533 635
     if (buffer == NULL) {
534 636
         *result_size = 0;
535 637
         return NULL;
536 638
     }
537 639
 
  640
+    memset(&stats, 0, sizeof(conn_buffer_stats_t));
  641
+
  642
+    for (ix = 0; ix < l.cbg_count; ix ++) {
  643
+        pthread_mutex_lock(&l.cbg_list[ix].lock);
  644
+        num_free_buffers           += l.cbg_list[ix].num_free_buffers;
  645
+        total_rsize                += l.cbg_list[ix].total_rsize;
  646
+        total_rsize_in_freelist    += l.cbg_list[ix].total_rsize_in_freelist;
  647
+        stats.allocs               += l.cbg_list[ix].stats.allocs;
  648
+        stats.frees                += l.cbg_list[ix].stats.frees;
  649
+        stats.destroys             += l.cbg_list[ix].stats.destroys;
  650
+        stats.reclamations_started += l.cbg_list[ix].stats.reclamations_started;
  651
+        stats.allocs_failed        += l.cbg_list[ix].stats.allocs_failed;
  652
+        pthread_mutex_unlock(&l.cbg_list[ix].lock);
  653
+    }
  654
+
538 655
     offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
539 656
                               "STAT num_free_buffers %" PRINTF_INT64_MODIFIER "u\n"
540 657
                               "STAT total_rsize %" PRINTF_INT64_MODIFIER "u\n"
@@ -544,14 +661,14 @@ char* do_conn_buffer_stats(size_t* result_size) {
544 661
                               "STAT failed_allocates %" PRINTF_INT64_MODIFIER "u\n"
545 662
                               "STAT destroys %" PRINTF_INT64_MODIFIER "u\n"
546 663
                               "STAT reclamations_started %" PRINTF_INT64_MODIFIER "u\n",
547  
-                              cbs.num_free_buffers,
548  
-                              cbs.total_rsize,
549  
-                              cbs.total_rsize_in_freelist,
550  
-                              cbs.stats.allocs,
551  
-                              cbs.stats.frees,
552  
-                              cbs.stats.allocs_failed,
553  
-                              cbs.stats.destroys,
554  
-                              cbs.stats.reclamations_started);
  664
+                              num_free_buffers,
  665
+                              total_rsize,
  666
+                              total_rsize_in_freelist,
  667
+                              stats.allocs,
  668
+                              stats.frees,
  669
+                              stats.allocs_failed,
  670
+                              stats.destroys,
  671
+                              stats.reclamations_started);
555 672
 
556 673
     offset = append_to_buffer(buffer, bufsize, offset, 0, terminator);
557 674
 
46  conn_buffer.h
@@ -2,6 +2,8 @@
2 2
 
3 3
 #include "generic.h"
4 4
 
  5
+#include <pthread.h>
  6
+
5 7
 #if !defined(_conn_buffer_h_)
6 8
 #define _conn_buffer_h_
7 9
 
@@ -50,11 +52,21 @@ struct conn_buffer_s {
50 52
 };
51 53
 
52 54
 
53  
-typedef struct conn_buffer_status_s conn_buffer_status_t;
54  
-struct conn_buffer_status_s {
  55
+typedef struct conn_buffer_stats_s conn_buffer_stats_t;
  56
+struct conn_buffer_stats_s {
  57
+    uint64_t allocs;
  58
+    uint64_t frees;
  59
+    uint64_t destroys;
  60
+    uint64_t reclamations_started;
  61
+    uint64_t allocs_failed;
  62
+};
  63
+
  64
+
  65
+typedef struct conn_buffer_group_s conn_buffer_group_t;
  66
+struct conn_buffer_group_s {
55 67
     conn_buffer_t** free_buffers;
56  
-    size_t num_free_buffers;
57 68
     size_t free_buffer_list_capacity;
  69
+    size_t num_free_buffers;
58 70
 
59 71
     size_t total_rsize;
60 72
     size_t total_rsize_in_freelist;
@@ -63,6 +75,7 @@ struct conn_buffer_status_s {
63 75
     bool initialized;
64 76
 
65 77
     struct {
  78
+        pthread_t tid;                  /* associated thread id */
66 79
         size_t initial_buffer_count;    /* initial buffers set up */
67 80
         size_t buffer_rsize_limit;      /* if the reported usage of a block is
68 81
                                          * greater or equal to this limit, the
@@ -80,30 +93,27 @@ struct conn_buffer_status_s {
80 93
         size_t page_size;               /* page size on the OS. */
81 94
     } settings;
82 95
 
83  
-    struct {
84  
-        uint64_t allocs;
85  
-        uint64_t frees;
86  
-        uint64_t destroys;
87  
-        uint64_t reclamations_started;
88  
-        uint64_t allocs_failed;
89  
-    } stats;
  96
+    conn_buffer_stats_t stats;
  97
+    pthread_mutex_t lock;               /* lock for this connection buffer group. */
90 98
 };
91 99
 
92 100
 
93  
-DECL_MT_FUNC(void*, alloc_conn_buffer,       (size_t max_rusage_hint));
94  
-DECL_MT_FUNC(void,  free_conn_buffer,        (void* ptr, ssize_t max_rusage));
95  
-DECL_MT_FUNC(void,  conn_buffer_reclamation, (void));
96  
-DECL_MT_FUNC(char*, conn_buffer_stats,       (size_t* result_size));
  101
+extern void* alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint);
  102
+extern void free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage);
  103
+extern void report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage);
  104
+extern char* conn_buffer_stats(size_t* result_size);
97 105
 
98  
-extern void report_max_rusage(void* ptr, size_t max_rusage);
99 106
 
100  
-extern void conn_buffer_init(size_t initial_buffer_count,
  107
+extern void conn_buffer_init(unsigned threads,
  108
+                             size_t initial_buffer_count,
101 109
                              size_t buffer_rsize_limit,
102 110
                              size_t total_rsize_range_bottom,
103 111
                              size_t total_rsize_range_top);
104 112
 
105  
-STATIC_DECL(int cb_freelist_check(void));
106  
-STATIC_DECL(conn_buffer_status_t cbs);
  113
+extern conn_buffer_group_t* get_conn_buffer_group(unsigned thread);
  114
+extern bool assign_thread_id_to_conn_buffer_group(unsigned group, pthread_t tid);
  115
+
  116
+STATIC_DECL(int cb_freelist_check(conn_buffer_group_t* cbg));
107 117
 
108 118
 #if !defined(CONN_BUFFER_MODULE)
109 119
 #undef STATIC
2  generic.h
@@ -105,7 +105,7 @@ typedef unsigned int rel_time_t;
105 105
 #define always_assert assert
106 106
 #endif /* #if defined(NDEBUG) */
107 107
 
108  
-#define DECL_MT_FUNC(ret_type, func_name, args)  extern ret_type do_ ## func_name args;  extern ret_type mt_ ## func_name args;
  108
+#define DECL_MT_FUNC(ret_type, func_name, args)  extern ret_type do_ ## func_name args;  extern ret_type func_name args;
109 109
 
110 110
 // bump a counter up by one. return 0 if the counter has overflowed, nonzero otherwise.
111 111
 #define BUMP(cntr)  ((++(cntr)) != 0)
55  memcached.c
@@ -384,7 +384,7 @@ static int allocate_udp_reply_port(int sfd, int tries) {
384 384
 #endif
385 385
 
386 386
 conn *conn_new(const int sfd, const int init_state, const int event_flags,
387  
-               const int read_buffer_size, const bool is_udp, const bool is_binary,
  387
+               conn_buffer_group_t* cbg, const bool is_udp, const bool is_binary,
388 388
                const struct sockaddr* const addr, const socklen_t addrlen,
389 389
                struct event_base *base) {
390 390
     conn* c = conn_from_freelist();
@@ -448,6 +448,7 @@ conn *conn_new(const int sfd, const int init_state, const int event_flags,
448 448
     } else {
449 449
         c->request_addr_size = addrlen;
450 450
     }
  451
+    c->cbg = cbg;
451 452
 
452 453
     if (settings.verbose > 1) {
453 454
         if (init_state == conn_listening)
@@ -533,18 +534,18 @@ void conn_cleanup(conn* c) {
533 534
     }
534 535
 
535 536
     if (c->rbuf) {
536  
-        free_conn_buffer(c->rbuf, 0);   /* no idea how much was used... */
  537
+        free_conn_buffer(c->cbg, c->rbuf, 0);   /* no idea how much was used... */
537 538
         c->rbuf = NULL;
538 539
         c->rsize = 0;
539 540
     }
540 541
     if (c->iov) {
541  
-        free_conn_buffer(c->iov, 0);    /* no idea how much was used... */
  542
+        free_conn_buffer(c->cbg, c->iov, 0);    /* no idea how much was used... */
542 543
         c->iov = NULL;
543 544
         c->iovsize = 0;
544 545
     }
545 546
 
546 547
     if (c->riov) {
547  
-        free_conn_buffer(c->riov, 0);   /* no idea how much was used... */
  548
+        free_conn_buffer(c->cbg, c->riov, 0);   /* no idea how much was used... */
548 549
         c->riov = NULL;
549 550
         c->riov_size = 0;
550 551
     }
@@ -560,15 +561,15 @@ void conn_free(conn* c) {
560 561
         if (c->msglist)
561 562
             pool_free(c->msglist, sizeof(struct msghdr) * c->msgsize, CONN_BUFFER_MSGLIST_POOL);
562 563
         if (c->rbuf)
563  
-            free_conn_buffer(c->rbuf, 0);
  564
+            free_conn_buffer(c->cbg, c->rbuf, 0);
564 565
         if (c->wbuf)
565 566
             pool_free(c->wbuf, c->wsize, CONN_BUFFER_WBUF_POOL);
566 567
         if (c->ilist)
567 568
             pool_free(c->ilist, sizeof(item*) * c->isize, CONN_BUFFER_ILIST_POOL);
568 569
         if (c->iov)
569  
-            free_conn_buffer(c->iov, c->iovused * sizeof(struct iovec));
  570
+            free_conn_buffer(c->cbg, c->iov, c->iovused * sizeof(struct iovec));
570 571
         if (c->riov)
571  
-            free_conn_buffer(c->riov, 0);
  572
+            free_conn_buffer(c->cbg, c->riov, 0);
572 573
         if (c->bp_key)
573 574
             pool_free(c->bp_key, sizeof(char) * KEY_MAX_LENGTH + 1, CONN_BUFFER_BP_KEY_POOL);
574 575
         if (c->bp_hdr_pool)
@@ -621,7 +622,7 @@ void conn_shrink(conn* c) {
621 622
 
622 623
     if (c->rbytes == 0 && c->rbuf != NULL) {
623 624
         /* drop the buffer since we have no bytes to preserve. */
624  
-        free_conn_buffer(c->rbuf, 0);
  625
+        free_conn_buffer(c->cbg, c->rbuf, 0);
625 626
         c->rbuf = NULL;
626 627
         c->rcurr = NULL;
627 628
         c->rsize = 0;
@@ -665,13 +666,13 @@ void conn_shrink(conn* c) {
665 666
     }
666 667
 
667 668
     if (c->riov) {
668  
-        free_conn_buffer(c->riov, 0);
  669
+        free_conn_buffer(c->cbg, c->riov, 0);
669 670
         c->riov = NULL;
670 671
         c->riov_size = 0;
671 672
     }
672 673
 
673 674
     if (c->iov != NULL) {
674  
-        free_conn_buffer(c->iov, 0);
  675
+        free_conn_buffer(c->cbg, c->iov, 0);
675 676
         c->iov = NULL;
676 677
         c->iovsize = 0;
677 678
     }
@@ -713,7 +714,7 @@ static int ensure_iov_space(conn* c) {
713 714
     assert(c != NULL);
714 715
 
715 716
     if (c->iovsize == 0) {
716  
-        c->iov = (struct iovec *)alloc_conn_buffer(0);
  717
+        c->iov = (struct iovec *)alloc_conn_buffer(c->cbg, 0);
717 718
         if (c->iov != NULL) {
718 719
             c->iovsize = CONN_BUFFER_DATA_SZ / sizeof(struct iovec);
719 720
         }
@@ -723,7 +724,7 @@ static int ensure_iov_space(conn* c) {
723 724
         return -1;
724 725
     }
725 726
 
726  
-    report_max_rusage(c->iov, (c->iovused + 1) * sizeof(struct iovec));
  727
+    report_max_rusage(c->cbg, c->iov, (c->iovused + 1) * sizeof(struct iovec));
727 728
 
728 729
     return 0;
729 730
 }
@@ -2286,7 +2287,7 @@ int try_read_udp(conn* c) {
2286 2287
 
2287 2288
     if (c->rbuf == NULL) {
2288 2289
         /* no idea how big the buffer will need to be. */
2289  
-        c->rbuf = (char*) alloc_conn_buffer(0);
  2290
+        c->rbuf = (char*) alloc_conn_buffer(c->cbg, 0);
2290 2291
 
2291 2292
         if (c->rbuf != NULL) {
2292 2293
             c->rcurr = c->rbuf;
@@ -2327,7 +2328,7 @@ int try_read_udp(conn* c) {
2327 2328
         }
2328 2329
 
2329 2330
         /* report peak usage here */
2330  
-        report_max_rusage(c->rbuf, res);
  2331
+        report_max_rusage(c->cbg, c->rbuf, res);
2331 2332
 
2332 2333
 #if defined(HAVE_UDP_REPLY_PORTS)
2333 2334
         reply_ports = ntohs(*((uint16_t*)(buf + 6)));
@@ -2350,7 +2351,7 @@ int try_read_udp(conn* c) {
2350 2351
         return 1;
2351 2352
     } else {
2352 2353
         /* return the conn buffer. */
2353  
-        free_conn_buffer(c->rbuf, 8 - 1 /* worst case for memory usage */);
  2354
+        free_conn_buffer(c->cbg, c->rbuf, 8 - 1 /* worst case for memory usage */);
2354 2355
         c->rbuf = NULL;
2355 2356
         c->rcurr = NULL;
2356 2357
         c->rsize = 0;
@@ -2380,7 +2381,7 @@ int try_read_network(conn* c) {
2380 2381
             c->rcurr = c->rbuf;
2381 2382
         }
2382 2383
     } else {
2383  
-        c->rbuf = (char*) alloc_conn_buffer(0);
  2384
+        c->rbuf = (char*) alloc_conn_buffer(c->cbg, 0);
2384 2385
         if (c->rbuf != NULL) {
2385 2386
             c->rcurr = c->rbuf;
2386 2387
             c->rsize = CONN_BUFFER_DATA_SZ;
@@ -2406,7 +2407,7 @@ int try_read_network(conn* c) {
2406 2407
             c->rbytes += res;
2407 2408
 
2408 2409
             /* report peak usage here */
2409  
-            report_max_rusage(c->rbuf, c->rbytes);
  2410
+            report_max_rusage(c->cbg, c->rbuf, c->rbytes);
2410 2411
 
2411 2412
             if (res < avail) {
2412 2413
                 break;
@@ -2425,7 +2426,7 @@ int try_read_network(conn* c) {
2425 2426
             if (errno == EAGAIN || errno == EWOULDBLOCK) {
2426 2427
                 /* if we have no data, release the connection buffer */
2427 2428
                 if (c->rbytes == 0) {
2428  
-                    free_conn_buffer(c->rbuf, 0);
  2429
+                    free_conn_buffer(c->cbg, c->rbuf, 0);
2429 2430
                     c->rbuf = NULL;
2430 2431
                     c->rcurr = NULL;
2431 2432
                     c->rsize = 0;
@@ -2598,7 +2599,7 @@ static void drive_machine(conn* c) {
2598 2599
                 break;
2599 2600
             }
2600 2601
             dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
2601  
-                              DATA_BUFFER_SIZE, false, c->binary,
  2602
+                              NULL, false, c->binary,
2602 2603
                               &addr, addrlen);
2603 2604
 
2604 2605
             break;
@@ -2724,7 +2725,7 @@ static void drive_machine(conn* c) {
2724 2725
                 c->sbytes -= res;
2725 2726
 
2726 2727
                 /* report peak usage here */
2727  
-                report_max_rusage(c->rbuf, res);
  2728
+                report_max_rusage(c->cbg, c->rbuf, res);
2728 2729
 
2729 2730
                 break;
2730 2731
             }
@@ -3577,7 +3578,7 @@ int main (int argc, char **argv) {
3577 3578
 #if defined(USE_FLAT_ALLOCATOR)
3578 3579
     flat_storage_init(settings.maxbytes);
3579 3580
 #endif /* #if defined(USE_FLAT_ALLOCATOR) */
3580  
-    conn_buffer_init(0, 0, settings.max_conn_buffer_bytes / 2, settings.max_conn_buffer_bytes);
  3581
+    conn_buffer_init(settings.num_threads - 1, 0, 0, settings.max_conn_buffer_bytes / 2, settings.max_conn_buffer_bytes);
3581 3582
 
3582 3583
     /* managed instance? alloc and zero a bucket array */
3583 3584
     if (settings.managed) {
@@ -3612,7 +3613,7 @@ int main (int argc, char **argv) {
3612 3613
     /* create the initial listening connection */
3613 3614
     if (l_socket != 0) {
3614 3615
         if (!(listen_conn = conn_new(l_socket, conn_listening,
3615  
-                                     EV_READ | EV_PERSIST, 1, false, false,
  3616
+                                     EV_READ | EV_PERSIST, NULL, false, false,
3616 3617
                                      NULL, 0,
3617 3618
                                      main_base))) {
3618 3619
             fprintf(stderr, "failed to create listening connection");
@@ -3621,7 +3622,7 @@ int main (int argc, char **argv) {
3621 3622
     }
3622 3623
     if ((settings.binary_port != 0) &&
3623 3624
         (listen_binary_conn = conn_new(b_socket, conn_listening,
3624  
-                                       EV_READ | EV_PERSIST, 1, false, true,
  3625
+                                       EV_READ | EV_PERSIST, NULL, false, true,
3625 3626
                                        NULL, 0,
3626 3627
                                        main_base)) == NULL) {
3627 3628
         fprintf(stderr, "failed to create listening connection");
@@ -3647,20 +3648,20 @@ int main (int argc, char **argv) {
3647 3648
     if (u_socket > -1) {
3648 3649
         /* Skip thread 0, the tcp accept socket dispatcher
3649 3650
            if running with > 1 thread. */
3650  
-        for (c = (settings.num_threads > 1 ? 1 : 0); c < settings.num_threads; c++) {
  3651
+        for (c = 1; c < settings.num_threads; c++) {
3651 3652
             /* this is guaranteed to hit all threads because we round-robin */
3652 3653
             dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
3653  
-                              UDP_READ_BUFFER_SIZE, 1, 0, NULL, 0);
  3654
+                              get_conn_buffer_group(c - 1), 1, 0, NULL, 0);
3654 3655
         }
3655 3656
     }
3656 3657
     /* create the initial listening udp connection, monitored on all threads */
3657 3658
     if (bu_socket > -1) {
3658 3659
         /* Skip thread 0, the tcp accept socket dispatcher
3659 3660
            if running with > 1 thread. */
3660  
-        for (c = (settings.num_threads > 1 ? 1 : 0); c < settings.num_threads; c++) {
  3661
+        for (c = 1; c < settings.num_threads; c++) {
3661 3662
             /* this is guaranteed to hit all threads because we round-robin */
3662 3663
             dispatch_conn_new(bu_socket, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
3663  
-                              UDP_READ_BUFFER_SIZE, true, true, NULL, 0);
  3664
+                              get_conn_buffer_group(c - 1), true, true, NULL, 0);
3664 3665
         }
3665 3666
     }
3666 3667
     /* enter the event loop */
12  memcached.h
@@ -17,7 +17,6 @@
17 17
  */
18 18
 #define DATA_BUFFER_SIZE 2048
19 19
 #define BP_HDR_POOL_INIT_SIZE 4096
20  
-#define UDP_READ_BUFFER_SIZE 65536
21 20
 #define UDP_MAX_PAYLOAD_SIZE 1400
22 21
 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
23 22
 
@@ -187,6 +186,7 @@ struct settings_s {
187 186
  */
188 187
 #include "binary_protocol.h"
189 188
 #include "binary_sm.h"
  189
+#include "conn_buffer.h"
190 190
 #include "items.h"
191 191
 
192 192
 
@@ -267,6 +267,8 @@ struct conn_s {
267 267
                          a managed instance. -1 (_not_ 0) means invalid. */
268 268
     int    gen;       /* generation requested for the bucket */
269 269
 
  270
+    conn_buffer_group_t* cbg;
  271
+
270 272
     /* used to process binary protocol messages */
271 273
     bp_cmd_info_t bp_info;
272 274
 
@@ -304,7 +306,7 @@ void do_run_deferred_deletes(void);
304 306
 char *do_add_delta(const char* key, const size_t nkey, const int incr, const unsigned int delta,
305 307
                    char *buf, uint32_t* res_val, const struct in_addr addr);
306 308
 int do_store_item(item *item, int comm, const char* key);
307  
-conn* conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size,
  309
+conn* conn_new(const int sfd, const int init_state, const int event_flags, conn_buffer_group_t* cbg,
308 310
                  const bool is_udp, const bool is_binary,
309 311
                  const struct sockaddr* const addr, const socklen_t addrlen,
310 312
                  struct event_base *base);
@@ -330,7 +332,7 @@ extern int transmit(conn *c);
330 332
 void thread_init(int nthreads, struct event_base *main_base);
331 333
 int  dispatch_event_add(int thread, conn* c);
332 334
 void dispatch_conn_new(int sfd, int init_state, int event_flags,
333  
-                       const int read_buffer_size,
  335
+                       conn_buffer_group_t* cbg,
334 336
                        const bool is_udp, const bool is_binary,
335 337
                        const struct sockaddr* addr, socklen_t addrlen);
336 338
 
@@ -365,17 +367,13 @@ int   mt_store_item(item *item, int comm, const char* key);
365 367
 
366 368
 
367 369
 # define add_delta                   mt_add_delta
368  
-# define alloc_conn_buffer           mt_alloc_conn_buffer
369 370
 # define append_thread_stats         mt_append_thread_stats
370 371
 # define assoc_expire_regex          mt_assoc_expire_regex
371 372
 # define assoc_move_next_bucket      mt_assoc_move_next_bucket
372 373
 # define conn_from_freelist          mt_conn_from_freelist
373 374
 # define conn_add_to_freelist        mt_conn_add_to_freelist
374  
-# define conn_buffer_reclamation     mt_conn_buffer_reclamation
375  
-# define conn_buffer_stats           mt_conn_buffer_stats
376 375
 # define defer_delete                mt_defer_delete
377 376
 # define flat_allocator_stats        mt_flat_allocator_stats
378  
-# define free_conn_buffer            mt_free_conn_buffer
379 377
 # define is_listen_thread            mt_is_listen_thread
380 378
 # define item_alloc                  mt_item_alloc
381 379
 # define item_cachedump              mt_item_cachedump
4  slabs_items_support.h
@@ -47,7 +47,7 @@ static inline bool item_setup_receive(item* it, conn* c) {
47 47
 
48 48
         assert(c->riov == NULL);
49 49
         assert(c->riov_size == 0);
50  
-        c->riov = (struct iovec*) alloc_conn_buffer(sizeof(struct iovec) * iov_len_required);
  50
+        c->riov = (struct iovec*) alloc_conn_buffer(c->cbg, sizeof(struct iovec) * iov_len_required);
51 51
         if (c->riov == NULL) {
52 52
             return false;
53 53
         }
@@ -55,7 +55,7 @@ static inline bool item_setup_receive(item* it, conn* c) {
55 55
         iov_len_required = 1;
56 56
     }
57 57
 
58  
-    report_max_rusage(c->riov, sizeof(struct iovec) * iov_len_required);
  58
+    report_max_rusage(c->cbg, c->riov, sizeof(struct iovec) * iov_len_required);
59 59
     c->riov_size = iov_len_required;
60 60
     c->riov_left = iov_len_required;
61 61
     c->riov_curr = 0;
58  thread.c
@@ -28,11 +28,11 @@ struct conn_queue_item {
28 28
     int     sfd;
29 29
     int     init_state;
30 30
     int     event_flags;
31  
-    int     read_buffer_size;
32 31
     int     is_udp;
33 32
     int     is_binary;
34 33
     struct sockaddr addr;
35 34
     socklen_t addrlen;
  35
+    conn_buffer_group_t* cbg;
36 36
     CQ_ITEM *next;
37 37
 };
38 38
 
@@ -196,7 +196,7 @@ static void cqi_free(CQ_ITEM *item) {
196 196
 /*
197 197
  * Creates a worker thread.
198 198
  */
199  
-static void create_worker(void *(*func)(void *), void *arg) {
  199
+static void create_worker(unsigned worker_num, void *(*func)(void *), void *arg) {
200 200
     pthread_t       thread;
201 201
     pthread_attr_t  attr;
202 202
     int             ret;
@@ -208,6 +208,15 @@ static void create_worker(void *(*func)(void *), void *arg) {
208 208
                 strerror(ret));
209 209
         exit(1);
210 210
     }
  211
+
  212
+    assign_thread_id_to_conn_buffer_group(worker_num - 1 /* worker num is
  213
+                                                            1-based, but since
  214
+                                                            the main thread does
  215
+                                                            not need connection
  216
+                                                            buffers, the
  217
+                                                            connection buffer
  218
+                                                            groups are 0-based. */,
  219
+                                          thread);
211 220
 }
212 221
 
213 222
 
@@ -304,7 +313,7 @@ static void thread_libevent_process(int fd, short which, void *arg) {
304 313
 
305 314
     if (NULL != item) {
306 315
         conn* c = conn_new(item->sfd, item->init_state, item->event_flags,
307  
-                           item->read_buffer_size, item->is_udp,
  316
+                           item->cbg, item->is_udp,
308 317
                            item->is_binary, &item->addr, item->addrlen,
309 318
                            me->base);
310 319
         if (c == NULL) {
@@ -332,7 +341,7 @@ static int last_thread = 0;
332 341
  * of an incoming connection.
333 342
  */