Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,9 @@ struct aws_h2err aws_h2_stream_on_decoder_headers_i(

if (stream->base.on_incoming_headers) {
if (stream->base.on_incoming_headers(&stream->base, block_type, header, 1, stream->base.user_data)) {
/* #TODO: callback errors should be Stream Errors, not Connection Errors */
AWS_H2_STREAM_LOGF(
ERROR, stream, "Incoming header callback raised error, %s", aws_error_name(aws_last_error()));
return aws_h2err_from_last_error();
return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
}
}

Expand Down Expand Up @@ -811,7 +810,7 @@ struct aws_h2err aws_h2_stream_on_decoder_headers_end(
stream,
"Incoming-header-block-done callback raised error, %s",
aws_error_name(aws_last_error()));
return aws_h2err_from_last_error();
return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
}
}

Expand Down Expand Up @@ -931,7 +930,7 @@ struct aws_h2err aws_h2_stream_on_decoder_data_i(struct aws_h2_stream *stream, s
if (stream->base.on_incoming_body(&stream->base, &data, stream->base.user_data)) {
AWS_H2_STREAM_LOGF(
ERROR, stream, "Incoming body callback raised error, %s", aws_error_name(aws_last_error()));
return aws_h2err_from_last_error();
return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
}
}

Expand Down
10 changes: 2 additions & 8 deletions source/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ static void s_sm_count_increase_synced(
for (size_t i = 0; i < num; i++) {
aws_ref_count_acquire(&stream_manager->internal_ref_count);
}
s_sm_log_stats_synced(stream_manager);
}

static void s_sm_count_decrease_synced(
Expand All @@ -109,15 +108,12 @@ static void s_sm_count_decrease_synced(
for (size_t i = 0; i < num; i++) {
aws_ref_count_release(&stream_manager->internal_ref_count);
}
s_sm_log_stats_synced(stream_manager);
}

static void s_aws_stream_management_transaction_init(
struct aws_http2_stream_management_transaction *work,
struct aws_http2_stream_manager *stream_manager) {
AWS_ZERO_STRUCT(*work);

STREAM_MANAGER_LOGF(TRACE, stream_manager, "work:%p inits", (void *)work);
aws_linked_list_init(&work->pending_make_requests);
work->stream_manager = stream_manager;
work->allocator = stream_manager->allocator;
Expand All @@ -126,7 +122,6 @@ static void s_aws_stream_management_transaction_init(

static void s_aws_stream_management_transaction_clean_up(struct aws_http2_stream_management_transaction *work) {
(void)work;
STREAM_MANAGER_LOGF(TRACE, work->stream_manager, "work:%p clean up", (void *)work);
AWS_ASSERT(aws_linked_list_empty(&work->pending_make_requests));
aws_ref_count_release(&work->stream_manager->internal_ref_count);
}
Expand Down Expand Up @@ -392,6 +387,7 @@ static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2
stream_manager->synced_data.finish_pending_stream_acquisitions_task_scheduled = true;
}
}
s_sm_log_stats_synced(stream_manager);
}

static struct aws_h2_sm_connection *s_sm_connection_new(
Expand Down Expand Up @@ -455,7 +451,7 @@ static void s_sm_on_connection_acquired(struct aws_http_connection *connection,
s_sm_count_decrease_synced(stream_manager, AWS_SMCT_CONNECTIONS_ACQUIRING, 1);
if (error_code || !connection) {
STREAM_MANAGER_LOGF(
WARN,
ERROR,
stream_manager,
"connection acquired from connection manager failed, with error: %d(%s)",
error_code,
Expand Down Expand Up @@ -626,7 +622,6 @@ static void s_sm_connection_on_scheduled_stream_finishes(
aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection);
work.sm_connection_to_release = sm_connection;
--stream_manager->synced_data.holding_connections_count;
s_sm_log_stats_synced(stream_manager);
}
s_unlock_synced_data(stream_manager);
} /* END CRITICAL SECTION */
Expand Down Expand Up @@ -750,7 +745,6 @@ static void s_make_request_task(struct aws_channel_task *task, void *arg, enum a
static void s_aws_http2_stream_manager_execute_transaction(struct aws_http2_stream_management_transaction *work) {

struct aws_http2_stream_manager *stream_manager = work->stream_manager;
STREAM_MANAGER_LOGF(TRACE, stream_manager, "work:%p executes", (void *)work);

/* Step1: Release connection */
if (work->sm_connection_to_release) {
Expand Down
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ add_test_case(h2_client_get_received_goaway)
add_test_case(h2_client_request_apis_failed_after_connection_begin_shutdown)
add_test_case(h2_client_get_local_settings)
add_test_case(h2_client_get_remote_settings)
add_test_case(h2_client_error_from_outgoing_body_callback_reset_stream)
add_test_case(h2_client_error_from_incoming_headers_callback_reset_stream)
add_test_case(h2_client_error_from_incoming_headers_done_callback_reset_stream)
add_test_case(h2_client_error_from_incoming_body_callback_reset_stream)

add_test_case(server_new_destroy)
add_test_case(connection_setup_shutdown)
Expand Down
235 changes: 235 additions & 0 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -4877,3 +4877,238 @@ TEST_CASE(h2_client_request_apis_failed_after_connection_begin_shutdown) {
aws_http_stream_release(stream);
return s_tester_clean_up();
}

enum request_callback {
REQUEST_CALLBACK_OUTGOING_BODY,
REQUEST_CALLBACK_INCOMING_HEADERS,
REQUEST_CALLBACK_INCOMING_HEADERS_DONE,
REQUEST_CALLBACK_INCOMING_BODY,
REQUEST_CALLBACK_COMPLETE,
REQUEST_CALLBACK_COUNT,
};

struct error_from_callback_tester {
struct aws_input_stream base;
enum request_callback error_at;
int callback_counts[REQUEST_CALLBACK_COUNT];
bool has_errored;
struct aws_stream_status status;
int on_complete_error_code;
};

static const int ERROR_FROM_CALLBACK_ERROR_CODE = (int)0xBEEFCAFE;

static int s_error_from_callback_common(
struct error_from_callback_tester *error_tester,
enum request_callback current_callback) {

error_tester->callback_counts[current_callback]++;

/* After error code returned, no more callbacks should fire (except for on_complete) */
AWS_FATAL_ASSERT(!error_tester->has_errored);
AWS_FATAL_ASSERT(current_callback <= error_tester->error_at);
if (current_callback == error_tester->error_at) {
error_tester->has_errored = true;
return aws_raise_error(ERROR_FROM_CALLBACK_ERROR_CODE);
}

return AWS_OP_SUCCESS;
}

static int s_error_from_outgoing_body_read(struct aws_input_stream *body, struct aws_byte_buf *dest) {

(void)dest;

struct error_from_callback_tester *error_tester = AWS_CONTAINER_OF(body, struct error_from_callback_tester, base);
if (s_error_from_callback_common(error_tester, REQUEST_CALLBACK_OUTGOING_BODY)) {
return AWS_OP_ERR;
}

/* If the common fn was successful, write out some data and end the stream */
ASSERT_TRUE(aws_byte_buf_write(dest, (const uint8_t *)"abcd", 4));
error_tester->status.is_end_of_stream = true;
return AWS_OP_SUCCESS;
}

static int s_error_from_outgoing_body_get_status(struct aws_input_stream *body, struct aws_stream_status *status) {
struct error_from_callback_tester *error_tester = AWS_CONTAINER_OF(body, struct error_from_callback_tester, base);
*status = error_tester->status;
return AWS_OP_SUCCESS;
}

static void s_error_from_outgoing_body_destroy(void *stream) {
/* allocated from stack, nothing to do */
(void)stream;
}
static struct aws_input_stream_vtable s_error_from_outgoing_body_vtable = {
.seek = NULL,
.read = s_error_from_outgoing_body_read,
.get_status = s_error_from_outgoing_body_get_status,
.get_length = NULL,
};

static int s_error_from_incoming_headers(
struct aws_http_stream *stream,
enum aws_http_header_block header_block,
const struct aws_http_header *header_array,
size_t num_headers,
void *user_data) {

(void)stream;
(void)header_block;
(void)header_array;
(void)num_headers;
return s_error_from_callback_common(user_data, REQUEST_CALLBACK_INCOMING_HEADERS);
}

static int s_error_from_incoming_headers_done(
struct aws_http_stream *stream,
enum aws_http_header_block header_block,
void *user_data) {
(void)stream;
(void)header_block;
return s_error_from_callback_common(user_data, REQUEST_CALLBACK_INCOMING_HEADERS_DONE);
}

static int s_error_from_incoming_body(
struct aws_http_stream *stream,
const struct aws_byte_cursor *data,
void *user_data) {

(void)stream;
(void)data;
return s_error_from_callback_common(user_data, REQUEST_CALLBACK_INCOMING_BODY);
}

static void s_error_tester_on_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
(void)stream;
struct error_from_callback_tester *error_tester = user_data;
error_tester->callback_counts[REQUEST_CALLBACK_COMPLETE]++;
error_tester->on_complete_error_code = error_code;
}

static int s_test_error_from_callback(struct aws_allocator *allocator, void *ctx, enum request_callback error_at) {

ASSERT_SUCCESS(s_tester_init(allocator, ctx));
/* get connection preface and acks out of the way */
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
/* send request */
struct aws_http_message *request = aws_http2_message_new_request(allocator);
ASSERT_NOT_NULL(request);

struct aws_http_header request_headers_src[] = {
DEFINE_HEADER(":method", "GET"),
DEFINE_HEADER(":scheme", "https"),
DEFINE_HEADER(":path", "/"),
};
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));

struct error_from_callback_tester error_tester = {
.error_at = error_at,
.status =
{
.is_valid = true,
.is_end_of_stream = false,
},
};
error_tester.base.vtable = &s_error_from_outgoing_body_vtable;
aws_ref_count_init(&error_tester.base.ref_count, &error_tester, s_error_from_outgoing_body_destroy);

aws_http_message_set_body_stream(request, &error_tester.base);

struct aws_http_make_request_options opt = {
.self_size = sizeof(opt),
.request = request,
.on_response_headers = s_error_from_incoming_headers,
.on_response_header_block_done = s_error_from_incoming_headers_done,
.on_response_body = s_error_from_incoming_body,
.on_complete = s_error_tester_on_stream_complete,
.user_data = &error_tester,
};
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &opt);
ASSERT_NOT_NULL(stream);
ASSERT_SUCCESS(aws_http_stream_activate(stream));

testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* Ensure the request can be destroyed after request is sent */
aws_http_message_release(opt.request);

/* fake peer sends response headers */
struct aws_http_header response_headers_src[] = {
DEFINE_HEADER(":status", "200"),
DEFINE_HEADER("date", "Fri, 01 Mar 2019 17:18:55 GMT"),
};

struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
uint32_t stream_id = aws_http_stream_get_id(stream);
struct aws_h2_frame *response_frame =
aws_h2_frame_new_headers(allocator, stream_id, response_headers, false /*end_stream*/, 0, NULL);
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));

struct aws_byte_buf response_body_bufs;
size_t body_length = 5;

/* fake peer sends a DATA frame larger than the window size we have */
ASSERT_SUCCESS(aws_byte_buf_init(&response_body_bufs, allocator, body_length));
ASSERT_TRUE(aws_byte_buf_write_u8_n(&response_body_bufs, (uint8_t)'a', body_length));
struct aws_byte_cursor body_cursor = aws_byte_cursor_from_buf(&response_body_bufs);
ASSERT_SUCCESS(h2_fake_peer_send_data_frame(&s_tester.peer, stream_id, body_cursor, true /*end_stream*/));

/* validate that stream completed with error */
testing_channel_drain_queued_tasks(&s_tester.testing_channel);

/* check that callbacks were invoked before error_at, but not after */
for (int i = 0; i < REQUEST_CALLBACK_COMPLETE; ++i) {
if (i <= error_at) {
ASSERT_TRUE(error_tester.callback_counts[i] > 0);
} else {
ASSERT_INT_EQUALS(0, error_tester.callback_counts[i]);
}
}

/* validate the RST_STREAM sent and connection is still open */
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
struct h2_decoded_frame *rst_stream_frame =
h2_decode_tester_find_frame(&s_tester.peer.decode, AWS_H2_FRAME_T_RST_STREAM, 0, NULL);
ASSERT_NOT_NULL(rst_stream_frame);
ASSERT_UINT_EQUALS(AWS_HTTP2_ERR_INTERNAL_ERROR, rst_stream_frame->error_code);
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));

/* the on_complete callback should always fire though, and should receive the proper error_code */
ASSERT_INT_EQUALS(1, error_tester.callback_counts[REQUEST_CALLBACK_COMPLETE]);
ASSERT_INT_EQUALS(ERROR_FROM_CALLBACK_ERROR_CODE, error_tester.on_complete_error_code);

aws_http_headers_release(response_headers);
aws_byte_buf_clean_up(&response_body_bufs);
aws_http_stream_release(stream);
return s_tester_clean_up();
}

TEST_CASE(h2_client_error_from_outgoing_body_callback_reset_stream) {
(void)ctx;
ASSERT_SUCCESS(s_test_error_from_callback(allocator, ctx, REQUEST_CALLBACK_OUTGOING_BODY));
return AWS_OP_SUCCESS;
}

TEST_CASE(h2_client_error_from_incoming_headers_callback_reset_stream) {
(void)ctx;
ASSERT_SUCCESS(s_test_error_from_callback(allocator, ctx, REQUEST_CALLBACK_INCOMING_HEADERS));
return AWS_OP_SUCCESS;
}

TEST_CASE(h2_client_error_from_incoming_headers_done_callback_reset_stream) {
(void)ctx;
ASSERT_SUCCESS(s_test_error_from_callback(allocator, ctx, REQUEST_CALLBACK_INCOMING_HEADERS_DONE));
return AWS_OP_SUCCESS;
}

TEST_CASE(h2_client_error_from_incoming_body_callback_reset_stream) {
(void)ctx;
ASSERT_SUCCESS(s_test_error_from_callback(allocator, ctx, REQUEST_CALLBACK_INCOMING_BODY));
return AWS_OP_SUCCESS;
}