diff --git a/bin/elasticurl/main.c b/bin/elasticurl/main.c index 2fe32697f..69dbdfd74 100644 --- a/bin/elasticurl/main.c +++ b/bin/elasticurl/main.c @@ -504,6 +504,7 @@ static void s_on_signing_complete(struct aws_http_message *request, int error_co fprintf(stderr, "failed to create request."); exit(1); } + aws_http_stream_activate(stream); /* Connection will stay alive until stream completes */ aws_http_connection_release(app_ctx->connection); diff --git a/include/aws/http/connection.h b/include/aws/http/connection.h index dc8dfb5f3..0c8e290a2 100644 --- a/include/aws/http/connection.h +++ b/include/aws/http/connection.h @@ -210,9 +210,15 @@ struct aws_http_client_connection_options { aws_http_on_client_connection_shutdown_fn *on_shutdown; /** - * If set to true, read back pressure mechanism will be enabled. - */ - bool enable_read_back_pressure; + * Set to true to manually manage the read window size. + * + * If this is false, the connection will maintain a constant window size. + * + * If this is true, the caller must manually increment the window size using aws_http_stream_update_window(). + * If the window is not incremented, it will shrink by the amount of body data received. If the window size + * reaches 0, no further data will be received. + **/ + bool manual_window_management; }; /** diff --git a/include/aws/http/private/connection_impl.h b/include/aws/http/private/connection_impl.h index 9302d04e2..f8d2da255 100644 --- a/include/aws/http/private/connection_impl.h +++ b/include/aws/http/private/connection_impl.h @@ -76,7 +76,7 @@ struct aws_http_connection { struct aws_atomic_var refcount; /* Starts at either 1 or 2, increments by two with each new stream */ - struct aws_atomic_var next_stream_id; + uint32_t next_stream_id; union { struct aws_http_connection_client_data { @@ -93,6 +93,8 @@ struct aws_http_connection { * Opposite is true on server connections */ struct aws_http_connection_client_data *client_data; struct aws_http_connection_server_data *server_data; + + bool manual_window_management; }; /* Gets a client connection up and running. @@ -100,6 +102,7 @@ struct aws_http_connection { struct aws_http_client_bootstrap { struct aws_allocator *alloc; bool is_using_tls; + bool manual_window_management; size_t initial_window_size; struct aws_http_connection_monitoring_options monitoring_options; void *user_data; @@ -135,6 +138,8 @@ struct aws_crt_statistics_http1_channel *aws_h1_connection_get_statistics(struct /** * Gets the next available stream id within the connection. Valid for creating both h1 and h2 streams. * + * This function is not thread-safe. + * * Returns 0 if there was an error. */ AWS_HTTP_API diff --git a/include/aws/http/private/h1_connection.h b/include/aws/http/private/h1_connection.h index 686644815..d19acfaa2 100644 --- a/include/aws/http/private/h1_connection.h +++ b/include/aws/http/private/h1_connection.h @@ -23,11 +23,13 @@ AWS_EXTERN_C_BEGIN AWS_HTTP_API struct aws_http_connection *aws_http_connection_new_http1_1_server( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size); AWS_HTTP_API struct aws_http_connection *aws_http_connection_new_http1_1_client( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size); AWS_EXTERN_C_END diff --git a/include/aws/http/private/h1_stream.h b/include/aws/http/private/h1_stream.h index 5f32bec04..3c2a6a121 100644 --- a/include/aws/http/private/h1_stream.h +++ b/include/aws/http/private/h1_stream.h @@ -58,4 +58,8 @@ struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_re AWS_EXTERN_C_END +/* we don't want this exported. We just want it to have external linkage between h1_stream and h1_connection compilation + * units. it is defined in h1_connection.c */ +int aws_h1_stream_activate(struct aws_http_stream *stream); + #endif /* AWS_HTTP_H1_STREAM_H */ diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index 856c3dd27..5aa934b1d 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -84,11 +84,13 @@ AWS_EXTERN_C_BEGIN AWS_HTTP_API struct aws_http_connection *aws_http_connection_new_http2_server( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size); AWS_HTTP_API struct aws_http_connection *aws_http_connection_new_http2_client( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size); AWS_EXTERN_C_END diff --git a/include/aws/http/private/h2_stream.h b/include/aws/http/private/h2_stream.h index b1cc56d2b..0f8f5a9a1 100644 --- a/include/aws/http/private/h2_stream.h +++ b/include/aws/http/private/h2_stream.h @@ -75,4 +75,6 @@ enum aws_h2_stream_state aws_h2_stream_get_state(const struct aws_h2_stream *str /* Connection is ready to send frames from stream now */ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgoing_data); +int aws_h2_stream_activate(struct aws_http_stream *stream); + #endif /* AWS_HTTP_H2_STREAM_H */ diff --git a/include/aws/http/private/request_response_impl.h b/include/aws/http/private/request_response_impl.h index b5f30bd70..c6f82493d 100644 --- a/include/aws/http/private/request_response_impl.h +++ b/include/aws/http/private/request_response_impl.h @@ -25,6 +25,7 @@ struct aws_http_stream_vtable { void (*destroy)(struct aws_http_stream *stream); void (*update_window)(struct aws_http_stream *stream, size_t increment_size); + int (*activate)(struct aws_http_stream *stream); }; /** diff --git a/include/aws/http/private/websocket_impl.h b/include/aws/http/private/websocket_impl.h index 07a31188f..5a675f208 100644 --- a/include/aws/http/private/websocket_impl.h +++ b/include/aws/http/private/websocket_impl.h @@ -85,6 +85,7 @@ struct aws_websocket_client_bootstrap_system_vtable { struct aws_http_stream *(*aws_http_connection_make_request)( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options); + int (*aws_http_stream_activate)(struct aws_http_stream *stream); void (*aws_http_stream_release)(struct aws_http_stream *stream); struct aws_http_connection *(*aws_http_stream_get_connection)(const struct aws_http_stream *stream); int (*aws_http_stream_get_incoming_response_status)(const struct aws_http_stream *stream, int *out_status); diff --git a/include/aws/http/request_response.h b/include/aws/http/request_response.h index aeada382d..13a6b9bb6 100644 --- a/include/aws/http/request_response.h +++ b/include/aws/http/request_response.h @@ -171,7 +171,7 @@ typedef int(aws_http_on_incoming_header_block_done_fn)( * The data must be copied immediately if you wish to preserve it. * This is always invoked on the HTTP connection's event-loop thread. * - * Note that, if the stream is using manual_window_management then the window + * Note that, if the connection is using manual_window_management then the window * size has shrunk by the amount of body data received. If the window size * reaches 0 no further data will be received. Increment the window size with * aws_http_stream_update_window(). @@ -243,17 +243,6 @@ struct aws_http_make_request_options { * See `aws_http_on_stream_complete_fn`. */ aws_http_on_stream_complete_fn *on_complete; - - /** - * Set to true to manually manage the read window size. - * - * If this is false, the connection will maintain a constant window size. - * - * If this is true, the caller must manually increment the window size using aws_http_stream_update_window(). - * If the window is not incremented, it will shrink by the amount of body data received. If the window size - * reaches 0, no further data will be received. - */ - bool manual_window_management; }; struct aws_http_request_handler_options { @@ -305,17 +294,6 @@ struct aws_http_request_handler_options { * See `aws_http_on_stream_complete_fn`. */ aws_http_on_stream_complete_fn *on_complete; - - /** - * Set to true to manually manage the read window size. - * - * If this is false, the connection will maintain a constant window size. - * - * If this is true, the caller must manually increment the window size using aws_http_stream_update_window(). - * If the window is not incremented, it will shrink by the amount of body data received. If the window size - * reaches 0, no further data will be received. - */ - bool manual_window_management; }; #define AWS_HTTP_REQUEST_HANDLER_OPTIONS_INIT \ @@ -634,7 +612,9 @@ int aws_http_message_erase_header(struct aws_http_message *message, size_t index /** * Create a stream, with a client connection sending a request. - * The request starts sending automatically once the stream is created. + * The request does not start sending automatically once the stream is created. You must call + * aws_http_stream_activate to begin execution of the request. + * * The `options` are copied during this call. * * Tip for language bindings: Do not bind the `options` struct. Use something more natural for your language, @@ -664,6 +644,13 @@ struct aws_http_stream *aws_http_stream_new_server_request_handler( AWS_HTTP_API void aws_http_stream_release(struct aws_http_stream *stream); +/** + * Only used for client initiated streams (immediately following a call to aws_http_connection_make_request). + * + * Activates the request's outgoing stream processing. + */ +AWS_HTTP_API int aws_http_stream_activate(struct aws_http_stream *stream); + AWS_HTTP_API struct aws_http_connection *aws_http_stream_get_connection(const struct aws_http_stream *stream); @@ -697,7 +684,8 @@ void aws_http_stream_update_window(struct aws_http_stream *stream, size_t increm /** * Gets the Http/2 id associated with a stream. Even h1 streams have an id (using the same allocation procedure - * as http/2) for easier tracking purposes. + * as http/2) for easier tracking purposes. For client streams, this will only be non-zero after a successful call + * to aws_http_stream_activate() */ uint32_t aws_http_stream_get_id(struct aws_http_stream *stream); diff --git a/include/aws/http/server.h b/include/aws/http/server.h index ef9eadfcd..d935165a4 100644 --- a/include/aws/http/server.h +++ b/include/aws/http/server.h @@ -105,9 +105,15 @@ struct aws_http_server_options { aws_http_server_on_destroy_fn *on_destroy_complete; /** - * If set to true, read back pressure mechanism will be enabled. - */ - bool enable_read_back_pressure; + * Set to true to manually manage the read window size. + * + * If this is false, the connection will maintain a constant window size. + * + * If this is true, the caller must manually increment the window size using aws_http_stream_update_window(). + * If the window is not incremented, it will shrink by the amount of body data received. If the window size + * reaches 0, no further data will be received. + **/ + bool manual_window_management; }; /** diff --git a/source/connection.c b/source/connection.c index c8b3b485a..a9ed07187 100644 --- a/source/connection.c +++ b/source/connection.c @@ -51,6 +51,7 @@ struct aws_http_server { struct aws_allocator *alloc; struct aws_server_bootstrap *bootstrap; bool is_using_tls; + bool manual_window_management; size_t initial_window_size; void *user_data; aws_http_server_on_incoming_connection_fn *on_incoming_connection; @@ -83,6 +84,7 @@ static struct aws_http_connection *s_connection_new( struct aws_channel *channel, bool is_server, bool is_using_tls, + bool manual_window_management, size_t initial_window_size) { struct aws_channel_slot *connection_slot = NULL; @@ -146,17 +148,19 @@ static struct aws_http_connection *s_connection_new( switch (version) { case AWS_HTTP_VERSION_1_1: if (is_server) { - connection = aws_http_connection_new_http1_1_server(alloc, initial_window_size); + connection = + aws_http_connection_new_http1_1_server(alloc, manual_window_management, initial_window_size); } else { - connection = aws_http_connection_new_http1_1_client(alloc, initial_window_size); + connection = + aws_http_connection_new_http1_1_client(alloc, manual_window_management, initial_window_size); } break; case AWS_HTTP_VERSION_2: AWS_FATAL_ASSERT(false && "H2 is not currently supported"); /* lol nice try */ if (is_server) { - connection = aws_http_connection_new_http2_server(alloc, initial_window_size); + connection = aws_http_connection_new_http2_server(alloc, manual_window_management, initial_window_size); } else { - connection = aws_http_connection_new_http2_client(alloc, initial_window_size); + connection = aws_http_connection_new_http2_client(alloc, manual_window_management, initial_window_size); } break; default: @@ -293,7 +297,13 @@ static void s_server_bootstrap_on_accept_channel_setup( goto error; } /* Create connection */ - connection = s_connection_new(server->alloc, channel, true, server->is_using_tls, server->initial_window_size); + connection = s_connection_new( + server->alloc, + channel, + true, + server->is_using_tls, + server->manual_window_management, + server->initial_window_size); if (!connection) { AWS_LOGF_ERROR( AWS_LS_HTTP_SERVER, @@ -466,6 +476,7 @@ struct aws_http_server *aws_http_server_new(const struct aws_http_server_options server->user_data = options->server_user_data; server->on_incoming_connection = options->on_incoming_connection; server->on_destroy_complete = options->on_destroy_complete; + server->manual_window_management = options->manual_window_management; int err = aws_mutex_init(&server->synced_data.lock); if (err) { @@ -490,7 +501,7 @@ struct aws_http_server *aws_http_server_new(const struct aws_http_server_options } struct aws_server_socket_channel_bootstrap_options bootstrap_options = { - .enable_read_back_pressure = options->enable_read_back_pressure, + .enable_read_back_pressure = options->manual_window_management, .tls_options = options->tls_options, .bootstrap = options->bootstrap, .socket_options = options->socket_options, @@ -613,7 +624,12 @@ static void s_client_bootstrap_on_channel_setup( AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "static: Socket connected, creating client connection object."); http_bootstrap->connection = s_connection_new( - http_bootstrap->alloc, channel, false, http_bootstrap->is_using_tls, http_bootstrap->initial_window_size); + http_bootstrap->alloc, + channel, + false, + http_bootstrap->is_using_tls, + http_bootstrap->manual_window_management, + http_bootstrap->initial_window_size); if (!http_bootstrap->connection) { AWS_LOGF_ERROR( AWS_LS_HTTP_CONNECTION, @@ -743,6 +759,7 @@ int aws_http_client_connect_internal( http_bootstrap->alloc = options->allocator; http_bootstrap->is_using_tls = options->tls_options != NULL; + http_bootstrap->manual_window_management = options->manual_window_management; http_bootstrap->initial_window_size = options->initial_window_size; http_bootstrap->user_data = options->user_data; http_bootstrap->on_setup = options->on_setup; @@ -766,7 +783,7 @@ int aws_http_client_connect_internal( .tls_options = options->tls_options, .setup_callback = s_client_bootstrap_on_channel_setup, .shutdown_callback = s_client_bootstrap_on_channel_shutdown, - .enable_read_back_pressure = options->enable_read_back_pressure, + .enable_read_back_pressure = options->manual_window_management, .user_data = http_bootstrap, }; @@ -845,13 +862,15 @@ static const uint32_t MAX_STREAM_ID = UINT32_MAX >> 1; uint32_t aws_http_connection_get_next_stream_id(struct aws_http_connection *connection) { - uint32_t next_id = (uint32_t)aws_atomic_fetch_add(&connection->next_stream_id, 2); - /* If next fetch would overflow next_stream_id, set it to 0 */ + uint32_t next_id = connection->next_stream_id; + if (AWS_UNLIKELY(next_id > MAX_STREAM_ID)) { AWS_LOGF_INFO(AWS_LS_HTTP_CONNECTION, "id=%p: All available stream ids are gone", (void *)connection); next_id = 0; aws_raise_error(AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED); + } else { + connection->next_stream_id += 2; } return next_id; diff --git a/source/connection_manager.c b/source/connection_manager.c index cce7f11d9..4e48fd2de 100644 --- a/source/connection_manager.c +++ b/source/connection_manager.c @@ -691,7 +691,7 @@ static int s_aws_http_connection_manager_new_connection(struct aws_http_connecti options.socket_options = &manager->socket_options; options.on_setup = s_aws_http_connection_manager_on_connection_setup; options.on_shutdown = s_aws_http_connection_manager_on_connection_shutdown; - options.enable_read_back_pressure = manager->enable_read_back_pressure; + options.manual_window_management = manager->enable_read_back_pressure; if (aws_http_connection_monitoring_options_is_valid(&manager->monitoring_options)) { options.monitoring_options = &manager->monitoring_options; diff --git a/source/h1_connection.c b/source/h1_connection.c index a7d22aef4..6e56c5fcb 100644 --- a/source/h1_connection.c +++ b/source/h1_connection.c @@ -418,6 +418,52 @@ static void s_connection_update_window(struct aws_http_connection *connection_ba } } +int aws_h1_stream_activate(struct aws_http_stream *stream) { + struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base); + + struct aws_http_connection *base_connection = stream->owning_connection; + struct h1_connection *connection = AWS_CONTAINER_OF(base_connection, struct h1_connection, base); + + bool should_schedule_task = false; + + { /* BEGIN CRITICAL SECTION */ + s_h1_connection_lock_synced_data(connection); + + if (stream->id) { + /* stream has already been activated. */ + s_h1_connection_unlock_synced_data(connection); + return AWS_OP_SUCCESS; + } + + stream->id = aws_http_connection_get_next_stream_id(base_connection); + + if (stream->id) { + aws_linked_list_push_back(&connection->synced_data.new_client_stream_list, &h1_stream->node); + if (!connection->synced_data.is_outgoing_stream_task_active) { + connection->synced_data.is_outgoing_stream_task_active = true; + should_schedule_task = true; + } + } + + s_h1_connection_unlock_synced_data(connection); + } /* END CRITICAL SECTION */ + + if (!stream->id) { + /* aws_http_connection_get_next_stream_id() raises its own error. */ + return AWS_OP_ERR; + } + + /* activate one more time now that the connection can actually run the stream. */ + aws_atomic_fetch_add(&stream->refcount, 1); + + if (should_schedule_task) { + AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Scheduling outgoing stream task.", (void *)&connection->base); + aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->outgoing_stream_task); + } + + return AWS_OP_SUCCESS; +} + struct aws_http_stream *s_make_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options) { @@ -437,21 +483,12 @@ struct aws_http_stream *s_make_request( /* Insert new stream into pending list, and schedule outgoing_stream_task if it's not already running. */ int new_stream_error_code = AWS_ERROR_SUCCESS; - bool should_schedule_task = false; { /* BEGIN CRITICAL SECTION */ s_h1_connection_lock_synced_data(connection); - if (connection->synced_data.new_stream_error_code) { new_stream_error_code = connection->synced_data.new_stream_error_code; - } else { - aws_linked_list_push_back(&connection->synced_data.new_client_stream_list, &stream->node); - if (!connection->synced_data.is_outgoing_stream_task_active) { - connection->synced_data.is_outgoing_stream_task_active = true; - should_schedule_task = true; - } } - s_h1_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ @@ -482,11 +519,6 @@ struct aws_http_stream *s_make_request( AWS_BYTE_CURSOR_PRI(path), AWS_BYTE_CURSOR_PRI(aws_http_version_to_str(connection->base.http_version))); - if (should_schedule_task) { - AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Scheduling outgoing stream task.", (void *)&connection->base); - aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->outgoing_stream_task); - } - return &stream->base; error: @@ -1208,7 +1240,11 @@ static int s_decoder_on_done(void *user_data) { } /* Common new() logic for server & client */ -static struct h1_connection *s_connection_new(struct aws_allocator *alloc, size_t initial_window_size, bool server) { +static struct h1_connection *s_connection_new( + struct aws_allocator *alloc, + bool manual_window_management, + size_t initial_window_size, + bool server) { struct h1_connection *connection = aws_mem_calloc(alloc, 1, sizeof(struct h1_connection)); if (!connection) { @@ -1222,9 +1258,10 @@ static struct h1_connection *s_connection_new(struct aws_allocator *alloc, size_ connection->base.channel_handler.impl = connection; connection->base.http_version = AWS_HTTP_VERSION_1_1; connection->base.initial_window_size = initial_window_size; + connection->base.manual_window_management = manual_window_management; /* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/ - aws_atomic_init_int(&connection->base.next_stream_id, (server ? 2 : 1)); + connection->base.next_stream_id = server ? 2 : 1; /* 1 refcount for user */ aws_atomic_init_int(&connection->base.refcount, 1); @@ -1282,9 +1319,10 @@ static struct h1_connection *s_connection_new(struct aws_allocator *alloc, size_ struct aws_http_connection *aws_http_connection_new_http1_1_server( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size) { - struct h1_connection *connection = s_connection_new(allocator, initial_window_size, true); + struct h1_connection *connection = s_connection_new(allocator, manual_window_management, initial_window_size, true); if (!connection) { return NULL; } @@ -1296,9 +1334,11 @@ struct aws_http_connection *aws_http_connection_new_http1_1_server( struct aws_http_connection *aws_http_connection_new_http1_1_client( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size) { - struct h1_connection *connection = s_connection_new(allocator, initial_window_size, false); + struct h1_connection *connection = + s_connection_new(allocator, manual_window_management, initial_window_size, false); if (!connection) { return NULL; } diff --git a/source/h1_stream.c b/source/h1_stream.c index fb809571e..d5c56fd1a 100644 --- a/source/h1_stream.c +++ b/source/h1_stream.c @@ -12,10 +12,12 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -#include #include -#include +#include +#include + +#include #include static void s_stream_destroy(struct aws_http_stream *stream_base) { @@ -33,6 +35,7 @@ static void s_stream_update_window(struct aws_http_stream *stream, size_t increm static const struct aws_http_stream_vtable s_stream_vtable = { .destroy = s_stream_destroy, .update_window = s_stream_update_window, + .activate = aws_h1_stream_activate, }; static struct aws_h1_stream *s_stream_new_common( @@ -44,12 +47,6 @@ static struct aws_h1_stream *s_stream_new_common( aws_http_on_incoming_body_fn *on_incoming_body, aws_http_on_stream_complete_fn on_complete) { - uint32_t stream_id = aws_http_connection_get_next_stream_id(owning_connection); - if (stream_id == 0) { - /* stream id exhausted error was already raised*/ - return NULL; - } - struct aws_h1_stream *stream = aws_mem_calloc(owning_connection->alloc, 1, sizeof(struct aws_h1_stream)); if (!stream) { return NULL; @@ -64,10 +61,9 @@ static struct aws_h1_stream *s_stream_new_common( stream->base.on_incoming_header_block_done = on_incoming_header_block_done; stream->base.on_incoming_body = on_incoming_body; stream->base.on_complete = on_complete; - stream->base.id = stream_id; - /* Stream refcount starts at 2. 1 for user and 1 for connection to release it's done with the stream */ - aws_atomic_init_int(&stream->base.refcount, 2); + /* Stream refcount starts at 1 for user and is incremented upon activation for the connection */ + aws_atomic_init_int(&stream->base.refcount, 1); return stream; } @@ -78,7 +74,7 @@ struct aws_h1_stream *aws_h1_stream_new_request( struct aws_h1_stream *stream = s_stream_new_common( client_connection, - options->manual_window_management, + client_connection->manual_window_management, options->user_data, options->on_response_headers, options->on_response_header_block_done, @@ -121,7 +117,7 @@ struct aws_h1_stream *aws_h1_stream_new_request( struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_request_handler_options *options) { struct aws_h1_stream *stream = s_stream_new_common( options->server_connection, - options->manual_window_management, + options->server_connection->manual_window_management, options->user_data, options->on_request_headers, options->on_request_header_block_done, @@ -131,8 +127,13 @@ struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_re return NULL; } + /* This code is only executed in server mode and can only be invoked from the event-loop thread so don't worry + * with the lock here. */ + stream->base.id = aws_http_connection_get_next_stream_id(options->server_connection); + stream->base.server_data = &stream->base.client_or_server_data.server; stream->base.server_data->on_request_done = options->on_request_done; + aws_atomic_fetch_add(&stream->base.refcount, 1); return stream; } diff --git a/source/h2_connection.c b/source/h2_connection.c index bbdcd551c..f06f004e0 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -158,6 +158,7 @@ static void s_shutdown_due_to_write_err(struct aws_h2_connection *connection, in /* Common new() logic for server & client */ static struct aws_h2_connection *s_connection_new( struct aws_allocator *alloc, + bool manual_window_management, size_t initial_window_size, bool server) { @@ -176,7 +177,8 @@ static struct aws_h2_connection *s_connection_new( connection->base.http_version = AWS_HTTP_VERSION_2; connection->base.initial_window_size = initial_window_size; /* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/ - aws_atomic_init_int(&connection->base.next_stream_id, (server ? 2 : 1)); + connection->base.next_stream_id = (server ? 2 : 1); + connection->base.manual_window_management = manual_window_management; aws_channel_task_init( &connection->cross_thread_work_task, s_cross_thread_work_task, connection, "HTTP/2 cross-thread work"); @@ -238,9 +240,11 @@ static struct aws_h2_connection *s_connection_new( struct aws_http_connection *aws_http_connection_new_http2_server( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size) { - struct aws_h2_connection *connection = s_connection_new(allocator, initial_window_size, true); + struct aws_h2_connection *connection = + s_connection_new(allocator, manual_window_management, initial_window_size, true); if (!connection) { return NULL; } @@ -252,9 +256,11 @@ struct aws_http_connection *aws_http_connection_new_http2_server( struct aws_http_connection *aws_http_connection_new_http2_client( struct aws_allocator *allocator, + bool manual_window_management, size_t initial_window_size) { - struct aws_h2_connection *connection = s_connection_new(allocator, initial_window_size, false); + struct aws_h2_connection *connection = + s_connection_new(allocator, manual_window_management, initial_window_size, false); if (!connection) { return NULL; } @@ -657,6 +663,8 @@ static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h goto error; } + aws_atomic_fetch_add(&stream->base.refcount, 1); + if (has_outgoing_data) { aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node); } @@ -702,6 +710,46 @@ static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, e s_try_write_outgoing_frames(connection); } +int aws_h2_stream_activate(struct aws_http_stream *stream) { + struct aws_h2_stream *h2_stream = AWS_CONTAINER_OF(stream, struct aws_h2_stream, base); + + struct aws_http_connection *base_connection = stream->owning_connection; + struct aws_h2_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h2_connection, base); + + bool was_cross_thread_work_scheduled = false; + { /* BEGIN CRITICAL SECTION */ + s_lock_synced_data(connection); + + if (stream->id) { + /* stream has already been activated. */ + s_unlock_synced_data(connection); + return AWS_OP_SUCCESS; + } + + stream->id = aws_http_connection_get_next_stream_id(base_connection); + + if (stream->id) { + was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled; + connection->synced_data.is_cross_thread_work_task_scheduled = true; + + aws_linked_list_push_back(&connection->synced_data.pending_stream_list, &h2_stream->node); + } + s_unlock_synced_data(connection); + } /* END CRITICAL SECTION */ + + if (!stream->id) { + /* aws_http_connection_get_next_stream_id() raises its own error. */ + return AWS_OP_ERR; + } + + if (!was_cross_thread_work_scheduled) { + CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task"); + aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task); + } + + return AWS_OP_SUCCESS; +} + static struct aws_http_stream *s_connection_make_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options) { @@ -723,22 +771,13 @@ static struct aws_http_stream *s_connection_make_request( } int new_stream_error_code = AWS_ERROR_SUCCESS; - bool was_cross_thread_work_scheduled = false; { /* BEGIN CRITICAL SECTION */ s_lock_synced_data(connection); if (connection->synced_data.new_stream_error_code) { new_stream_error_code = connection->synced_data.new_stream_error_code; - goto unlock; } - /* success */ - was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled; - connection->synced_data.is_cross_thread_work_task_scheduled = true; - - aws_linked_list_push_back(&connection->synced_data.pending_stream_list, &stream->node); - - unlock: s_unlock_synced_data(connection); } /* END CRITICAL SECTION */ @@ -753,11 +792,6 @@ static struct aws_http_stream *s_connection_make_request( goto error; } - if (!was_cross_thread_work_scheduled) { - CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task"); - aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task); - } - AWS_H2_STREAM_LOG(DEBUG, stream, "Created HTTP/2 request stream"); /* #TODO: print method & path */ return &stream->base; diff --git a/source/h2_stream.c b/source/h2_stream.c index 0b9c3fee5..879dd9336 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -25,6 +25,7 @@ static void s_stream_destroy(struct aws_http_stream *stream_base); struct aws_http_stream_vtable s_h2_stream_vtable = { .destroy = s_stream_destroy, .update_window = NULL, + .activate = aws_h2_stream_activate, }; const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state) { @@ -63,12 +64,6 @@ struct aws_h2_stream *aws_h2_stream_new_request( AWS_PRECONDITION(client_connection); AWS_PRECONDITION(options); - /* #TODO optimization: don't make use of atomic here. have connection assign from connection->synced_data */ - uint32_t stream_id = aws_http_connection_get_next_stream_id(client_connection); - if (stream_id == 0) { - return NULL; - } - struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream)); if (!stream) { return NULL; @@ -83,10 +78,9 @@ struct aws_h2_stream *aws_h2_stream_new_request( stream->base.on_incoming_header_block_done = options->on_response_header_block_done; stream->base.on_incoming_body = options->on_response_body; stream->base.on_complete = options->on_complete; - stream->base.id = stream_id; - /* Stream refcount starts at 2. 1 for user and 1 for connection to release when it's done with the stream */ - aws_atomic_init_int(&stream->base.refcount, 2); + /* Stream refcount starts at 1, and gets incremented again for the connection upon a call to activate() */ + aws_atomic_init_int(&stream->base.refcount, 1); /* Init H2 specific stuff */ stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE; diff --git a/source/proxy_connection.c b/source/proxy_connection.c index 3b90b9cfa..d3a0cb0f1 100644 --- a/source/proxy_connection.c +++ b/source/proxy_connection.c @@ -502,6 +502,8 @@ static int s_make_proxy_connect_request( user_data->connect_stream = stream; user_data->connect_request = request; + aws_http_stream_activate(stream); + return AWS_OP_SUCCESS; on_error: diff --git a/source/request_response.c b/source/request_response.c index 5f9fbbf20..a3c1bff77 100644 --- a/source/request_response.c +++ b/source/request_response.c @@ -638,6 +638,16 @@ struct aws_http_stream *aws_http_connection_make_request( return stream; } +int aws_http_stream_activate(struct aws_http_stream *stream) { + AWS_PRECONDITION(stream); + AWS_PRECONDITION(stream->vtable); + AWS_PRECONDITION(stream->vtable->activate); + /* make sure it's actually a client calling us. This is always a programmer bug, so just assert and die. */ + AWS_PRECONDITION(aws_http_connection_is_client(stream->owning_connection)); + + return stream->vtable->activate(stream); +} + struct aws_http_stream *aws_http_stream_new_server_request_handler( const struct aws_http_request_handler_options *options) { AWS_PRECONDITION(options); diff --git a/source/websocket_bootstrap.c b/source/websocket_bootstrap.c index 72b201490..f3e561b02 100644 --- a/source/websocket_bootstrap.c +++ b/source/websocket_bootstrap.c @@ -36,6 +36,7 @@ static const struct aws_websocket_client_bootstrap_system_vtable s_default_syste .aws_http_connection_close = aws_http_connection_close, .aws_http_connection_get_channel = aws_http_connection_get_channel, .aws_http_connection_make_request = aws_http_connection_make_request, + .aws_http_stream_activate = aws_http_stream_activate, .aws_http_stream_release = aws_http_stream_release, .aws_http_stream_get_connection = aws_http_stream_get_connection, .aws_http_stream_get_incoming_response_status = aws_http_stream_get_incoming_response_status, @@ -317,7 +318,8 @@ static void s_ws_bootstrap_on_http_setup(struct aws_http_connection *http_connec struct aws_http_stream *handshake_stream = s_system_vtable->aws_http_connection_make_request(http_connection, &options); - if (!handshake_stream) { + + if (!handshake_stream || s_system_vtable->aws_http_stream_activate(handshake_stream)) { AWS_LOGF_ERROR( AWS_LS_HTTP_WEBSOCKET_SETUP, "id=%p: Failed to initiate websocket upgrade request, error %d (%s).", diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb2a1437b..66263dcb8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -80,7 +80,6 @@ add_test_case(h1_client_response_arrives_before_request_done_sending_is_ok) add_test_case(h1_client_response_without_request_shuts_down_connection) add_test_case(h1_client_response_close_header_ends_connection) add_test_case(h1_client_response_close_header_with_pipelining) -add_test_case(h1_client_window_reopens_by_default) add_test_case(h1_client_window_shrinks_if_user_says_so) add_test_case(h1_client_window_manual_update) add_test_case(h1_client_window_manual_update_off_thread) @@ -93,6 +92,7 @@ add_test_case(h1_client_error_from_incoming_headers_done_callback_stops_decoder) add_test_case(h1_client_error_from_incoming_body_callback_stops_decoder) add_test_case(h1_client_close_from_off_thread_makes_not_open) add_test_case(h1_client_close_from_on_thread_makes_not_open) +add_test_case(h1_client_unactivated_stream_cleans_up) add_test_case(h1_client_midchannel_sanity_check) add_test_case(h1_client_midchannel_read) add_test_case(h1_client_midchannel_read_immediately) @@ -299,6 +299,7 @@ add_h2_decoder_test_set(h2_decoder_err_bad_preface_from_client_3) add_test_case(h2_client_sanity_check) add_test_case(h2_client_request_create) +add_test_case(h2_client_unactivated_stream_cleans_up) add_test_case(h2_client_connection_preface_sent) add_test_case(h2_client_ping_ack) diff --git a/tests/integration_test_proxy.c b/tests/integration_test_proxy.c index 1bb4c50b8..9463c69a1 100644 --- a/tests/integration_test_proxy.c +++ b/tests/integration_test_proxy.c @@ -146,7 +146,8 @@ static int s_do_proxy_request_test( }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.client_connection, &request_options); - (void)stream; + ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); ASSERT_SUCCESS(proxy_tester_wait(&tester, proxy_tester_request_complete_pred_fn)); ASSERT_TRUE(tester.wait_result == AWS_ERROR_SUCCESS); diff --git a/tests/proxy_test_helper.c b/tests/proxy_test_helper.c index d71d30111..9dc5e51d3 100644 --- a/tests/proxy_test_helper.c +++ b/tests/proxy_test_helper.c @@ -256,7 +256,7 @@ int proxy_tester_create_testing_channel_connection(struct proxy_tester *tester) /* Use small window so that we can observe it opening in tests. * Channel may wait until the window is small before issuing the increment command. */ - struct aws_http_connection *connection = aws_http_connection_new_http1_1_client(tester->alloc, 256); + struct aws_http_connection *connection = aws_http_connection_new_http1_1_client(tester->alloc, true, 256); ASSERT_NOT_NULL(connection); connection->user_data = tester->http_bootstrap->user_data; diff --git a/tests/test_connection_monitor.c b/tests/test_connection_monitor.c index 4a1428e6d..4018c961c 100644 --- a/tests/test_connection_monitor.c +++ b/tests/test_connection_monitor.c @@ -117,9 +117,9 @@ static int s_init_monitor_test(struct aws_allocator *allocator, struct aws_crt_s s_test_context.test_channel.channel_shutdown = s_testing_channel_shutdown_callback; s_test_context.test_channel.channel_shutdown_user_data = &s_test_context; - struct aws_http_connection *connection = aws_http_connection_new_http1_1_client(allocator, SIZE_MAX); + struct aws_http_connection *connection = aws_http_connection_new_http1_1_client(allocator, true, SIZE_MAX); ASSERT_NOT_NULL(connection); - aws_atomic_init_int(&connection->next_stream_id, 1); + connection->next_stream_id = 1; struct aws_channel_slot *slot = aws_channel_slot_new(s_test_context.test_channel.channel); ASSERT_NOT_NULL(slot); @@ -1102,6 +1102,7 @@ static void s_add_outgoing_stream(struct test_http_stats_event *event) { request_options.user_data = (void *)aws_array_list_length(&s_test_context.requests); request_info.stream = aws_http_connection_make_request(s_test_context.connection, &request_options); + aws_http_stream_activate(request_info.stream); aws_array_list_push_back(&s_test_context.requests, &request_info); } diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index 54457c369..5c0936d05 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -52,6 +52,7 @@ struct tester { struct testing_channel testing_channel; struct aws_http_connection *connection; struct aws_logger logger; + bool manual_window_management; }; static int s_tester_init(struct tester *tester, struct aws_allocator *alloc) { @@ -73,7 +74,7 @@ static int s_tester_init(struct tester *tester, struct aws_allocator *alloc) { /* Use small window so that we can observe it opening in tests. * Channel may wait until the window is small before issuing the increment command. */ - tester->connection = aws_http_connection_new_http1_1_client(alloc, 256); + tester->connection = aws_http_connection_new_http1_1_client(alloc, true, 256); ASSERT_NOT_NULL(tester->connection); struct aws_channel_slot *slot = aws_channel_slot_new(tester->testing_channel.channel); @@ -118,6 +119,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_1liner) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -161,6 +163,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_headers) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -208,6 +211,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_body) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -251,6 +255,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_content_length_0_ok) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -268,6 +273,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_content_length_0_ok) { stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -323,6 +329,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_large_body) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); /* check result */ const char *expected_head_fmt = "PUT /large.txt HTTP/1.1\r\n" @@ -398,6 +405,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_large_head) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); /* check result */ testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -431,6 +439,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_multiple) { for (size_t i = 0; i < num_streams; ++i) { streams[i] = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(streams[i]); + aws_http_stream_activate(streams[i]); } testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -604,6 +613,7 @@ static int s_response_tester_init_ex( if (!response->stream) { return AWS_OP_ERR; } + aws_http_stream_activate(response->stream); return AWS_OP_SUCCESS; } @@ -1436,42 +1446,6 @@ H1_CLIENT_TEST_CASE(h1_client_request_close_header_with_pipelining) { return AWS_OP_SUCCESS; } -/* By default, after reading an aws_io_message of N bytes, the connection should issue window update of N bytes */ -H1_CLIENT_TEST_CASE(h1_client_window_reopens_by_default) { - (void)ctx; - struct tester tester; - ASSERT_SUCCESS(s_tester_init(&tester, allocator)); - - /* send request */ - struct aws_http_message *request = s_new_default_get_request(allocator); - - struct response_tester response; - ASSERT_SUCCESS(s_response_tester_init(&response, &tester, request)); - - testing_channel_drain_queued_tasks(&tester.testing_channel); - - /* Ensure the request can be destroyed after request is sent */ - aws_http_message_destroy(request); - - /* send response */ - const char *response_str = "HTTP/1.1 200 OK\r\n" - "Content-Length: 9\r\n" - "\r\n" - "Call Momo"; - ASSERT_SUCCESS(testing_channel_push_read_str(&tester.testing_channel, response_str)); - - testing_channel_drain_queued_tasks(&tester.testing_channel); - - /* check result */ - size_t window_update = testing_channel_last_window_update(&tester.testing_channel); - ASSERT_TRUE(window_update == strlen(response_str)); - - /* clean up */ - ASSERT_SUCCESS(s_response_tester_clean_up(&response)); - ASSERT_SUCCESS(s_tester_clean_up(&tester)); - return AWS_OP_SUCCESS; -} - /* The user's body reading callback can prevent the window from fully re-opening. */ H1_CLIENT_TEST_CASE(h1_client_window_shrinks_if_user_says_so) { (void)ctx; @@ -1480,10 +1454,9 @@ H1_CLIENT_TEST_CASE(h1_client_window_shrinks_if_user_says_so) { /* send request */ struct aws_http_message *request = s_new_default_get_request(allocator); - struct aws_http_make_request_options opt_override = {.manual_window_management = true}; struct response_tester response; - ASSERT_SUCCESS(s_response_tester_init_ex(&response, &tester, request, &opt_override, NULL)); + ASSERT_SUCCESS(s_response_tester_init_ex(&response, &tester, request, NULL, NULL)); response.stop_auto_window_update = true; testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -1612,7 +1585,7 @@ static int s_test_content_length_mismatch_is_error( }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); - + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); /* check result */ @@ -1653,6 +1626,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_cancelled_by_channel_shutdown) { }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -1693,6 +1667,7 @@ H1_CLIENT_TEST_CASE(h1_client_multiple_requests_cancelled_by_channel_shutdown) { opt.user_data = &completion_error_codes[i]; streams[i] = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(streams[i]); + aws_http_stream_activate(streams[i]); } /* 2 streams are now in-progress */ @@ -1702,6 +1677,7 @@ H1_CLIENT_TEST_CASE(h1_client_multiple_requests_cancelled_by_channel_shutdown) { opt.user_data = &completion_error_codes[2]; streams[2] = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(streams[2]); + aws_http_stream_activate(streams[2]); /* shutdown channel */ aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_SUCCESS); @@ -1737,6 +1713,7 @@ H1_CLIENT_TEST_CASE(h1_client_new_request_fails_if_channel_shut_down) { struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NULL(stream); + ASSERT_INT_EQUALS(aws_last_error(), AWS_ERROR_HTTP_CONNECTION_CLOSED); aws_http_message_destroy(opt.request); @@ -1896,6 +1873,7 @@ static int s_test_error_from_callback(struct aws_allocator *allocator, enum requ struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); testing_channel_drain_queued_tasks(&tester.testing_channel); @@ -1999,6 +1977,36 @@ H1_CLIENT_TEST_CASE(h1_client_close_from_on_thread_makes_not_open) { return AWS_OP_SUCCESS; } +H1_CLIENT_TEST_CASE(h1_client_unactivated_stream_cleans_up) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + ASSERT_TRUE(aws_http_connection_is_open(tester.connection)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("GET"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/"))); + + struct aws_http_make_request_options options = { + .self_size = sizeof(struct aws_http_make_request_options), + .on_complete = s_on_complete, + .user_data = &tester, + .on_response_body = s_response_tester_on_body, + .on_response_header_block_done = s_response_tester_on_header_block_done, + .on_response_headers = s_response_tester_on_headers, + .request = request, + }; + + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &options); + aws_http_message_release(request); + ASSERT_NOT_NULL(stream); + /* we do not activate, that is the test. */ + aws_http_stream_release(stream); + aws_http_connection_close(tester.connection); + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + struct protocol_switcher { /* Settings */ struct tester *tester; @@ -2066,6 +2074,7 @@ static int s_switch_protocols(struct protocol_switcher *switcher) { struct aws_http_stream *upgrade_stream = aws_http_connection_make_request(switcher->tester->connection, &upgrade_request); ASSERT_NOT_NULL(upgrade_stream); + aws_http_stream_activate(upgrade_stream); testing_channel_drain_queued_tasks(&switcher->tester->testing_channel); /* Ensure the request can be destroyed after request is sent */ diff --git a/tests/test_h1_server.c b/tests/test_h1_server.c index aa76f03ba..43b06f4be 100644 --- a/tests/test_h1_server.c +++ b/tests/test_h1_server.c @@ -193,7 +193,7 @@ static int s_tester_init(struct aws_allocator *alloc) { struct aws_testing_channel_options test_channel_options = {.clock_fn = aws_high_res_clock_get_ticks}; ASSERT_SUCCESS(testing_channel_init(&s_tester.testing_channel, alloc, &test_channel_options)); - s_tester.server_connection = aws_http_connection_new_http1_1_server(alloc, SIZE_MAX); + s_tester.server_connection = aws_http_connection_new_http1_1_server(alloc, true, SIZE_MAX); ASSERT_NOT_NULL(s_tester.server_connection); struct aws_http_server_connection_options options = AWS_HTTP_SERVER_CONNECTION_OPTIONS_INIT; options.connection_user_data = &s_tester; @@ -1476,7 +1476,7 @@ static int s_error_tester_init(struct aws_allocator *alloc, struct error_from_ca struct aws_testing_channel_options test_channel_options = {.clock_fn = aws_high_res_clock_get_ticks}; ASSERT_SUCCESS(testing_channel_init(&tester->testing_channel, alloc, &test_channel_options)); - tester->server_connection = aws_http_connection_new_http1_1_server(alloc, SIZE_MAX); + tester->server_connection = aws_http_connection_new_http1_1_server(alloc, true, SIZE_MAX); ASSERT_NOT_NULL(tester->server_connection); struct aws_http_server_connection_options options = AWS_HTTP_SERVER_CONNECTION_OPTIONS_INIT; options.connection_user_data = tester; diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 4910389eb..5880cdab8 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -40,7 +40,7 @@ static int s_tester_init(struct aws_allocator *alloc, void *ctx) { ASSERT_SUCCESS(testing_channel_init(&s_tester.testing_channel, alloc, &options)); - s_tester.connection = aws_http_connection_new_http2_client(alloc, SIZE_MAX); + s_tester.connection = aws_http_connection_new_http2_client(alloc, true, SIZE_MAX); ASSERT_NOT_NULL(s_tester.connection); { /* re-enact marriage vows of http-connection and channel (handled by http-bootstrap in real world) */ @@ -98,6 +98,7 @@ TEST_CASE(h2_client_request_create) { struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &options); ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); /* shutdown channel so request can be released */ aws_channel_shutdown(s_tester.testing_channel.channel, AWS_ERROR_SUCCESS); @@ -105,6 +106,41 @@ TEST_CASE(h2_client_request_create) { ASSERT_TRUE(testing_channel_is_shutdown_completed(&s_tester.testing_channel)); /* release request */ + aws_http_stream_release(stream); + + aws_http_message_release(request); + + return s_tester_clean_up(); +} + +TEST_CASE(h2_client_unactivated_stream_cleans_up) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* create request */ + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header headers[] = { + {aws_byte_cursor_from_c_str(":method"), aws_byte_cursor_from_c_str("GET")}, + {aws_byte_cursor_from_c_str(":scheme"), aws_byte_cursor_from_c_str("https")}, + {aws_byte_cursor_from_c_str(":path"), aws_byte_cursor_from_c_str("/")}, + }; + ASSERT_SUCCESS(aws_http_headers_add_array(aws_http_message_get_headers(request), headers, AWS_ARRAY_SIZE(headers))); + + struct aws_http_make_request_options options = { + .self_size = sizeof(options), + .request = request, + }; + + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &options); + ASSERT_NOT_NULL(stream); + /* do not activate the stream, that's the test. */ + + /* shutdown channel so request can be released */ + aws_channel_shutdown(s_tester.testing_channel.channel, AWS_ERROR_SUCCESS); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(testing_channel_is_shutdown_completed(&s_tester.testing_channel)); + aws_http_stream_release(stream); aws_http_message_release(request); diff --git a/tests/test_proxy.c b/tests/test_proxy.c index f1b7e0a1e..92aa64141 100644 --- a/tests/test_proxy.c +++ b/tests/test_proxy.c @@ -408,6 +408,9 @@ static int s_do_http_proxy_request_transform_test(struct aws_allocator *allocato }; struct aws_http_stream *stream = aws_http_connection_make_request(tester.client_connection, &request_options); + ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); + testing_channel_run_currently_queued_tasks(tester.testing_channel); s_verify_transformed_request(untransformed_request, request, use_basic_auth, allocator); diff --git a/tests/test_tls.c b/tests/test_tls.c index 66bf05c0f..c5e05ff44 100644 --- a/tests/test_tls.c +++ b/tests/test_tls.c @@ -189,6 +189,7 @@ static int s_test_tls_download_medium_file(struct aws_allocator *allocator, void }; ASSERT_NOT_NULL(test.stream = aws_http_connection_make_request(test.client_connection, &req_options)); + aws_http_stream_activate(test.stream); /* wait for the request to complete */ s_test_wait(&test, s_stream_wait_pred); diff --git a/tests/test_websocket_bootstrap.c b/tests/test_websocket_bootstrap.c index 5db8be5be..0bdc79782 100644 --- a/tests/test_websocket_bootstrap.c +++ b/tests/test_websocket_bootstrap.c @@ -37,6 +37,7 @@ static struct aws_channel *s_mock_http_connection_get_channel(struct aws_http_co static struct aws_http_stream *s_mock_http_connection_make_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options); +static int s_mock_http_stream_activate(struct aws_http_stream *stream); static void s_mock_http_stream_release(struct aws_http_stream *stream); static struct aws_http_connection *s_mock_http_stream_get_connection(const struct aws_http_stream *stream); static int s_mock_http_stream_get_incoming_response_status(const struct aws_http_stream *stream, int *out_status); @@ -48,6 +49,7 @@ static const struct aws_websocket_client_bootstrap_system_vtable s_mock_system_v .aws_http_connection_close = s_mock_http_connection_close, .aws_http_connection_get_channel = s_mock_http_connection_get_channel, .aws_http_connection_make_request = s_mock_http_connection_make_request, + .aws_http_stream_activate = s_mock_http_stream_activate, .aws_http_stream_release = s_mock_http_stream_release, .aws_http_stream_get_connection = s_mock_http_stream_get_connection, .aws_http_stream_get_incoming_response_status = s_mock_http_stream_get_incoming_response_status, @@ -113,6 +115,7 @@ static struct tester { bool websocket_new_called_successfully; bool http_stream_release_called; + bool http_stream_activate_called; bool websocket_setup_invoked; int websocket_setup_error_code; @@ -292,6 +295,15 @@ static struct aws_http_stream *s_mock_http_connection_make_request( return s_mock_stream; } +static int s_mock_http_stream_activate(struct aws_http_stream *stream) { + AWS_FATAL_ASSERT(stream == s_mock_stream); + AWS_FATAL_ASSERT(!s_tester.http_connection_release_called); + AWS_FATAL_ASSERT(!s_tester.http_stream_release_called); + s_tester.http_stream_activate_called = true; + + return AWS_OP_SUCCESS; +} + static void s_mock_http_stream_release(struct aws_http_stream *stream) { AWS_FATAL_ASSERT(stream == s_mock_stream); AWS_FATAL_ASSERT(!s_tester.http_connection_release_called); @@ -522,6 +534,7 @@ static int s_drive_websocket_connect(int *out_error_code) { /* If request was created, it must be released eventually. */ if (s_tester.http_stream_new_called_successfully) { + ASSERT_TRUE(s_tester.http_stream_activate_called); ASSERT_TRUE(s_tester.http_stream_release_called); }