diff --git a/include/aws/http/connection_manager.h b/include/aws/http/connection_manager.h index acd7d99b1..f3802d3da 100644 --- a/include/aws/http/connection_manager.h +++ b/include/aws/http/connection_manager.h @@ -69,6 +69,12 @@ struct aws_http_connection_manager_options { * If set to true, the read back pressure mechanism will be enabled. */ bool enable_read_back_pressure; + + /** + * If set to a non-zero value, then connections that stay in the pool longer than the specified + * timeout will be closed automatically. + */ + uint64_t max_connection_idle_in_milliseconds; }; AWS_EXTERN_C_BEGIN diff --git a/include/aws/http/private/connection_manager_system_vtable.h b/include/aws/http/private/connection_manager_system_vtable.h index 76ac2ec1a..fcf0c3dd0 100644 --- a/include/aws/http/private/connection_manager_system_vtable.h +++ b/include/aws/http/private/connection_manager_system_vtable.h @@ -24,6 +24,9 @@ typedef int(aws_http_connection_manager_create_connection_fn)(const struct aws_h typedef void(aws_http_connection_manager_close_connection_fn)(struct aws_http_connection *connection); typedef void(aws_http_connection_manager_release_connection_fn)(struct aws_http_connection *connection); typedef bool(aws_http_connection_manager_is_connection_open_fn)(const struct aws_http_connection *connection); +typedef bool(aws_http_connection_manager_is_callers_thread_fn)(struct aws_channel *channel); +typedef struct aws_channel *(aws_http_connection_manager_connection_get_channel_fn)( + struct aws_http_connection *connection); struct aws_http_connection_manager_system_vtable { /* @@ -33,6 +36,9 @@ struct aws_http_connection_manager_system_vtable { aws_http_connection_manager_close_connection_fn *close_connection; aws_http_connection_manager_release_connection_fn *release_connection; aws_http_connection_manager_is_connection_open_fn *is_connection_open; + aws_io_clock_fn *get_monotonic_time; + aws_http_connection_manager_is_callers_thread_fn *is_callers_thread; + aws_http_connection_manager_connection_get_channel_fn *connection_get_channel; }; AWS_HTTP_API diff --git a/source/connection_manager.c b/source/connection_manager.c index 45a5ae354..788d8a201 100644 --- a/source/connection_manager.c +++ b/source/connection_manager.c @@ -22,15 +22,27 @@ #include #include +#include #include #include #include +#include #include #include #include #include +/* + * Established connections not currently in use are tracked via this structure. + */ +struct aws_idle_connection { + struct aws_allocator *allocator; + struct aws_linked_list_node node; + uint64_t cull_timestamp; + struct aws_http_connection *connection; +}; + /* * System vtable to use under normal circumstances */ @@ -39,6 +51,9 @@ static struct aws_http_connection_manager_system_vtable s_default_system_vtable .release_connection = aws_http_connection_release, .close_connection = aws_http_connection_close, .is_connection_open = aws_http_connection_is_open, + .get_monotonic_time = aws_high_res_clock_get_ticks, + .is_callers_thread = aws_channel_thread_is_callers_thread, + .connection_get_channel = aws_http_connection_get_channel, }; const struct aws_http_connection_manager_system_vtable *g_aws_http_connection_manager_default_system_vtable_ptr = @@ -143,9 +158,22 @@ struct aws_http_connection_manager { enum aws_http_connection_manager_state_type state; /* - * The set of all available, ready-to-be-used connections + * The number of all established, idle connections. So + * that we don't have compute the size of a linked list every time. + */ + size_t idle_connection_count; + + /* + * The set of all available, ready-to-be-used connections, as aws_idle_connection structs. + * + * This must be a LIFO stack. When connections are released by the user, they must be added on to the back. + * When we vend connections to the user, they must be removed from the back first. + * In this way, the list will always be sorted from oldest (in terms of time spent idle) to newest. This means + * we can always use the cull timestamp of the front connection as the next scheduled time for culling. + * It also means that when we cull connections, we can quit the loop as soon as we find a connection + * whose timestamp is greater than the current timestamp. */ - struct aws_array_list connections; + struct aws_linked_list idle_connections; /* * The set of all incomplete connection acquisition requests @@ -212,12 +240,25 @@ struct aws_http_connection_manager { * if set to true, read back pressure mechanism will be enabled. */ bool enable_read_back_pressure; + + /** + * If set to a non-zero value, then connections that stay in the pool longer than the specified + * timeout will be closed automatically. + */ + uint64_t max_connection_idle_in_milliseconds; + + /* + * Task to cull idle connections. This task is run periodically on the cull_event_loop if a non-zero + * culling time interval is specified. + */ + struct aws_task *cull_task; + struct aws_event_loop *cull_event_loop; }; struct aws_http_connection_manager_snapshot { enum aws_http_connection_manager_state_type state; - size_t held_connection_count; + size_t idle_connection_count; size_t pending_acquisition_count; size_t pending_connects_count; size_t vended_connection_count; @@ -234,7 +275,7 @@ static void s_aws_http_connection_manager_get_snapshot( struct aws_http_connection_manager_snapshot *snapshot) { snapshot->state = manager->state; - snapshot->held_connection_count = aws_array_list_length(&manager->connections); + snapshot->idle_connection_count = manager->idle_connection_count; snapshot->pending_acquisition_count = manager->pending_acquisition_count; snapshot->pending_connects_count = manager->pending_connects_count; snapshot->vended_connection_count = manager->vended_connection_count; @@ -249,11 +290,11 @@ static void s_aws_http_connection_manager_log_snapshot( if (snapshot->state != AWS_HCMST_UNINITIALIZED) { AWS_LOGF_DEBUG( AWS_LS_HTTP_CONNECTION_MANAGER, - "id=%p: snapshot - state=%d, held_connection_count=%zu, pending_acquire_count=%zu, " + "id=%p: snapshot - state=%d, idle_connection_count=%zu, pending_acquire_count=%zu, " "pending_connect_count=%zu, vended_connection_count=%zu, open_connection_count=%zu, ref_count=%zu", (void *)manager, (int)snapshot->state, - snapshot->held_connection_count, + snapshot->idle_connection_count, snapshot->pending_acquisition_count, snapshot->pending_connects_count, snapshot->vended_connection_count, @@ -294,6 +335,8 @@ static bool s_aws_http_connection_manager_should_destroy(struct aws_http_connect return false; } + AWS_FATAL_ASSERT(manager->idle_connection_count == 0); + return true; } @@ -369,12 +412,14 @@ static void s_aws_http_connection_manager_complete_acquisitions( AWS_CONTAINER_OF(node, struct aws_http_connection_acquisition, node); if (pending_acquisition->error_code == AWS_OP_SUCCESS) { - struct aws_channel *channel = aws_http_connection_get_channel(pending_acquisition->connection); + + struct aws_channel *channel = + pending_acquisition->manager->system_vtable->connection_get_channel(pending_acquisition->connection); AWS_PRECONDITION(channel); /* For some workloads, going ahead and moving the connection callback to the connection's thread is a * substantial performance improvement so let's do that */ - if (!aws_channel_thread_is_callers_thread(channel)) { + if (!pending_acquisition->manager->system_vtable->is_callers_thread(channel)) { aws_channel_task_init( &pending_acquisition->acquisition_task, s_connection_acquisition_task, @@ -459,7 +504,7 @@ struct aws_connection_management_transaction { struct aws_allocator *allocator; struct aws_linked_list completions; struct aws_http_connection *connection_to_release; - struct aws_array_list connections_to_release; + struct aws_linked_list connections_to_release; /* */ struct aws_http_connection_manager_snapshot snapshot; size_t new_connections; bool should_destroy_manager; @@ -470,19 +515,15 @@ static void s_aws_connection_management_transaction_init( struct aws_http_connection_manager *manager) { AWS_ZERO_STRUCT(*work); - /* 0-size, does no allocation, cannot fail */ - AWS_FATAL_ASSERT( - aws_array_list_init_dynamic( - &work->connections_to_release, manager->allocator, 0, sizeof(struct aws_http_connection *)) == - AWS_OP_SUCCESS); - + aws_linked_list_init(&work->connections_to_release); aws_linked_list_init(&work->completions); work->manager = manager; work->allocator = manager->allocator; } static void s_aws_connection_management_transaction_clean_up(struct aws_connection_management_transaction *work) { - aws_array_list_clean_up(&work->connections_to_release); + AWS_FATAL_ASSERT(aws_linked_list_empty(&work->connections_to_release)); + AWS_FATAL_ASSERT(aws_linked_list_empty(&work->completions)); } static void s_aws_http_connection_manager_build_transaction(struct aws_connection_management_transaction *work) { @@ -492,11 +533,18 @@ static void s_aws_http_connection_manager_build_transaction(struct aws_connectio /* * Step 1 - If there's free connections, complete acquisition requests */ - while (aws_array_list_length(&manager->connections) > 0 && manager->pending_acquisition_count > 0) { - struct aws_http_connection *connection = NULL; - aws_array_list_back(&manager->connections, &connection); - - aws_array_list_pop_back(&manager->connections); + while (!aws_linked_list_empty(&manager->idle_connections) > 0 && manager->pending_acquisition_count > 0) { + AWS_FATAL_ASSERT(manager->idle_connection_count >= 1); + /* + * It is absolutely critical that this is pop_back and not front. By making the idle connections + * a LIFO stack, the list will always be sorted from oldest (in terms of idle time) to newest. This means + * we can always use the cull timestamp of the first connection as the next scheduled time for culling. + * It also means that when we cull connections, we can quit the loop as soon as we find a connection + * whose timestamp is greater than the current timestamp. + */ + struct aws_linked_list_node *node = aws_linked_list_pop_back(&manager->idle_connections); + struct aws_idle_connection *idle_connection = AWS_CONTAINER_OF(node, struct aws_idle_connection, node); + struct aws_http_connection *connection = idle_connection->connection; AWS_LOGF_DEBUG( AWS_LS_HTTP_CONNECTION_MANAGER, @@ -506,6 +554,8 @@ static void s_aws_http_connection_manager_build_transaction(struct aws_connectio s_aws_http_connection_manager_move_front_acquisition( manager, connection, AWS_ERROR_SUCCESS, &work->completions); ++manager->vended_connection_count; + --manager->idle_connection_count; + aws_mem_release(idle_connection->allocator, idle_connection); } /* @@ -527,9 +577,11 @@ static void s_aws_http_connection_manager_build_transaction(struct aws_connectio } } else { /* - * swap our internal connection set with the zeroed work set + * swap our internal connection set with the empty work set */ - aws_array_list_swap_contents(&manager->connections, &work->connections_to_release); + AWS_FATAL_ASSERT(aws_linked_list_empty(&work->connections_to_release)); + aws_linked_list_swap_contents(&manager->idle_connections, &work->connections_to_release); + manager->idle_connection_count = 0; /* * Move all manager pending acquisitions to the work completion list @@ -558,21 +610,28 @@ static void s_aws_http_connection_manager_build_transaction(struct aws_connectio static void s_aws_http_connection_manager_execute_transaction(struct aws_connection_management_transaction *work); -static void s_aws_http_connection_manager_destroy(struct aws_http_connection_manager *manager) { +/* + * The final last gasp of a connection manager where memory is cleaned up. Destruction is split up into two parts, + * a begin and a finish. Idle connection culling requires a scheduled task on an arbitrary event loop. If idle + * connection culling is on then this task must be cancelled before destruction can finish, but you can only cancel + * a task from the same event loop that it is scheduled on. To resolve this, when using idle connection culling, + * we schedule a finish destruction task on the event loop that the culling task is on. This finish task + * cancels the culling task and then calls this function. If we are not using idle connection culling, we can + * call this function immediately from the start of destruction. + */ +static void s_aws_http_connection_manager_finish_destroy(struct aws_http_connection_manager *manager) { if (manager == NULL) { return; } AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Destroying self", (void *)manager); - AWS_ASSERT(manager->pending_connects_count == 0); - AWS_ASSERT(manager->vended_connection_count == 0); - AWS_ASSERT(manager->pending_acquisition_count == 0); - AWS_ASSERT(manager->open_connection_count == 0); - AWS_ASSERT(aws_linked_list_empty(&manager->pending_acquisitions)); - AWS_ASSERT(aws_array_list_length(&manager->connections) == 0); - - aws_array_list_clean_up(&manager->connections); + AWS_FATAL_ASSERT(manager->pending_connects_count == 0); + AWS_FATAL_ASSERT(manager->vended_connection_count == 0); + AWS_FATAL_ASSERT(manager->pending_acquisition_count == 0); + AWS_FATAL_ASSERT(manager->open_connection_count == 0); + AWS_FATAL_ASSERT(aws_linked_list_empty(&manager->pending_acquisitions)); + AWS_FATAL_ASSERT(aws_linked_list_empty(&manager->idle_connections)); aws_string_destroy(manager->host); if (manager->tls_connection_options) { @@ -584,6 +643,15 @@ static void s_aws_http_connection_manager_destroy(struct aws_http_connection_man aws_http_proxy_config_destroy(manager->proxy_config); } + /* + * If this task exists then we are actually in the corresponding event loop running the final destruction task. + * In that case, we've already cancelled this task and when you cancel, it runs synchronously. So in that + * case the task has run as cancelled, it was not rescheduled, and so we can safely release the memory. + */ + if (manager->cull_task) { + aws_mem_release(manager->allocator, manager->cull_task); + } + aws_mutex_clean_up(&manager->lock); if (manager->shutdown_complete_callback) { @@ -593,6 +661,105 @@ static void s_aws_http_connection_manager_destroy(struct aws_http_connection_man aws_mem_release(manager->allocator, manager); } +/* This is scheduled to run on the cull task's event loop. If there's no cull task we just destroy the + * manager directly without a cross-thread task. */ +static void s_final_destruction_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)status; + struct aws_http_connection_manager *manager = arg; + struct aws_allocator *allocator = manager->allocator; + + if (manager->cull_task) { + AWS_FATAL_ASSERT(manager->cull_event_loop != NULL); + aws_event_loop_cancel_task(manager->cull_event_loop, manager->cull_task); + } + + s_aws_http_connection_manager_finish_destroy(manager); + + aws_mem_release(allocator, task); +} + +static void s_aws_http_connection_manager_begin_destroy(struct aws_http_connection_manager *manager) { + if (manager == NULL) { + return; + } + + /* + * If we have a cull task running then we have to cancel it. But you can only cancel tasks within the event + * loop that the task is scheduled on. So to solve this case, if there's a cull task, rather than doing + * cleanup synchronously, we schedule a final destruction task (on the cull event loop) which cancels the + * cull task before going on to release all the memory and notify the shutdown callback. + * + * If there's no cull task we can just cleanup synchronously. + */ + if (manager->cull_event_loop != NULL) { + AWS_FATAL_ASSERT(manager->cull_task); + struct aws_task *final_destruction_task = aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_task)); + aws_task_init(final_destruction_task, s_final_destruction_task, manager, "final_scheduled_destruction"); + aws_event_loop_schedule_task_now(manager->cull_event_loop, final_destruction_task); + } else { + s_aws_http_connection_manager_finish_destroy(manager); + } +} + +static void s_cull_task(struct aws_task *task, void *arg, enum aws_task_status status); +static void s_schedule_connection_culling(struct aws_http_connection_manager *manager) { + if (manager->max_connection_idle_in_milliseconds == 0) { + return; + } + + if (manager->cull_task == NULL) { + manager->cull_task = aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_task)); + if (manager->cull_task == NULL) { + return; + } + + aws_task_init(manager->cull_task, s_cull_task, manager, "cull_idle_connections"); + } + + if (manager->cull_event_loop == NULL) { + manager->cull_event_loop = aws_event_loop_group_get_next_loop(manager->bootstrap->event_loop_group); + } + + if (manager->cull_event_loop == NULL) { + goto on_error; + } + + uint64_t cull_task_time = 0; + const struct aws_linked_list_node *end = aws_linked_list_end(&manager->idle_connections); + struct aws_linked_list_node *oldest_node = aws_linked_list_begin(&manager->idle_connections); + if (oldest_node != end) { + /* + * Since the connections are in LIFO order in the list, the front of the list has the closest + * cull time. + */ + struct aws_idle_connection *oldest_idle_connection = + AWS_CONTAINER_OF(oldest_node, struct aws_idle_connection, node); + cull_task_time = oldest_idle_connection->cull_timestamp; + } else { + /* + * There are no connections in the list, so the absolute minimum anything could be culled is the full + * culling interval from now. + */ + uint64_t now = 0; + if (manager->system_vtable->get_monotonic_time(&now)) { + goto on_error; + } + cull_task_time = + now + aws_timestamp_convert( + manager->max_connection_idle_in_milliseconds, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + } + + aws_event_loop_schedule_task_future(manager->cull_event_loop, manager->cull_task, cull_task_time); + + return; + +on_error: + + manager->cull_event_loop = NULL; + aws_mem_release(manager->allocator, manager->cull_task); + manager->cull_task = NULL; +} + struct aws_http_connection_manager *aws_http_connection_manager_new( struct aws_allocator *allocator, struct aws_http_connection_manager_options *options) { @@ -623,11 +790,7 @@ struct aws_http_connection_manager *aws_http_connection_manager_new( goto on_error; } - if (aws_array_list_init_dynamic( - &manager->connections, allocator, options->max_connections, sizeof(struct aws_http_connection *))) { - goto on_error; - } - + aws_linked_list_init(&manager->idle_connections); aws_linked_list_init(&manager->pending_acquisitions); manager->host = aws_string_new_from_array(allocator, options->host.ptr, options->host.len); @@ -664,6 +827,9 @@ struct aws_http_connection_manager *aws_http_connection_manager_new( manager->shutdown_complete_callback = options->shutdown_complete_callback; manager->shutdown_complete_user_data = options->shutdown_complete_user_data; manager->enable_read_back_pressure = options->enable_read_back_pressure; + manager->max_connection_idle_in_milliseconds = options->max_connection_idle_in_milliseconds; + + s_schedule_connection_culling(manager); AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Successfully created", (void *)manager); @@ -671,7 +837,7 @@ struct aws_http_connection_manager *aws_http_connection_manager_new( on_error: - s_aws_http_connection_manager_destroy(manager); + s_aws_http_connection_manager_begin_destroy(manager); return NULL; } @@ -781,16 +947,17 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect /* * Step 2 - Perform any requested connection releases */ - size_t release_count = aws_array_list_length(&work->connections_to_release); - for (size_t i = 0; i < release_count; ++i) { - struct aws_http_connection *connection = NULL; - if (aws_array_list_get_at(&work->connections_to_release, &connection, i)) { - continue; - } + while (!aws_linked_list_empty(&work->connections_to_release)) { + struct aws_linked_list_node *node = aws_linked_list_pop_back(&work->connections_to_release); + struct aws_idle_connection *idle_connection = AWS_CONTAINER_OF(node, struct aws_idle_connection, node); AWS_LOGF_INFO( - AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Releasing connection (id=%p)", (void *)manager, (void *)connection); - manager->system_vtable->release_connection(connection); + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Releasing connection (id=%p)", + (void *)manager, + (void *)idle_connection->connection); + manager->system_vtable->release_connection(idle_connection->connection); + aws_mem_release(idle_connection->allocator, idle_connection); } if (work->connection_to_release) { @@ -880,7 +1047,7 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect * Step 5 - destroy the manager if necessary */ if (should_destroy) { - s_aws_http_connection_manager_destroy(manager); + s_aws_http_connection_manager_begin_destroy(manager); } /* @@ -906,6 +1073,7 @@ void aws_http_connection_manager_acquire_connection( request->allocator = manager->allocator; request->callback = callback; request->user_data = user_data; + request->manager = manager; struct aws_connection_management_transaction work; s_aws_connection_management_transaction_init(&work, manager); @@ -931,6 +1099,38 @@ void aws_http_connection_manager_acquire_connection( s_aws_http_connection_manager_execute_transaction(&work); } +static int s_idle_connection(struct aws_http_connection_manager *manager, struct aws_http_connection *connection) { + struct aws_idle_connection *idle_connection = + aws_mem_calloc(manager->allocator, 1, sizeof(struct aws_idle_connection)); + if (idle_connection == NULL) { + return AWS_OP_ERR; + } + + idle_connection->allocator = manager->allocator; + idle_connection->connection = connection; + + uint64_t idle_start_timestamp = 0; + if (manager->system_vtable->get_monotonic_time(&idle_start_timestamp)) { + goto on_error; + } + + idle_connection->cull_timestamp = + idle_start_timestamp + + aws_timestamp_convert( + manager->max_connection_idle_in_milliseconds, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + + aws_linked_list_push_back(&manager->idle_connections, &idle_connection->node); + ++manager->idle_connection_count; + + return AWS_OP_SUCCESS; + +on_error: + + aws_mem_release(idle_connection->allocator, idle_connection); + + return AWS_OP_ERR; +} + int aws_http_connection_manager_release_connection( struct aws_http_connection_manager *manager, struct aws_http_connection *connection) { @@ -961,7 +1161,7 @@ int aws_http_connection_manager_release_connection( --manager->vended_connection_count; if (!should_release_connection) { - if (aws_array_list_push_back(&manager->connections, &connection)) { + if (s_idle_connection(manager, connection)) { should_release_connection = true; } } @@ -1012,16 +1212,13 @@ static void s_aws_http_connection_manager_on_connection_setup( --manager->pending_connects_count; if (connection != NULL) { - if (!is_shutting_down) { - /* We reserved enough room for max_connections, this should never fail */ - AWS_FATAL_ASSERT(aws_array_list_push_back(&manager->connections, &connection) == AWS_OP_SUCCESS); - } else { + if (is_shutting_down || s_idle_connection(manager, connection)) { /* - * We won't add the connection to the pool; just release it immediately + * release it immediately */ AWS_LOGF_DEBUG( AWS_LS_HTTP_CONNECTION_MANAGER, - "id=%p: New connection (id=%p) releasing immediately due to shutdown state", + "id=%p: New connection (id=%p) releasing immediately", (void *)manager, (void *)connection); work.connection_to_release = connection; @@ -1058,8 +1255,6 @@ static void s_aws_http_connection_manager_on_connection_shutdown( void *user_data) { (void)error_code; - bool should_release_connection = false; - struct aws_http_connection_manager *manager = user_data; AWS_LOGF_DEBUG( @@ -1076,38 +1271,87 @@ static void s_aws_http_connection_manager_on_connection_shutdown( AWS_FATAL_ASSERT(manager->open_connection_count > 0); --manager->open_connection_count; - size_t connection_count = aws_array_list_length(&manager->connections); - /* - * Find and, if found, remove it from connections + * Find and, if found, remove it from idle connections */ - if (connection_count > 0) { - AWS_ASSERT(manager->state == AWS_HCMST_READY); + const struct aws_linked_list_node *end = aws_linked_list_end(&manager->idle_connections); + for (struct aws_linked_list_node *node = aws_linked_list_begin(&manager->idle_connections); node != end; + node = aws_linked_list_next(node)) { + struct aws_idle_connection *current_idle_connection = AWS_CONTAINER_OF(node, struct aws_idle_connection, node); + if (current_idle_connection->connection == connection) { + aws_linked_list_remove(node); + work.connection_to_release = connection; + aws_mem_release(current_idle_connection->allocator, current_idle_connection); + --manager->idle_connection_count; + break; + } + } + + s_aws_http_connection_manager_build_transaction(&work); + + aws_mutex_unlock(&manager->lock); + + s_aws_http_connection_manager_execute_transaction(&work); +} + +static void s_cull_idle_connections(struct aws_http_connection_manager *manager) { + AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: culling idle connections", (void *)manager); - struct aws_http_connection *last_connection = NULL; - AWS_FATAL_ASSERT( - aws_array_list_get_at(&manager->connections, &last_connection, connection_count - 1) == AWS_OP_SUCCESS); + if (manager == NULL || manager->max_connection_idle_in_milliseconds == 0) { + return; + } + + uint64_t now = 0; + if (manager->system_vtable->get_monotonic_time(&now)) { + return; + } + + struct aws_connection_management_transaction work; + s_aws_connection_management_transaction_init(&work, manager); - for (size_t i = 0; i < connection_count; ++i) { - struct aws_http_connection *current_connection = NULL; - aws_array_list_get_at(&manager->connections, ¤t_connection, i); + aws_mutex_lock(&manager->lock); - if (current_connection == connection) { - should_release_connection = true; - aws_array_list_set_at(&manager->connections, &last_connection, i); + /* Only if we're not shutting down */ + if (manager->state == AWS_HCMST_READY) { + const struct aws_linked_list_node *end = aws_linked_list_end(&manager->idle_connections); + struct aws_linked_list_node *current_node = aws_linked_list_begin(&manager->idle_connections); + while (current_node != end) { + struct aws_linked_list_node *node = current_node; + struct aws_idle_connection *current_idle_connection = + AWS_CONTAINER_OF(node, struct aws_idle_connection, node); + if (current_idle_connection->cull_timestamp > now) { break; } - } - if (should_release_connection) { - aws_array_list_pop_back(&manager->connections); - work.connection_to_release = connection; + current_node = aws_linked_list_next(current_node); + aws_linked_list_remove(node); + aws_linked_list_push_back(&work.connections_to_release, node); + --manager->idle_connection_count; + + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: culling idle connection (%p)", + (void *)manager, + (void *)current_idle_connection->connection); } } - s_aws_http_connection_manager_build_transaction(&work); + s_aws_http_connection_manager_get_snapshot(manager, &work.snapshot); aws_mutex_unlock(&manager->lock); s_aws_http_connection_manager_execute_transaction(&work); } + +static void s_cull_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + if (status != AWS_TASK_STATUS_RUN_READY) { + return; + } + + struct aws_http_connection_manager *manager = arg; + + s_cull_idle_connections(manager); + + s_schedule_connection_culling(manager); +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eabac37f4..3ac73325f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -420,16 +420,24 @@ add_test_case(connection_setup_shutdown) #add_test_case(connection_destroy_server_with_multiple_connections_existing) #add_test_case(connection_server_shutting_down_new_connection_setup_fail) -add_net_test_case(test_connection_manager_setup_shutdown) +# connection manager tests +# unit tests where connections are mocked +add_test_case(test_connection_manager_setup_shutdown) +add_test_case(test_connection_manager_acquire_release_mix_synchronous) +add_test_case(test_connection_manager_connect_callback_failure) +add_test_case(test_connection_manager_connect_immediate_failure) +add_test_case(test_connection_manager_proxy_setup_shutdown) +add_test_case(test_connection_manager_idle_culling_single) +add_test_case(test_connection_manager_idle_culling_many) +add_test_case(test_connection_manager_idle_culling_mixture) + +# tests where we establish real connections add_net_test_case(test_connection_manager_single_connection) add_net_test_case(test_connection_manager_many_connections) add_net_test_case(test_connection_manager_acquire_release) add_net_test_case(test_connection_manager_close_and_release) add_net_test_case(test_connection_manager_acquire_release_mix) -add_net_test_case(test_connection_manager_acquire_release_mix_synchronous) -add_net_test_case(test_connection_manager_connect_callback_failure) -add_net_test_case(test_connection_manager_connect_immediate_failure) -add_net_test_case(test_connection_manager_proxy_setup_shutdown) + add_test_case(h1_server_sanity_check) add_test_case(h1_server_receive_1line_request) diff --git a/tests/test_connection_manager.c b/tests/test_connection_manager.c index 47f146b3e..9d42a0859 100644 --- a/tests/test_connection_manager.c +++ b/tests/test_connection_manager.c @@ -43,11 +43,12 @@ struct cm_tester_options { struct aws_http_connection_manager_system_vtable *mock_table; struct aws_http_proxy_options *proxy_options; size_t max_connections; + uint64_t max_connection_idle_in_ms; + uint64_t starting_mock_time; }; struct cm_tester { struct aws_allocator *allocator; - struct aws_logger logger; struct aws_event_loop_group event_loop_group; struct aws_host_resolver host_resolver; @@ -76,10 +77,27 @@ struct cm_tester { struct aws_atomic_var next_connection_id; struct aws_array_list mock_connections; aws_http_on_client_connection_shutdown_fn *release_connection_fn; + + struct aws_mutex mock_time_lock; + uint64_t mock_time; }; static struct cm_tester s_tester; +static int s_tester_get_mock_time(uint64_t *current_time) { + aws_mutex_lock(&s_tester.mock_time_lock); + *current_time = s_tester.mock_time; + aws_mutex_unlock(&s_tester.mock_time_lock); + + return AWS_OP_SUCCESS; +} + +static void s_tester_set_mock_time(uint64_t current_time) { + aws_mutex_lock(&s_tester.mock_time_lock); + s_tester.mock_time = current_time; + aws_mutex_unlock(&s_tester.mock_time_lock); +} + static void s_cm_tester_on_cm_shutdown_complete(void *user_data) { struct cm_tester *tester = user_data; AWS_FATAL_ASSERT(tester == &s_tester); @@ -101,6 +119,15 @@ static void s_cm_tester_on_client_bootstrap_shutdown_complete(void *user_data) { aws_condition_variable_notify_one(&tester->signal); } +static struct aws_event_loop *s_new_event_loop( + struct aws_allocator *alloc, + aws_io_clock_fn *clock, + void *new_loop_user_data) { + (void)new_loop_user_data; + + return aws_event_loop_new_default(alloc, clock); +} + static int s_cm_tester_init(struct cm_tester_options *options) { struct cm_tester *tester = &s_tester; @@ -113,18 +140,19 @@ static int s_cm_tester_init(struct cm_tester_options *options) { ASSERT_SUCCESS(aws_mutex_init(&tester->lock)); ASSERT_SUCCESS(aws_condition_variable_init(&tester->signal)); - struct aws_logger_standard_options logger_options = { - .level = AWS_LOG_LEVEL_TRACE, - .file = stderr, - }; + ASSERT_SUCCESS( + aws_array_list_init_dynamic(&tester->connections, tester->allocator, 10, sizeof(struct aws_http_connection *))); - ASSERT_SUCCESS(aws_logger_init_standard(&tester->logger, tester->allocator, &logger_options)); - aws_logger_set(&tester->logger); + aws_mutex_init(&tester->mock_time_lock); + s_tester_set_mock_time(options->starting_mock_time); + aws_io_clock_fn *clock_fn = &aws_high_res_clock_get_ticks; + if (options->mock_table) { + clock_fn = options->mock_table->get_monotonic_time; + } ASSERT_SUCCESS( - aws_array_list_init_dynamic(&tester->connections, tester->allocator, 10, sizeof(struct aws_http_connection *))); + aws_event_loop_group_init(&tester->event_loop_group, tester->allocator, clock_fn, 1, s_new_event_loop, NULL)); - ASSERT_SUCCESS(aws_event_loop_group_default_init(&tester->event_loop_group, tester->allocator, 1)); ASSERT_SUCCESS( aws_host_resolver_init_default(&tester->host_resolver, tester->allocator, 8, &tester->event_loop_group)); struct aws_client_bootstrap_options bootstrap_options = { @@ -162,8 +190,13 @@ static int s_cm_tester_init(struct cm_tester_options *options) { .max_connections = options->max_connections, .shutdown_complete_user_data = tester, .shutdown_complete_callback = s_cm_tester_on_cm_shutdown_complete, + .max_connection_idle_in_milliseconds = options->max_connection_idle_in_ms, }; + if (options->mock_table) { + g_aws_http_connection_manager_default_system_vtable_ptr = options->mock_table; + } + tester->connection_manager = aws_http_connection_manager_new(tester->allocator, &cm_options); ASSERT_NOT_NULL(tester->connection_manager); @@ -387,9 +420,9 @@ static int s_cm_tester_clean_up(void) { aws_mutex_clean_up(&tester->lock); aws_condition_variable_clean_up(&tester->signal); - aws_http_library_clean_up(); + aws_mutex_clean_up(&tester->mock_time_lock); - aws_logger_clean_up(&tester->logger); + aws_http_library_clean_up(); return AWS_OP_SUCCESS; } @@ -494,7 +527,10 @@ AWS_TEST_CASE(test_connection_manager_close_and_release, s_test_connection_manag static int s_test_connection_manager_acquire_release_mix(struct aws_allocator *allocator, void *ctx) { (void)ctx; - struct cm_tester_options options = {.allocator = allocator, .max_connections = 5}; + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = 5, + }; ASSERT_SUCCESS(s_cm_tester_init(&options)); @@ -581,11 +617,28 @@ static bool s_aws_http_connection_manager_is_connection_open_sync_mock(const str return !proxy->is_closed_on_release; } +static bool s_aws_http_connection_manager_is_callers_thread_sync_mock(struct aws_channel *channel) { + (void)channel; + + return true; +} + +static struct aws_channel *s_aws_http_connection_manager_connection_get_channel_sync_mock( + struct aws_http_connection *connection) { + (void)connection; + + return (struct aws_channel *)1; +} + static struct aws_http_connection_manager_system_vtable s_synchronous_mocks = { .create_connection = s_aws_http_connection_manager_create_connection_sync_mock, .release_connection = s_aws_http_connection_manager_release_connection_sync_mock, .close_connection = s_aws_http_connection_manager_close_connection_sync_mock, - .is_connection_open = s_aws_http_connection_manager_is_connection_open_sync_mock}; + .is_connection_open = s_aws_http_connection_manager_is_connection_open_sync_mock, + .get_monotonic_time = aws_high_res_clock_get_ticks, + .connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock, + .is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock, +}; static int s_test_connection_manager_acquire_release_mix_synchronous(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -593,6 +646,7 @@ static int s_test_connection_manager_acquire_release_mix_synchronous(struct aws_ struct cm_tester_options options = { .allocator = allocator, .max_connections = 5, + .mock_table = &s_synchronous_mocks, }; ASSERT_SUCCESS(s_cm_tester_init(&options)); @@ -631,7 +685,10 @@ static int s_test_connection_manager_connect_callback_failure(struct aws_allocat (void)ctx; struct cm_tester_options options = { - .allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks}; + .allocator = allocator, + .max_connections = 5, + .mock_table = &s_synchronous_mocks, + }; ASSERT_SUCCESS(s_cm_tester_init(&options)); @@ -653,7 +710,10 @@ static int s_test_connection_manager_connect_immediate_failure(struct aws_alloca (void)ctx; struct cm_tester_options options = { - .allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks}; + .allocator = allocator, + .max_connections = 5, + .mock_table = &s_synchronous_mocks, + }; ASSERT_SUCCESS(s_cm_tester_init(&options)); @@ -693,3 +753,235 @@ static int s_test_connection_manager_proxy_setup_shutdown(struct aws_allocator * return AWS_OP_SUCCESS; } AWS_TEST_CASE(test_connection_manager_proxy_setup_shutdown, s_test_connection_manager_proxy_setup_shutdown); + +static struct aws_http_connection_manager_system_vtable s_idle_mocks = { + .create_connection = s_aws_http_connection_manager_create_connection_sync_mock, + .release_connection = s_aws_http_connection_manager_release_connection_sync_mock, + .close_connection = s_aws_http_connection_manager_close_connection_sync_mock, + .is_connection_open = s_aws_http_connection_manager_is_connection_open_sync_mock, + .get_monotonic_time = s_tester_get_mock_time, + .connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock, + .is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock, +}; + +static int s_register_acquired_connections(struct aws_array_list *seen_connections) { + aws_mutex_lock(&s_tester.lock); + + size_t acquired_count = aws_array_list_length(&s_tester.connections); + for (size_t i = 0; i < acquired_count; ++i) { + struct aws_http_connection *connection = NULL; + aws_array_list_get_at(&s_tester.connections, &connection, i); + aws_array_list_push_back(seen_connections, &connection); + } + + aws_mutex_unlock(&s_tester.lock); + + return AWS_OP_SUCCESS; +} + +static size_t s_get_acquired_connections_seen_count(struct aws_array_list *seen_connections) { + size_t actual_seen_count = 0; + aws_mutex_lock(&s_tester.lock); + + size_t seen_count = aws_array_list_length(seen_connections); + size_t acquired_count = aws_array_list_length(&s_tester.connections); + for (size_t i = 0; i < acquired_count; ++i) { + struct aws_http_connection *acquired_connection = NULL; + aws_array_list_get_at(&s_tester.connections, &acquired_connection, i); + + for (size_t j = 0; j < seen_count; ++j) { + struct aws_http_connection *seen_connection = NULL; + aws_array_list_get_at(seen_connections, &seen_connection, j); + + if (seen_connection == acquired_connection) { + actual_seen_count++; + } + } + } + + aws_mutex_unlock(&s_tester.lock); + + return actual_seen_count; +} + +static int s_test_connection_manager_idle_culling_single(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_array_list seen_connections; + AWS_ZERO_STRUCT(seen_connections); + ASSERT_SUCCESS(aws_array_list_init_dynamic(&seen_connections, allocator, 10, sizeof(struct aws_http_connection *))); + + uint64_t now = 0; + + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = 1, + .mock_table = &s_idle_mocks, + .max_connection_idle_in_ms = 1000, + .starting_mock_time = now, + }; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + /* add enough fake connections to cover all the acquires */ + s_add_mock_connections(2, AWS_NCRT_SUCCESS, false); + + /* acquire some connections */ + s_acquire_connections(1); + + ASSERT_SUCCESS(s_wait_on_connection_reply_count(1)); + + /* remember what connections we acquired */ + s_register_acquired_connections(&seen_connections); + + /* release the connections */ + s_release_connections(1, false); + + /* advance fake time enough to cause the connections to be culled, also sleep for real to give the cull task + * a chance to run in the real event loop + */ + uint64_t one_sec_in_nanos = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + s_tester_set_mock_time(now + one_sec_in_nanos); + aws_thread_current_sleep(2 * one_sec_in_nanos); + + /* acquire some connections */ + s_acquire_connections(1); + ASSERT_SUCCESS(s_wait_on_connection_reply_count(2)); + + /* make sure the connections acquired were not ones that we expected to cull */ + ASSERT_INT_EQUALS(s_get_acquired_connections_seen_count(&seen_connections), 0); + + /* release everything and clean up */ + s_release_connections(1, false); + + ASSERT_SUCCESS(s_cm_tester_clean_up()); + + aws_array_list_clean_up(&seen_connections); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_idle_culling_single, s_test_connection_manager_idle_culling_single); + +static int s_test_connection_manager_idle_culling_many(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_array_list seen_connections; + AWS_ZERO_STRUCT(seen_connections); + ASSERT_SUCCESS(aws_array_list_init_dynamic(&seen_connections, allocator, 10, sizeof(struct aws_http_connection *))); + + uint64_t now = 0; + + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = 5, + .mock_table = &s_idle_mocks, + .max_connection_idle_in_ms = 1000, + .starting_mock_time = now, + }; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + /* add enough fake connections to cover all the acquires */ + s_add_mock_connections(10, AWS_NCRT_SUCCESS, false); + + /* acquire some connections */ + s_acquire_connections(5); + + ASSERT_SUCCESS(s_wait_on_connection_reply_count(5)); + + /* remember what connections we acquired */ + s_register_acquired_connections(&seen_connections); + + /* release the connections */ + s_release_connections(5, false); + + /* advance fake time enough to cause the connections to be culled, also sleep for real to give the cull task + * a chance to run in the real event loop + */ + uint64_t one_sec_in_nanos = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + s_tester_set_mock_time(now + one_sec_in_nanos); + aws_thread_current_sleep(2 * one_sec_in_nanos); + + /* acquire some connections */ + s_acquire_connections(5); + ASSERT_SUCCESS(s_wait_on_connection_reply_count(10)); + + /* make sure the connections acquired were not ones that we expected to cull */ + ASSERT_INT_EQUALS(s_get_acquired_connections_seen_count(&seen_connections), 0); + + /* release everything and clean up */ + s_release_connections(5, false); + + ASSERT_SUCCESS(s_cm_tester_clean_up()); + + aws_array_list_clean_up(&seen_connections); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_idle_culling_many, s_test_connection_manager_idle_culling_many); + +static int s_test_connection_manager_idle_culling_mixture(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_array_list seen_connections; + AWS_ZERO_STRUCT(seen_connections); + ASSERT_SUCCESS(aws_array_list_init_dynamic(&seen_connections, allocator, 10, sizeof(struct aws_http_connection *))); + + uint64_t now = 0; + + struct cm_tester_options options = { + .allocator = allocator, + .max_connections = 10, + .mock_table = &s_idle_mocks, + .max_connection_idle_in_ms = 1000, + .starting_mock_time = now, + }; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + /* add enough fake connections to cover all the acquires */ + s_add_mock_connections(15, AWS_NCRT_SUCCESS, false); + + /* acquire some connections */ + s_acquire_connections(10); + + ASSERT_SUCCESS(s_wait_on_connection_reply_count(10)); + + /* remember what connections we acquired */ + s_register_acquired_connections(&seen_connections); + + /* + * release the connections + * Previous tests created situations where the entire block of idle connections end up getting culled. We also + * want to create a situation where just some of the connections get culled. + */ + s_release_connections(5, false); + s_tester_set_mock_time(now + 1); + s_release_connections(5, false); + s_tester_set_mock_time(now); + uint64_t one_sec_in_nanos = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + + /* + * advance fake time enough to cause half of the connections to be culled, also sleep for real to give the cull task + * a chance to run in the real event loop. + */ + s_tester_set_mock_time(now + one_sec_in_nanos); + aws_thread_current_sleep(2 * one_sec_in_nanos); + + /* acquire some connections */ + s_acquire_connections(10); + ASSERT_SUCCESS(s_wait_on_connection_reply_count(20)); + + /* make sure the connections acquired are half old and half new */ + ASSERT_INT_EQUALS(s_get_acquired_connections_seen_count(&seen_connections), 5); + + /* release everything and clean up */ + s_release_connections(10, false); + + ASSERT_SUCCESS(s_cm_tester_clean_up()); + + aws_array_list_clean_up(&seen_connections); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_idle_culling_mixture, s_test_connection_manager_idle_culling_mixture); \ No newline at end of file