diff --git a/include/aws/http/http.h b/include/aws/http/http.h index e681542f1..5a38cac07 100644 --- a/include/aws/http/http.h +++ b/include/aws/http/http.h @@ -48,6 +48,7 @@ enum aws_http_errors { AWS_ERROR_HTTP_CONNECTION_MANAGER_SHUTTING_DOWN, AWS_ERROR_HTTP_PROTOCOL_ERROR, AWS_ERROR_HTTP_STREAM_CLOSED, + AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED, AWS_ERROR_HTTP_INVALID_FRAME_SIZE, AWS_ERROR_HTTP_COMPRESSION, diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index ab33f01bb..fa7522c4a 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -27,18 +27,40 @@ struct aws_h2_decoder; struct aws_h2_connection { struct aws_http_connection base; + struct aws_channel_task cross_thread_work_task; + /* Only the event-loop thread may touch this data */ struct { struct aws_h2_decoder *decoder; struct aws_h2_frame_encoder encoder; + + /* True when reading/writing has stopped, whether due to errors or normal channel shutdown. */ + bool is_reading_stopped; + bool is_writing_stopped; + + /* Maps stream-id to aws_h2_frame* */ + struct aws_hash_table active_streams_map; + } thread_data; /* Any thread may touch this data, but the lock must be held */ struct { struct aws_mutex lock; + /* New `aws_h2_stream *` that haven't moved to `thread_data` yet */ + struct aws_linked_list pending_stream_list; + /* Refers to the next stream id to vend */ uint32_t next_stream_id; + + /* If non-zero, reason to immediately reject new streams. (ex: closing) */ + int new_stream_error_code; + + bool is_cross_thread_work_task_scheduled; + + /* For checking status from outside the event-loop thread. */ + bool is_open; + } synced_data; }; @@ -54,9 +76,6 @@ struct aws_http_connection *aws_http_connection_new_http2_client( struct aws_allocator *allocator, size_t initial_window_size); -AWS_HTTP_API -uint32_t aws_h2_connection_get_next_stream_id(struct aws_h2_connection *connection); - AWS_EXTERN_C_END #endif /* AWS_HTTP_H2_CONNECTION_H */ diff --git a/include/aws/http/private/h2_stream.h b/include/aws/http/private/h2_stream.h index 0187a9446..4d410e52f 100644 --- a/include/aws/http/private/h2_stream.h +++ b/include/aws/http/private/h2_stream.h @@ -21,6 +21,18 @@ #include +#include + +#define AWS_H2_STREAM_LOGF(level, stream, text, ...) \ + AWS_LOGF_##level( \ + AWS_LS_HTTP_STREAM, \ + "id=%" PRIu32 " connection=%p state=%s: " text, \ + (stream)->id, \ + (void *)(stream)->base.owning_connection, \ + aws_h2_stream_state_to_str((stream)->thread_data.state), \ + __VA_ARGS__) +#define AWS_H2_STREAM_LOG(level, stream, text) AWS_H2_STREAM_LOGF(level, (stream), "%s", (text)) + enum aws_h2_stream_state { AWS_H2_STREAM_STATE_IDLE, AWS_H2_STREAM_STATE_RESERVED_LOCAL, @@ -36,7 +48,9 @@ enum aws_h2_stream_state { struct aws_h2_stream { struct aws_http_stream base; - const uint32_t id; + uint32_t id; + + struct aws_linked_list_node node; /* Only the event-loop thread may touch this data */ struct { @@ -60,7 +74,7 @@ AWS_HTTP_API const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state); AWS_HTTP_API -struct aws_h2_stream *aws_h1_stream_new_request( +struct aws_h2_stream *aws_h2_stream_new_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options); diff --git a/source/h1_connection.c b/source/h1_connection.c index 79f427730..784421541 100644 --- a/source/h1_connection.c +++ b/source/h1_connection.c @@ -1576,6 +1576,11 @@ static int s_handler_process_read_message( } AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Done processing message.", (void *)&connection->base); + if (message) { + /* release message back to pool before re-opening window */ + aws_mem_release(message->allocator, message); + message = NULL; + } /* Increment read window */ if (incoming_message_size > connection->thread_data.incoming_message_window_shrink_size) { @@ -1593,9 +1598,6 @@ static int s_handler_process_read_message( } } - if (message) { - aws_mem_release(message->allocator, message); - } return AWS_OP_SUCCESS; shutdown: @@ -1735,7 +1737,7 @@ static int s_handler_shutdown( } /* It's OK to access synced_data.pending_stream_list without holding the lock because - * no more streams can be added after s_shutdown_connection() has been invoked. */ + * no more streams can be added after s_stop() has been invoked. */ while (!aws_linked_list_empty(&connection->synced_data.pending_stream_list)) { struct aws_linked_list_node *node = aws_linked_list_front(&connection->synced_data.pending_stream_list); s_stream_complete(AWS_CONTAINER_OF(node, struct aws_h1_stream, node), stream_error_code); diff --git a/source/h2_connection.c b/source/h2_connection.c index 0bac35ce8..89b0d6c1a 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -14,7 +14,9 @@ */ #include + #include +#include #include @@ -54,6 +56,11 @@ static int s_handler_shutdown( static size_t s_handler_initial_window_size(struct aws_channel_handler *handler); static size_t s_handler_message_overhead(struct aws_channel_handler *handler); static void s_handler_destroy(struct aws_channel_handler *handler); +static struct aws_http_stream *s_connection_make_request( + struct aws_http_connection *client_connection, + const struct aws_http_make_request_options *options); + +static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status); static struct aws_http_connection_vtable s_h2_connection_vtable = { .channel_handler_vtable = @@ -67,7 +74,7 @@ static struct aws_http_connection_vtable s_h2_connection_vtable = { .destroy = s_handler_destroy, }, - .make_request = NULL, + .make_request = s_connection_make_request, .new_server_request_handler_stream = NULL, .stream_send_response = NULL, .close = NULL, @@ -79,6 +86,67 @@ static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = { .on_data = NULL, }; +static void s_lock_synced_data(struct aws_h2_connection *connection) { + int err = aws_mutex_lock(&connection->synced_data.lock); + AWS_ASSERT(!err && "lock failed"); + (void)err; +} + +static void s_unlock_synced_data(struct aws_h2_connection *connection) { + int err = aws_mutex_unlock(&connection->synced_data.lock); + AWS_ASSERT(!err && "unlock failed"); + (void)err; +} + +/** + * Internal function for bringing connection to a stop. + * Invoked multiple times, including when: + * - Channel is shutting down in the read direction. + * - Channel is shutting down in the write direction. + * - An error occurs that will shutdown the channel. + * - User wishes to close the connection (this is the only case where the function may run off-thread). + */ +static void s_stop( + struct aws_h2_connection *connection, + bool stop_reading, + bool stop_writing, + bool schedule_shutdown, + int error_code) { + + AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */ + + if (stop_reading) { + AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); + connection->thread_data.is_reading_stopped = true; + } + + if (stop_writing) { + AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); + connection->thread_data.is_writing_stopped = true; + } + { /* BEGIN CRITICAL SECTION */ + s_lock_synced_data(connection); + + /* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response) + * we don't consider the connection "open" anymore so user can't create more streams */ + connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED; + connection->synced_data.is_open = false; + + s_unlock_synced_data(connection); + } /* END CRITICAL SECTION */ + + if (schedule_shutdown) { + AWS_LOGF_INFO( + AWS_LS_HTTP_CONNECTION, + "id=%p: Shutting down connection with error code %d (%s).", + (void *)&connection->base, + error_code, + aws_error_name(error_code)); + + aws_channel_shutdown(connection->base.channel_slot->channel, error_code); + } +} + /* Common new() logic for server & client */ static struct aws_h2_connection *s_connection_new( struct aws_allocator *alloc, @@ -92,13 +160,6 @@ static struct aws_h2_connection *s_connection_new( return NULL; } - /* Init mutex first, because its error handling is different than every other init failure */ - if (aws_mutex_init(&connection->synced_data.lock)) { - CONNECTION_LOGF( - ERROR, connection, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error())); - goto error_mutex; - } - connection->base.vtable = &s_h2_connection_vtable; connection->base.alloc = alloc; connection->base.channel_handler.vtable = &s_h2_connection_vtable.channel_handler_vtable; @@ -107,12 +168,32 @@ static struct aws_h2_connection *s_connection_new( connection->base.http_version = AWS_HTTP_VERSION_2; connection->base.initial_window_size = initial_window_size; + aws_channel_task_init( + &connection->cross_thread_work_task, s_cross_thread_work_task, connection, "HTTP/2 cross-thread work"); + /* 1 refcount for user */ aws_atomic_init_int(&connection->base.refcount, 1); /* Init the next stream id (server must use odd ids, client even [RFC 7540 5.1.1])*/ connection->synced_data.next_stream_id = (server ? 2 : 1); + connection->synced_data.is_open = true; + aws_linked_list_init(&connection->synced_data.pending_stream_list); + + if (aws_mutex_init(&connection->synced_data.lock)) { + CONNECTION_LOGF( + ERROR, connection, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error())); + goto error; + } + + if (aws_hash_table_init( + &connection->thread_data.active_streams_map, alloc, 8, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) { + + CONNECTION_LOGF( + ERROR, connection, "Hashtable init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error())); + goto error; + } + /* Create a new decoder */ struct aws_h2_decoder_params params = { .alloc = alloc, @@ -135,12 +216,7 @@ static struct aws_h2_connection *s_connection_new( return connection; -error_mutex: - /* If mutex fails, don't invoke its clean_up() */ - aws_mem_release(alloc, connection); - return NULL; error: - /* Everything else has idempotent clean_up()/destroy() functions, so we can naively call our own destroy() */ s_handler_destroy(&connection->base.channel_handler); return NULL; } @@ -177,36 +253,166 @@ static void s_handler_destroy(struct aws_channel_handler *handler) { struct aws_h2_connection *connection = handler->impl; CONNECTION_LOG(TRACE, connection, "Destroying connection"); + /* No streams should be left in internal datastructures */ + AWS_ASSERT( + !aws_hash_table_is_valid(&connection->thread_data.active_streams_map) || + aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0); + + AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_stream_list)); + aws_h2_decoder_destroy(connection->thread_data.decoder); aws_h2_frame_encoder_clean_up(&connection->thread_data.encoder); + aws_hash_table_clean_up(&connection->thread_data.active_streams_map); aws_mutex_clean_up(&connection->synced_data.lock); aws_mem_release(connection->base.alloc, connection); } -uint32_t aws_h2_connection_get_next_stream_id(struct aws_h2_connection *connection) { +static void s_stream_complete(struct aws_h2_stream *stream, int error_code) { + /* Nice logging */ + if (error_code) { + AWS_H2_STREAM_LOGF( + ERROR, stream, "Stream completed with error %d (%s).", error_code, aws_error_name(error_code)); + } else if (stream->base.client_data) { + int status = stream->base.client_data->response_status; + AWS_H2_STREAM_LOGF( + DEBUG, stream, "Client stream complete, response status %d (%s)", status, aws_http_status_text(status)); + } else { + AWS_H2_STREAM_LOG(DEBUG, stream, "Server stream complete"); + } + + /* Invoke callback */ + if (stream->base.on_complete) { + stream->base.on_complete(&stream->base, error_code, stream->base.user_data); + } + + /* release connection's hold on stream */ + aws_http_stream_release(&stream->base); +} + +static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h2_stream *stream) { + /* #TODO: don't exceed peer's max-concurrent-streams setting */ + + if (aws_hash_table_put(&connection->thread_data.active_streams_map, (void *)(size_t)stream->id, stream, NULL)) { + goto error; + } + + /* #TODO: start encoding this stream's frames */ + + return; +error: + s_stream_complete(stream, aws_last_error()); +} + +/* Perform on-thread work that is triggered by calls to the connection/stream API */ +static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) { + (void)task; + if (status != AWS_TASK_STATUS_RUN_READY) { + return; + } - uint32_t next_id = 0; + struct aws_h2_connection *connection = arg; + + struct aws_linked_list pending_streams; + aws_linked_list_init(&pending_streams); { /* BEGIN CRITICAL SECTION */ - int err = aws_mutex_lock(&connection->synced_data.lock); - AWS_FATAL_ASSERT(err == AWS_OP_SUCCESS); + s_lock_synced_data(connection); + connection->synced_data.is_cross_thread_work_task_scheduled = false; + + aws_linked_list_swap_contents(&connection->synced_data.pending_stream_list, &pending_streams); + + s_unlock_synced_data(connection); + } /* END CRITICAL SECTION */ + + /* Process new pending_streams */ + while (!aws_linked_list_empty(&pending_streams)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams); + struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node); + s_activate_stream(connection, stream); + } + + /* #TODO: process stuff from other API calls (ex: window-updates) */ +} + +/* Set ID for new stream. Lock must be held while calling this. */ +static int s_acquire_next_stream_id(struct aws_h2_connection *connection, uint32_t *out_stream_id) { + if (AWS_UNLIKELY(connection->synced_data.next_stream_id > MAX_STREAM_ID)) { + CONNECTION_LOG(ERROR, connection, "Connection exhausted all possible stream IDs."); + return aws_raise_error(AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED); + } + + *out_stream_id = connection->synced_data.next_stream_id; + connection->synced_data.next_stream_id += 2; + return AWS_OP_SUCCESS; +} - next_id = connection->synced_data.next_stream_id; - connection->synced_data.next_stream_id += 2; +static struct aws_http_stream *s_connection_make_request( + struct aws_http_connection *client_connection, + const struct aws_http_make_request_options *options) { - /* If next fetch would overflow next_stream_id, set it to 0 */ - if (AWS_UNLIKELY(next_id > MAX_STREAM_ID)) { - CONNECTION_LOG(INFO, connection, "All available stream ids are gone, closing the connection"); + struct aws_h2_connection *connection = AWS_CONTAINER_OF(client_connection, struct aws_h2_connection, base); + + /* #TODO: http/2-ify the request (ex: add ":method" header). Should we mutate a copy or the original? */ + + struct aws_h2_stream *stream = aws_h2_stream_new_request(client_connection, options); + if (!stream) { + CONNECTION_LOGF( + ERROR, + connection, + "Failed to create stream, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + return NULL; + } + + int new_stream_error_code = AWS_ERROR_SUCCESS; + bool was_cross_thread_work_scheduled = false; + { /* BEGIN CRITICAL SECTION */ + s_lock_synced_data(connection); + + if (connection->synced_data.new_stream_error_code) { + new_stream_error_code = connection->synced_data.new_stream_error_code; + goto unlock; + } - next_id = 0; - aws_raise_error(AWS_ERROR_HTTP_PROTOCOL_ERROR); + if (s_acquire_next_stream_id(connection, &stream->id)) { + new_stream_error_code = aws_last_error(); + goto unlock; } - err = aws_mutex_unlock(&connection->synced_data.lock); - AWS_FATAL_ASSERT(err == AWS_OP_SUCCESS); + /* success */ + was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled; + connection->synced_data.is_cross_thread_work_task_scheduled = true; + + aws_linked_list_push_back(&connection->synced_data.pending_stream_list, &stream->node); + + unlock: + s_unlock_synced_data(connection); } /* END CRITICAL SECTION */ - return next_id; + if (new_stream_error_code) { + aws_raise_error(new_stream_error_code); + CONNECTION_LOGF( + ERROR, + connection, + "Cannot create request stream, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + goto error; + } + + if (!was_cross_thread_work_scheduled) { + CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task"); + aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task); + } + + AWS_H2_STREAM_LOG(DEBUG, stream, "Created HTTP/2 request stream"); /* #TODO: print method & path */ + return &stream->base; + +error: + /* Force destruction of the stream, avoiding ref counting */ + stream->base.vtable->destroy(&stream->base); + return NULL; } static int s_handler_process_read_message( @@ -217,6 +423,11 @@ static int s_handler_process_read_message( (void)handler; (void)slot; (void)message; + + /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer, + * so we can just keep the aws_channel's read-window wide open */ + /* #TODO update read window by however much we just read */ + return aws_raise_error(AWS_ERROR_UNIMPLEMENTED); } @@ -258,13 +469,43 @@ static int s_handler_shutdown( error_code, aws_error_name(error_code)); + if (dir == AWS_CHANNEL_DIR_READ) { + /* This call ensures that no further streams will be created. */ + s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code); + + } else /* AWS_CHANNEL_DIR_WRITE */ { + s_stop(connection, false /*stop_reading*/, true /*stop_writing*/, false /*schedule_shutdown*/, error_code); + + /* Remove remaining streams from internal datastructures and mark them as complete. */ + + struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map); + while (!aws_hash_iter_done(&stream_iter)) { + struct aws_h2_stream *stream = stream_iter.element.value; + aws_hash_iter_delete(&stream_iter, true); + aws_hash_iter_next(&stream_iter); + + s_stream_complete(stream, AWS_ERROR_HTTP_CONNECTION_CLOSED); + } + + /* It's OK to access synced_data.pending_stream_list without holding the lock because + * no more streams can be added after s_stop() has been invoked. */ + while (!aws_linked_list_empty(&connection->synced_data.pending_stream_list)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_stream_list); + struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node); + s_stream_complete(stream, AWS_ERROR_HTTP_CONNECTION_CLOSED); + } + } + aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately); return AWS_OP_SUCCESS; } static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) { - struct aws_h2_connection *connection = handler->impl; - return connection->base.initial_window_size; + (void)handler; + + /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer, + * so we can just keep the aws_channel's read-window wide open */ + return SIZE_MAX; } static size_t s_handler_message_overhead(struct aws_channel_handler *handler) { diff --git a/source/h2_stream.c b/source/h2_stream.c index 929ebd9b0..b8f393aaf 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -20,18 +20,6 @@ #include #include -#include - -#define STREAM_LOGF(level, stream, text, ...) \ - AWS_LOGF_##level( \ - AWS_LS_HTTP_STREAM, \ - "id=%" PRIu32 "(%p) state=%s: " text, \ - (stream)->id, \ - (void *)(stream), \ - aws_h2_stream_state_to_str((stream)->thread_data.state), \ - __VA_ARGS__) -#define STREAM_LOG(level, stream, text) STREAM_LOGF(level, stream, "%s", text) - static void s_stream_destroy(struct aws_http_stream *stream_base); struct aws_http_stream_vtable s_h2_stream_vtable = { @@ -41,7 +29,7 @@ struct aws_http_stream_vtable s_h2_stream_vtable = { /* Shortcut for logging invalid stream and raising an error */ static int s_h2_stream_raise_invalid_frame(struct aws_h2_stream *stream, enum aws_h2_frame_type type, int error_code) { - STREAM_LOGF( + AWS_H2_STREAM_LOGF( ERROR, stream, "Not allowed to receive frame of type %s when in %s state, raising %s", @@ -52,7 +40,7 @@ static int s_h2_stream_raise_invalid_frame(struct aws_h2_stream *stream, enum aw } static void s_h2_stream_set_state(struct aws_h2_stream *stream, enum aws_h2_stream_state new_state) { - STREAM_LOGF( + AWS_H2_STREAM_LOGF( DEBUG, stream, "Stream moving from state %s to %s", @@ -80,6 +68,7 @@ const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state) { return "CLOSED"; default: /* unreachable */ + AWS_ASSERT(0); return "*** UNKNOWN ***"; } } @@ -109,7 +98,7 @@ static int s_h2_stream_handle_headers(struct aws_h2_stream *stream, struct aws_h struct aws_h2_frame_headers frame; if (aws_h2_frame_headers_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode HEADERS frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode HEADERS frame"); return AWS_OP_ERR; } @@ -121,10 +110,11 @@ static int s_h2_stream_handle_headers(struct aws_h2_stream *stream, struct aws_h stream->base.user_data); if (frame.end_headers) { - STREAM_LOG(DEBUG, stream, "HEADERS frame is self-containing, calling header_block_done"); + AWS_H2_STREAM_LOG(DEBUG, stream, "HEADERS frame is self-containing, calling header_block_done"); stream->base.on_incoming_header_block_done(&stream->base, false, stream->base.user_data); } else { - STREAM_LOG(DEBUG, stream, "HEADERS frame does not have END_HEADERS set, expecting following CONTINUATION"); + AWS_H2_STREAM_LOG( + DEBUG, stream, "HEADERS frame does not have END_HEADERS set, expecting following CONTINUATION"); stream->thread_data.expects_continuation = true; } @@ -135,7 +125,7 @@ static int s_h2_stream_handle_priority(struct aws_h2_stream *stream, struct aws_ struct aws_h2_frame_priority frame; if (aws_h2_frame_priority_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode PRIORITY frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode PRIORITY frame"); return AWS_OP_ERR; } @@ -149,7 +139,7 @@ static int s_h2_stream_handle_rst_stream(struct aws_h2_stream *stream, struct aw struct aws_h2_frame_rst_stream frame; if (aws_h2_frame_rst_stream_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode RST_STREAM frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode RST_STREAM frame"); return AWS_OP_ERR; } @@ -165,12 +155,12 @@ static int s_h2_stream_handle_push_promise(struct aws_h2_stream *stream, struct struct aws_h2_frame_push_promise frame; if (aws_h2_frame_push_promise_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode PUSH_PROMISE frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode PUSH_PROMISE frame"); return AWS_OP_ERR; } if (!frame.end_headers) { - STREAM_LOG(TRACE, stream, "PUSH_PROMISE END_HEADERS not set, expecting CONTINUATION frame next"); + AWS_H2_STREAM_LOG(TRACE, stream, "PUSH_PROMISE END_HEADERS not set, expecting CONTINUATION frame next"); stream->thread_data.expects_continuation = true; } @@ -183,7 +173,7 @@ static int s_h2_stream_handle_window_update(struct aws_h2_stream *stream, struct struct aws_h2_frame_window_update frame; if (aws_h2_frame_window_update_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode WINDOW_UPDATE frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode WINDOW_UPDATE frame"); return AWS_OP_ERR; } @@ -196,7 +186,7 @@ static int s_h2_stream_handle_continuation(struct aws_h2_stream *stream, struct AWS_PRECONDITION(decoder->header.type == AWS_H2_FRAME_T_CONTINUATION); if (!stream->thread_data.expects_continuation) { - STREAM_LOG( + AWS_H2_STREAM_LOG( ERROR, stream, "Received CONTINUATION frame following a frame with END_HEADERS set, raising PROTOCOL_ERROR"); @@ -205,16 +195,16 @@ static int s_h2_stream_handle_continuation(struct aws_h2_stream *stream, struct struct aws_h2_frame_continuation frame; if (aws_h2_frame_continuation_decode(&frame, decoder)) { - STREAM_LOG(ERROR, stream, "Failed to decode CONTINUATION frame"); + AWS_H2_STREAM_LOG(ERROR, stream, "Failed to decode CONTINUATION frame"); return AWS_OP_ERR; } if (frame.end_headers) { - STREAM_LOG(TRACE, stream, "CONTINUATION frames complete, calling header_block_done"); + AWS_H2_STREAM_LOG(TRACE, stream, "CONTINUATION frames complete, calling header_block_done"); stream->thread_data.expects_continuation = false; stream->base.on_incoming_header_block_done(&stream->base, false, stream->base.user_data); } else { - STREAM_LOG(TRACE, stream, "CONTINUATION END_HEADERS not set, expecting CONTINUATION frame next"); + AWS_H2_STREAM_LOG(TRACE, stream, "CONTINUATION END_HEADERS not set, expecting CONTINUATION frame next"); stream->thread_data.expects_continuation = true; } @@ -372,14 +362,12 @@ static int (*s_state_handlers[])(struct aws_h2_stream *, struct aws_h2_frame_dec * Public API **********************************************************************************************************************/ -struct aws_h2_stream *aws_h2_stream_new( +struct aws_h2_stream *aws_h2_stream_new_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options) { AWS_PRECONDITION(client_connection); AWS_PRECONDITION(options); - struct aws_h2_connection *connection = AWS_CONTAINER_OF(client_connection, struct aws_h2_connection, base); - struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream)); if (!stream) { return NULL; @@ -395,14 +383,12 @@ struct aws_h2_stream *aws_h2_stream_new( stream->base.on_incoming_body = options->on_response_body; stream->base.on_complete = options->on_complete; - /* Stream refcount starts at 2. 1 for user and 1 for connection to release it's done with the stream */ + /* Stream refcount starts at 2. 1 for user and 1 for connection to release when it's done with the stream */ aws_atomic_init_int(&stream->base.refcount, 2); /* Init H2 specific stuff */ - *((uint32_t *)&stream->id) = aws_h2_connection_get_next_stream_id(connection); - s_h2_stream_set_state(stream, AWS_H2_STREAM_STATE_IDLE); - - STREAM_LOG(DEBUG, stream, "Created stream"); + stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE; + aws_linked_list_node_reset(&stream->node); return stream; } @@ -410,7 +396,7 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) { AWS_PRECONDITION(stream_base); struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base); - STREAM_LOG(DEBUG, stream, "Destroying stream"); + AWS_H2_STREAM_LOG(DEBUG, stream, "Destroying stream"); aws_mem_release(stream->base.alloc, stream); } @@ -419,7 +405,7 @@ int aws_h2_stream_handle_frame(struct aws_h2_stream *stream, struct aws_h2_frame AWS_PRECONDITION(stream); AWS_PRECONDITION(decoder); - STREAM_LOGF(DEBUG, stream, "Received frame of type %s", aws_h2_frame_type_to_str(decoder->header.type)); + AWS_H2_STREAM_LOGF(DEBUG, stream, "Received frame of type %s", aws_h2_frame_type_to_str(decoder->header.type)); return s_state_handlers[stream->thread_data.state](stream, decoder); } diff --git a/source/http.c b/source/http.c index b3d90614c..da06aca67 100644 --- a/source/http.c +++ b/source/http.c @@ -112,6 +112,9 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_STREAM_CLOSED, "Received frame on a closed stream"), + AWS_DEFINE_ERROR_INFO_HTTP( + AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED, + "Connection exhausted all possible stream IDs. Establish a new connection for new streams."), AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_INVALID_FRAME_SIZE, "Received frame with an illegal frame size"), diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 56467523a..760db0728 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -198,6 +198,7 @@ add_test_case(h2_frame_window_update) add_test_case(h2_frame_continuation) add_test_case(h2_client_sanity_check) +add_test_case(h2_client_request_create) add_test_case(server_new_destroy) add_test_case(connection_setup_shutdown) diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 5fa6c660c..40939cb13 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -14,6 +14,8 @@ */ #include + +#include #include #define TEST_CASE(NAME) \ @@ -23,7 +25,6 @@ /* Singleton used by tests in this file */ struct tester { struct aws_allocator *alloc; - struct aws_logger logger; struct aws_http_connection *connection; struct testing_channel testing_channel; } s_tester; @@ -34,13 +35,6 @@ static int s_tester_init(struct aws_allocator *alloc, void *ctx) { s_tester.alloc = alloc; - struct aws_logger_standard_options logger_options = { - .level = AWS_LOG_LEVEL_TRACE, - .file = stderr, - }; - ASSERT_SUCCESS(aws_logger_init_standard(&s_tester.logger, alloc, &logger_options)); - aws_logger_set(&s_tester.logger); - ASSERT_SUCCESS(testing_channel_init(&s_tester.testing_channel, alloc)); s_tester.connection = aws_http_connection_new_http2_client(alloc, SIZE_MAX); @@ -63,7 +57,6 @@ static int s_tester_clean_up(void) { aws_http_connection_release(s_tester.connection); ASSERT_SUCCESS(testing_channel_clean_up(&s_tester.testing_channel)); aws_http_library_clean_up(); - aws_logger_clean_up(&s_tester.logger); return AWS_OP_SUCCESS; } @@ -72,3 +65,38 @@ TEST_CASE(h2_client_sanity_check) { ASSERT_SUCCESS(s_tester_init(allocator, ctx)); return s_tester_clean_up(); } + +/* Test that a stream can be created and destroyed. */ +TEST_CASE(h2_client_request_create) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* create request */ + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header headers[] = { + {aws_byte_cursor_from_c_str(":method"), aws_byte_cursor_from_c_str("GET")}, + {aws_byte_cursor_from_c_str(":scheme"), aws_byte_cursor_from_c_str("https")}, + {aws_byte_cursor_from_c_str(":path"), aws_byte_cursor_from_c_str("/")}, + }; + ASSERT_SUCCESS(aws_http_headers_add_array(aws_http_message_get_headers(request), headers, AWS_ARRAY_SIZE(headers))); + + struct aws_http_make_request_options options = { + .self_size = sizeof(options), + .request = request, + }; + + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &options); + ASSERT_NOT_NULL(stream); + + /* shutdown channel so request can be released */ + aws_channel_shutdown(s_tester.testing_channel.channel, AWS_ERROR_SUCCESS); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(testing_channel_is_shutdown_completed(&s_tester.testing_channel)); + + /* release request */ + aws_http_stream_release(stream); + aws_http_message_release(request); + + return s_tester_clean_up(); +}