Skip to content

Commit

Permalink
Add on_closed callback to MQTT311 (#258)
Browse files Browse the repository at this point in the history
Add on_closed callback that is called whenever a mqtt311 connection is fully closed
  • Loading branch information
TwistedTwigleg committed Feb 3, 2023
1 parent b66bb1a commit 13cae5a
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 0 deletions.
31 changes: 31 additions & 0 deletions include/aws/mqtt/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ struct aws_http_proxy_options;
struct aws_socket_options;
struct aws_tls_connection_options;

/**
* Empty struct that is passed when on_connection_closed is called.
* Currently holds nothing but will allow expanding in the future should it be needed.
*/
struct on_connection_closed_data;

struct aws_mqtt_client {
struct aws_allocator *allocator;
struct aws_client_bootstrap *bootstrap;
Expand Down Expand Up @@ -63,6 +69,16 @@ typedef void(aws_mqtt_client_on_connection_interrupted_fn)(
int error_code,
void *userdata);

/**
* Called if the connection to the server is closed by user request
* Note: Currently the "data" argument is always NULL, but this may change in the future if additional data is needed to
* be sent.
*/
typedef void(aws_mqtt_client_on_connection_closed_fn)(
struct aws_mqtt_client_connection *connection,
struct on_connection_closed_data *data,
void *userdata);

/**
* Called when a connection to the server is resumed
* (if clean_session is true, calling aws_mqtt_resubscribe_existing_topics is suggested)
Expand Down Expand Up @@ -403,6 +419,21 @@ int aws_mqtt_client_connection_set_connection_interruption_handlers(
aws_mqtt_client_on_connection_resumed_fn *on_resumed,
void *on_resumed_ud);

/**
* Sets the callback to call when the connection is closed normally by user request.
* This is different than the connection interrupted or lost, this only covers successful
* closure.
*
* \param[in] connection The connection object
* \param[in] on_closed The function to call when a connection is closed
* \param[in] on_closed_ud Userdata for on_closed
*/
AWS_MQTT_API
int aws_mqtt_client_connection_set_connection_closed_handler(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_connection_closed_fn *on_closed,
void *on_closed_ud);

/**
* Sets the callback to call whenever ANY publish packet is received. Only safe to set when connection is not connected.
*
Expand Down
2 changes: 2 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ struct aws_mqtt_client_connection {
void *on_interrupted_ud;
aws_mqtt_client_on_connection_resumed_fn *on_resumed;
void *on_resumed_ud;
aws_mqtt_client_on_connection_closed_fn *on_closed;
void *on_closed_ud;
aws_mqtt_client_publish_received_fn *on_any_publish;
void *on_any_publish_ud;
aws_mqtt_client_on_disconnect_fn *on_disconnect;
Expand Down
19 changes: 19 additions & 0 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,15 @@ static void s_mqtt_client_shutdown(
"id=%p: Caller requested disconnect from on_interrupted callback, aborting reconnect",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
break;
case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Disconnect completed, clearing request queue and calling callback",
(void *)connection);
MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
break;
case AWS_MQTT_CLIENT_STATE_CONNECTING:
AWS_LOGF_TRACE(
Expand Down Expand Up @@ -1086,6 +1088,23 @@ int aws_mqtt_client_connection_set_connection_interruption_handlers(
return AWS_OP_SUCCESS;
}

int aws_mqtt_client_connection_set_connection_closed_handler(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_connection_closed_fn *on_closed,
void *on_closed_ud) {

AWS_PRECONDITION(connection);
if (s_check_connection_state_for_configuration(connection)) {
return aws_raise_error(AWS_ERROR_INVALID_STATE);
}
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting connection closed handler", (void *)connection);

connection->on_closed = on_closed;
connection->on_closed_ud = on_closed_ud;

return AWS_OP_SUCCESS;
}

int aws_mqtt_client_connection_set_on_any_publish_handler(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_publish_received_fn *on_any_publish,
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ add_test_case(mqtt_clean_session_keep_next_session)
add_test_case(mqtt_connection_publish_QoS1_timeout)
add_test_case(mqtt_connection_unsub_timeout)
add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time)
add_test_case(mqtt_connection_close_callback_simple)
add_test_case(mqtt_connection_close_callback_interrupted)
add_test_case(mqtt_connection_close_callback_multi)
# Operation statistics tests
add_test_case(mqtt_operation_statistics_simple_publish)
add_test_case(mqtt_operation_statistics_offline_publish)
Expand Down
146 changes: 146 additions & 0 deletions tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct mqtt_connection_state_test {
struct aws_array_list qos_returned; /* list of uint_8 */
size_t ops_completed;
size_t expected_ops_completed;
size_t connection_close_calls; /* All of the times on_connection_closed has been called */
};

static struct mqtt_connection_state_test test_data = {0};
Expand Down Expand Up @@ -2759,3 +2760,148 @@ AWS_TEST_CASE_FIXTURE(
s_test_mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/* Function called for testing the on_connection_closed callback */
static void s_on_connection_closed_fn(
struct aws_mqtt_client_connection *connection,
struct on_connection_closed_data *data,
void *userdata) {
(void)connection;
(void)data;

struct mqtt_connection_state_test *state_test_data = (struct mqtt_connection_state_test *)userdata;

aws_mutex_lock(&state_test_data->lock);
state_test_data->connection_close_calls += 1;
aws_mutex_unlock(&state_test_data->lock);
}

/**
* Test that the connection close callback is fired only once and when the connection was closed
*/
static int s_test_mqtt_connection_close_callback_simple_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};
aws_mqtt_client_connection_set_connection_closed_handler(
state_test_data->mqtt_connection, s_on_connection_closed_fn, state_test_data);

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

/* sleep for 2 sec, just to make sure the connection is stable */
aws_thread_current_sleep((uint64_t)ONE_SEC * 2);

/* Disconnect */
ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

/* Make sure the callback was called and the value is what we expect */
ASSERT_UINT_EQUALS(1, state_test_data->connection_close_calls);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connection_close_callback_simple,
s_setup_mqtt_server_fn,
s_test_mqtt_connection_close_callback_simple_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Test that the connection close callback is NOT fired during an interrupt
*/
static int s_test_mqtt_connection_close_callback_interrupted_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};
aws_mqtt_client_connection_set_connection_closed_handler(
state_test_data->mqtt_connection, s_on_connection_closed_fn, state_test_data);

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

/* Kill the connection */
aws_channel_shutdown(state_test_data->server_channel, AWS_ERROR_INVALID_STATE);
s_wait_for_reconnect_to_complete(state_test_data);

/* sleep for 2 sec, just to make sure the connection is stable */
aws_thread_current_sleep((uint64_t)ONE_SEC * 2);

/* Disconnect */
ASSERT_SUCCESS(
aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);

/* Make sure the callback was called only ONCE and the value is what we expect */
ASSERT_UINT_EQUALS(1, state_test_data->connection_close_calls);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connection_close_callback_interrupted,
s_setup_mqtt_server_fn,
s_test_mqtt_connection_close_callback_interrupted_fn,
s_clean_up_mqtt_server_fn,
&test_data)

/**
* Test that the connection close callback is called every time a disconnect happens, if it happens multiple times
*/
static int s_test_mqtt_connection_close_callback_multi_fn(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
struct mqtt_connection_state_test *state_test_data = ctx;

struct aws_mqtt_connection_options connection_options = {
.user_data = state_test_data,
.clean_session = false,
.client_id = aws_byte_cursor_from_c_str("client1234"),
.host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address),
.socket_options = &state_test_data->socket_options,
.on_connection_complete = s_on_connection_complete_fn,
};
aws_mqtt_client_connection_set_connection_closed_handler(
state_test_data->mqtt_connection, s_on_connection_closed_fn, state_test_data);

int disconnect_amount = 10;
for (int i = 0; i < disconnect_amount; i++) {
ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options));
s_wait_for_connection_to_complete(state_test_data);

/* Disconnect */
ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect(
state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data));
s_wait_for_disconnect_to_complete(state_test_data);
}

/* Make sure the callback was called disconnect_amount times */
ASSERT_UINT_EQUALS(disconnect_amount, state_test_data->connection_close_calls);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE_FIXTURE(
mqtt_connection_close_callback_multi,
s_setup_mqtt_server_fn,
s_test_mqtt_connection_close_callback_multi_fn,
s_clean_up_mqtt_server_fn,
&test_data)

0 comments on commit 13cae5a

Please sign in to comment.