Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/aws/http/connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ typedef void(aws_http_connection_manager_on_connection_setup_fn)(

typedef void(aws_http_connection_manager_shutdown_complete_fn)(void *user_data);

/**
* Metrics for logging and debugging purpose.
*/
struct aws_http_manager_metrics {
/**
* The number of additional concurrent requests that can be supported by the HTTP manager without needing to
* establish additional connections to the target server.
*
* For connection manager, it equals to connections that's idle.
* For stream manager, it equals to the number of streams that are possible to be made without creating new
* connection, although the implementation can create new connection without fully filling it.
*/
size_t available_concurrency;
/* The number of requests that are awaiting concurrency to be made available from the HTTP manager. */
size_t pending_concurrency_acquires;
};

/*
* Connection manager configuration struct.
*
Expand Down Expand Up @@ -143,6 +160,14 @@ int aws_http_connection_manager_release_connection(
struct aws_http_connection_manager *manager,
struct aws_http_connection *connection);

/**
* Fetch the current manager metrics from connection manager.
*/
AWS_HTTP_API
void aws_http_connection_manager_fetch_metrics(
const struct aws_http_connection_manager *manager,
struct aws_http_manager_metrics *out_metrics);

AWS_EXTERN_C_END

#endif /* AWS_HTTP_CONNECTION_MANAGER_H */
12 changes: 12 additions & 0 deletions include/aws/http/http2_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct proxy_env_var_settings;
struct aws_http2_setting;
struct aws_http_make_request_options;
struct aws_http_stream;
struct aws_http_manager_metrics;

/**
* Always invoked asynchronously when the stream was created, successfully or not.
Expand Down Expand Up @@ -168,5 +169,16 @@ void aws_http2_stream_manager_acquire_stream(
struct aws_http2_stream_manager *http2_stream_manager,
const struct aws_http2_stream_manager_acquire_stream_options *acquire_stream_option);

/**
* Fetch the current metrics from stream manager.
*
* @param http2_stream_manager
* @param out_metrics The metrics to be fetched
*/
AWS_HTTP_API
void aws_http2_stream_manager_fetch_metrics(
const struct aws_http2_stream_manager *http2_stream_manager,
struct aws_http_manager_metrics *out_metrics);

AWS_EXTERN_C_END
#endif /* AWS_HTTP2_STREAM_MANAGER_H */
13 changes: 10 additions & 3 deletions include/aws/http/private/random_access_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ int aws_random_access_set_remove(struct aws_random_access_set *set, const void *
* Get the pointer to a random element from the data structure. Fails when the data structure is empty.
*/
AWS_HTTP_API
int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void **out);
int aws_random_access_set_random_get_ptr(const struct aws_random_access_set *set, void **out);

AWS_HTTP_API
size_t aws_random_access_set_get_size(struct aws_random_access_set *set);
size_t aws_random_access_set_get_size(const struct aws_random_access_set *set);

/**
* Check the element exist in the data structure or not.
*/
AWS_HTTP_API
int aws_random_access_set_exist(struct aws_random_access_set *set, const void *element, bool *exist);
int aws_random_access_set_exist(const struct aws_random_access_set *set, const void *element, bool *exist);

/**
* Get the pointer to an element that currently stored at that index. It may change if operations like remove and add
* happens. Helpful for debugging and iterating through the whole set.
*/
AWS_HTTP_API
int aws_random_access_set_random_get_ptr_index(const struct aws_random_access_set *set, void **out, size_t index);

AWS_EXTERN_C_END
#endif /* AWS_HTTP_RANDOM_ACCESS_SET_H */
12 changes: 12 additions & 0 deletions source/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1538,3 +1538,15 @@ static void s_cull_task(struct aws_task *task, void *arg, enum aws_task_status s

s_schedule_connection_culling(manager);
}

void aws_http_connection_manager_fetch_metrics(
const struct aws_http_connection_manager *manager,
struct aws_http_manager_metrics *out_metrics) {
AWS_PRECONDITION(manager);
AWS_PRECONDITION(out_metrics);

AWS_FATAL_ASSERT(aws_mutex_lock((struct aws_mutex *)(void *)&manager->lock) == AWS_OP_SUCCESS);
out_metrics->available_concurrency = manager->idle_connection_count;
out_metrics->pending_concurrency_acquires = manager->pending_acquisition_count;
AWS_FATAL_ASSERT(aws_mutex_unlock((struct aws_mutex *)(void *)&manager->lock) == AWS_OP_SUCCESS);
}
35 changes: 33 additions & 2 deletions source/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static struct aws_h2_sm_connection *s_get_best_sm_connection_from_set(struct aws

/* helper function for building the transaction: Try to assign connection for a pending stream acquisition */
/* *_synced should only be called with LOCK HELD or from another synced function */
static void s_sm_try_assign_connection_to_pending_stream_acquisition(
static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
struct aws_http2_stream_manager *stream_manager,
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {

Expand Down Expand Up @@ -344,7 +344,7 @@ static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2
aws_linked_list_pop_front(&stream_manager->synced_data.pending_stream_acquisitions);
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
AWS_CONTAINER_OF(node, struct aws_h2_sm_pending_stream_acquisition, node);
s_sm_try_assign_connection_to_pending_stream_acquisition(stream_manager, pending_stream_acquisition);
s_sm_try_assign_connection_to_pending_stream_acquisition_synced(stream_manager, pending_stream_acquisition);
if (pending_stream_acquisition->sm_connection == NULL) {
/* Cannot find any connection, push it back to the front and break the loop */
aws_linked_list_push_front(&stream_manager->synced_data.pending_stream_acquisitions, node);
Expand Down Expand Up @@ -994,3 +994,34 @@ void aws_http2_stream_manager_acquire_stream(
} /* END CRITICAL SECTION */
s_aws_http2_stream_manager_execute_transaction(&work);
}

static size_t s_get_available_streams_num_from_connection_set(const struct aws_random_access_set *set) {
size_t all_available_streams_num = 0;
size_t ideal_connection_num = aws_random_access_set_get_size(set);
for (size_t i = 0; i < ideal_connection_num; i++) {
struct aws_h2_sm_connection *sm_connection = NULL;
AWS_FATAL_ASSERT(aws_random_access_set_random_get_ptr_index(set, (void **)&sm_connection, i) == AWS_OP_SUCCESS);
uint32_t available_streams = sm_connection->max_concurrent_streams - sm_connection->num_streams_assigned;
all_available_streams_num += (size_t)available_streams;
}
return all_available_streams_num;
}

void aws_http2_stream_manager_fetch_metrics(
const struct aws_http2_stream_manager *stream_manager,
struct aws_http_manager_metrics *out_metrics) {
AWS_PRECONDITION(stream_manager);
AWS_PRECONDITION(out_metrics);
{ /* BEGIN CRITICAL SECTION */
s_lock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
size_t all_available_streams_num = 0;
all_available_streams_num +=
s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.ideal_available_set);
all_available_streams_num +=
s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.nonideal_available_set);
out_metrics->pending_concurrency_acquires =
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION];
out_metrics->available_concurrency = all_available_streams_num;
s_unlock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
} /* END CRITICAL SECTION */
}
12 changes: 9 additions & 3 deletions source/random_access_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ int aws_random_access_set_remove(struct aws_random_access_set *set, const void *
return AWS_OP_SUCCESS;
}

int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void **out) {
int aws_random_access_set_random_get_ptr(const struct aws_random_access_set *set, void **out) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(out != NULL);
size_t length = aws_array_list_length(&set->impl->list);
Expand All @@ -166,11 +166,11 @@ int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void
return aws_array_list_get_at(&set->impl->list, (void *)out, index);
}

size_t aws_random_access_set_get_size(struct aws_random_access_set *set) {
size_t aws_random_access_set_get_size(const struct aws_random_access_set *set) {
return aws_array_list_length(&set->impl->list);
}

int aws_random_access_set_exist(struct aws_random_access_set *set, const void *element, bool *exist) {
int aws_random_access_set_exist(const struct aws_random_access_set *set, const void *element, bool *exist) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(element);
AWS_PRECONDITION(exist);
Expand All @@ -179,3 +179,9 @@ int aws_random_access_set_exist(struct aws_random_access_set *set, const void *e
*exist = find != NULL;
return re;
}

int aws_random_access_set_random_get_ptr_index(const struct aws_random_access_set *set, void **out, size_t index) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(out != NULL);
return aws_array_list_get_at(&set->impl->list, (void *)out, index);
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ add_net_test_case(h2_sm_mock_multiple_connections)
add_net_test_case(h2_sm_mock_bad_connection_acquired)
add_net_test_case(h2_sm_mock_connections_closed_before_request_made)
add_net_test_case(h2_sm_mock_max_concurrent_streams_remote)
add_net_test_case(h2_sm_mock_fetch_metric)
add_net_test_case(h2_sm_mock_complete_stream)
add_net_test_case(h2_sm_mock_ideal_num_streams)
add_net_test_case(h2_sm_mock_large_ideal_num_streams)
Expand Down
50 changes: 50 additions & 0 deletions tests/test_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,56 @@ TEST_CASE(h2_sm_mock_max_concurrent_streams_remote) {
return s_tester_clean_up();
}

/* Test that the remote max concurrent streams setting hit */
TEST_CASE(h2_sm_mock_fetch_metric) {
(void)ctx;
struct sm_tester_options options = {
.max_connections = 5,
.alloc = allocator,
};
ASSERT_SUCCESS(s_tester_init(&options));
s_override_cm_connect_function(s_aws_http_connection_manager_create_connection_sync_mock);
/* Set the remote max to be 2 */
s_tester.max_con_stream_remote = 2;
/* Acquire a stream to trigger */
ASSERT_SUCCESS(s_sm_stream_acquiring(1));
/* waiting for one fake connection made */
ASSERT_SUCCESS(s_wait_on_fake_connection_count(1));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(1));
struct aws_http_manager_metrics out_metrics;
AWS_ZERO_STRUCT(out_metrics);

aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
/* Acquired 1 stream, and we hold one connection, the max streams per connection is 2. */
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 1);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 0);

ASSERT_SUCCESS(s_sm_stream_acquiring(1));

ASSERT_SUCCESS(s_wait_on_fake_connection_count(1));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(2));
aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 0);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 0);

ASSERT_SUCCESS(s_sm_stream_acquiring(10));
ASSERT_SUCCESS(s_wait_on_fake_connection_count(5));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(10));
aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 0);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 2);

ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
/* Still have two more streams that have not been completed */
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());

return s_tester_clean_up();
}

/* Test that the stream completed will free the connection for more streams */
TEST_CASE(h2_sm_mock_complete_stream) {
(void)ctx;
Expand Down