Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ struct aws_h2_connection {

/* Any thread may touch this data, but the lock must be held (unless it's an atomic) */
struct {
/* For checking status from outside the event-loop thread. */
struct aws_atomic_var is_open;

/* If non-zero, reason to immediately reject new streams. (ex: closing) */
struct aws_atomic_var new_stream_error_code;

struct aws_mutex lock;

/* New `aws_h2_stream *` that haven't moved to `thread_data` yet */
Expand All @@ -113,6 +107,14 @@ struct aws_h2_connection {
bool is_cross_thread_work_task_scheduled;

} synced_data;

struct {
/* For checking status from outside the event-loop thread. */
struct aws_atomic_var is_open;

/* If non-zero, reason to immediately reject new streams. (ex: closing) */
struct aws_atomic_var new_stream_error_code;
} atomic;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ int aws_h1_stream_activate(struct aws_http_stream *stream) {
return AWS_OP_ERR;
}

/* activate one more time now that the connection can actually run the stream. */
/* connection keeps activated stream alive until stream completes */
aws_atomic_fetch_add(&stream->refcount, 1);

if (should_schedule_task) {
Expand Down
36 changes: 21 additions & 15 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ static void s_stop(

/* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response)
* we don't consider the connection "open" anymore so user can't create more streams */
aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED);
aws_atomic_store_int(&connection->synced_data.is_open, 0);
aws_atomic_store_int(&connection->atomic.new_stream_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED);
aws_atomic_store_int(&connection->atomic.is_open, 0);

if (schedule_shutdown) {
AWS_LOGF_INFO(
Expand Down Expand Up @@ -242,8 +242,8 @@ static struct aws_h2_connection *s_connection_new(
/* 1 refcount for user */
aws_atomic_init_int(&connection->base.refcount, 1);

aws_atomic_init_int(&connection->synced_data.is_open, 1);
aws_atomic_init_int(&connection->synced_data.new_stream_error_code, 0);
aws_atomic_init_int(&connection->atomic.is_open, 1);
aws_atomic_init_int(&connection->atomic.new_stream_error_code, 0);
aws_linked_list_init(&connection->synced_data.pending_stream_list);

aws_linked_list_init(&connection->thread_data.outgoing_streams_list);
Expand Down Expand Up @@ -1257,7 +1257,7 @@ struct aws_h2err s_decoder_on_goaway_begin(
return aws_h2err_from_h2_code(AWS_H2_ERR_PROTOCOL_ERROR);
}
/* stop sending any new stream and making new request */
aws_atomic_store_int(&connection->synced_data.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
aws_atomic_store_int(&connection->atomic.new_stream_error_code, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
connection->thread_data.goaway_received_last_stream_id = last_stream;
CONNECTION_LOGF(
DEBUG,
Expand Down Expand Up @@ -1569,10 +1569,12 @@ int aws_h2_connection_send_rst_and_close_reserved_stream(
}

/* Move stream into "active" datastructures and notify stream that it can send frames now */
static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h2_stream *stream) {
static void s_move_stream_to_thread(
struct aws_h2_connection *connection,
struct aws_h2_stream *stream,
int new_stream_error_code) {
AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));

int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code);
if (new_stream_error_code) {
aws_raise_error(new_stream_error_code);
AWS_H2_STREAM_LOGF(
Expand Down Expand Up @@ -1601,8 +1603,6 @@ static void s_activate_stream(struct aws_h2_connection *connection, struct aws_h
goto error;
}

aws_atomic_fetch_add(&stream->base.refcount, 1);

if (has_outgoing_data) {
aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
}
Expand Down Expand Up @@ -1635,10 +1635,13 @@ static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, e
} /* END CRITICAL SECTION */

/* Process new pending_streams */
while (!aws_linked_list_empty(&pending_streams)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams);
struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
s_activate_stream(connection, stream);
if (!aws_linked_list_empty(&pending_streams)) {
int new_stream_error_code = (int)aws_atomic_load_int(&connection->atomic.new_stream_error_code);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why we fetch the error code here? So when the connection closes while we are iterating the pending_streams, it will not be updated, right? To improve performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to improve performance. It's 1 atomic_load for the N streams being activated, vs N atomic_loads.

It's ok if we're not perfectly in sync here. Eventually, the connection shuts down and any streams remaining in synced_data.pending_streams or thread_data.active_streams_map are completed with AWS_ERROR_HTTP_CONNECTION_CLOSED. This function is running on the event-loop thread, and so are (almost) all the other things that might set the atomic.new_stream_error_code.

The only other thing that can set it is a user calling aws_http_connection_close(). So, if a user calls close() in the middle of this loop running, then the streams will still activate successfully, but most likely they'll fail when the connection shuts down over the next few ticks of the event-loop.

do {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams);
struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
s_move_stream_to_thread(connection, stream, new_stream_error_code);
} while (!aws_linked_list_empty(&pending_streams));
}

/* #TODO: process stuff from other API calls (ex: window-updates) */
Expand Down Expand Up @@ -1680,6 +1683,9 @@ int aws_h2_stream_activate(struct aws_http_stream *stream) {
return AWS_OP_ERR;
}

/* connection keeps activated stream alive until stream completes */
aws_atomic_fetch_add(&stream->refcount, 1);

if (!was_cross_thread_work_scheduled) {
CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
Expand Down Expand Up @@ -1708,7 +1714,7 @@ static struct aws_http_stream *s_connection_make_request(
return NULL;
}

int new_stream_error_code = (int)aws_atomic_load_int(&connection->synced_data.new_stream_error_code);
int new_stream_error_code = (int)aws_atomic_load_int(&connection->atomic.new_stream_error_code);
if (new_stream_error_code) {
aws_raise_error(new_stream_error_code);
CONNECTION_LOGF(
Expand All @@ -1731,7 +1737,7 @@ static struct aws_http_stream *s_connection_make_request(

static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
bool is_open = aws_atomic_load_int(&connection->synced_data.is_open);
bool is_open = aws_atomic_load_int(&connection->atomic.is_open);
return is_open;
}

Expand Down