diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index 796eedcf8..a48e8844b 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -99,12 +99,6 @@ struct aws_h2_connection { /* Any thread may touch this data, but the lock must be held (unless it's an atomic) */ struct { - /* For checking status from outside the event-loop thread. */ - struct aws_atomic_var is_open; - - /* If non-zero, reason to immediately reject new streams. (ex: closing) */ - struct aws_atomic_var new_stream_error_code; - struct aws_mutex lock; /* New `aws_h2_stream *` that haven't moved to `thread_data` yet */ @@ -113,6 +107,14 @@ struct aws_h2_connection { bool is_cross_thread_work_task_scheduled; } synced_data; + + struct { + /* For checking status from outside the event-loop thread. */ + struct aws_atomic_var is_open; + + /* If non-zero, reason to immediately reject new streams. (ex: closing) */ + struct aws_atomic_var new_stream_error_code; + } atomic; }; /** diff --git a/source/h1_connection.c b/source/h1_connection.c index 8a6f93b2c..75a7f4de3 100644 --- a/source/h1_connection.c +++ b/source/h1_connection.c @@ -455,7 +455,7 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) { return AWS_OP_ERR; } - /* activate one more time now that the connection can actually run the stream. */ + /* connection keeps activated stream alive until stream completes */ aws_atomic_fetch_add(&stream->refcount, 1); if (should_schedule_task) { diff --git a/source/h2_connection.c b/source/h2_connection.c index 632e8d4ee..f2c68e4d5 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -183,8 +183,8 @@ static void s_stop( /* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response) * we don't consider the connection "open" anymore so user can't create more streams */ - aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED); - aws_atomic_store_int(&connection->synced_data.is_open, 0); + aws_atomic_store_int(&connection->atomic.new_stream_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED); + aws_atomic_store_int(&connection->atomic.is_open, 0); if (schedule_shutdown) { AWS_LOGF_INFO( @@ -242,8 +242,8 @@ static struct aws_h2_connection *s_connection_new( /* 1 refcount for user */ aws_atomic_init_int(&connection->base.refcount, 1); - aws_atomic_init_int(&connection->synced_data.is_open, 1); - aws_atomic_init_int(&connection->synced_data.new_stream_error_code, 0); + aws_atomic_init_int(&connection->atomic.is_open, 1); + aws_atomic_init_int(&connection->atomic.new_stream_error_code, 0); aws_linked_list_init(&connection->synced_data.pending_stream_list); aws_linked_list_init(&connection->thread_data.outgoing_streams_list); @@ -1257,7 +1257,7 @@ struct aws_h2err s_decoder_on_goaway_begin( return aws_h2err_from_h2_code(AWS_H2_ERR_PROTOCOL_ERROR); } /* stop sending any new stream and making new request */ - aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED); + aws_atomic_store_int(&connection->atomic.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED); connection->thread_data.goaway_received_last_stream_id = last_stream; CONNECTION_LOGF( DEBUG, @@ -1569,10 +1569,12 @@ int aws_h2_connection_send_rst_and_close_reserved_stream( } /* Move stream into "active" datastructures and notify stream that it can send frames now */ -static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h2_stream *stream) { +static void s_move_stream_to_thread( + struct aws_h2_connection *connection, + struct aws_h2_stream *stream, + int new_stream_error_code) { AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); - int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code); if (new_stream_error_code) { aws_raise_error(new_stream_error_code); AWS_H2_STREAM_LOGF( @@ -1601,8 +1603,6 @@ static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h goto error; } - aws_atomic_fetch_add(&stream->base.refcount, 1); - if (has_outgoing_data) { aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node); } @@ -1635,10 +1635,13 @@ static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, e } /* END CRITICAL SECTION */ /* Process new pending_streams */ - while (!aws_linked_list_empty(&pending_streams)) { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams); - struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node); - s_activate_stream(connection, stream); + if (!aws_linked_list_empty(&pending_streams)) { + int new_stream_error_code = (int)aws_atomic_load_int(&connection->atomic.new_stream_error_code); + do { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams); + struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node); + s_move_stream_to_thread(connection, stream, new_stream_error_code); + } while (!aws_linked_list_empty(&pending_streams)); } /* #TODO: process stuff from other API calls (ex: window-updates) */ @@ -1680,6 +1683,9 @@ int aws_h2_stream_activate(struct aws_http_stream *stream) { return AWS_OP_ERR; } + /* connection keeps activated stream alive until stream completes */ + aws_atomic_fetch_add(&stream->refcount, 1); + if (!was_cross_thread_work_scheduled) { CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task"); aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task); @@ -1708,7 +1714,7 @@ static struct aws_http_stream *s_connection_make_request( return NULL; } - int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code); + int new_stream_error_code = (int)aws_atomic_load_int(&connection->atomic.new_stream_error_code); if (new_stream_error_code) { aws_raise_error(new_stream_error_code); CONNECTION_LOGF( @@ -1731,7 +1737,7 @@ static struct aws_http_stream *s_connection_make_request( static bool s_connection_is_open(const struct aws_http_connection *connection_base) { struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base); - bool is_open = aws_atomic_load_int(&connection->synced_data.is_open); + bool is_open = aws_atomic_load_int(&connection->atomic.is_open); return is_open; }