diff --git a/include/aws/http/http.h b/include/aws/http/http.h index 4ae2d2276..0c3764bf0 100644 --- a/include/aws/http/http.h +++ b/include/aws/http/http.h @@ -51,6 +51,7 @@ enum aws_http_errors { AWS_ERROR_HTTP_CHANNEL_THROUGHPUT_FAILURE, AWS_ERROR_HTTP_PROTOCOL_ERROR, AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED, + AWS_ERROR_HTTP_GOAWAY_RECEIVED, AWS_ERROR_HTTP_RST_STREAM_RECEIVED, AWS_ERROR_HTTP_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_HTTP_PACKAGE_ID) diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index cefef2e3f..796eedcf8 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -90,6 +90,11 @@ struct aws_h2_connection { * Reduce the space after receiving a flow-controlled frame. Increment after sending WINDOW_UPDATE for * connection */ size_t window_size_self; + + /* Highest self-initiated stream-id that peer might have processed. + * Defaults to max stream-id, may be lowered when GOAWAY frame received. */ + uint32_t goaway_received_last_stream_id; + } thread_data; /* Any thread may touch this data, but the lock must be held (unless it's an atomic) */ diff --git a/source/h2_connection.c b/source/h2_connection.c index f8d1ef3eb..632e8d4ee 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -55,6 +55,7 @@ static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) static size_t s_handler_message_overhead(struct aws_channel_handler *handler); static void s_handler_destroy(struct aws_channel_handler *handler); static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot); +static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code); static struct aws_http_stream *s_connection_make_request( struct aws_http_connection *client_connection, const struct aws_http_make_request_options *options); @@ -98,6 +99,11 @@ static struct aws_h2err s_decoder_on_settings( void *userdata); static struct aws_h2err s_decoder_on_settings_ack(void *userdata); static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata); +struct aws_h2err s_decoder_on_goaway_begin( + uint32_t last_stream, + uint32_t error_code, + uint32_t debug_data_length, + void *userdata); static struct aws_http_connection_vtable s_h2_connection_vtable = { .channel_handler_vtable = @@ -133,6 +139,7 @@ static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = { .on_settings = s_decoder_on_settings, .on_settings_ack = s_decoder_on_settings_ack, .on_window_update = s_decoder_on_window_update, + .on_goaway_begin = s_decoder_on_goaway_begin, }; static void s_lock_synced_data(struct aws_h2_connection *connection) { @@ -279,6 +286,8 @@ static struct aws_h2_connection *s_connection_new( connection->thread_data.window_size_peer = aws_h2_settings_initial[AWS_H2_SETTINGS_INITIAL_WINDOW_SIZE]; connection->thread_data.window_size_self = aws_h2_settings_initial[AWS_H2_SETTINGS_INITIAL_WINDOW_SIZE]; + connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX; + /* Create a new decoder */ struct aws_h2_decoder_params params = { .alloc = alloc, @@ -1230,6 +1239,53 @@ static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t return AWS_H2ERR_SUCCESS; } +struct aws_h2err s_decoder_on_goaway_begin( + uint32_t last_stream, + uint32_t error_code, + uint32_t debug_data_length, + void *userdata) { + (void)debug_data_length; + struct aws_h2_connection *connection = userdata; + + if (last_stream > connection->thread_data.goaway_received_last_stream_id) { + CONNECTION_LOGF( + ERROR, + connection, + "Received GOAWAY with invalid last-stream-id=%" PRIu32 ", must not exceed previous last-stream-id=%" PRIu32, + last_stream, + connection->thread_data.goaway_received_last_stream_id); + return aws_h2err_from_h2_code(AWS_H2_ERR_PROTOCOL_ERROR); + } + /* stop sending any new stream and making new request */ + aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED); + connection->thread_data.goaway_received_last_stream_id = last_stream; + CONNECTION_LOGF( + DEBUG, + connection, + "Received GOAWAY error-code=%s(0x%x) last-stream-id=%" PRIu32, + aws_h2_error_code_to_str(error_code), + error_code, + last_stream); + /* Complete activated streams whose id is higher than last_stream, since they will not process by peer. We should + * treat them as they had never been created at all. + * This would be more efficient if we could iterate streams in reverse-id order */ + struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map); + while (!aws_hash_iter_done(&stream_iter)) { + struct aws_h2_stream *stream = stream_iter.element.value; + aws_hash_iter_next(&stream_iter); + if (stream->base.id > last_stream) { + AWS_H2_STREAM_LOG( + DEBUG, + stream, + "stream ID is higher than GOAWAY last stream ID, please retry this stream on a new connection."); + s_stream_complete(connection, stream, AWS_ERROR_HTTP_GOAWAY_RECEIVED); + } + } + + /* #TODO inform our user about the error_code and debug data by fire some kind of API */ + return AWS_H2ERR_SUCCESS; +} + /* End decoder callbacks */ static int s_send_connection_preface_client_string(struct aws_h2_connection *connection) { @@ -1516,6 +1572,18 @@ int aws_h2_connection_send_rst_and_close_reserved_stream( static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h2_stream *stream) { AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); + int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code); + if (new_stream_error_code) { + aws_raise_error(new_stream_error_code); + AWS_H2_STREAM_LOGF( + ERROR, + stream, + "Failed activating stream, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + goto error; + } + uint32_t max_concurrent_streams = connection->thread_data.settings_peer[AWS_H2_SETTINGS_MAX_CONCURRENT_STREAMS]; if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) >= max_concurrent_streams) { AWS_H2_STREAM_LOG(ERROR, stream, "Failed activating stream, max concurrent streams are reached"); diff --git a/source/http.c b/source/http.c index 7227a243b..6cdaa3524 100644 --- a/source/http.c +++ b/source/http.c @@ -116,6 +116,9 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED, "Connection exhausted all possible stream IDs. Establish a new connection for new streams."), + AWS_DEFINE_ERROR_INFO_HTTP( + AWS_ERROR_HTTP_GOAWAY_RECEIVED, + "Peer sent GOAWAY to initiate connection shutdown. Establish a new connection to retry the streams."), AWS_DEFINE_ERROR_INFO_HTTP( AWS_ERROR_HTTP_RST_STREAM_RECEIVED, "Peer sent RST_STREAM to terminate stream"), diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 025f27ce1..62b172ff0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -365,6 +365,8 @@ add_test_case(h2_client_stream_err_initial_window_size_cause_window_exceed_max) add_test_case(h2_client_stream_err_receive_rst_stream) add_test_case(h2_client_stream_receive_rst_stream_after_complete_response_ok) add_test_case(h2_client_push_promise_automatically_rejected) +add_test_case(h2_client_conn_receive_goaway) +add_test_case(h2_client_conn_err_invalid_last_stream_id_goaway) add_test_case(server_new_destroy) diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 220c9f87f..ee88c4553 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -1978,3 +1978,125 @@ TEST_CASE(h2_client_push_promise_automatically_rejected) { client_stream_tester_clean_up(&stream_tester); return s_tester_clean_up(); } + +/* Test client receives the GOAWAY frame, stop creating new stream and complete the streams whose id are higher than the + * last stream id included in GOAWAY frame */ +TEST_CASE(h2_client_conn_receive_goaway) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + + /* send multiple requests */ + enum { NUM_STREAMS = 3 }; + struct aws_http_message *requests[NUM_STREAMS]; + struct aws_http_header request_headers_src[NUM_STREAMS][3] = { + { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/a.txt"), + }, + { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/b.txt"), + }, + { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/c.txt"), + }, + }; + struct client_stream_tester stream_testers[NUM_STREAMS]; + for (size_t i = 0; i < NUM_STREAMS; ++i) { + requests[i] = aws_http_message_new_request(allocator); + aws_http_message_add_header_array(requests[i], request_headers_src[i], AWS_ARRAY_SIZE(request_headers_src[i])); + } + /* Send the first two requests */ + ASSERT_SUCCESS(s_stream_tester_init(&stream_testers[0], requests[0])); + ASSERT_SUCCESS(s_stream_tester_init(&stream_testers[1], requests[1])); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* fake peer send a GOAWAY frame indicating only the first request will be processed */ + uint32_t stream_id = aws_http_stream_get_id(stream_testers[0].stream); + struct aws_byte_cursor debug_info; + AWS_ZERO_STRUCT(debug_info); + struct aws_h2_frame *peer_frame = aws_h2_frame_new_goaway(allocator, stream_id, AWS_H2_ERR_NO_ERROR, debug_info); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* validate the connection is still open, and the second request finished with GOAWAY_RECEIVED */ + ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection)); + ASSERT_FALSE(stream_testers[0].complete); + ASSERT_TRUE(stream_testers[1].complete); + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_GOAWAY_RECEIVED, stream_testers[1].on_complete_error_code); + + /* validate the new requst will no be accepted */ + ASSERT_FAILS(s_stream_tester_init(&stream_testers[2], requests[2])); + + /* Try gracefully shutting down the connection */ + struct aws_http_header response_headers_src[] = {DEFINE_HEADER(":status", "200")}; + struct aws_http_headers *response_headers = aws_http_headers_new(allocator); + aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src)); + struct aws_h2_frame *response_frame = aws_h2_frame_new_headers( + allocator, aws_http_stream_get_id(stream_testers[0].stream), response_headers, true /* end_stream */, 0, NULL); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame)); + /* shutdown channel */ + aws_channel_shutdown(s_tester.testing_channel.channel, AWS_ERROR_SUCCESS); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(testing_channel_is_shutdown_completed(&s_tester.testing_channel)); + + /* validate the first request finishes successfully */ + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_TRUE(stream_testers[0].complete); + ASSERT_INT_EQUALS(200, stream_testers[0].response_status); + + ASSERT_FALSE(aws_http_connection_is_open(s_tester.connection)); + + /* clean up */ + aws_http_headers_release(response_headers); + for (size_t i = 0; i < NUM_STREAMS; ++i) { + client_stream_tester_clean_up(&stream_testers[i]); + aws_http_message_release(requests[i]); + } + return s_tester_clean_up(); +} + +/* Test client receives the GOAWAY frame, stop creating new stream and complete the streams whose id are higher than the + * last stream id included in GOAWAY frame */ +TEST_CASE(h2_client_conn_err_invalid_last_stream_id_goaway) { + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + + /* fake peer send multiple GOAWAY frames */ + struct aws_byte_cursor debug_info; + AWS_ZERO_STRUCT(debug_info); + /* First on with last_stream_id as AWS_H2_STREAM_ID_MAX */ + struct aws_h2_frame *peer_frame = + aws_h2_frame_new_goaway(allocator, AWS_H2_STREAM_ID_MAX, AWS_H2_ERR_NO_ERROR, debug_info); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + /* Second one with last_stream_id as 1 and some error */ + peer_frame = aws_h2_frame_new_goaway(allocator, 1, AWS_H2_ERR_FLOW_CONTROL_ERROR, debug_info); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + /* validate the connection is still open, everything is fine */ + ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection)); + + /* Another GOAWAY with higher last stream id will cause connection closed with an error */ + peer_frame = aws_h2_frame_new_goaway(allocator, 3, AWS_H2_ERR_FLOW_CONTROL_ERROR, debug_info); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + + ASSERT_FALSE(aws_http_connection_is_open(s_tester.connection)); + ASSERT_INT_EQUALS( + AWS_ERROR_HTTP_PROTOCOL_ERROR, testing_channel_get_shutdown_error_code(&s_tester.testing_channel)); + /* clean up */ + return s_tester_clean_up(); +}