Permalink
Browse files

Channel-based memory management

Assign a decoding pool on a per-channel basis. This allows memory to be
released on a per-channel basis which is helpful for clients handling
multiple channels
  • Loading branch information...
alanxz committed Jun 21, 2013
1 parent 837a0b5 commit 4a2d899cd3ae3ef8bb9305eddd88c95d3dfc0463
Showing with 164 additions and 42 deletions.
  1. +4 −0 librabbitmq/amqp.h
  2. +76 −35 librabbitmq/amqp_connection.c
  3. +43 −0 librabbitmq/amqp_mem.c
  4. +17 −2 librabbitmq/amqp_private.h
  5. +24 −5 librabbitmq/amqp_socket.c
View
@@ -460,6 +460,10 @@ AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_maybe_release_buffers(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame);
@@ -71,18 +71,13 @@ amqp_connection_state_t amqp_new_connection(void)
return NULL;
}
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0);
if (0 != res) {
goto out_nomem;
}
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL) {
goto out_nomem;
}
state->inbound_buffer.bytes = state->header_buffer;
state->inbound_buffer.len = sizeof(state->header_buffer);
state->state = CONNECTION_STATE_INITIAL;
/* the server protocol version response is 8 bytes, which conveniently
@@ -99,8 +94,6 @@ amqp_connection_state_t amqp_new_connection(void)
out_nomem:
free(state->sock_inbound_buffer.bytes);
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
free(state);
return NULL;
}
@@ -140,10 +133,6 @@ int amqp_tune_connection(amqp_connection_state_t state,
state->frame_max = frame_max;
state->heartbeat = heartbeat;
empty_amqp_pool(&state->frame_pool);
init_amqp_pool(&state->frame_pool, frame_max);
state->inbound_buffer.len = frame_max;
state->outbound_buffer.len = frame_max;
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
@@ -163,8 +152,17 @@ int amqp_destroy_connection(amqp_connection_state_t state)
{
int status = AMQP_STATUS_OK;
if (state) {
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
int i;
for (i = 0; i < POOL_TABLE_SIZE; ++i) {
amqp_pool_table_entry_t *entry = state->pool_table[i];
while (NULL != entry) {
amqp_pool_table_entry_t *todelete = entry;
empty_amqp_pool(&entry->pool);
entry = entry->next;
free(todelete);
}
}
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
status = amqp_socket_close(state->socket);
@@ -175,7 +173,8 @@ int amqp_destroy_connection(amqp_connection_state_t state)
static void return_to_idle(amqp_connection_state_t state)
{
state->inbound_buffer.bytes = NULL;
state->inbound_buffer.len = sizeof(state->header_buffer);
state->inbound_buffer.bytes = state->header_buffer;
state->inbound_offset = 0;
state->target_size = HEADER_SIZE;
state->state = CONNECTION_STATE_IDLE;
@@ -215,16 +214,6 @@ int amqp_handle_input(amqp_connection_state_t state,
}
if (state->state == CONNECTION_STATE_IDLE) {
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
{
return AMQP_STATUS_NO_MEMORY;
}
state->state = CONNECTION_STATE_HEADER;
}
@@ -261,10 +250,27 @@ int amqp_handle_input(amqp_connection_state_t state,
/* it's not a protocol header; fall through to process it as a
regular frame header */
case CONNECTION_STATE_HEADER:
case CONNECTION_STATE_HEADER: {
amqp_channel_t channel;
amqp_pool_t *channel_pool;
/* frame length is 3 bytes in */
channel = amqp_d16(raw_frame, 1);
channel_pool = amqp_get_or_create_channel_pool(state, channel);
if (NULL == channel_pool) {
return AMQP_STATUS_NO_MEMORY;
}
state->target_size
= amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
amqp_pool_alloc_bytes(channel_pool, state->target_size, &state->inbound_buffer);
if (NULL == state->inbound_buffer.bytes) {
return AMQP_STATUS_NO_MEMORY;
}
memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE);
raw_frame = state->inbound_buffer.bytes;
state->state = CONNECTION_STATE_BODY;
bytes_consumed += consume_data(state, &received_data);
@@ -275,11 +281,13 @@ int amqp_handle_input(amqp_connection_state_t state,
return bytes_consumed;
}
}
/* fall through to process body */
case CONNECTION_STATE_BODY: {
amqp_bytes_t encoded;
int res;
amqp_pool_t *channel_pool;
/* Check frame end marker (footer) */
if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) {
@@ -289,14 +297,19 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->frame_type = amqp_d8(raw_frame, 0);
decoded_frame->channel = amqp_d16(raw_frame, 1);
channel_pool = amqp_get_or_create_channel_pool(state, decoded_frame->channel);
if (NULL == channel_pool) {
return AMQP_STATUS_NO_MEMORY;
}
switch (decoded_frame->frame_type) {
case AMQP_FRAME_METHOD:
decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE);
encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
res = amqp_decode_method(decoded_frame->payload.method.id,
&state->decoding_pool, encoded,
channel_pool, encoded,
&decoded_frame->payload.method.decoded);
if (res < 0) {
return res;
@@ -315,7 +328,7 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->payload.properties.raw = encoded;
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
&state->decoding_pool, encoded,
channel_pool, encoded,
&decoded_frame->payload.properties.decoded);
if (res < 0) {
return res;
@@ -351,19 +364,21 @@ int amqp_handle_input(amqp_connection_state_t state,
amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state)
{
return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
return (state->state == CONNECTION_STATE_IDLE);
}
void amqp_release_buffers(amqp_connection_state_t state)
{
int i;
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
if (state->first_queued_frame) {
amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
}
for (i = 0; i < POOL_TABLE_SIZE; ++i) {
amqp_pool_table_entry_t *entry = state->pool_table[i];
recycle_amqp_pool(&state->frame_pool);
recycle_amqp_pool(&state->decoding_pool);
for ( ;NULL != entry; entry = entry->next) {
amqp_maybe_release_buffers_on_channel(state, entry->channel);
}
}
}
void amqp_maybe_release_buffers(amqp_connection_state_t state)
@@ -373,6 +388,32 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state)
}
}
void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel)
{
amqp_link_t *queued_link;
amqp_pool_t *pool;
if (CONNECTION_STATE_IDLE != state->state) {
return;
}
queued_link = state->first_queued_frame;
while (NULL != queued_link) {
amqp_frame_t *frame = queued_link->data;
if (channel == frame->channel) {
return;
}
queued_link = queued_link->next;
}
pool = amqp_get_channel_pool(state, channel);
if (pool != NULL) {
recycle_amqp_pool(pool);
}
}
int amqp_send_frame(amqp_connection_state_t state,
const amqp_frame_t *frame)
{
View
@@ -202,3 +202,46 @@ void amqp_bytes_free(amqp_bytes_t bytes)
{
free(bytes.bytes);
}
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t state, amqp_channel_t channel)
{
amqp_pool_table_entry_t *entry;
size_t index = channel % POOL_TABLE_SIZE;
entry = state->pool_table[index];
for ( ; NULL != entry; entry = entry->next) {
if (channel == entry->channel) {
return &entry->pool;
}
}
entry = malloc(sizeof(amqp_pool_table_entry_t));
if (NULL == entry) {
return NULL;
}
entry->channel = channel;
entry->next = state->pool_table[index];
state->pool_table[index] = entry;
init_amqp_pool(&entry->pool, state->frame_max);
return &entry->pool;
}
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel)
{
amqp_pool_table_entry_t *entry;
size_t index = channel % POOL_TABLE_SIZE;
entry = state->pool_table[index];
for ( ; NULL != entry; entry = entry->next) {
if (channel == entry->channel) {
return &entry->pool;
}
}
return NULL;
}
View
@@ -119,15 +119,27 @@ typedef struct amqp_link_t_ {
void *data;
} amqp_link_t;
#define POOL_TABLE_SIZE 16
typedef struct amqp_pool_table_entry_t_ {
struct amqp_pool_table_entry_t_ *next;
amqp_pool_t pool;
amqp_channel_t channel;
} amqp_pool_table_entry_t;
struct amqp_connection_state_t_ {
amqp_pool_t frame_pool;
amqp_pool_t decoding_pool;
amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
amqp_connection_state_enum state;
int channel_max;
int frame_max;
int heartbeat;
/* buffer for holding frame headers. Allows us to delay allocating
* the raw frame buffer until the type, channel, and size are all known
*/
char header_buffer[HEADER_SIZE + 1];
amqp_bytes_t inbound_buffer;
size_t inbound_offset;
@@ -147,6 +159,9 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
static inline void *amqp_offset(void *data, size_t offset)
{
return (char *)data + offset;
View
@@ -575,8 +575,19 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
&& (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
)
)) {
amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
amqp_pool_t *channel_pool;
amqp_frame_t *frame_copy;
amqp_link_t *link;
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
if (NULL == channel_pool) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = AMQP_STATUS_NO_MEMORY;
return result;
}
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
@@ -699,9 +710,17 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t default_properties[2];
amqp_table_t default_table;
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
sasl_method, vl);
amqp_pool_t *channel_pool;
amqp_bytes_t response_bytes;
channel_pool = amqp_get_or_create_channel_pool(state, 0);
if (NULL == channel_pool) {
res = AMQP_STATUS_NO_MEMORY;
goto error_res;
}
response_bytes = sasl_response(channel_pool,
sasl_method, vl);
if (response_bytes.bytes == NULL) {
res = AMQP_STATUS_NO_MEMORY;
goto error_res;
@@ -735,7 +754,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
int i;
amqp_table_entry_t *current_entry;
s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool,
s.client_properties.entries = amqp_pool_alloc(channel_pool,
sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
if (NULL == s.client_properties.entries) {
res = AMQP_STATUS_NO_MEMORY;

0 comments on commit 4a2d899

Please sign in to comment.