Browse files

connection buffers are per-thread to avoid lock contention

Summary: - A connection buffer group is created per thread.  It gets the thread id so if asserts are compiled in, it can determine if it is being used by a different thread.
         - conn_new and dispatch_conn_new no longer needs an initial read buffer size.  it is meaningless in the context of connection buffers.
         
         Minor change:
         - udp support for stress test

Reviewed By: ps

Test Plan: ran stress test with asserts on for 1 hr with 4 clients that reconnect every 15 minutes.
           
           ran in production with asserts on for 10 minutes (and still going...)

Revert: OK


git-svn-id: http://svn.facebook.com/svnroot/projects/memcached/trunk@122288 2c7ba8d8-a2f7-0310-a573-de162e16dcc7
  • Loading branch information...
1 parent 93a414a commit c0dbcafaca0d7c6a671c1f754064793348031c47 ttung committed Sep 18, 2008
Showing with 348 additions and 239 deletions.
  1. +5 −4 binary_sm.c
  2. +259 −142 conn_buffer.c
  3. +28 −18 conn_buffer.h
  4. +1 −1 generic.h
  5. +28 −27 memcached.c
  6. +5 −7 memcached.h
  7. +2 −2 slabs_items_support.h
  8. +20 −38 thread.c
View
9 binary_sm.c
@@ -125,7 +125,7 @@ void process_binary_protocol(conn* c) {
}
dispatch_conn_new(sfd, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, false, c->binary, &addr, addrlen);
+ NULL, false, c->binary, &addr, addrlen);
return;
}
@@ -406,7 +406,8 @@ static inline bp_handler_res_t handle_header_size_known(conn* c)
assert(c->riov == NULL);
assert(c->riov_size == 0);
- c->riov = (struct iovec*) alloc_conn_buffer(0 /* no hint provided,
+ c->riov = (struct iovec*) alloc_conn_buffer(c->cbg,
+ 0 /* no hint provided,
* because we don't
* know how much the
* value will
@@ -416,7 +417,7 @@ static inline bp_handler_res_t handle_header_size_known(conn* c)
return retval;
}
c->riov_size = 1;
- report_max_rusage(c->riov, sizeof(struct iovec));
+ report_max_rusage(c->cbg, c->riov, sizeof(struct iovec));
/* set up the receive. */
c->riov[0].iov_base = c->bp_key;
@@ -576,7 +577,7 @@ static inline bp_handler_res_t handle_direct_receive(conn* c)
if (c->state == conn_bp_process) {
/* going into the process stage. we can release our receive IOV
* buffers. */
- free_conn_buffer(c->riov, 0);
+ free_conn_buffer(c->cbg, c->riov, 0);
c->riov = NULL;
c->riov_size = 0;
}
View
401 conn_buffer.c
@@ -25,185 +25,192 @@
#define HEAP_LEFT_CHILD(index) ((((index) + 1) << 1) - 1 + 0)
#define HEAP_RIGHT_CHILD(index) ((((index) + 1) << 1) - 1 + 1)
+
#ifdef CONN_BUFFER_CORRUPTION_DETECTION
static const bool detect_corruption = true;
#else
static const bool detect_corruption = false;
#endif /* #ifdef CONN_BUFFER_CORRUPTION_DETECTION */
+static struct {
+ size_t page_size;
+ int global_initialized;
+ conn_buffer_group_t* cbg_list;
+ size_t cbg_count;
+} l;
-STATIC int cb_freelist_check(void) {
+STATIC int cb_freelist_check(conn_buffer_group_t* cbg) {
#if defined(FREELIST_CHECK)
size_t i, found_entries, rsize_total;
bool end_found = false;
/* num free buffers agrees with reality? */
for (i = 0, found_entries = 0, rsize_total = 0;
- i < cbs.free_buffer_list_capacity;
+ i < cbg->free_buffer_list_capacity;
i ++) {
size_t left_child, right_child;
- if (cbs.free_buffers[i] == NULL) {
+ if (cbg->free_buffers[i] == NULL) {
end_found = true;
continue;
}
assert(end_found == false);
- assert(cbs.free_buffers[i]->signature == CONN_BUFFER_SIGNATURE);
- assert(cbs.free_buffers[i]->in_freelist == true);
- assert(cbs.free_buffers[i]->used == false);
+ assert(cbg->free_buffers[i]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbg->free_buffers[i]->in_freelist == true);
+ assert(cbg->free_buffers[i]->used == false);
found_entries ++;
- rsize_total += cbs.free_buffers[i]->max_rusage;
+ rsize_total += cbg->free_buffers[i]->max_rusage;
left_child = HEAP_LEFT_CHILD(i);
right_child = HEAP_RIGHT_CHILD(i);
- if (left_child < cbs.num_free_buffers) {
+ if (left_child < cbg->num_free_buffers) {
/* valid left child */
- assert(cbs.free_buffers[left_child] != NULL);
- assert(cbs.free_buffers[left_child]->signature == CONN_BUFFER_SIGNATURE);
- assert(cbs.free_buffers[left_child]->in_freelist == true);
- assert(cbs.free_buffers[left_child]->used == false);
+ assert(cbg->free_buffers[left_child] != NULL);
+ assert(cbg->free_buffers[left_child]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbg->free_buffers[left_child]->in_freelist == true);
+ assert(cbg->free_buffers[left_child]->used == false);
- assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[left_child]->max_rusage);
+ assert(cbg->free_buffers[i]->max_rusage >= cbg->free_buffers[left_child]->max_rusage);
}
- if (right_child < cbs.num_free_buffers) {
+ if (right_child < cbg->num_free_buffers) {
/* valid right child */
- assert(cbs.free_buffers[right_child] != NULL);
- assert(cbs.free_buffers[right_child]->signature == CONN_BUFFER_SIGNATURE);
- assert(cbs.free_buffers[right_child]->in_freelist == true);
- assert(cbs.free_buffers[right_child]->used == false);
+ assert(cbg->free_buffers[right_child] != NULL);
+ assert(cbg->free_buffers[right_child]->signature == CONN_BUFFER_SIGNATURE);
+ assert(cbg->free_buffers[right_child]->in_freelist == true);
+ assert(cbg->free_buffers[right_child]->used == false);
- assert(cbs.free_buffers[i]->max_rusage >= cbs.free_buffers[right_child]->max_rusage);
+ assert(cbg->free_buffers[i]->max_rusage >= cbg->free_buffers[right_child]->max_rusage);
}
}
- assert(found_entries == cbs.num_free_buffers);
- assert(rsize_total == cbs.total_rsize_in_freelist);
+ assert(found_entries == cbg->num_free_buffers);
+ assert(rsize_total == cbg->total_rsize_in_freelist);
#endif /* #if defined(FREELIST_CHECK) */
return 0;
}
static size_t round_up_to_page(size_t bytes) {
- if ((bytes % cbs.settings.page_size) != 0) {
- bytes = ((bytes + cbs.settings.page_size - 1) & ~ (cbs.settings.page_size - 1));
+ if ((bytes % l.page_size) != 0) {
+ bytes = ((bytes + l.page_size - 1) & ~ (l.page_size - 1));
}
return bytes;
}
-static void add_conn_buffer_to_freelist(conn_buffer_t* buffer) {
+static void add_conn_buffer_to_freelist(conn_buffer_group_t* cbg, conn_buffer_t* buffer) {
size_t index;
- assert(cb_freelist_check() == 0);
+ assert(cb_freelist_check(cbg) == 0);
(void) cb_freelist_check; /* dummy rvalue to avoid compiler warning. */
assert(buffer->signature == CONN_BUFFER_SIGNATURE);
assert(buffer->in_freelist == false);
assert(buffer->used == false);
- if (cbs.num_free_buffers >= cbs.free_buffer_list_capacity) {
- if (cbs.free_buffers == NULL) {
- cbs.free_buffer_list_capacity = cbs.settings.initial_buffer_count;
- cbs.free_buffers = (conn_buffer_t**) pool_malloc(sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
+ if (cbg->num_free_buffers >= cbg->free_buffer_list_capacity) {
+ if (cbg->free_buffers == NULL) {
+ cbg->free_buffer_list_capacity = cbg->settings.initial_buffer_count;
+ cbg->free_buffers = (conn_buffer_t**) pool_malloc(sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity,
CONN_BUFFER_POOL);
} else {
- cbs.free_buffers = pool_realloc(cbs.free_buffers,
- sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity * 2,
- sizeof(conn_buffer_t*) * cbs.free_buffer_list_capacity,
+ cbg->free_buffers = pool_realloc(cbg->free_buffers,
+ sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity * 2,
+ sizeof(conn_buffer_t*) * cbg->free_buffer_list_capacity,
CONN_BUFFER_POOL);
- cbs.free_buffer_list_capacity *= 2;
+ cbg->free_buffer_list_capacity *= 2;
}
- memset(&cbs.free_buffers[cbs.num_free_buffers], 0,
- sizeof(conn_buffer_t*) * (cbs.free_buffer_list_capacity - cbs.num_free_buffers));
+ memset(&cbg->free_buffers[cbg->num_free_buffers], 0,
+ sizeof(conn_buffer_t*) * (cbg->free_buffer_list_capacity - cbg->num_free_buffers));
}
buffer->in_freelist = true;
- assert(cbs.free_buffers[cbs.num_free_buffers] == NULL);
- cbs.free_buffers[cbs.num_free_buffers] = buffer;
- index = cbs.num_free_buffers;
- cbs.num_free_buffers ++;
- cbs.total_rsize_in_freelist += buffer->max_rusage;
+ assert(cbg->free_buffers[cbg->num_free_buffers] == NULL);
+ cbg->free_buffers[cbg->num_free_buffers] = buffer;
+ index = cbg->num_free_buffers;
+ cbg->num_free_buffers ++;
+ cbg->total_rsize_in_freelist += buffer->max_rusage;
while (index != 0) {
size_t parent_index = HEAP_PARENT(index);
- if (cbs.free_buffers[index]->max_rusage >
- cbs.free_buffers[parent_index]->max_rusage) {
+ if (cbg->free_buffers[index]->max_rusage >
+ cbg->free_buffers[parent_index]->max_rusage) {
conn_buffer_t* temp;
/* swap */
- temp = cbs.free_buffers[index];
- cbs.free_buffers[index] = cbs.free_buffers[parent_index];
- cbs.free_buffers[parent_index] = temp;
+ temp = cbg->free_buffers[index];
+ cbg->free_buffers[index] = cbg->free_buffers[parent_index];
+ cbg->free_buffers[parent_index] = temp;
} else {
/* no swap occured, so we can stop the reheaping operation */
break;
}
}
- assert(cb_freelist_check() == 0);
+ assert(cb_freelist_check(cbg) == 0);
}
-static conn_buffer_t* remove_conn_buffer_from_freelist(size_t max_rusage_hint) {
+static conn_buffer_t* remove_conn_buffer_from_freelist(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
conn_buffer_t* ret;
conn_buffer_t* compare;
size_t index;
- assert(cb_freelist_check() == 0);
+ assert(cb_freelist_check(cbg) == 0);
- if (cbs.num_free_buffers == 0) {
- assert(cbs.free_buffers[0] == NULL);
+ if (cbg->num_free_buffers == 0) {
+ assert(cbg->free_buffers[0] == NULL);
return NULL;
}
- ret = cbs.free_buffers[0];
- cbs.free_buffers[0] = NULL;
+ ret = cbg->free_buffers[0];
+ cbg->free_buffers[0] = NULL;
assert(ret->signature == CONN_BUFFER_SIGNATURE);
assert(ret->in_freelist == true);
assert(ret->used == false);
ret->in_freelist = false;
- cbs.num_free_buffers --;
- cbs.total_rsize_in_freelist -= ret->max_rusage;
+ cbg->num_free_buffers --;
+ cbg->total_rsize_in_freelist -= ret->max_rusage;
- if (cbs.num_free_buffers == 0) {
- assert(cb_freelist_check() == 0);
+ if (cbg->num_free_buffers == 0) {
+ assert(cb_freelist_check(cbg) == 0);
return ret;
}
index = 0;
- compare = cbs.free_buffers[cbs.num_free_buffers];
- cbs.free_buffers[cbs.num_free_buffers] = NULL;
+ compare = cbg->free_buffers[cbg->num_free_buffers];
+ cbg->free_buffers[cbg->num_free_buffers] = NULL;
while (true) {
size_t left_child_index = HEAP_LEFT_CHILD(index);
size_t right_child_index = HEAP_RIGHT_CHILD(index);
bool valid_left, valid_right, swap_left, swap_right;
- valid_left = (left_child_index < cbs.num_free_buffers) ? true : false;
- valid_right = (right_child_index < cbs.num_free_buffers) ? true : false;
+ valid_left = (left_child_index < cbg->num_free_buffers) ? true : false;
+ valid_right = (right_child_index < cbg->num_free_buffers) ? true : false;
swap_left = (valid_left &&
- cbs.free_buffers[left_child_index]->max_rusage >
+ cbg->free_buffers[left_child_index]->max_rusage >
compare->max_rusage) ? true : false;
swap_right = (valid_right &&
- cbs.free_buffers[right_child_index]->max_rusage >
+ cbg->free_buffers[right_child_index]->max_rusage >
compare->max_rusage) ? true : false;
/* it's possible that we'd want to swap with both (i.e., bigger than
* both). pick the larger one to swap with.
*/
if (swap_left && swap_right) {
- if (cbs.free_buffers[left_child_index]->max_rusage >
- cbs.free_buffers[right_child_index]->max_rusage) {
+ if (cbg->free_buffers[left_child_index]->max_rusage >
+ cbg->free_buffers[right_child_index]->max_rusage) {
/* left is greater, swap with left. */
swap_right = false;
} else {
@@ -212,31 +219,31 @@ static conn_buffer_t* remove_conn_buffer_from_freelist(size_t max_rusage_hint) {
}
if (swap_left) {
- assert(cbs.free_buffers[index] == NULL);
- cbs.free_buffers[index] = cbs.free_buffers[left_child_index];
- cbs.free_buffers[left_child_index] = NULL;
+ assert(cbg->free_buffers[index] == NULL);
+ cbg->free_buffers[index] = cbg->free_buffers[left_child_index];
+ cbg->free_buffers[left_child_index] = NULL;
index = left_child_index;
} else if (swap_right) {
- assert(cbs.free_buffers[index] == NULL);
- cbs.free_buffers[index] = cbs.free_buffers[right_child_index];
- cbs.free_buffers[right_child_index] = NULL;
+ assert(cbg->free_buffers[index] == NULL);
+ cbg->free_buffers[index] = cbg->free_buffers[right_child_index];
+ cbg->free_buffers[right_child_index] = NULL;
index = right_child_index;
} else {
- assert(cbs.free_buffers[index] == NULL);
- cbs.free_buffers[index] = compare;
+ assert(cbg->free_buffers[index] == NULL);
+ cbg->free_buffers[index] = compare;
break;
}
}
- assert(cb_freelist_check() == 0);
+ assert(cb_freelist_check(cbg) == 0);
return ret;
}
-static conn_buffer_t* make_conn_buffer(void) {
+static conn_buffer_t* make_conn_buffer(conn_buffer_group_t* cbg) {
conn_buffer_t* buffer;
- if (cbs.total_rsize + cbs.settings.page_size >= cbs.settings.total_rsize_range_top) {
+ if (cbg->total_rsize + l.page_size >= cbg->settings.total_rsize_range_top) {
/* we don't start the reclamation here because we didn't actually exceed
* the top range.
*/
@@ -258,7 +265,7 @@ static conn_buffer_t* make_conn_buffer(void) {
buffer->in_freelist = false;
buffer->used = false;
- cbs.total_rsize += buffer->max_rusage;
+ cbg->total_rsize += buffer->max_rusage;
return buffer;
}
@@ -304,16 +311,17 @@ static bool try_remap(void* ptr, const size_t range, unsigned remap_attempts) {
}
-static void destroy_conn_buffer(conn_buffer_t* buffer) {
+static void destroy_conn_buffer(conn_buffer_group_t* cbg, conn_buffer_t* buffer) {
void* ptr = buffer;
size_t range = buffer->max_rusage;
assert(buffer->in_freelist == false);
assert(buffer->used == false);
- assert(cbs.total_rsize > 0);
+ assert(cbg->total_rsize > 0);
+
- cbs.stats.destroys ++;
- cbs.total_rsize -= buffer->max_rusage;
+ cbg->stats.destroys ++;
+ cbg->total_rsize -= buffer->max_rusage;
munmap(buffer, CONN_BUFFER_SIZE);
/* if we're trying to detect corruption, we need to freeze out the address
@@ -352,20 +360,34 @@ static conn_buffer_t* get_buffer_from_data_ptr(void* _ptr) {
}
-void conn_buffer_init(size_t initial_buffer_count,
- size_t buffer_rsize_limit,
- size_t total_rsize_range_bottom,
- size_t total_rsize_range_top) {
- size_t i;
+static void conn_buffer_reclamation(conn_buffer_group_t* cbg) {
+ if (cbg->reclamation_in_progress) {
+ if (cbg->num_free_buffers != 0) {
+ /* grab the most space-consuming buffer and reclaim it. */
+ conn_buffer_t* tofree = remove_conn_buffer_from_freelist(cbg, CONN_BUFFER_SIZE);
+
+ destroy_conn_buffer(cbg, tofree);
+ }
+
+ if (cbg->num_free_buffers == 0 ||
+ cbg->total_rsize <= cbg->settings.total_rsize_range_bottom) {
+ cbg->reclamation_in_progress = false;
+ }
+ }
+}
- always_assert( cbs.initialized == false );
- always_assert( (CONN_BUFFER_HEADER_SZ % sizeof(void*)) == 0 );
- memset(&cbs, 0, sizeof(conn_buffer_status_t));
+static void conn_buffer_group_init(conn_buffer_group_t* const cbg,
+ size_t initial_buffer_count,
+ size_t buffer_rsize_limit,
+ size_t total_rsize_range_bottom,
+ size_t total_rsize_range_top) {
+ size_t i;
- cbs.settings.page_size = getpagesize();
+ always_assert( cbg->initialized == false );
+ always_assert( (CONN_BUFFER_HEADER_SZ % sizeof(void*)) == 0 );
- always_assert( (cbs.settings.page_size & (cbs.settings.page_size - 1)) == 0);
+ always_assert( (l.page_size & (l.page_size - 1)) == 0);
/* write in some defaults */
if (initial_buffer_count == 0) {
@@ -381,60 +403,71 @@ void conn_buffer_init(size_t initial_buffer_count,
total_rsize_range_top = CONN_BUFFER_TOTAL_RSIZE_RANGE_TOP_DEFAULT;
}
- always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_bottom);
- always_assert(initial_buffer_count * cbs.settings.page_size <= total_rsize_range_top);
+ always_assert(initial_buffer_count * l.page_size <= total_rsize_range_bottom);
+ always_assert(initial_buffer_count * l.page_size <= total_rsize_range_top);
// always_assert(buffer_rsize_limit < total_rsize_range_bottom);
always_assert(total_rsize_range_bottom < total_rsize_range_top);
- always_assert(buffer_rsize_limit >= cbs.settings.page_size);
+ always_assert(buffer_rsize_limit >= l.page_size);
- cbs.settings.initial_buffer_count = initial_buffer_count;
- cbs.settings.buffer_rsize_limit = buffer_rsize_limit;
- cbs.settings.total_rsize_range_bottom = total_rsize_range_bottom;
- cbs.settings.total_rsize_range_top = total_rsize_range_top;
+ cbg->settings.initial_buffer_count = initial_buffer_count;
+ cbg->settings.buffer_rsize_limit = buffer_rsize_limit;
+ cbg->settings.total_rsize_range_bottom = total_rsize_range_bottom;
+ cbg->settings.total_rsize_range_top = total_rsize_range_top;
for (i = 0; i < initial_buffer_count; i ++) {
conn_buffer_t* buffer;
- buffer = make_conn_buffer();
+ buffer = make_conn_buffer(cbg);
always_assert(buffer != NULL);
- add_conn_buffer_to_freelist(buffer);
+ add_conn_buffer_to_freelist(cbg, buffer);
}
- cbs.initialized = true;
+ pthread_mutex_init(&cbg->lock, NULL);
+
+ cbg->initialized = true;
}
-void do_conn_buffer_reclamation(void) {
- if (cbs.reclamation_in_progress) {
- if (cbs.num_free_buffers != 0) {
- /* grab the most space-consuming buffer and reclaim it. */
- conn_buffer_t* tofree = remove_conn_buffer_from_freelist(CONN_BUFFER_SIZE);
+void conn_buffer_init(unsigned groups,
+ size_t initial_buffer_count,
+ size_t buffer_rsize_limit,
+ size_t total_rsize_range_bottom,
+ size_t total_rsize_range_top) {
+ unsigned ix;
- destroy_conn_buffer(tofree);
- }
+ l.page_size = getpagesize();
+ l.cbg_list = calloc(groups, sizeof(conn_buffer_group_t));
- if (cbs.num_free_buffers == 0 ||
- cbs.total_rsize <= cbs.settings.total_rsize_range_bottom) {
- cbs.reclamation_in_progress = false;
- }
+ for (ix = 0; ix < groups; ix ++) {
+ conn_buffer_group_init(&l.cbg_list[ix], initial_buffer_count, buffer_rsize_limit,
+ total_rsize_range_bottom, total_rsize_range_top);
}
+ l.cbg_count = groups;
+
+ l.global_initialized = true;
}
/**
* allocate a connection buffer. max_rusage_hint is a hint for how much
* of the buffer will be used in the worst case. if it is 0, the hint is
- * discarded. currently, the hint is ignored. */
-void* do_alloc_conn_buffer(size_t max_rusage_hint) {
+ * discarded. currently, the hint is ignored.
+ *
+ * this is a thread-guarded function, i.e., it should only be called for a
+ * connection buffer group by the thread it is assigned to.
+ */
+static void* do_alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
conn_buffer_t* buffer;
- if ( (buffer = remove_conn_buffer_from_freelist(max_rusage_hint)) == NULL &&
- (buffer = make_conn_buffer()) == NULL ) {
- cbs.stats.allocs_failed ++;
+ assert(cbg->settings.tid == pthread_self());
+
+ if ( (buffer = remove_conn_buffer_from_freelist(cbg, max_rusage_hint)) == NULL &&
+ (buffer = make_conn_buffer(cbg)) == NULL ) {
+ cbg->stats.allocs_failed ++;
return NULL;
}
- cbs.stats.allocs ++;
+ cbg->stats.allocs ++;
assert(buffer->signature == CONN_BUFFER_SIGNATURE);
assert(buffer->in_freelist == false);
@@ -443,15 +476,26 @@ void* do_alloc_conn_buffer(size_t max_rusage_hint) {
buffer->rusage_updated = false;
buffer->prev_rusage = buffer->max_rusage;
- do_conn_buffer_reclamation();
+ conn_buffer_reclamation(cbg);
return buffer->data;
}
-void do_free_conn_buffer(void* ptr, ssize_t max_rusage) {
+/**
+ * releases a connection buffer. max_rusage_hint is a hint for how much of the
+ * buffer was used in the worst case. if it is 0 and no one has ever called
+ * report_max_rusage on this buffer, it is assumed that the entire buffer has
+ * been accessed. if it is 0 and someone has called report_max_rusage, then the
+ * largest value reported is used.
+ *
+ * this is a thread-guarded function, i.e., it should only be called for a
+ * connection buffer group by the thread it is assigned to.
+ */
+static void do_free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage) {
conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
+ assert(cbg->settings.tid == pthread_self());
assert(buffer->signature == CONN_BUFFER_SIGNATURE);
assert(buffer->in_freelist == false);
assert(buffer->used == true);
@@ -475,37 +519,44 @@ void do_free_conn_buffer(void* ptr, ssize_t max_rusage) {
}
// bump counter
- cbs.stats.frees ++;
+ cbg->stats.frees ++;
/* do we reclaim this buffer? */
- if (max_rusage >= cbs.settings.buffer_rsize_limit ||
+ if (max_rusage >= cbg->settings.buffer_rsize_limit ||
detect_corruption) {
/* yes, reclaim now... we must set the max_rusage to the previously
* known rusage because that delta was never accounted for. */
buffer->max_rusage = buffer->prev_rusage;
- destroy_conn_buffer(buffer);
+ destroy_conn_buffer(cbg, buffer);
} else {
/* adjust stats */
- cbs.total_rsize += (max_rusage - buffer->prev_rusage);
+ cbg->total_rsize += (max_rusage - buffer->prev_rusage);
/* return to the free list */
- add_conn_buffer_to_freelist(buffer);
+ add_conn_buffer_to_freelist(cbg, buffer);
}
/* should we start a reclamation? */
- if (cbs.reclamation_in_progress == false &&
- cbs.total_rsize >= cbs.settings.total_rsize_range_top) {
- cbs.stats.reclamations_started ++;
- cbs.reclamation_in_progress = true;
+ if (cbg->reclamation_in_progress == false &&
+ cbg->total_rsize >= cbg->settings.total_rsize_range_top) {
+ cbg->stats.reclamations_started ++;
+ cbg->reclamation_in_progress = true;
}
- do_conn_buffer_reclamation();
+ conn_buffer_reclamation(cbg);
}
-void report_max_rusage(void* ptr, size_t max_rusage) {
+/**
+ * report the maximum usage of a connection buffer.
+ *
+ * this is a thread-guarded function, i.e., it should only be called for a
+ * connection buffer group by the thread it is assigned to.
+ */
+static void do_report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage) {
conn_buffer_t* buffer = get_buffer_from_data_ptr(ptr);
+ assert(cbg->settings.tid == pthread_self());
assert(buffer->signature == CONN_BUFFER_SIGNATURE);
assert(buffer->in_freelist == false);
assert(buffer->used == true);
@@ -517,24 +568,90 @@ void report_max_rusage(void* ptr, size_t max_rusage) {
buffer->max_rusage = max_rusage;
}
- /* yeah, we're reading a variable in a thread-unsafe way, but we'll do a
+ /* yeah, we're reading a variable in a group-unsafe way, but we'll do a
* second check once we grab the lock. */
- if (cbs.reclamation_in_progress) {
- conn_buffer_reclamation();
+ if (cbg->reclamation_in_progress) {
+ conn_buffer_reclamation(cbg);
+ }
+}
+
+
+void* alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint) {
+ void* ret;
+
+ pthread_mutex_lock(&cbg->lock);
+ ret = do_alloc_conn_buffer(cbg, max_rusage_hint);
+ pthread_mutex_unlock(&cbg->lock);
+ return ret;
+}
+
+void free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage) {
+ pthread_mutex_lock(&cbg->lock);
+ do_free_conn_buffer(cbg, ptr, max_rusage);
+ pthread_mutex_unlock(&cbg->lock);
+}
+
+void report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage) {
+ pthread_mutex_lock(&cbg->lock);
+ do_report_max_rusage(cbg, ptr, max_rusage);
+ pthread_mutex_unlock(&cbg->lock);
+}
+
+
+conn_buffer_group_t* get_conn_buffer_group(unsigned group) {
+ assert(group < l.cbg_count);
+ return &l.cbg_list[group];
+}
+
+
+/**
+ * assign a thread id to a connection buffer group. returns false if no errors
+ * occur.
+ */
+bool assign_thread_id_to_conn_buffer_group(unsigned group, pthread_t tid) {
+ assert(group < l.cbg_count);
+ if (group < l.cbg_count) {
+ assert(l.cbg_list[group].settings.tid == 0);
+ if (l.cbg_list[group].settings.tid == 0) {
+ l.cbg_list[group].settings.tid = tid;
+ return false;
+ }
}
+ return true;
}
-char* do_conn_buffer_stats(size_t* result_size) {
+char* conn_buffer_stats(size_t* result_size) {
size_t bufsize = 2048, offset = 0;
char* buffer = malloc(bufsize);
char terminator[] = "END\r\n";
+ unsigned ix;
+
+ size_t num_free_buffers = 0;
+ size_t total_rsize = 0;
+ size_t total_rsize_in_freelist = 0;
+ conn_buffer_stats_t stats;
if (buffer == NULL) {
*result_size = 0;
return NULL;
}
+ memset(&stats, 0, sizeof(conn_buffer_stats_t));
+
+ for (ix = 0; ix < l.cbg_count; ix ++) {
+ pthread_mutex_lock(&l.cbg_list[ix].lock);
+ num_free_buffers += l.cbg_list[ix].num_free_buffers;
+ total_rsize += l.cbg_list[ix].total_rsize;
+ total_rsize_in_freelist += l.cbg_list[ix].total_rsize_in_freelist;
+ stats.allocs += l.cbg_list[ix].stats.allocs;
+ stats.frees += l.cbg_list[ix].stats.frees;
+ stats.destroys += l.cbg_list[ix].stats.destroys;
+ stats.reclamations_started += l.cbg_list[ix].stats.reclamations_started;
+ stats.allocs_failed += l.cbg_list[ix].stats.allocs_failed;
+ pthread_mutex_unlock(&l.cbg_list[ix].lock);
+ }
+
offset = append_to_buffer(buffer, bufsize, offset, sizeof(terminator),
"STAT num_free_buffers %" PRINTF_INT64_MODIFIER "u\n"
"STAT total_rsize %" PRINTF_INT64_MODIFIER "u\n"
@@ -544,14 +661,14 @@ char* do_conn_buffer_stats(size_t* result_size) {
"STAT failed_allocates %" PRINTF_INT64_MODIFIER "u\n"
"STAT destroys %" PRINTF_INT64_MODIFIER "u\n"
"STAT reclamations_started %" PRINTF_INT64_MODIFIER "u\n",
- cbs.num_free_buffers,
- cbs.total_rsize,
- cbs.total_rsize_in_freelist,
- cbs.stats.allocs,
- cbs.stats.frees,
- cbs.stats.allocs_failed,
- cbs.stats.destroys,
- cbs.stats.reclamations_started);
+ num_free_buffers,
+ total_rsize,
+ total_rsize_in_freelist,
+ stats.allocs,
+ stats.frees,
+ stats.allocs_failed,
+ stats.destroys,
+ stats.reclamations_started);
offset = append_to_buffer(buffer, bufsize, offset, 0, terminator);
View
46 conn_buffer.h
@@ -2,6 +2,8 @@
#include "generic.h"
+#include <pthread.h>
+
#if !defined(_conn_buffer_h_)
#define _conn_buffer_h_
@@ -50,11 +52,21 @@ struct conn_buffer_s {
};
-typedef struct conn_buffer_status_s conn_buffer_status_t;
-struct conn_buffer_status_s {
+typedef struct conn_buffer_stats_s conn_buffer_stats_t;
+struct conn_buffer_stats_s {
+ uint64_t allocs;
+ uint64_t frees;
+ uint64_t destroys;
+ uint64_t reclamations_started;
+ uint64_t allocs_failed;
+};
+
+
+typedef struct conn_buffer_group_s conn_buffer_group_t;
+struct conn_buffer_group_s {
conn_buffer_t** free_buffers;
- size_t num_free_buffers;
size_t free_buffer_list_capacity;
+ size_t num_free_buffers;
size_t total_rsize;
size_t total_rsize_in_freelist;
@@ -63,6 +75,7 @@ struct conn_buffer_status_s {
bool initialized;
struct {
+ pthread_t tid; /* associated thread id */
size_t initial_buffer_count; /* initial buffers set up */
size_t buffer_rsize_limit; /* if the reported usage of a block is
* greater or equal to this limit, the
@@ -80,30 +93,27 @@ struct conn_buffer_status_s {
size_t page_size; /* page size on the OS. */
} settings;
- struct {
- uint64_t allocs;
- uint64_t frees;
- uint64_t destroys;
- uint64_t reclamations_started;
- uint64_t allocs_failed;
- } stats;
+ conn_buffer_stats_t stats;
+ pthread_mutex_t lock; /* lock for this connection buffer group. */
};
-DECL_MT_FUNC(void*, alloc_conn_buffer, (size_t max_rusage_hint));
-DECL_MT_FUNC(void, free_conn_buffer, (void* ptr, ssize_t max_rusage));
-DECL_MT_FUNC(void, conn_buffer_reclamation, (void));
-DECL_MT_FUNC(char*, conn_buffer_stats, (size_t* result_size));
+extern void* alloc_conn_buffer(conn_buffer_group_t* cbg, size_t max_rusage_hint);
+extern void free_conn_buffer(conn_buffer_group_t* cbg, void* ptr, ssize_t max_rusage);
+extern void report_max_rusage(conn_buffer_group_t* cbg, void* ptr, size_t max_rusage);
+extern char* conn_buffer_stats(size_t* result_size);
-extern void report_max_rusage(void* ptr, size_t max_rusage);
-extern void conn_buffer_init(size_t initial_buffer_count,
+extern void conn_buffer_init(unsigned threads,
+ size_t initial_buffer_count,
size_t buffer_rsize_limit,
size_t total_rsize_range_bottom,
size_t total_rsize_range_top);
-STATIC_DECL(int cb_freelist_check(void));
-STATIC_DECL(conn_buffer_status_t cbs);
+extern conn_buffer_group_t* get_conn_buffer_group(unsigned thread);
+extern bool assign_thread_id_to_conn_buffer_group(unsigned group, pthread_t tid);
+
+STATIC_DECL(int cb_freelist_check(conn_buffer_group_t* cbg));
#if !defined(CONN_BUFFER_MODULE)
#undef STATIC
View
2 generic.h
@@ -105,7 +105,7 @@ typedef unsigned int rel_time_t;
#define always_assert assert
#endif /* #if defined(NDEBUG) */
-#define DECL_MT_FUNC(ret_type, func_name, args) extern ret_type do_ ## func_name args; extern ret_type mt_ ## func_name args;
+#define DECL_MT_FUNC(ret_type, func_name, args) extern ret_type do_ ## func_name args; extern ret_type func_name args;
// bump a counter up by one. return 0 if the counter has overflowed, nonzero otherwise.
#define BUMP(cntr) ((++(cntr)) != 0)
View
55 memcached.c
@@ -384,7 +384,7 @@ static int allocate_udp_reply_port(int sfd, int tries) {
#endif
conn *conn_new(const int sfd, const int init_state, const int event_flags,
- const int read_buffer_size, const bool is_udp, const bool is_binary,
+ conn_buffer_group_t* cbg, const bool is_udp, const bool is_binary,
const struct sockaddr* const addr, const socklen_t addrlen,
struct event_base *base) {
conn* c = conn_from_freelist();
@@ -448,6 +448,7 @@ conn *conn_new(const int sfd, const int init_state, const int event_flags,
} else {
c->request_addr_size = addrlen;
}
+ c->cbg = cbg;
if (settings.verbose > 1) {
if (init_state == conn_listening)
@@ -533,18 +534,18 @@ void conn_cleanup(conn* c) {
}
if (c->rbuf) {
- free_conn_buffer(c->rbuf, 0); /* no idea how much was used... */
+ free_conn_buffer(c->cbg, c->rbuf, 0); /* no idea how much was used... */
c->rbuf = NULL;
c->rsize = 0;
}
if (c->iov) {
- free_conn_buffer(c->iov, 0); /* no idea how much was used... */
+ free_conn_buffer(c->cbg, c->iov, 0); /* no idea how much was used... */
c->iov = NULL;
c->iovsize = 0;
}
if (c->riov) {
- free_conn_buffer(c->riov, 0); /* no idea how much was used... */
+ free_conn_buffer(c->cbg, c->riov, 0); /* no idea how much was used... */
c->riov = NULL;
c->riov_size = 0;
}
@@ -560,15 +561,15 @@ void conn_free(conn* c) {
if (c->msglist)
pool_free(c->msglist, sizeof(struct msghdr) * c->msgsize, CONN_BUFFER_MSGLIST_POOL);
if (c->rbuf)
- free_conn_buffer(c->rbuf, 0);
+ free_conn_buffer(c->cbg, c->rbuf, 0);
if (c->wbuf)
pool_free(c->wbuf, c->wsize, CONN_BUFFER_WBUF_POOL);
if (c->ilist)
pool_free(c->ilist, sizeof(item*) * c->isize, CONN_BUFFER_ILIST_POOL);
if (c->iov)
- free_conn_buffer(c->iov, c->iovused * sizeof(struct iovec));
+ free_conn_buffer(c->cbg, c->iov, c->iovused * sizeof(struct iovec));
if (c->riov)
- free_conn_buffer(c->riov, 0);
+ free_conn_buffer(c->cbg, c->riov, 0);
if (c->bp_key)
pool_free(c->bp_key, sizeof(char) * KEY_MAX_LENGTH + 1, CONN_BUFFER_BP_KEY_POOL);
if (c->bp_hdr_pool)
@@ -621,7 +622,7 @@ void conn_shrink(conn* c) {
if (c->rbytes == 0 && c->rbuf != NULL) {
/* drop the buffer since we have no bytes to preserve. */
- free_conn_buffer(c->rbuf, 0);
+ free_conn_buffer(c->cbg, c->rbuf, 0);
c->rbuf = NULL;
c->rcurr = NULL;
c->rsize = 0;
@@ -665,13 +666,13 @@ void conn_shrink(conn* c) {
}
if (c->riov) {
- free_conn_buffer(c->riov, 0);
+ free_conn_buffer(c->cbg, c->riov, 0);
c->riov = NULL;
c->riov_size = 0;
}
if (c->iov != NULL) {
- free_conn_buffer(c->iov, 0);
+ free_conn_buffer(c->cbg, c->iov, 0);
c->iov = NULL;
c->iovsize = 0;
}
@@ -713,7 +714,7 @@ static int ensure_iov_space(conn* c) {
assert(c != NULL);
if (c->iovsize == 0) {
- c->iov = (struct iovec *)alloc_conn_buffer(0);
+ c->iov = (struct iovec *)alloc_conn_buffer(c->cbg, 0);
if (c->iov != NULL) {
c->iovsize = CONN_BUFFER_DATA_SZ / sizeof(struct iovec);
}
@@ -723,7 +724,7 @@ static int ensure_iov_space(conn* c) {
return -1;
}
- report_max_rusage(c->iov, (c->iovused + 1) * sizeof(struct iovec));
+ report_max_rusage(c->cbg, c->iov, (c->iovused + 1) * sizeof(struct iovec));
return 0;
}
@@ -2286,7 +2287,7 @@ int try_read_udp(conn* c) {
if (c->rbuf == NULL) {
/* no idea how big the buffer will need to be. */
- c->rbuf = (char*) alloc_conn_buffer(0);
+ c->rbuf = (char*) alloc_conn_buffer(c->cbg, 0);
if (c->rbuf != NULL) {
c->rcurr = c->rbuf;
@@ -2327,7 +2328,7 @@ int try_read_udp(conn* c) {
}
/* report peak usage here */
- report_max_rusage(c->rbuf, res);
+ report_max_rusage(c->cbg, c->rbuf, res);
#if defined(HAVE_UDP_REPLY_PORTS)
reply_ports = ntohs(*((uint16_t*)(buf + 6)));
@@ -2350,7 +2351,7 @@ int try_read_udp(conn* c) {
return 1;
} else {
/* return the conn buffer. */
- free_conn_buffer(c->rbuf, 8 - 1 /* worst case for memory usage */);
+ free_conn_buffer(c->cbg, c->rbuf, 8 - 1 /* worst case for memory usage */);
c->rbuf = NULL;
c->rcurr = NULL;
c->rsize = 0;
@@ -2380,7 +2381,7 @@ int try_read_network(conn* c) {
c->rcurr = c->rbuf;
}
} else {
- c->rbuf = (char*) alloc_conn_buffer(0);
+ c->rbuf = (char*) alloc_conn_buffer(c->cbg, 0);
if (c->rbuf != NULL) {
c->rcurr = c->rbuf;
c->rsize = CONN_BUFFER_DATA_SZ;
@@ -2406,7 +2407,7 @@ int try_read_network(conn* c) {
c->rbytes += res;
/* report peak usage here */
- report_max_rusage(c->rbuf, c->rbytes);
+ report_max_rusage(c->cbg, c->rbuf, c->rbytes);
if (res < avail) {
break;
@@ -2425,7 +2426,7 @@ int try_read_network(conn* c) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* if we have no data, release the connection buffer */
if (c->rbytes == 0) {
- free_conn_buffer(c->rbuf, 0);
+ free_conn_buffer(c->cbg, c->rbuf, 0);
c->rbuf = NULL;
c->rcurr = NULL;
c->rsize = 0;
@@ -2598,7 +2599,7 @@ static void drive_machine(conn* c) {
break;
}
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, false, c->binary,
+ NULL, false, c->binary,
&addr, addrlen);
break;
@@ -2724,7 +2725,7 @@ static void drive_machine(conn* c) {
c->sbytes -= res;
/* report peak usage here */
- report_max_rusage(c->rbuf, res);
+ report_max_rusage(c->cbg, c->rbuf, res);
break;
}
@@ -3577,7 +3578,7 @@ int main (int argc, char **argv) {
#if defined(USE_FLAT_ALLOCATOR)
flat_storage_init(settings.maxbytes);
#endif /* #if defined(USE_FLAT_ALLOCATOR) */
- conn_buffer_init(0, 0, settings.max_conn_buffer_bytes / 2, settings.max_conn_buffer_bytes);
+ conn_buffer_init(settings.num_threads - 1, 0, 0, settings.max_conn_buffer_bytes / 2, settings.max_conn_buffer_bytes);
/* managed instance? alloc and zero a bucket array */
if (settings.managed) {
@@ -3612,7 +3613,7 @@ int main (int argc, char **argv) {
/* create the initial listening connection */
if (l_socket != 0) {
if (!(listen_conn = conn_new(l_socket, conn_listening,
- EV_READ | EV_PERSIST, 1, false, false,
+ EV_READ | EV_PERSIST, NULL, false, false,
NULL, 0,
main_base))) {
fprintf(stderr, "failed to create listening connection");
@@ -3621,7 +3622,7 @@ int main (int argc, char **argv) {
}
if ((settings.binary_port != 0) &&
(listen_binary_conn = conn_new(b_socket, conn_listening,
- EV_READ | EV_PERSIST, 1, false, true,
+ EV_READ | EV_PERSIST, NULL, false, true,
NULL, 0,
main_base)) == NULL) {
fprintf(stderr, "failed to create listening connection");
@@ -3647,20 +3648,20 @@ int main (int argc, char **argv) {
if (u_socket > -1) {
/* Skip thread 0, the tcp accept socket dispatcher
if running with > 1 thread. */
- for (c = (settings.num_threads > 1 ? 1 : 0); c < settings.num_threads; c++) {
+ for (c = 1; c < settings.num_threads; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
- UDP_READ_BUFFER_SIZE, 1, 0, NULL, 0);
+ get_conn_buffer_group(c - 1), 1, 0, NULL, 0);
}
}
/* create the initial listening udp connection, monitored on all threads */
if (bu_socket > -1) {
/* Skip thread 0, the tcp accept socket dispatcher
if running with > 1 thread. */
- for (c = (settings.num_threads > 1 ? 1 : 0); c < settings.num_threads; c++) {
+ for (c = 1; c < settings.num_threads; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(bu_socket, conn_bp_header_size_unknown, EV_READ | EV_PERSIST,
- UDP_READ_BUFFER_SIZE, true, true, NULL, 0);
+ get_conn_buffer_group(c - 1), true, true, NULL, 0);
}
}
/* enter the event loop */
View
12 memcached.h
@@ -17,7 +17,6 @@
*/
#define DATA_BUFFER_SIZE 2048
#define BP_HDR_POOL_INIT_SIZE 4096
-#define UDP_READ_BUFFER_SIZE 65536
#define UDP_MAX_PAYLOAD_SIZE 1400
#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
@@ -187,6 +186,7 @@ struct settings_s {
*/
#include "binary_protocol.h"
#include "binary_sm.h"
+#include "conn_buffer.h"
#include "items.h"
@@ -267,6 +267,8 @@ struct conn_s {
a managed instance. -1 (_not_ 0) means invalid. */
int gen; /* generation requested for the bucket */
+ conn_buffer_group_t* cbg;
+
/* used to process binary protocol messages */
bp_cmd_info_t bp_info;
@@ -304,7 +306,7 @@ void do_run_deferred_deletes(void);
char *do_add_delta(const char* key, const size_t nkey, const int incr, const unsigned int delta,
char *buf, uint32_t* res_val, const struct in_addr addr);
int do_store_item(item *item, int comm, const char* key);
-conn* conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size,
+conn* conn_new(const int sfd, const int init_state, const int event_flags, conn_buffer_group_t* cbg,
const bool is_udp, const bool is_binary,
const struct sockaddr* const addr, const socklen_t addrlen,
struct event_base *base);
@@ -330,7 +332,7 @@ extern int transmit(conn *c);
void thread_init(int nthreads, struct event_base *main_base);
int dispatch_event_add(int thread, conn* c);
void dispatch_conn_new(int sfd, int init_state, int event_flags,
- const int read_buffer_size,
+ conn_buffer_group_t* cbg,
const bool is_udp, const bool is_binary,
const struct sockaddr* addr, socklen_t addrlen);
@@ -365,17 +367,13 @@ int mt_store_item(item *item, int comm, const char* key);
# define add_delta mt_add_delta
-# define alloc_conn_buffer mt_alloc_conn_buffer
# define append_thread_stats mt_append_thread_stats
# define assoc_expire_regex mt_assoc_expire_regex
# define assoc_move_next_bucket mt_assoc_move_next_bucket
# define conn_from_freelist mt_conn_from_freelist
# define conn_add_to_freelist mt_conn_add_to_freelist
-# define conn_buffer_reclamation mt_conn_buffer_reclamation
-# define conn_buffer_stats mt_conn_buffer_stats
# define defer_delete mt_defer_delete
# define flat_allocator_stats mt_flat_allocator_stats
-# define free_conn_buffer mt_free_conn_buffer
# define is_listen_thread mt_is_listen_thread
# define item_alloc mt_item_alloc
# define item_cachedump mt_item_cachedump
View
4 slabs_items_support.h
@@ -47,15 +47,15 @@ static inline bool item_setup_receive(item* it, conn* c) {
assert(c->riov == NULL);
assert(c->riov_size == 0);
- c->riov = (struct iovec*) alloc_conn_buffer(sizeof(struct iovec) * iov_len_required);
+ c->riov = (struct iovec*) alloc_conn_buffer(c->cbg, sizeof(struct iovec) * iov_len_required);
if (c->riov == NULL) {
return false;
}
} else {
iov_len_required = 1;
}
- report_max_rusage(c->riov, sizeof(struct iovec) * iov_len_required);
+ report_max_rusage(c->cbg, c->riov, sizeof(struct iovec) * iov_len_required);
c->riov_size = iov_len_required;
c->riov_left = iov_len_required;
c->riov_curr = 0;
View
58 thread.c
@@ -28,11 +28,11 @@ struct conn_queue_item {
int sfd;
int init_state;
int event_flags;
- int read_buffer_size;
int is_udp;
int is_binary;
struct sockaddr addr;
socklen_t addrlen;
+ conn_buffer_group_t* cbg;
CQ_ITEM *next;
};
@@ -196,7 +196,7 @@ static void cqi_free(CQ_ITEM *item) {
/*
* Creates a worker thread.
*/
-static void create_worker(void *(*func)(void *), void *arg) {
+static void create_worker(unsigned worker_num, void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
@@ -208,6 +208,15 @@ static void create_worker(void *(*func)(void *), void *arg) {
strerror(ret));
exit(1);
}
+
+ assign_thread_id_to_conn_buffer_group(worker_num - 1 /* worker num is
+ 1-based, but since
+ the main thread does
+ not need connection
+ buffers, the
+ connection buffer
+ groups are 0-based. */,
+ thread);
}
@@ -304,7 +313,7 @@ static void thread_libevent_process(int fd, short which, void *arg) {
if (NULL != item) {
conn* c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->is_udp,
+ item->cbg, item->is_udp,
item->is_binary, &item->addr, item->addrlen,
me->base);
if (c == NULL) {
@@ -332,7 +341,7 @@ static int last_thread = 0;
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, int init_state, int event_flags,
- int read_buffer_size, const bool is_udp, const bool is_binary,
+ conn_buffer_group_t* cbg, const bool is_udp, const bool is_binary,
const struct sockaddr* const addr, socklen_t addrlen) {
CQ_ITEM *item = cqi_new();
/* Count threads from 1..N to skip the dispatch thread.*/
@@ -345,7 +354,11 @@ void dispatch_conn_new(int sfd, int init_state, int event_flags,
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
- item->read_buffer_size = read_buffer_size;
+ if (cbg) {
+ item->cbg = cbg;
+ } else {
+ item->cbg = get_conn_buffer_group(tix - 1);
+ }
item->is_udp = is_udp;
item->is_binary = is_binary;
memcpy(&item->addr, addr, addrlen);
@@ -603,37 +616,6 @@ char* mt_flat_allocator_stats(size_t* result_size) {
}
#endif /* #if defined(USE_FLAT_ALLOCATOR) */
-/******************************* CONN BUFFER ******************************/
-void* mt_alloc_conn_buffer(size_t max_rusage_hint) {
- void* ret;
-
- pthread_mutex_lock(&conn_buffer_lock);
- ret = do_alloc_conn_buffer(max_rusage_hint);
- pthread_mutex_unlock(&conn_buffer_lock);
- return ret;
-}
-
-void mt_free_conn_buffer(void* ptr, ssize_t max_rusage) {
- pthread_mutex_lock(&conn_buffer_lock);
- do_free_conn_buffer(ptr, max_rusage);
- pthread_mutex_unlock(&conn_buffer_lock);
-}
-
-void mt_conn_buffer_reclamation(void) {
- pthread_mutex_lock(&conn_buffer_lock);
- do_conn_buffer_reclamation();
- pthread_mutex_unlock(&conn_buffer_lock);
-}
-
-char* mt_conn_buffer_stats(size_t* result_size) {
- char* ret;
-
- pthread_mutex_lock(&cache_lock);
- ret = do_conn_buffer_stats(result_size);
- pthread_mutex_unlock(&cache_lock);
- return ret;
-}
-
/******************************* GLOBAL STATS ******************************/
void mt_stats_lock() {
@@ -686,12 +668,12 @@ void thread_init(int nthreads, struct event_base *main_base) {
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
- setup_thread(&threads[i]);
+ setup_thread(&threads[i]);
}
/* Create threads after we've done all the libevent setup. */
for (i = 1; i < nthreads; i++) {
- create_worker(worker_libevent, &threads[i]);
+ create_worker(i, worker_libevent, &threads[i]);
}
/* Wait for all the threads to set themselves up before returning. */

0 comments on commit c0dbcaf

Please sign in to comment.