Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/aws/http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum aws_http_errors {
AWS_ERROR_HTTP_CHANNEL_THROUGHPUT_FAILURE,
AWS_ERROR_HTTP_PROTOCOL_ERROR,
AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED,
AWS_ERROR_HTTP_GOAWAY_RECEIVED,
AWS_ERROR_HTTP_RST_STREAM_RECEIVED,

AWS_ERROR_HTTP_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_HTTP_PACKAGE_ID)
Expand Down
5 changes: 5 additions & 0 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ struct aws_h2_connection {
* Reduce the space after receiving a flow-controlled frame. Increment after sending WINDOW_UPDATE for
* connection */
size_t window_size_self;

/* Highest self-initiated stream-id that peer might have processed.
* Defaults to max stream-id, may be lowered when GOAWAY frame received. */
uint32_t goaway_received_last_stream_id;

} thread_data;

/* Any thread may touch this data, but the lock must be held (unless it's an atomic) */
Expand Down
68 changes: 68 additions & 0 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static size_t s_handler_initial_window_size(struct aws_channel_handler *handler)
static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
static void s_handler_destroy(struct aws_channel_handler *handler);
static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot);
static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code);
static struct aws_http_stream *s_connection_make_request(
struct aws_http_connection *client_connection,
const struct aws_http_make_request_options *options);
Expand Down Expand Up @@ -98,6 +99,11 @@ static struct aws_h2err s_decoder_on_settings(
void *userdata);
static struct aws_h2err s_decoder_on_settings_ack(void *userdata);
static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata);
struct aws_h2err s_decoder_on_goaway_begin(
uint32_t last_stream,
uint32_t error_code,
uint32_t debug_data_length,
void *userdata);

static struct aws_http_connection_vtable s_h2_connection_vtable = {
.channel_handler_vtable =
Expand Down Expand Up @@ -133,6 +139,7 @@ static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = {
.on_settings = s_decoder_on_settings,
.on_settings_ack = s_decoder_on_settings_ack,
.on_window_update = s_decoder_on_window_update,
.on_goaway_begin = s_decoder_on_goaway_begin,
};

static void s_lock_synced_data(struct aws_h2_connection *connection) {
Expand Down Expand Up @@ -279,6 +286,8 @@ static struct aws_h2_connection *s_connection_new(
connection->thread_data.window_size_peer = aws_h2_settings_initial[AWS_H2_SETTINGS_INITIAL_WINDOW_SIZE];
connection->thread_data.window_size_self = aws_h2_settings_initial[AWS_H2_SETTINGS_INITIAL_WINDOW_SIZE];

connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX;

/* Create a new decoder */
struct aws_h2_decoder_params params = {
.alloc = alloc,
Expand Down Expand Up @@ -1230,6 +1239,53 @@ static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t
return AWS_H2ERR_SUCCESS;
}

struct aws_h2err s_decoder_on_goaway_begin(
uint32_t last_stream,
uint32_t error_code,
uint32_t debug_data_length,
void *userdata) {
(void)debug_data_length;
struct aws_h2_connection *connection = userdata;

if (last_stream > connection->thread_data.goaway_received_last_stream_id) {
CONNECTION_LOGF(
ERROR,
connection,
"Received GOAWAY with invalid last-stream-id=%" PRIu32 ", must not exceed previous last-stream-id=%" PRIu32,
last_stream,
connection->thread_data.goaway_received_last_stream_id);
return aws_h2err_from_h2_code(AWS_H2_ERR_PROTOCOL_ERROR);
}
/* stop sending any new stream and making new request */
aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
connection->thread_data.goaway_received_last_stream_id = last_stream;
CONNECTION_LOGF(
DEBUG,
connection,
"Received GOAWAY error-code=%s(0x%x) last-stream-id=%" PRIu32,
aws_h2_error_code_to_str(error_code),
error_code,
last_stream);
/* Complete activated streams whose id is higher than last_stream, since they will not process by peer. We should
* treat them as they had never been created at all.
* This would be more efficient if we could iterate streams in reverse-id order */
struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
while (!aws_hash_iter_done(&stream_iter)) {
struct aws_h2_stream *stream = stream_iter.element.value;
aws_hash_iter_next(&stream_iter);
if (stream->base.id > last_stream) {
AWS_H2_STREAM_LOG(
DEBUG,
stream,
"stream ID is higher than GOAWAY last stream ID, please retry this stream on a new connection.");
s_stream_complete(connection, stream, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
}
}

/* #TODO inform our user about the error_code and debug data by fire some kind of API */
return AWS_H2ERR_SUCCESS;
}

/* End decoder callbacks */

static int s_send_connection_preface_client_string(struct aws_h2_connection *connection) {
Expand Down Expand Up @@ -1516,6 +1572,18 @@ int aws_h2_connection_send_rst_and_close_reserved_stream(
static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h2_stream *stream) {
AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));

int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code);
if (new_stream_error_code) {
aws_raise_error(new_stream_error_code);
AWS_H2_STREAM_LOGF(
ERROR,
stream,
"Failed activating stream, error %d (%s)",
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}

uint32_t max_concurrent_streams = connection->thread_data.settings_peer[AWS_H2_SETTINGS_MAX_CONCURRENT_STREAMS];
if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) >= max_concurrent_streams) {
AWS_H2_STREAM_LOG(ERROR, stream, "Failed activating stream, max concurrent streams are reached");
Expand Down
3 changes: 3 additions & 0 deletions source/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_HTTP(
AWS_ERROR_HTTP_STREAM_IDS_EXHAUSTED,
"Connection exhausted all possible stream IDs. Establish a new connection for new streams."),
AWS_DEFINE_ERROR_INFO_HTTP(
AWS_ERROR_HTTP_GOAWAY_RECEIVED,
"Peer sent GOAWAY to initiate connection shutdown. Establish a new connection to retry the streams."),
AWS_DEFINE_ERROR_INFO_HTTP(
AWS_ERROR_HTTP_RST_STREAM_RECEIVED,
"Peer sent RST_STREAM to terminate stream"),
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ add_test_case(h2_client_stream_err_initial_window_size_cause_window_exceed_max)
add_test_case(h2_client_stream_err_receive_rst_stream)
add_test_case(h2_client_stream_receive_rst_stream_after_complete_response_ok)
add_test_case(h2_client_push_promise_automatically_rejected)
add_test_case(h2_client_conn_receive_goaway)
add_test_case(h2_client_conn_err_invalid_last_stream_id_goaway)


add_test_case(server_new_destroy)
Expand Down
122 changes: 122 additions & 0 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1978,3 +1978,125 @@ TEST_CASE(h2_client_push_promise_automatically_rejected) {
client_stream_tester_clean_up(&stream_tester);
return s_tester_clean_up();
}

/* Test client receives the GOAWAY frame, stop creating new stream and complete the streams whose id are higher than the
* last stream id included in GOAWAY frame */
TEST_CASE(h2_client_conn_receive_goaway) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));

/* send multiple requests */
enum { NUM_STREAMS = 3 };
struct aws_http_message *requests[NUM_STREAMS];
struct aws_http_header request_headers_src[NUM_STREAMS][3] = {
{
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/a.txt"),
},
{
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/b.txt"),
},
{
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/c.txt"),
},
};
struct client_stream_tester stream_testers[NUM_STREAMS];
for (size_t i = 0; i < NUM_STREAMS; ++i) {
requests[i] = aws_http_message_new_request(allocator);
aws_http_message_add_header_array(requests[i], request_headers_src[i], AWS_ARRAY_SIZE(request_headers_src[i]));
}
/* Send the first two requests */
ASSERT_SUCCESS(s_stream_tester_init(&stream_testers[0], requests[0]));
ASSERT_SUCCESS(s_stream_tester_init(&stream_testers[1], requests[1]));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* fake peer send a GOAWAY frame indicating only the first request will be processed */
uint32_t stream_id = aws_http_stream_get_id(stream_testers[0].stream);
struct aws_byte_cursor debug_info;
AWS_ZERO_STRUCT(debug_info);
struct aws_h2_frame *peer_frame = aws_h2_frame_new_goaway(allocator, stream_id, AWS_H2_ERR_NO_ERROR, debug_info);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* validate the connection is still open, and the second request finished with GOAWAY_RECEIVED */
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));
ASSERT_FALSE(stream_testers[0].complete);
ASSERT_TRUE(stream_testers[1].complete);
ASSERT_INT_EQUALS(AWS_ERROR_HTTP_GOAWAY_RECEIVED, stream_testers[1].on_complete_error_code);

/* validate the new requst will no be accepted */
ASSERT_FAILS(s_stream_tester_init(&stream_testers[2], requests[2]));

/* Try gracefully shutting down the connection */
struct aws_http_header response_headers_src[] = {DEFINE_HEADER(":status", "200")};
struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
struct aws_h2_frame *response_frame = aws_h2_frame_new_headers(
allocator, aws_http_stream_get_id(stream_testers[0].stream), response_headers, true /* end_stream */, 0, NULL);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));
/* shutdown channel */
aws_channel_shutdown(s_tester.testing_channel.channel, AWS_ERROR_SUCCESS);
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_TRUE(testing_channel_is_shutdown_completed(&s_tester.testing_channel));

/* validate the first request finishes successfully */
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_TRUE(stream_testers[0].complete);
ASSERT_INT_EQUALS(200, stream_testers[0].response_status);

ASSERT_FALSE(aws_http_connection_is_open(s_tester.connection));

/* clean up */
aws_http_headers_release(response_headers);
for (size_t i = 0; i < NUM_STREAMS; ++i) {
client_stream_tester_clean_up(&stream_testers[i]);
aws_http_message_release(requests[i]);
}
return s_tester_clean_up();
}

/* Test client receives the GOAWAY frame, stop creating new stream and complete the streams whose id are higher than the
* last stream id included in GOAWAY frame */
TEST_CASE(h2_client_conn_err_invalid_last_stream_id_goaway) {
ASSERT_SUCCESS(s_tester_init(allocator, ctx));

/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));

/* fake peer send multiple GOAWAY frames */
struct aws_byte_cursor debug_info;
AWS_ZERO_STRUCT(debug_info);
/* First on with last_stream_id as AWS_H2_STREAM_ID_MAX */
struct aws_h2_frame *peer_frame =
aws_h2_frame_new_goaway(allocator, AWS_H2_STREAM_ID_MAX, AWS_H2_ERR_NO_ERROR, debug_info);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
/* Second one with last_stream_id as 1 and some error */
peer_frame = aws_h2_frame_new_goaway(allocator, 1, AWS_H2_ERR_FLOW_CONTROL_ERROR, debug_info);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* validate the connection is still open, everything is fine */
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));

/* Another GOAWAY with higher last stream id will cause connection closed with an error */
peer_frame = aws_h2_frame_new_goaway(allocator, 3, AWS_H2_ERR_FLOW_CONTROL_ERROR, debug_info);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame));
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

ASSERT_FALSE(aws_http_connection_is_open(s_tester.connection));
ASSERT_INT_EQUALS(
AWS_ERROR_HTTP_PROTOCOL_ERROR, testing_channel_get_shutdown_error_code(&s_tester.testing_channel));
/* clean up */
return s_tester_clean_up();
}