Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cc67547
remove streams closed long ago from closed-stream structure
TingDaoK May 4, 2020
8e7bb42
test for receive frames after rst stream received
TingDaoK May 5, 2020
d1c8f7f
size_t->uint64_t
TingDaoK May 5, 2020
c2262ca
Merge branch 'master' into remove-stream-closed-long-ago-from-closed-…
TingDaoK May 5, 2020
00d9fbe
trivial comments update
TingDaoK May 5, 2020
06b7ec2
Merge branch 'remove-stream-closed-long-ago-from-closed-stream-struct…
TingDaoK May 5, 2020
5f21c49
Protocol error for closed long ago stream
TingDaoK May 5, 2020
8d9ca2d
clang format
TingDaoK May 5, 2020
958c117
address comments
TingDaoK May 6, 2020
85b8814
clang format
TingDaoK May 6, 2020
65ac13a
type cast to double
TingDaoK May 6, 2020
f484295
LRU cache for closed stream
TingDaoK May 6, 2020
9e89e8f
clean up the detail if failed to put it into cache
TingDaoK May 6, 2020
3bd74af
size_t->enum for the # of streans
TingDaoK May 6, 2020
a027e57
clang format
TingDaoK May 6, 2020
70b5f13
remove the max_closed_stream_cache_size from connection
TingDaoK May 6, 2020
8cc5ebb
Merge branch 'remove-stream-closed-long-ago-from-closed-stream-struct…
TingDaoK May 6, 2020
b90fae1
udpated the comments
TingDaoK May 6, 2020
a3853b4
Merge with master
TingDaoK May 6, 2020
d80d391
we don't need timestamp anymore....
TingDaoK May 7, 2020
3374d60
swtich/case
TingDaoK May 7, 2020
6248691
fifo-cache
TingDaoK May 11, 2020
2954aeb
clang format
TingDaoK May 11, 2020
562fa73
Merge branch 'master' into remove-stream-closed-long-ago-from-closed-…
TingDaoK May 11, 2020
861fd9e
one line
TingDaoK May 11, 2020
3d1989f
Merge branch 'remove-stream-closed-long-ago-from-closed-stream-struct…
TingDaoK May 11, 2020
093f2a2
updated the comments
TingDaoK May 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include <aws/common/atomics.h>
#include <aws/common/fifo_cache.h>
#include <aws/common/hash_table.h>
#include <aws/common/mutex.h>

Expand Down Expand Up @@ -75,11 +76,10 @@ struct aws_h2_connection {
* When queue is empty, then we send DATA frames from the outgoing_streams_list */
struct aws_linked_list outgoing_frames_queue;

/* Maps stream-id to aws_h2_stream_closed_when.
* Contains data about streams that were recently closed by this end (sent RST_STREAM frame or END_STREAM flag),
* but might still receive frames that remote peer sent before learning that the stream was closed.
* Entries are removed after a period of time. */
struct aws_hash_table closed_streams_where_frames_might_trickle_in;
/* FIFO cache for closed stream, key: stream-id, value: aws_h2_stream_closed_when.
* Contains data about streams that were recently closed.
* The oldest entry will be removed if the cache is full */
struct aws_cache *closed_streams;

/* Flow-control of connection from peer. Indicating the buffer capacity of our peer.
* Reduce the space after sending a flow-controlled frame. Increment after receiving WINDOW_UPDATE for
Expand Down Expand Up @@ -132,6 +132,7 @@ struct aws_h2_connection {
* The action which caused the stream to close.
*/
enum aws_h2_stream_closed_when {
AWS_H2_STREAM_CLOSED_UNKNOWN,
AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM,
AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED,
AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT,
Expand All @@ -146,6 +147,8 @@ enum aws_h2_data_encode_status {

/* When window size is too small to fit the possible padding into it, we stop sending data and wait for WINDOW_UPDATE */
#define AWS_H2_MIN_WINDOW_SIZE (256)
/* Default value for max closed streams we will keep in memory. */
#define AWS_H2_DEFAULT_MAX_CLOSED_STREAMS (32)

/* Private functions called from tests... */

Expand Down
133 changes: 79 additions & 54 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,12 @@ static struct aws_h2_connection *s_connection_new(
goto error;
}

if (aws_hash_table_init(
&connection->thread_data.closed_streams_where_frames_might_trickle_in,
alloc,
8,
aws_hash_ptr,
aws_ptr_eq,
NULL,
NULL)) {

/* TODO: make the max_items configurable */
connection->thread_data.closed_streams =
aws_cache_new_fifo(alloc, aws_hash_ptr, aws_ptr_eq, NULL, NULL, AWS_H2_DEFAULT_MAX_CLOSED_STREAMS);
if (!connection->thread_data.closed_streams) {
CONNECTION_LOGF(
ERROR, connection, "Hashtable init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
ERROR, connection, "FIFO cache init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
goto error;
}

Expand Down Expand Up @@ -392,7 +387,7 @@ static void s_handler_destroy(struct aws_channel_handler *handler) {
aws_h2_decoder_destroy(connection->thread_data.decoder);
aws_h2_frame_encoder_clean_up(&connection->thread_data.encoder);
aws_hash_table_clean_up(&connection->thread_data.active_streams_map);
aws_hash_table_clean_up(&connection->thread_data.closed_streams_where_frames_might_trickle_in);
aws_cache_destroy(connection->thread_data.closed_streams);
aws_mutex_clean_up(&connection->synced_data.lock);
aws_mem_release(connection->base.alloc, connection);
}
Expand Down Expand Up @@ -840,52 +835,94 @@ struct aws_h2err s_get_active_stream_for_incoming_frame(
return AWS_H2ERR_SUCCESS;
}

void *cached_value = NULL;
/* Stream is closed, check whether it's legal for a few more frames to trickle in */
aws_hash_table_find(&connection->thread_data.closed_streams_where_frames_might_trickle_in, stream_id_key, &found);
if (found) {
enum aws_h2_stream_closed_when closed_when = (enum aws_h2_stream_closed_when)(size_t)found->value;
if (closed_when == AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT) {
/* An endpoint MUST ignore frames that it receives on closed streams after it has sent a RST_STREAM frame */
CONNECTION_LOGF(
TRACE,
connection,
"Ignoring %s frame on stream id=%" PRIu32 " because RST_STREAM was recently sent.",
aws_h2_frame_type_to_str(frame_type),
stream_id);

if (aws_cache_find(connection->thread_data.closed_streams, stream_id_key, &cached_value)) {
return aws_h2err_from_last_error();
}
if (cached_value) {
if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
/* If we support PRIORITY, do something here. Right now just ignore it */
return AWS_H2ERR_SUCCESS;

} else {
AWS_ASSERT(closed_when == AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM);

/* WINDOW_UPDATE or RST_STREAM frames can be received ... for a short period after
* a DATA or HEADERS frame containing an END_STREAM flag is sent.
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received in this state */
if (frame_type == AWS_H2_FRAME_T_WINDOW_UPDATE || frame_type == AWS_H2_FRAME_T_RST_STREAM) {
}
enum aws_h2_stream_closed_when closed_when = (enum aws_h2_stream_closed_when)(size_t)cached_value;
switch (closed_when) {
case AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM:
/* WINDOW_UPDATE or RST_STREAM frames can be received ... for a short period after
* a DATA or HEADERS frame containing an END_STREAM flag is sent.
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received in this state */
if (frame_type == AWS_H2_FRAME_T_WINDOW_UPDATE || frame_type == AWS_H2_FRAME_T_RST_STREAM) {
CONNECTION_LOGF(
TRACE,
connection,
"Ignoring %s frame on stream id=%" PRIu32 " because END_STREAM flag was recently sent.",
aws_h2_frame_type_to_str(frame_type),
stream_id);

return AWS_H2ERR_SUCCESS;
} else {
CONNECTION_LOGF(
ERROR,
connection,
"Illegal to receive %s frame on stream id=%" PRIu32 " after END_STREAM has been received.",
aws_h2_frame_type_to_str(frame_type),
stream_id);

return aws_h2err_from_h2_code(AWS_H2_ERR_STREAM_CLOSED);
}
break;
case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED:
/* An endpoint that receives any frame other than PRIORITY after receiving a RST_STREAM
* MUST treat that as a stream error (Section 5.4.2) of type STREAM_CLOSED */
CONNECTION_LOGF(
ERROR,
connection,
"Illegal to receive %s frame on stream id=%" PRIu32 " after RST_STREAM has been received",
aws_h2_frame_type_to_str(frame_type),
stream_id);
struct aws_h2_frame *rst_stream =
aws_h2_frame_new_rst_stream(connection->base.alloc, stream_id, AWS_H2_ERR_STREAM_CLOSED);
if (!rst_stream) {
CONNECTION_LOGF(
ERROR, connection, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error()));
return aws_h2err_from_last_error();
}
aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream);
return AWS_H2ERR_SUCCESS;
case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT:
/* An endpoint MUST ignore frames that it receives on closed streams after it has sent a RST_STREAM
* frame */
CONNECTION_LOGF(
TRACE,
connection,
"Ignoring %s frame on stream id=%" PRIu32 " because END_STREAM flag was recently sent.",
"Ignoring %s frame on stream id=%" PRIu32 " because RST_STREAM was recently sent.",
aws_h2_frame_type_to_str(frame_type),
stream_id);

return AWS_H2ERR_SUCCESS;
}
break;
default:
CONNECTION_LOGF(
ERROR, connection, "Invalid state fo cached closed stream, stream id=%" PRIu32, stream_id);
return aws_h2err_from_h2_code(AWS_H2_ERR_INTERNAL_ERROR);
break;
}
}
if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
/* ignored if the stream has been removed from the dependency tree */
return AWS_H2ERR_SUCCESS;
}

/* #TODO An endpoint that receives any frame other than PRIORITY after receiving a RST_STREAM
* MUST treat that as a stream error (Section 5.4.2) of type STREAM_CLOSED */

/* Stream was closed long ago, or didn't fit criteria for being ignored */
/* Stream closed (purged from closed_streams, or implicitly closed when its ID was skipped) */
CONNECTION_LOGF(
ERROR,
connection,
"Illegal to receive %s frame on stream id=%" PRIu32 " state=closed",
"Illegal to receive %s frame on stream id=%" PRIu32
", no memory of closed stream (ID skipped, or removed from cache)",
aws_h2_frame_type_to_str(frame_type),
stream_id);

return aws_h2err_from_h2_code(AWS_H2_ERR_STREAM_CLOSED);
return aws_h2err_from_h2_code(AWS_H2_ERR_PROTOCOL_ERROR);
}

/* Decoder callbacks */
Expand Down Expand Up @@ -1587,21 +1624,9 @@ static int s_record_closed_stream(

AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));

/* Peer might have already sent/queued frames for this stream before learning that we closed it.
* But if peer was the one to close the stream via RST_STREAM, they know better than to send more frames. */
bool frames_might_trickle_in = closed_when != AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED;
if (frames_might_trickle_in) {
if (aws_hash_table_put(
&connection->thread_data.closed_streams_where_frames_might_trickle_in,
(void *)(size_t)stream_id,
(void *)(size_t)closed_when,
NULL)) {

CONNECTION_LOG(ERROR, connection, "Failed inserting ID into map of recently closed streams");
return AWS_OP_ERR;
}

/* #TODO remove entries from closed_streams_where_frames_might_trickle_in after some period of time */
if (aws_cache_put(connection->thread_data.closed_streams, (void *)(size_t)stream_id, (void *)(size_t)closed_when)) {
CONNECTION_LOG(ERROR, connection, "Failed inserting ID into cache of recently closed streams");
return AWS_OP_ERR;
}

return AWS_OP_SUCCESS;
Expand Down
5 changes: 3 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ add_test_case(h2_client_stream_err_malformed_header)
add_test_case(h2_client_stream_err_state_forbids_frame)
add_test_case(h2_client_conn_err_stream_frames_received_for_idle_stream)
add_test_case(h2_client_stream_ignores_some_frames_received_soon_after_closing)
#TODO add_test_case(h2_client_conn_err_stream_frames_received_long_after_closing)
add_test_case(h2_client_conn_err_stream_frames_received_after_rst_stream_received)
add_test_case(h2_client_conn_err_stream_frames_received_soon_after_closing)
add_test_case(h2_client_stream_err_stream_frames_received_soon_after_rst_stream_received)
add_test_case(h2_client_conn_err_stream_frames_received_after_removed_from_cache)
add_test_case(h2_client_stream_receive_info_headers)
add_test_case(h2_client_stream_err_receive_info_headers_after_main)
add_test_case(h2_client_stream_receive_trailing_headers)
Expand Down
Loading