diff --git a/include/aws/http/connection_manager.h b/include/aws/http/connection_manager.h new file mode 100644 index 000000000..a89a919b3 --- /dev/null +++ b/include/aws/http/connection_manager.h @@ -0,0 +1,113 @@ +#ifndef AWS_HTTP_CONNECTION_MANAGER_H +#define AWS_HTTP_CONNECTION_MANAGER_H + +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include + +#include + +struct aws_client_bootstrap; +struct aws_http_connection; +struct aws_http_connection_manager; +struct aws_http_connection_manager_mocks; +struct aws_socket_options; +struct aws_tls_connection_options; + +typedef void(aws_http_connection_manager_on_connection_setup_fn)( + struct aws_http_connection *connection, + int error_code, + void *user_data); + +/* + * Connection manager configuration struct. + * + * Contains all of the configuration needed to create an http connection as well as + * the maximum number of connections to ever have in existence. + */ +struct aws_http_connection_manager_options { + /* + * http connection configuration + */ + struct aws_client_bootstrap *bootstrap; + size_t initial_window_size; + struct aws_socket_options *socket_options; + struct aws_tls_connection_options *tls_connection_options; + struct aws_byte_cursor host; + uint16_t port; + + /* + * Maximum number of connections this manager is allowed to contain + */ + size_t max_connections; +}; + +AWS_EXTERN_C_BEGIN + +/* + * Connection managers are ref counted. Adds one external ref to the manager. + */ +AWS_HTTP_API +void aws_http_connection_manager_acquire(struct aws_http_connection_manager *manager); + +/* + * Connection managers are ref counted. Removes one external ref from the manager. + * + * When the ref count goes to zero, the connection manager begins its shut down + * process. All pending connection acquisitions are failed (with callbacks + * invoked) and any (erroneous) subsequent attempts to acquire a connection + * fail immediately. The connection manager destroys itself once all pending + * asynchronous activities have resolved. + */ +AWS_HTTP_API +void aws_http_connection_manager_release(struct aws_http_connection_manager *manager); + +/* + * Creates a new connection manager with the supplied configuration options. + * + * The returned connection manager begins with a ref count of 1. + */ +AWS_HTTP_API +struct aws_http_connection_manager *aws_http_connection_manager_new( + struct aws_allocator *allocator, + struct aws_http_connection_manager_options *options); + +/* + * Requests a connection from the manager. The requester is notified of + * an acquired connection (or failure to acquire) via the supplied callback. + * + * Once a connection has been successfully acquired from the manager it + * must be released back (via aws_http_connection_manager_release_connection) + * at some point. Failure to do so will cause a resource leak. + */ +AWS_HTTP_API +void aws_http_connection_manager_acquire_connection( + struct aws_http_connection_manager *manager, + aws_http_connection_manager_on_connection_setup_fn *callback, + void *user_data); + +/* + * Returns a connection back to the manager. All acquired connections must + * eventually be released back to the manager in order to avoid a resource leak. + */ +AWS_HTTP_API +int aws_http_connection_manager_release_connection( + struct aws_http_connection_manager *manager, + struct aws_http_connection *connection); + +AWS_EXTERN_C_END + +#endif /* AWS_HTTP_CONNECTION_MANAGER_H */ diff --git a/include/aws/http/http.h b/include/aws/http/http.h index 6af06ed9e..6b2b1102e 100644 --- a/include/aws/http/http.h +++ b/include/aws/http/http.h @@ -31,6 +31,9 @@ enum aws_http_errors { AWS_ERROR_HTTP_CALLBACK_FAILURE, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT, AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER, + AWS_ERROR_HTTP_CONNECTION_MANAGER_INVALID_STATE_FOR_ACQUIRE, + AWS_ERROR_HTTP_CONNECTION_MANAGER_VENDED_CONNECTION_UNDERFLOW, + AWS_ERROR_HTTP_END_RANGE = 0x0C00, }; @@ -39,6 +42,7 @@ enum aws_http_log_subject { AWS_LS_HTTP_CONNECTION, AWS_LS_HTTP_SERVER, AWS_LS_HTTP_STREAM, + AWS_LS_HTTP_CONNECTION_MANAGER, AWS_LS_HTTP_WEBSOCKET, }; diff --git a/include/aws/http/private/connection_manager_function_table.h b/include/aws/http/private/connection_manager_function_table.h new file mode 100644 index 000000000..d815d36ca --- /dev/null +++ b/include/aws/http/private/connection_manager_function_table.h @@ -0,0 +1,51 @@ +#ifndef AWS_HTTP_CONNECTION_MANAGER_FUNCTION_TABLE_H +#define AWS_HTTP_CONNECTION_MANAGER_FUNCTION_TABLE_H + +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include + +#include + +typedef int(aws_http_connection_manager_create_connection_fn)(const struct aws_http_client_connection_options *options); +typedef void(aws_http_connection_manager_close_connection_fn)(struct aws_http_connection *connection); +typedef void(aws_http_connection_manager_release_connection_fn)(struct aws_http_connection *connection); +typedef bool(aws_http_connection_manager_is_connection_open_fn)(const struct aws_http_connection *connection); + +struct aws_http_connection_manager_function_table { + /* + * Downstream http functions + */ + aws_http_connection_manager_create_connection_fn *create_connection; + aws_http_connection_manager_close_connection_fn *close_connection; + aws_http_connection_manager_release_connection_fn *release_connection; + aws_http_connection_manager_is_connection_open_fn *is_connection_open; +}; + +AWS_HTTP_API +bool aws_http_connection_manager_function_table_is_valid( + const struct aws_http_connection_manager_function_table *table); + +AWS_HTTP_API +void aws_http_connection_manager_set_function_table( + struct aws_http_connection_manager *manager, + const struct aws_http_connection_manager_function_table *function_table); + +AWS_HTTP_API +extern const struct aws_http_connection_manager_function_table + *g_aws_http_connection_manager_default_function_table_ptr; + +#endif /* AWS_HTTP_CONNECTION_MANAGER_FUNCTION_TABLE_H */ diff --git a/source/connection_manager.c b/source/connection_manager.c new file mode 100644 index 000000000..6c226e7aa --- /dev/null +++ b/source/connection_manager.c @@ -0,0 +1,967 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * Function table to use under normal circumstances + */ +static struct aws_http_connection_manager_function_table s_default_function_table = { + .create_connection = aws_http_client_connect, + .release_connection = aws_http_connection_release, + .close_connection = aws_http_connection_close, + .is_connection_open = aws_http_connection_is_open}; + +const struct aws_http_connection_manager_function_table *g_aws_http_connection_manager_default_function_table_ptr = + &s_default_function_table; + +bool aws_http_connection_manager_function_table_is_valid( + const struct aws_http_connection_manager_function_table *table) { + return table->create_connection && table->close_connection && table->release_connection && + table->is_connection_open; +} + +enum aws_http_connection_manager_state_type { AWS_HCMST_UNINITIALIZED, AWS_HCMST_READY, AWS_HCMST_SHUTTING_DOWN }; + +/** + * Vocabulary + * Acquisition - a request by a user for a connection + * Pending Acquisition - a request by a user for a new connection that has not been completed. It may be + * waiting on http, a release by another user, or the manager itself. + * Pending Connect - a request to the http layer for a new connection that has not been resolved yet + * Vended Connection - a successfully established connection that is currently in use by something; must + * be released (through the connection manager) by the user before anyone else can use it. The connection + * manager does not explicitly track vended connections. + * Task Set - A set of operations that should be attempted once the lock is released. A task set includes + * completion callbacks (which can't fail) and connection attempts (which can fail either immediately or + * asynchronously). + * + * Requirements/Assumptions + * (1) Don't invoke user callbacks while holding the internal state lock + * (2) Don't invoke downstream http calls while holding the internal state lock + * (3) Only log unusual or rare events while the lock is held. Common-path logging should be while it is + * not held. + * (4) Don't crash or do awful things (leaking resources is ok though) if the interface contract + * (ref counting + balanced acquire/release of connections) is violated by the user + * + * In order to fulfill (1) and (2), all operations within the connection manager follow a pattern: + * + * (1) Lock + * (2) Make state changes based on the operation + * (3) Build a task set (completions and connect calls) as appropriate to the operation + * (4) Unlock + * (5) Execute the task set + * + * Asynchronous work order failures are handled in the async callback, but immediate failures require + * us to relock and update the internal state. When there's an immediate connect failure, we use a + * conservative policy to fail all excess (beyond the # of pending connects) acquisitions; this allows us + * to avoid a possible recursive invocation (and potential failures) to connect again. + * + * Lifecycle + * Our connection manager implementation has a reasonably complex lifecycle. + * + * All state around the life cycle is protected by a lock. It seemed too risky and error-prone + * to try and mix an atomic ref count with the internal tracking counters we need. + * + * Over the course of its lifetime, a connection manager moves through two states: + * + * READY - connections may be acquired and released. When the external ref count for the manager + * drops to zero, the manager moves to: + * + * SHUTTING_DOWN - connections may no longer be acquired and released (how could they if the external + * ref count was accurate?) but in case of user ref errors, we simply fail attempts to do so rather + * than crash or underflow. While in this state, we wait for a set of tracking counters to all fall to zero: + * + * pending_connect_count - the # of unresolved calls to the http layer's connect logic + * open_connection_count - the # of connections for whom the release callback (from http) has not been invoked + * vended_connection_count - the # of connections held by external users that haven't been released. Under correct + * usage this should be zero before SHUTTING_DOWN is entered, but we attempt to handle incorrect usage gracefully. + * + * While shutting down, as pending connects resolve, we immediately release new incoming (from http) connections + * + * During the transition from READY to SHUTTING_DOWN, we flush the pending acquisition queue (with failure callbacks) + * and since we disallow new acquires, pending_acquisition_count should always be zero after the transition. + * + */ +struct aws_http_connection_manager { + struct aws_allocator *allocator; + + /* + * A union of external downstream dependencies (primarily global http API functions) and + * internal implementation references. Selectively overridden by tests in order to + * enable strong coverage of internal implementation details. + */ + const struct aws_http_connection_manager_function_table *function_table; + + /* + * Controls access to all mutable state on the connection manager + */ + struct aws_mutex lock; + + /* + * A manager can be in one of two states, READY or SHUTTING_DOWN. The state transition + * takes place when ref_count drops to zero. + */ + enum aws_http_connection_manager_state_type state; + + /* + * The set of all available, ready-to-be-used connections + */ + struct aws_array_list connections; + + /* + * The set of all incomplete connection acquisition requests + */ + struct aws_linked_list pending_acquisitions; + + /* + * The number of all incomplete connection acquisition requests. So + * that we don't have compute the size of a linked list every time. + */ + size_t pending_acquisition_count; + + /* + * The number of pending new connection requests we have outstanding to the http + * layer. + */ + size_t pending_connects_count; + + /* + * The number of connections currently being used by external users. + */ + size_t vended_connection_count; + + /* + * Always equal to # of connection shutdown callbacks not yet invoked + * or equivalently: + * + * # of connections ever created by the manager - # shutdown callbacks received + */ + size_t open_connection_count; + + /* + * All the options needed to create an http connection + */ + struct aws_client_bootstrap *bootstrap; + size_t initial_window_size; + struct aws_socket_options socket_options; + struct aws_tls_connection_options *tls_connection_options; + struct aws_string *host; + uint16_t port; + + /* + * The maximum number of connections this manager should ever have at once. + */ + size_t max_connections; + + /* + * Lifecycle tracking for the connection manager. Starts at 1. + * + * Once this drops to zero, the manager state transitions to shutting down + * + * The manager is deleted when all other tracking counters have returned to zero. + * + * We don't use an atomic here because the shutdown phase wants to check many different + * values. You could argue that we could use a sum of everything, but we still need the + * individual values for proper behavior and error checking during the ready state. Also, + * a hybrid atomic/lock solution felt excessively complicated and delicate. + */ + size_t external_ref_count; +}; + +struct aws_http_connection_manager_snapshot { + enum aws_http_connection_manager_state_type state; + + size_t held_connection_count; + size_t pending_acquisition_count; + size_t pending_connects_count; + size_t vended_connection_count; + size_t open_connection_count; + + size_t external_ref_count; +}; + +/* + * Correct usage requires AWS_ZERO_STRUCT to have been called beforehand. + */ +static void s_aws_http_connection_manager_get_snapshot( + struct aws_http_connection_manager *manager, + struct aws_http_connection_manager_snapshot *snapshot) { + + snapshot->state = manager->state; + snapshot->held_connection_count = aws_array_list_length(&manager->connections); + snapshot->pending_acquisition_count = manager->pending_acquisition_count; + snapshot->pending_connects_count = manager->pending_connects_count; + snapshot->vended_connection_count = manager->vended_connection_count; + snapshot->open_connection_count = manager->open_connection_count; + + snapshot->external_ref_count = manager->external_ref_count; +} + +static void s_aws_http_connection_manager_log_snapshot( + struct aws_http_connection_manager *manager, + struct aws_http_connection_manager_snapshot *snapshot) { + if (snapshot->state != AWS_HCMST_UNINITIALIZED) { + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: snapshot - state=%d, held_connection_count=%zu, pending_acquire_count=%zu, " + "pending_connect_count=%zu, vended_connection_count=%zu, open_connection_count=%zu, ref_count=%zu", + (void *)manager, + (int)snapshot->state, + snapshot->held_connection_count, + snapshot->pending_acquisition_count, + snapshot->pending_connects_count, + snapshot->vended_connection_count, + snapshot->open_connection_count, + snapshot->external_ref_count); + } else { + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: snapshot not initialized by control flow", (void *)manager); + } +} + +void aws_http_connection_manager_set_function_table( + struct aws_http_connection_manager *manager, + const struct aws_http_connection_manager_function_table *function_table) { + AWS_FATAL_ASSERT(aws_http_connection_manager_function_table_is_valid(function_table)); + + manager->function_table = function_table; +} + +/* + * Hard Requirement: Manager's lock must held somewhere in the call stack + */ +static bool s_aws_http_connection_manager_should_destroy(struct aws_http_connection_manager *manager) { + if (manager->state != AWS_HCMST_SHUTTING_DOWN) { + return false; + } + + if (manager->external_ref_count != 0) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: ref count is non zero while in the shut down state", + (void *)manager); + return false; + } + + if (manager->vended_connection_count > 0 || manager->pending_connects_count > 0 || + manager->open_connection_count > 0) { + return false; + } + + return true; +} + +/* + * A struct that functions as both the pending acquisition tracker and the about-to-complete data. + * + * The list in the connection manager (pending_acquisitions) is the set of all acquisition requests that we + * haven't yet resolved. + * + * In order to make sure we never invoke callbacks while holding the manager's lock, in a number of places + * we build a list of one or more acquisitions to complete. Once the lock is released + * we complete all the acquisitions in the list using the data within the struct (hence why we have + * "result-oriented" members like connection and error_code). This means we can fail an acquisition + * simply by setting the error_code and moving it to the current task set. + */ +struct aws_http_connection_acquisition { + struct aws_linked_list_node node; + struct aws_http_connection_manager *manager; /* Only used by logging */ + aws_http_connection_manager_on_connection_setup_fn *callback; + void *user_data; + struct aws_http_connection *connection; + int error_code; +}; + +/* + * Invokes a set of connection acquisition completion callbacks. + * + * Soft Requirement: The manager's lock must not be held in the callstack. + * + * Assumes that internal state (like pending_acquisition_count, vended_connection_count, etc...) have already been + * updated according to the list's contents. + */ +static void s_aws_http_connection_manager_complete_acquisitions( + struct aws_linked_list *acquisitions, + struct aws_allocator *allocator) { + + while (!aws_linked_list_empty(acquisitions)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(acquisitions); + struct aws_http_connection_acquisition *pending_acquisition = + AWS_CONTAINER_OF(node, struct aws_http_connection_acquisition, node); + + pending_acquisition->callback( + pending_acquisition->connection, pending_acquisition->error_code, pending_acquisition->user_data); + + if (pending_acquisition->error_code != 0) { + AWS_LOGF_WARN( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Failed to completed connection acquisition with error_code %d(%s)", + (void *)pending_acquisition->manager, + pending_acquisition->error_code, + aws_error_str(pending_acquisition->error_code)); + } else { + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Successfully completed connection acquisition with connection id=%p", + (void *)pending_acquisition->manager, + (void *)pending_acquisition->connection); + } + + aws_mem_release(allocator, pending_acquisition); + } +} + +/* + * Moves the first pending connection acquisition into a (task set) list. Call this while holding the lock to + * build the set of callbacks to be completed once the lock is released. + * + * Hard Requirement: Manager's lock must held somewhere in the call stack + * + * If this was a successful acquisition then connection is non-null + * If this was a failed acquisition then connection is null and error_code is hopefully a useful diagnostic (extreme + * edge cases exist where it may not be though) + */ +static void s_aws_http_connection_manager_move_front_acquisition( + struct aws_http_connection_manager *manager, + struct aws_http_connection *connection, + int error_code, + struct aws_linked_list *output_list) { + + AWS_FATAL_ASSERT(!aws_linked_list_empty(&manager->pending_acquisitions)); + struct aws_linked_list_node *node = aws_linked_list_pop_front(&manager->pending_acquisitions); + + AWS_FATAL_ASSERT(manager->pending_acquisition_count > 0); + --manager->pending_acquisition_count; + + struct aws_http_connection_acquisition *pending_acquisition = + AWS_CONTAINER_OF(node, struct aws_http_connection_acquisition, node); + pending_acquisition->connection = connection; + pending_acquisition->error_code = error_code; + + aws_linked_list_push_back(output_list, node); +} + +static void s_aws_http_connection_manager_destroy(struct aws_http_connection_manager *manager) { + if (manager == NULL) { + return; + } + + AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Destroying self", (void *)manager); + + AWS_ASSERT(manager->pending_connects_count == 0); + AWS_ASSERT(manager->vended_connection_count == 0); + AWS_ASSERT(manager->pending_acquisition_count == 0); + AWS_ASSERT(manager->open_connection_count == 0); + AWS_ASSERT(aws_linked_list_empty(&manager->pending_acquisitions)); + AWS_ASSERT(aws_array_list_length(&manager->connections) == 0); + + aws_array_list_clean_up(&manager->connections); + + aws_string_destroy(manager->host); + if (manager->tls_connection_options) { + aws_tls_connection_options_clean_up(manager->tls_connection_options); + aws_mem_release(manager->allocator, manager->tls_connection_options); + } + + aws_mutex_clean_up(&manager->lock); + + aws_mem_release(manager->allocator, manager); +} + +struct aws_http_connection_manager *aws_http_connection_manager_new( + struct aws_allocator *allocator, + struct aws_http_connection_manager_options *options) { + + if (!options || !options->socket_options || options->max_connections == 0) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_http_connection_manager *manager = + aws_mem_acquire(allocator, sizeof(struct aws_http_connection_manager)); + if (manager == NULL) { + return NULL; + } + + AWS_ZERO_STRUCT(*manager); + manager->allocator = allocator; + + if (aws_mutex_init(&manager->lock)) { + goto on_error; + } + + if (aws_array_list_init_dynamic( + &manager->connections, allocator, options->max_connections, sizeof(struct aws_http_connection *))) { + goto on_error; + } + + aws_linked_list_init(&manager->pending_acquisitions); + + manager->host = aws_string_new_from_array(allocator, options->host.ptr, options->host.len); + if (manager->host == NULL) { + goto on_error; + } + + if (options->tls_connection_options) { + manager->tls_connection_options = aws_mem_acquire(allocator, sizeof(struct aws_tls_connection_options)); + AWS_ZERO_STRUCT(*manager->tls_connection_options); + if (aws_tls_connection_options_copy(manager->tls_connection_options, options->tls_connection_options)) { + goto on_error; + } + } + + manager->state = AWS_HCMST_READY; + manager->initial_window_size = options->initial_window_size; + manager->port = options->port; + manager->max_connections = options->max_connections; + manager->socket_options = *options->socket_options; + manager->bootstrap = options->bootstrap; + manager->function_table = g_aws_http_connection_manager_default_function_table_ptr; + manager->external_ref_count = 1; + + AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Successfully created", (void *)manager); + + return manager; + +on_error: + + s_aws_http_connection_manager_destroy(manager); + + return NULL; +} + +void aws_http_connection_manager_acquire(struct aws_http_connection_manager *manager) { + aws_mutex_lock(&manager->lock); + AWS_FATAL_ASSERT(manager->external_ref_count > 0); + manager->external_ref_count += 1; + aws_mutex_unlock(&manager->lock); +} + +void aws_http_connection_manager_release(struct aws_http_connection_manager *manager) { + /* + * Swap targets in case we need to start the shut down process (clean up done outside the lock) + */ + struct aws_array_list connections_to_release; + AWS_ZERO_STRUCT(connections_to_release); + + struct aws_linked_list pending_acquisitions_to_fail; + aws_linked_list_init(&pending_acquisitions_to_fail); + + bool should_destroy = false; + + AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: release", (void *)manager); + + aws_mutex_lock(&manager->lock); + + if (manager->external_ref_count > 0) { + manager->external_ref_count -= 1; + + if (manager->external_ref_count == 0) { + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: ref count now zero, starting shut down process", + (void *)manager); + manager->state = AWS_HCMST_SHUTTING_DOWN; + should_destroy = s_aws_http_connection_manager_should_destroy(manager); + + /* + * swap our internal connection set with the zeroed local set + */ + aws_array_list_init_dynamic( + &connections_to_release, manager->allocator, 0, sizeof(struct aws_http_connection *)); + aws_array_list_swap_contents(&manager->connections, &connections_to_release); + + /* + * Swap our pending acquisitions with the local list + */ + aws_linked_list_swap_contents(&manager->pending_acquisitions, &pending_acquisitions_to_fail); + + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: manager release, failing %zu pending acquisitions", + (void *)manager, + manager->pending_acquisition_count); + manager->pending_acquisition_count = 0; + } + } else { + AWS_LOGF_ERROR( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Connection manager release called with a zero reference count", + (void *)manager); + } + + aws_mutex_unlock(&manager->lock); + + size_t connection_count = aws_array_list_length(&connections_to_release); + if (connection_count > 0) { + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: manager release, releasing %zu held connections", + (void *)manager, + connection_count); + } + + for (size_t i = 0; i < connection_count; ++i) { + struct aws_http_connection *connection = NULL; + if (aws_array_list_get_at(&connections_to_release, &connection, i)) { + continue; + } + + manager->function_table->release_connection(connection); + } + + aws_array_list_clean_up(&connections_to_release); + s_aws_http_connection_manager_complete_acquisitions(&pending_acquisitions_to_fail, manager->allocator); + + if (should_destroy) { + s_aws_http_connection_manager_destroy(manager); + } +} + +static void s_aws_http_connection_manager_build_task_set( + struct aws_http_connection_manager *manager, + struct aws_linked_list *completions, + size_t *new_connections) { + + *new_connections = 0; + + /* + * Step 1 - If there's free connections, complete acquisition requests + */ + while (aws_array_list_length(&manager->connections) > 0 && manager->pending_acquisition_count > 0) { + struct aws_http_connection *connection = NULL; + aws_array_list_back(&manager->connections, &connection); + + aws_array_list_pop_back(&manager->connections); + + s_aws_http_connection_manager_move_front_acquisition(manager, connection, AWS_ERROR_SUCCESS, completions); + ++manager->vended_connection_count; + } + + /* + * Step 2 - if there's excess pending acquisitions and we have room to make more, make more + */ + if (manager->pending_acquisition_count > manager->pending_connects_count) { + AWS_FATAL_ASSERT( + manager->max_connections >= manager->vended_connection_count + manager->pending_connects_count); + + *new_connections = manager->pending_acquisition_count - manager->pending_connects_count; + size_t max_new_connections = + manager->max_connections - (manager->vended_connection_count + manager->pending_connects_count); + + if (*new_connections > max_new_connections) { + *new_connections = max_new_connections; + } + + manager->pending_connects_count += *new_connections; + } +} + +static void s_aws_http_connection_manager_on_connection_setup( + struct aws_http_connection *connection, + int error_code, + void *user_data); + +static void s_aws_http_connection_manager_on_connection_shutdown( + struct aws_http_connection *connection, + int error_code, + void *user_data); + +static int s_aws_http_connection_manager_new_connection(struct aws_http_connection_manager *manager) { + struct aws_http_client_connection_options options; + AWS_ZERO_STRUCT(options); + options.self_size = sizeof(struct aws_http_client_connection_options); + options.bootstrap = manager->bootstrap; + options.tls_options = manager->tls_connection_options; + options.allocator = manager->allocator; + options.user_data = manager; + options.host_name = aws_byte_cursor_from_string(manager->host); + options.port = manager->port; + options.initial_window_size = manager->initial_window_size; + options.socket_options = &manager->socket_options; + options.on_setup = s_aws_http_connection_manager_on_connection_setup; + options.on_shutdown = s_aws_http_connection_manager_on_connection_shutdown; + + if (manager->function_table->create_connection(&options)) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: http connection creation failed with error code %d(%s)", + (void *)manager, + aws_last_error(), + aws_error_str(aws_last_error())); + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + +static void s_aws_http_connection_manager_execute_task_set( + struct aws_http_connection_manager *manager, + struct aws_linked_list *completions, + size_t new_connections) { + + int representative_error = 0; + size_t new_connection_failures = 0; + + struct aws_array_list errors; + AWS_ZERO_STRUCT(errors); + + if (new_connections > 0) { + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Requesting %zu new connections from http", + (void *)manager, + new_connections); + } + + /* Even if we can't init this array, we still need to invoke error callbacks properly */ + bool push_errors = + aws_array_list_init_dynamic(&errors, manager->allocator, new_connections, sizeof(int)) == AWS_ERROR_SUCCESS; + + for (size_t i = 0; i < new_connections; ++i) { + if (s_aws_http_connection_manager_new_connection(manager)) { + ++new_connection_failures; + representative_error = aws_last_error(); + if (push_errors) { + AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &representative_error) == AWS_OP_SUCCESS); + } + } + } + + if (new_connection_failures > 0) { + /* + * We failed and aren't going to receive a callback, but the current state assumes we will receive + * a callback. So we need to re-lock and update the state ourselves. + */ + aws_mutex_lock(&manager->lock); + + AWS_FATAL_ASSERT(manager->pending_connects_count >= new_connection_failures); + manager->pending_connects_count -= new_connection_failures; + + /* + * Rather than failing one acquisition for each connection failure, if there's at least one + * connection failure, we instead fail all excess acquisitions, since there's no pending + * connect that will necessarily resolve them. + * + * Try to correspond an error with the acquisition failure, but as a fallback just use the + * representative error. + */ + size_t i = 0; + while (manager->pending_acquisition_count > manager->pending_connects_count) { + int error = representative_error; + if (i < aws_array_list_length(&errors)) { + aws_array_list_get_at(&errors, &error, i); + } + + s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error, completions); + ++i; + } + + aws_mutex_unlock(&manager->lock); + } + + s_aws_http_connection_manager_complete_acquisitions(completions, manager->allocator); + + aws_array_list_clean_up(&errors); +} + +void aws_http_connection_manager_acquire_connection( + struct aws_http_connection_manager *manager, + aws_http_connection_manager_on_connection_setup_fn *callback, + void *user_data) { + + struct aws_http_connection_acquisition *request = + aws_mem_acquire(manager->allocator, sizeof(struct aws_http_connection_acquisition)); + if (request == NULL) { + callback(NULL, aws_last_error(), user_data); + return; + } + + AWS_LOGF_DEBUG(AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Acquire connection", (void *)manager); + + AWS_ZERO_STRUCT(*request); + request->callback = callback; + request->user_data = user_data; + + struct aws_linked_list completions; + aws_linked_list_init(&completions); + + size_t new_connections = 0; + + struct aws_http_connection_manager_snapshot snapshot; + AWS_ZERO_STRUCT(snapshot); + + aws_mutex_lock(&manager->lock); + + if (manager->state == AWS_HCMST_READY) { + aws_linked_list_push_back(&manager->pending_acquisitions, &request->node); + ++manager->pending_acquisition_count; + + s_aws_http_connection_manager_build_task_set(manager, &completions, &new_connections); + } else { + AWS_LOGF_ERROR( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Acquire connection called when manager in shut down state", + (void *)manager); + + request->error_code = AWS_ERROR_HTTP_CONNECTION_MANAGER_INVALID_STATE_FOR_ACQUIRE; + aws_linked_list_push_back(&completions, &request->node); + + aws_raise_error(AWS_ERROR_HTTP_CONNECTION_MANAGER_INVALID_STATE_FOR_ACQUIRE); + } + + s_aws_http_connection_manager_get_snapshot(manager, &snapshot); + + aws_mutex_unlock(&manager->lock); + + s_aws_http_connection_manager_log_snapshot(manager, &snapshot); + + s_aws_http_connection_manager_execute_task_set(manager, &completions, new_connections); +} + +int aws_http_connection_manager_release_connection( + struct aws_http_connection_manager *manager, + struct aws_http_connection *connection) { + struct aws_linked_list completions; + aws_linked_list_init(&completions); + + bool should_destroy = false; + int result = AWS_OP_ERR; + size_t new_connections = 0; + bool should_release_connection = !manager->function_table->is_connection_open(connection); + + struct aws_http_connection_manager_snapshot snapshot; + AWS_ZERO_STRUCT(snapshot); + + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, "id=%p: Releasing connection (id=%p)", (void *)manager, (void *)connection); + + aws_mutex_lock(&manager->lock); + + /* We're probably hosed in this case, but let's not underflow */ + if (manager->vended_connection_count == 0) { + AWS_LOGF_FATAL( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Connection released when vended connection count is zero", + (void *)manager); + aws_raise_error(AWS_ERROR_HTTP_CONNECTION_MANAGER_VENDED_CONNECTION_UNDERFLOW); + goto release; + } + + result = AWS_OP_SUCCESS; + + --manager->vended_connection_count; + + if (!should_release_connection) { + if (aws_array_list_push_back(&manager->connections, &connection)) { + should_release_connection = true; + } + } + + s_aws_http_connection_manager_build_task_set(manager, &completions, &new_connections); + + /* + * This could be the last connection and we might have already gotten the release callback + * from http. In that case, this would be our last chance to detect a destroyable state. + */ + should_destroy = s_aws_http_connection_manager_should_destroy(manager); + s_aws_http_connection_manager_get_snapshot(manager, &snapshot); + +release: + + aws_mutex_unlock(&manager->lock); + + s_aws_http_connection_manager_log_snapshot(manager, &snapshot); + + s_aws_http_connection_manager_execute_task_set(manager, &completions, new_connections); + + if (should_release_connection) { + manager->function_table->release_connection(connection); + } + + if (should_destroy) { + s_aws_http_connection_manager_destroy(manager); + } + + return result; +} + +static void s_aws_http_connection_manager_on_connection_setup( + struct aws_http_connection *connection, + int error_code, + void *user_data) { + struct aws_http_connection_manager *manager = user_data; + + struct aws_linked_list completions; + aws_linked_list_init(&completions); + + size_t new_connections = 0; + + if (connection != NULL) { + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Received new connection (id=%p) from http layer", + (void *)manager, + (void *)connection); + } else { + AWS_LOGF_WARN( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Failed to obtain new connection from http layer, error %d(%s)", + (void *)manager, + error_code, + aws_error_str(error_code)); + } + + struct aws_http_connection_manager_snapshot snapshot; + AWS_ZERO_STRUCT(snapshot); + + aws_mutex_lock(&manager->lock); + + bool is_shutting_down = manager->state == AWS_HCMST_SHUTTING_DOWN; + + AWS_FATAL_ASSERT(manager->pending_connects_count > 0); + --manager->pending_connects_count; + + if (connection != NULL) { + if (!is_shutting_down) { + /* We reserved enough room for max_connections, this should never fail */ + AWS_FATAL_ASSERT(aws_array_list_push_back(&manager->connections, &connection) == AWS_OP_SUCCESS); + } + ++manager->open_connection_count; + } else { + /* + * To be safe, if we have an excess of pending acquisitions (beyond the number of pending + * connects), we need to fail all of the excess. Technically, we might be able to try and + * make a new connection, if there's room, but that could lead to some bad failure loops. + * + * This won't happen during shutdown since there are no pending acquisitions at that point. + */ + while (manager->pending_acquisition_count > manager->pending_connects_count) { + s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error_code, &completions); + } + } + + s_aws_http_connection_manager_build_task_set(manager, &completions, &new_connections); + + bool should_destroy = s_aws_http_connection_manager_should_destroy(manager); + s_aws_http_connection_manager_get_snapshot(manager, &snapshot); + + aws_mutex_unlock(&manager->lock); + + s_aws_http_connection_manager_log_snapshot(manager, &snapshot); + + s_aws_http_connection_manager_execute_task_set(manager, &completions, new_connections); + + if (is_shutting_down && connection != NULL) { + /* + * We didn't add the connection to the pool; just release it immediately + */ + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: New connection (id=%p) releasing immediately due to shutdown state", + (void *)manager, + (void *)connection); + manager->function_table->release_connection(connection); + } + + if (should_destroy) { + s_aws_http_connection_manager_destroy(manager); + } +} + +static void s_aws_http_connection_manager_on_connection_shutdown( + struct aws_http_connection *connection, + int error_code, + void *user_data) { + (void)error_code; + + bool should_release_connection = false; + + struct aws_http_connection_manager *manager = user_data; + + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: shutdown received for connection (id=%p)", + (void *)manager, + (void *)connection); + + struct aws_http_connection_manager_snapshot snapshot; + AWS_ZERO_STRUCT(snapshot); + + aws_mutex_lock(&manager->lock); + + AWS_FATAL_ASSERT(manager->open_connection_count > 0); + --manager->open_connection_count; + + size_t connection_count = aws_array_list_length(&manager->connections); + + /* + * Find and, if found, remove it from connections + */ + if (connection_count > 0) { + AWS_ASSERT(manager->state == AWS_HCMST_READY); + + struct aws_http_connection *last_connection = NULL; + AWS_FATAL_ASSERT( + aws_array_list_get_at(&manager->connections, &last_connection, connection_count - 1) == AWS_OP_SUCCESS); + + for (size_t i = 0; i < connection_count; ++i) { + struct aws_http_connection *current_connection = NULL; + aws_array_list_get_at(&manager->connections, ¤t_connection, i); + + if (current_connection == connection) { + should_release_connection = true; + aws_array_list_set_at(&manager->connections, &last_connection, i); + break; + } + } + + if (should_release_connection) { + aws_array_list_pop_back(&manager->connections); + } + } + + bool should_destroy = s_aws_http_connection_manager_should_destroy(manager); + s_aws_http_connection_manager_get_snapshot(manager, &snapshot); + + aws_mutex_unlock(&manager->lock); + + s_aws_http_connection_manager_log_snapshot(manager, &snapshot); + + if (should_release_connection) { + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION_MANAGER, + "id=%p: Releasing held connection (id=%p)", + (void *)manager, + (void *)connection); + manager->function_table->release_connection(connection); + } + + if (should_destroy) { + s_aws_http_connection_manager_destroy(manager); + } +} diff --git a/source/http.c b/source/http.c index 29c7153c3..a634f93e2 100644 --- a/source/http.c +++ b/source/http.c @@ -61,6 +61,12 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER, "Operation cannot be performed because websocket has been converted to a midchannel handler."), + AWS_DEFINE_ERROR_INFO_HTTP( + AWS_ERROR_HTTP_CONNECTION_MANAGER_INVALID_STATE_FOR_ACQUIRE, + "Acquire called after the connection manager's ref count has reached zero"), + AWS_DEFINE_ERROR_INFO_HTTP( + AWS_ERROR_HTTP_CONNECTION_MANAGER_VENDED_CONNECTION_UNDERFLOW, + "Release called when the connection manager's vended connection count was zero"), AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_END_RANGE, "Not a real error and should never be seen."), @@ -77,6 +83,7 @@ static struct aws_log_subject_info s_log_subject_infos[] = { DEFINE_LOG_SUBJECT_INFO(AWS_LS_HTTP_CONNECTION, "http-connection", "HTTP client or server connection"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_HTTP_SERVER, "http-server", "HTTP server socket listening for incoming connections"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_HTTP_STREAM, "http-stream", "HTTP request-response exchange"), + DEFINE_LOG_SUBJECT_INFO(AWS_LS_HTTP_CONNECTION_MANAGER, "connection-manager", "Http connection manager"), DEFINE_LOG_SUBJECT_INFO(AWS_LS_HTTP_WEBSOCKET, "websocket", "Websocket"), }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index faa7ec712..1b66d79c2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -125,6 +125,17 @@ add_test_case(hpack_static_table_get) add_test_case(hpack_dynamic_table_find) add_test_case(hpack_dynamic_table_get) +add_test_case(test_connection_manager_setup_shutdown) +add_test_case(test_connection_manager_single_connection) +add_test_case(test_connection_manager_many_connections) +add_test_case(test_connection_manager_acquire_release) +add_test_case(test_connection_manager_close_and_release) +add_test_case(test_connection_manager_acquire_release_mix) +add_test_case(test_connection_manager_acquire_release_mix_synchronous) +add_test_case(test_connection_manager_connect_callback_failure) +add_test_case(test_connection_manager_connect_immediate_failure) +add_test_case(test_connection_manager_success_then_cancel_pending_from_failure) + set(TEST_BINARY_NAME ${CMAKE_PROJECT_NAME}-tests) generate_test_driver(${TEST_BINARY_NAME}) diff --git a/tests/test_connection_manager.c b/tests/test_connection_manager.c new file mode 100644 index 000000000..67e65734c --- /dev/null +++ b/tests/test_connection_manager.c @@ -0,0 +1,587 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +enum new_connection_result_type { AWS_NCRT_SUCCESS, AWS_NCRT_ERROR_VIA_CALLBACK, AWS_NCRT_ERROR_FROM_CREATE }; + +struct mock_connection_proxy { + enum new_connection_result_type result; + bool is_closed_on_release; +}; + +struct cm_tester_options { + struct aws_allocator *allocator; + struct aws_http_connection_manager_function_table *mock_table; + size_t max_connections; +}; + +struct cm_tester { + struct aws_allocator *allocator; + struct aws_event_loop_group event_loop_group; + struct aws_host_resolver host_resolver; + + struct aws_client_bootstrap *client_bootstrap; + + struct aws_http_connection_manager *connection_manager; + + struct aws_tls_ctx *tls_ctx; + struct aws_tls_ctx_options tls_ctx_options; + struct aws_tls_connection_options tls_connection_options; + + struct aws_mutex lock; + struct aws_condition_variable signal; + + struct aws_array_list connections; + size_t connection_errors; + size_t connection_releases; + + size_t wait_for_connection_count; + + struct aws_http_connection_manager_function_table *mock_table; + + struct aws_atomic_var next_connection_id; + struct aws_array_list mock_connections; + aws_http_on_client_connection_shutdown_fn *release_connection_fn; +}; + +static struct cm_tester s_tester; + +int s_cm_tester_init(struct cm_tester_options *options) { + struct cm_tester *tester = &s_tester; + + AWS_ZERO_STRUCT(*tester); + + aws_tls_init_static_state(options->allocator); + aws_http_library_init(options->allocator); + aws_load_error_strings(); + aws_io_load_error_strings(); + + tester->allocator = options->allocator; + + ASSERT_SUCCESS( + aws_array_list_init_dynamic(&tester->connections, tester->allocator, 10, sizeof(struct aws_http_connection *))); + + ASSERT_SUCCESS(aws_event_loop_group_default_init(&tester->event_loop_group, tester->allocator, 1)); + ASSERT_SUCCESS( + aws_host_resolver_init_default(&tester->host_resolver, tester->allocator, 8, &tester->event_loop_group)); + tester->client_bootstrap = + aws_client_bootstrap_new(tester->allocator, &tester->event_loop_group, &tester->host_resolver, NULL); + ASSERT_NOT_NULL(tester->client_bootstrap); + + struct aws_socket_options socket_options = { + .type = AWS_SOCKET_STREAM, + .domain = AWS_SOCKET_IPV4, + .connect_timeout_ms = (uint32_t)aws_timestamp_convert(60, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_MILLIS, NULL), + }; + + aws_tls_ctx_options_init_default_client(&tester->tls_ctx_options, options->allocator); + + tester->tls_ctx = aws_tls_client_ctx_new(options->allocator, &tester->tls_ctx_options); + ASSERT_NOT_NULL(tester->tls_ctx); + + aws_tls_connection_options_init_from_ctx(&tester->tls_connection_options, tester->tls_ctx); + + struct aws_http_connection_manager_options cm_options = {.bootstrap = tester->client_bootstrap, + .initial_window_size = SIZE_MAX, + .socket_options = &socket_options, + .tls_connection_options = + NULL, //&tester->tls_connection_options, + .host = aws_byte_cursor_from_c_str("www.google.com"), + .port = 80, + .max_connections = options->max_connections}; + + tester->connection_manager = aws_http_connection_manager_new(tester->allocator, &cm_options); + ASSERT_NOT_NULL(tester->connection_manager); + + if (options->mock_table) { + aws_http_connection_manager_set_function_table(tester->connection_manager, options->mock_table); + } + + tester->mock_table = options->mock_table; + + aws_atomic_store_int(&tester->next_connection_id, 0); + + ASSERT_SUCCESS(aws_array_list_init_dynamic( + &tester->mock_connections, tester->allocator, 10, sizeof(struct mock_connection_proxy *))); + + return AWS_OP_SUCCESS; +} + +void s_add_mock_connections(size_t count, enum new_connection_result_type result, bool closed_on_release) { + struct cm_tester *tester = &s_tester; + + for (size_t i = 0; i < count; ++i) { + struct mock_connection_proxy *mock = aws_mem_acquire(tester->allocator, sizeof(struct mock_connection_proxy)); + AWS_ZERO_STRUCT(*mock); + + mock->result = result; + mock->is_closed_on_release = closed_on_release; + + aws_array_list_push_back(&tester->mock_connections, &mock); + } +} + +void s_release_connections(size_t count, bool close_first) { + + struct cm_tester *tester = &s_tester; + + aws_mutex_lock(&tester->lock); + + size_t release_count = aws_array_list_length(&tester->connections); + if (release_count > count) { + release_count = count; + } + + if (release_count == 0) { + return; + } + + struct aws_array_list to_release; + if (aws_array_list_init_dynamic( + &to_release, tester->allocator, release_count, sizeof(struct aws_http_connection *))) { + return; + } + + for (size_t i = 0; i < release_count; ++i) { + struct aws_http_connection *connection = NULL; + if (aws_array_list_back(&tester->connections, &connection)) { + continue; + } + + aws_array_list_pop_back(&tester->connections); + + aws_array_list_push_back(&to_release, &connection); + } + + aws_mutex_unlock(&tester->lock); + + for (size_t i = 0; i < aws_array_list_length(&to_release); ++i) { + struct aws_http_connection *connection = NULL; + if (aws_array_list_get_at(&to_release, &connection, i)) { + continue; + } + + if (close_first) { + if (tester->mock_table) { + tester->mock_table->close_connection(connection); + } else { + aws_http_connection_close(connection); + } + } + + aws_http_connection_manager_release_connection(tester->connection_manager, connection); + + aws_mutex_lock(&tester->lock); + ++tester->connection_releases; + aws_condition_variable_notify_one(&tester->signal); + aws_mutex_unlock(&tester->lock); + } + + aws_array_list_clean_up(&to_release); +} + +void s_on_acquire_connection(struct aws_http_connection *connection, int error_code, void *user_data) { + (void)error_code; + (void)user_data; + + struct cm_tester *tester = &s_tester; + + aws_mutex_lock(&tester->lock); + + if (connection == NULL) { + ++tester->connection_errors; + } else { + aws_array_list_push_back(&tester->connections, &connection); + } + + aws_condition_variable_notify_one(&tester->signal); + + aws_mutex_unlock(&tester->lock); +} + +static void s_acquire_connections(size_t count) { + struct cm_tester *tester = &s_tester; + + for (size_t i = 0; i < count; ++i) { + aws_http_connection_manager_acquire_connection(tester->connection_manager, s_on_acquire_connection, tester); + } +} + +static bool s_is_connection_reply_count_at_least(void *context) { + (void)context; + + struct cm_tester *tester = &s_tester; + + return tester->wait_for_connection_count <= + aws_array_list_length(&tester->connections) + tester->connection_errors + tester->connection_releases; +} + +static void s_wait_on_connection_reply_count(size_t count) { + struct cm_tester *tester = &s_tester; + + aws_mutex_lock(&tester->lock); + + tester->wait_for_connection_count = count; + aws_condition_variable_wait_pred(&tester->signal, &tester->lock, s_is_connection_reply_count_at_least, tester); + + aws_mutex_unlock(&tester->lock); +} + +void s_cm_tester_clean_up(void) { + struct cm_tester *tester = &s_tester; + + s_release_connections(aws_array_list_length(&tester->connections), false); + + aws_array_list_clean_up(&tester->connections); + + for (size_t i = 0; i < aws_array_list_length(&tester->mock_connections); ++i) { + struct mock_connection_proxy *mock = NULL; + + if (aws_array_list_get_at(&tester->mock_connections, &mock, i)) { + continue; + } + + aws_mem_release(tester->allocator, mock); + } + aws_array_list_clean_up(&tester->mock_connections); + + aws_http_connection_manager_release(tester->connection_manager); + + aws_client_bootstrap_release(tester->client_bootstrap); + + aws_host_resolver_clean_up(&tester->host_resolver); + aws_event_loop_group_clean_up(&tester->event_loop_group); + + aws_tls_ctx_options_clean_up(&tester->tls_ctx_options); + aws_tls_connection_options_clean_up(&tester->tls_connection_options); + aws_tls_ctx_destroy(tester->tls_ctx); + + aws_http_library_clean_up(); + aws_tls_clean_up_static_state(); +} + +static int s_test_connection_manager_setup_shutdown(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 5}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_setup_shutdown, s_test_connection_manager_setup_shutdown); + +static int s_test_connection_manager_single_connection(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 5}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_acquire_connections(1); + + s_wait_on_connection_reply_count(1); + + s_release_connections(1, false); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_single_connection, s_test_connection_manager_single_connection); + +static int s_test_connection_manager_many_connections(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 20}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_acquire_connections(20); + + s_wait_on_connection_reply_count(20); + + s_release_connections(20, false); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_many_connections, s_test_connection_manager_many_connections); + +static int s_test_connection_manager_acquire_release(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 4}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_acquire_connections(20); + + s_wait_on_connection_reply_count(4); + + for (size_t i = 4; i < 20; ++i) { + s_release_connections(1, false); + + s_wait_on_connection_reply_count(i + 1); + } + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_acquire_release, s_test_connection_manager_acquire_release); + +static int s_test_connection_manager_close_and_release(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 4}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_acquire_connections(20); + + s_wait_on_connection_reply_count(4); + + for (size_t i = 4; i < 20; ++i) { + s_release_connections(1, i % 1 == 0); + + s_wait_on_connection_reply_count(i + 1); + } + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_close_and_release, s_test_connection_manager_close_and_release); + +static int s_test_connection_manager_acquire_release_mix(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = {.allocator = allocator, .max_connections = 5}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + for (size_t i = 0; i < 10; ++i) { + s_acquire_connections(2); + + s_wait_on_connection_reply_count(i + 1); + + s_release_connections(1, i % 1 == 0); + } + + s_wait_on_connection_reply_count(15); + + for (size_t i = 15; i < 20; ++i) { + s_release_connections(1, i % 1 == 0); + + s_wait_on_connection_reply_count(i + 1); + } + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_acquire_release_mix, s_test_connection_manager_acquire_release_mix); + +static int s_aws_http_connection_manager_create_connection_sync_mock( + const struct aws_http_client_connection_options *options) { + struct cm_tester *tester = &s_tester; + + size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1); + + aws_mutex_lock(&tester->lock); + tester->release_connection_fn = options->on_shutdown; + aws_mutex_unlock(&tester->lock); + + struct mock_connection_proxy *connection = NULL; + + if (next_connection_id < aws_array_list_length(&tester->mock_connections)) { + aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id); + } + + if (connection) { + if (connection->result == AWS_NCRT_SUCCESS) { + options->on_setup((struct aws_http_connection *)connection, AWS_ERROR_SUCCESS, options->user_data); + } else if (connection->result == AWS_NCRT_ERROR_VIA_CALLBACK) { + options->on_setup(NULL, AWS_ERROR_HTTP_UNKNOWN, options->user_data); + } + + if (connection->result != AWS_NCRT_ERROR_FROM_CREATE) { + return AWS_OP_SUCCESS; + } + } + + return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN); +} + +static void s_aws_http_connection_manager_release_connection_sync_mock(struct aws_http_connection *connection) { + (void)connection; + + struct cm_tester *tester = &s_tester; + + tester->release_connection_fn(connection, AWS_ERROR_SUCCESS, tester->connection_manager); +} + +static void s_aws_http_connection_manager_close_connection_sync_mock(struct aws_http_connection *connection) { + (void)connection; +} + +static bool s_aws_http_connection_manager_is_connection_open_sync_mock(const struct aws_http_connection *connection) { + (void)connection; + + struct mock_connection_proxy *proxy = (struct mock_connection_proxy *)(void *)connection; + + return !proxy->is_closed_on_release; +} + +static struct aws_http_connection_manager_function_table s_synchronous_mocks = { + .create_connection = s_aws_http_connection_manager_create_connection_sync_mock, + .release_connection = s_aws_http_connection_manager_release_connection_sync_mock, + .close_connection = s_aws_http_connection_manager_close_connection_sync_mock, + .is_connection_open = s_aws_http_connection_manager_is_connection_open_sync_mock}; + +static int s_test_connection_manager_acquire_release_mix_synchronous(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = { + .allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + for (size_t i = 0; i < 20; ++i) { + s_add_mock_connections(1, AWS_NCRT_SUCCESS, i % 1 == 0); + } + + for (size_t i = 0; i < 10; ++i) { + s_acquire_connections(2); + + s_wait_on_connection_reply_count(i + 1); + + s_release_connections(1, false); + } + + s_wait_on_connection_reply_count(15); + + for (size_t i = 15; i < 20; ++i) { + s_release_connections(1, false); + + s_wait_on_connection_reply_count(i + 1); + } + + ASSERT_TRUE(s_tester.connection_errors == 0); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE( + test_connection_manager_acquire_release_mix_synchronous, + s_test_connection_manager_acquire_release_mix_synchronous); + +static int s_test_connection_manager_connect_callback_failure(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = { + .allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_add_mock_connections(5, AWS_NCRT_ERROR_VIA_CALLBACK, false); + + s_acquire_connections(5); + + s_wait_on_connection_reply_count(5); + + ASSERT_TRUE(s_tester.connection_errors == 5); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_connect_callback_failure, s_test_connection_manager_connect_callback_failure); + +static int s_test_connection_manager_connect_immediate_failure(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cm_tester_options options = { + .allocator = allocator, .max_connections = 5, .mock_table = &s_synchronous_mocks}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_add_mock_connections(5, AWS_NCRT_ERROR_FROM_CREATE, false); + + s_acquire_connections(5); + + s_wait_on_connection_reply_count(5); + + ASSERT_TRUE(s_tester.connection_errors == 5); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(test_connection_manager_connect_immediate_failure, s_test_connection_manager_connect_immediate_failure); + +static int s_test_connection_manager_success_then_cancel_pending_from_failure( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + struct cm_tester_options options = { + .allocator = allocator, .max_connections = 1, .mock_table = &s_synchronous_mocks}; + + ASSERT_SUCCESS(s_cm_tester_init(&options)); + + s_add_mock_connections(1, AWS_NCRT_SUCCESS, true); + s_add_mock_connections(1, AWS_NCRT_ERROR_FROM_CREATE, false); + + s_acquire_connections(5); + + s_wait_on_connection_reply_count(1); + + ASSERT_TRUE(s_tester.connection_errors == 0); + + s_release_connections(1, true); + + s_wait_on_connection_reply_count(5); + + ASSERT_TRUE(s_tester.connection_errors == 4); + + s_cm_tester_clean_up(); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE( + test_connection_manager_success_then_cancel_pending_from_failure, + s_test_connection_manager_success_then_cancel_pending_from_failure); \ No newline at end of file diff --git a/tests/test_websocket_encoder.c b/tests/test_websocket_encoder.c index 4bb2c93af..09888e936 100644 --- a/tests/test_websocket_encoder.c +++ b/tests/test_websocket_encoder.c @@ -153,6 +153,7 @@ ENCODER_TEST_CASE(websocket_encoder_rsv) { 0x89, // fin | rsv1 | rsv2 | rsv3 | 4bit opcode 0x00, // mask | 7bit payload len }; + expected_output[0] |= (1 << (6 - rsv)); tester.out_buf.len = 0; /* reset output buffer */