Skip to content

Commit

Permalink
Automatic ping and interupt notification fixes. (#58)
Browse files Browse the repository at this point in the history
* Automatic ping and interupt notification fixes.
  • Loading branch information
JonathanHenson committed Mar 20, 2019
1 parent 86aee87 commit 39f285f
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 75 deletions.
56 changes: 34 additions & 22 deletions include/aws/mqtt/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,39 @@ struct aws_mqtt_topic_subscription {
void *on_publish_ud;
};

/**
* host_name The server name to connect to. This resource may be freed immediately on return.
* port The port on the server to connect to
* client_id The clientid to place in the CONNECT packet.
* socket_options The socket options to pass to the aws_client_bootstrap functions.
* This is copied into the connection
* tls_options TLS settings to use when opening a connection.
* This is copied into the connection
* Pass NULL to connect without TLS (NOT RECOMMENDED)
* clean_session True to discard all server session data and start fresh
* keep_alive_time_secs The keep alive value to place in the CONNECT PACKET, a PING will automatically
* be sent at this interval as well. If you specify 0, defaults will be used
* and a ping will be sent once per 60 minutes.
* ping_timeout_ms Network connection is re-established if a ping response is not received within
* this amount of time (milliseconds). If you specify 0, a default value of 3 seconds is used.
* Alternatively, tcp keep-alive may be away to accomplish this in a more efficient
* (low-power) scenario, but keep-alive options may not work the same way on every platform and OS version.
* on_connection_complete The callback to fire when the connection attempt completes
* user_data Passed to the userdata param of on_connection_complete
*/
struct aws_mqtt_connection_options {
const struct aws_byte_cursor host_name;
uint16_t port;
const struct aws_socket_options *socket_options;
const struct aws_tls_connection_options *tls_options;
const struct aws_byte_cursor client_id;
uint16_t keep_alive_time_secs;
uint32_t ping_timeout_ms;
aws_mqtt_client_on_connection_complete_fn *on_connection_complete;
void *user_data;
bool clean_session;
};

AWS_EXTERN_C_BEGIN

/**
Expand Down Expand Up @@ -219,34 +252,13 @@ int aws_mqtt_client_connection_set_connection_interruption_handlers(
* Once the connection is opened, on_connack will be called.
*
* \param[in] connection The connection object
* \param[in] host_name The server name to connect to. This resource may be freed immediately on return.
* \param[in] port The port on the server to connect to
* \param[in] client_id The clientid to place in the CONNECT packet.
* \param[in] socket_options The socket options to pass to the aws_client_bootstrap functions
* This is copied into the connection
* \param[in] tls_options TLS settings to use when opening a connection.
* This is copied into the connection
* Pass NULL to connect without TLS (NOT RECOMMENDED)
* \param[in] clean_session True to discard all server session data and start fresh
* \param[in] keep_alive_time The keep alive value to place in the CONNECT PACKET
* \param[in] on_connection_complete The callback to fire when the connection attempt completes
* \param[in] userdata Passed to the userdata param of on_connection_complete
*
* \returns AWS_OP_SUCCESS if the connection has been successfully initiated,
* otherwise AWS_OP_ERR and aws_last_error() will be set.
*/
AWS_MQTT_API
int aws_mqtt_client_connection_connect(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *host_name,
uint16_t port,
const struct aws_socket_options *socket_options,
const struct aws_tls_connection_options *tls_options,
const struct aws_byte_cursor *client_id,
bool clean_session,
uint16_t keep_alive_time,
aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
void *userdata);
const struct aws_mqtt_connection_options *connection_options);

/**
* Opens the actual connection defined by aws_mqtt_client_connection_new.
Expand Down
6 changes: 3 additions & 3 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ enum aws_mqtt_client_request_state {
AWS_MQTT_CLIENT_REQUEST_ERROR,
};

extern const uint64_t request_timeout_ns;

/* Called after the timeout if a matching ack packet hasn't arrived.
Return AWS_MQTT_CLIENT_REQUEST_ONGOING to check on the task later.
Return AWS_MQTT_CLIENT_REQUEST_COMPLETE to consider request complete.
Expand Down Expand Up @@ -136,6 +134,7 @@ struct aws_mqtt_client_connection {
struct aws_mutex mutex;
} pending_requests;
struct aws_mqtt_reconnect_task *reconnect_task;
struct aws_channel_task ping_task;

uint64_t last_pingresp_timestamp;

Expand All @@ -152,7 +151,8 @@ struct aws_mqtt_client_connection {
/* Connect parameters */
struct aws_byte_buf client_id;
bool clean_session;
uint16_t keep_alive_time;
uint16_t keep_alive_time_secs;
uint64_t request_timeout_ns;
struct aws_string *username;
struct aws_string *password;
struct {
Expand Down
51 changes: 30 additions & 21 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
# pragma warning(disable : 4204)
#endif

/* 3 seconds */
static const uint64_t s_default_request_timeout_ns = 3000000000;

/*******************************************************************************
* Client Init
******************************************************************************/
Expand Down Expand Up @@ -82,6 +85,9 @@ static void s_mqtt_client_init(
aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group);
aws_event_loop_schedule_task_future(
el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt);
} else {
connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED;
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false);
}
return;
}
Expand All @@ -106,7 +112,7 @@ static void s_mqtt_client_init(
&connect,
aws_byte_cursor_from_buf(&connection->client_id),
connection->clean_session,
connection->keep_alive_time);
connection->keep_alive_time_secs);

if (connection->will.topic.buffer) {
/* Add will if present */
Expand Down Expand Up @@ -470,15 +476,7 @@ int aws_mqtt_client_connection_set_connection_interruption_handlers(

int aws_mqtt_client_connection_connect(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *host_name,
uint16_t port,
const struct aws_socket_options *socket_options,
const struct aws_tls_connection_options *tls_options,
const struct aws_byte_cursor *client_id,
bool clean_session,
uint16_t keep_alive_time,
aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
void *userdata) {
const struct aws_mqtt_connection_options *connection_options) {

if (connection->state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED);
Expand All @@ -488,16 +486,24 @@ int aws_mqtt_client_connection_connect(
aws_string_destroy(connection->host_name);
}

connection->host_name = aws_string_new_from_array(connection->allocator, host_name->ptr, host_name->len);
connection->port = port;
connection->socket_options = *socket_options;
connection->host_name = aws_string_new_from_array(
connection->allocator, connection_options->host_name.ptr, connection_options->host_name.len);
connection->port = connection_options->port;
connection->socket_options = *connection_options->socket_options;
connection->state = AWS_MQTT_CLIENT_STATE_CONNECTING;
connection->clean_session = clean_session;
connection->keep_alive_time = keep_alive_time;
connection->clean_session = connection_options->clean_session;
connection->keep_alive_time_secs = connection_options->keep_alive_time_secs;

if (!connection_options->ping_timeout_ms) {
connection->request_timeout_ns = s_default_request_timeout_ns;
} else {
connection->request_timeout_ns = aws_timestamp_convert(
(uint64_t)connection_options->ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
}

/* Cheat and set the tls_options host_name to our copy if they're the same */
if (tls_options) {
connection->tls_options = *tls_options;
if (connection_options->tls_options) {
connection->tls_options = *connection_options->tls_options;
struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(connection->host_name);
aws_tls_connection_options_set_server_name(&connection->tls_options, connection->allocator, &host_name_cur);
} else {
Expand All @@ -520,13 +526,15 @@ int aws_mqtt_client_connection_connect(
aws_task_init(&connection->reconnect_task->task, s_attempt_reconect, connection->reconnect_task);

/* Only set connection->client_id if a new one was provided */
struct aws_byte_buf client_id_buf = aws_byte_buf_from_array(client_id->ptr, client_id->len);
struct aws_byte_buf client_id_buf =
aws_byte_buf_from_array(connection_options->client_id.ptr, connection_options->client_id.len);
if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) {
aws_mem_release(connection->allocator, connection->reconnect_task);
return AWS_OP_ERR;
}

if (aws_mqtt_client_connection_reconnect(connection, on_connection_complete, userdata)) {
if (aws_mqtt_client_connection_reconnect(
connection, connection_options->on_connection_complete, connection_options->user_data)) {
aws_mem_release(connection->allocator, connection->reconnect_task);
aws_byte_buf_clean_up(&connection->client_id);
return AWS_OP_ERR;
Expand Down Expand Up @@ -611,6 +619,7 @@ int aws_mqtt_client_connection_disconnect(
connection->on_disconnect = on_disconnect;
connection->on_disconnect_ud = userdata;

connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTING;
mqtt_disconnect_impl(connection, AWS_OP_SUCCESS);

return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -1269,12 +1278,12 @@ static enum aws_mqtt_client_request_state s_pingreq_send(uint16_t message_id, bo
return AWS_MQTT_CLIENT_REQUEST_ONGOING;
}

/* Check that a pingresp has been recieved since pingreq was sent */
/* Check that a pingresp has been received since pingreq was sent */

uint64_t current_time = 0;
aws_channel_current_clock_time(connection->slot->channel, &current_time);

if (current_time - connection->last_pingresp_timestamp > request_timeout_ns) {
if (current_time - connection->last_pingresp_timestamp > connection->request_timeout_ns) {
/* It's been too long since the last ping, close the connection */

mqtt_disconnect_impl(connection, AWS_ERROR_MQTT_TIMEOUT);
Expand Down
40 changes: 34 additions & 6 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
#include <aws/mqtt/private/packets.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/clock.h>
#include <aws/common/math.h>
#include <aws/common/task_scheduler.h>

#ifdef _MSC_VER
# pragma warning(disable : 4204)
#endif

const uint64_t request_timeout_ns = 3000000000;

/*******************************************************************************
* Packet State Machine
******************************************************************************/

/* one hour */
static const uint16_t s_default_keep_alive_ping_freq_secs = 3600;

typedef int(packet_handler_fn)(struct aws_mqtt_client_connection *connection, struct aws_byte_cursor message_cursor);

static int s_packet_handler_default(
Expand All @@ -41,6 +44,32 @@ static int s_packet_handler_default(
return aws_raise_error(AWS_ERROR_MQTT_INVALID_PACKET_TYPE);
}

static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status);
static void s_schedule_ping(struct aws_mqtt_client_connection *connection) {
aws_channel_task_init(&connection->ping_task, s_on_time_to_ping, connection);

uint64_t schedule_time = 0;
aws_channel_current_clock_time(connection->slot->channel, &schedule_time);
if (connection->keep_alive_time_secs) {
schedule_time +=
aws_timestamp_convert(connection->keep_alive_time_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
} else {
schedule_time +=
aws_timestamp_convert(s_default_keep_alive_ping_freq_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
}

aws_channel_schedule_task_future(connection->slot->channel, &connection->ping_task, schedule_time);
}

static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
(void)channel_task;

if (status == AWS_TASK_STATUS_RUN_READY) {
struct aws_mqtt_client_connection *connection = arg;
aws_mqtt_client_connection_ping(connection);
s_schedule_ping(connection);
}
}
static int s_packet_handler_connack(
struct aws_mqtt_client_connection *connection,
struct aws_byte_cursor message_cursor) {
Expand Down Expand Up @@ -94,6 +123,7 @@ static int s_packet_handler_connack(
mqtt_disconnect_impl(connection, AWS_ERROR_MQTT_PROTOCOL_ERROR);
}

s_schedule_ping(connection);
return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -529,7 +559,7 @@ static void s_request_timeout_task(struct aws_channel_task *task, void *arg, enu

uint64_t ttr = 0;
aws_channel_current_clock_time(request->connection->slot->channel, &ttr);
ttr += request_timeout_ns;
ttr += request->connection->request_timeout_ns;

aws_channel_schedule_task_future(request->connection->slot->channel, task, ttr);
} else {
Expand Down Expand Up @@ -628,10 +658,8 @@ void mqtt_request_complete(struct aws_mqtt_client_connection *connection, int er

void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int error_code) {

connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTING;

/* If there is an outstanding reconnect task, cancel it */
if (connection->reconnect_task) {
if (connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING && connection->reconnect_task) {
aws_atomic_store_ptr(&connection->reconnect_task->connection_ptr, NULL);
connection->reconnect_task = NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion source/packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ int aws_mqtt_packet_ack_decode(struct aws_byte_cursor *cur, struct aws_mqtt_pack
}

/* Validate flags */
if (packet->fixed_header.flags != (aws_mqtt_packet_has_flags(&packet->fixed_header) ? S_BIT_1_FLAGS : 0u)) {
if (packet->fixed_header.flags != (aws_mqtt_packet_has_flags(&packet->fixed_header) ? S_BIT_1_FLAGS : 0U)) {

return aws_raise_error(AWS_ERROR_MQTT_INVALID_RESERVED_BITS);
}
Expand Down
26 changes: 15 additions & 11 deletions tests/aws_iot_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,21 @@ int main(int argc, char **argv) {
aws_mqtt_client_connection_set_will(args.connection, &subscribe_topic_cur, 1, false, &will_cur);

struct aws_byte_cursor client_id_cur = aws_byte_cursor_from_string(s_client_id);
aws_mqtt_client_connection_connect(
args.connection,
&host_name_cur,
8883,
&socket_options,
&tls_con_opt,
&client_id_cur,
true,
0,
s_mqtt_on_connection_complete,
&args);

struct aws_mqtt_connection_options conn_options = {
.host_name = host_name_cur,
.port = 8883,
.socket_options = &socket_options,
.tls_options = &tls_con_opt,
.client_id = client_id_cur,
.keep_alive_time_secs = 0,
.ping_timeout_ms = 0,
.on_connection_complete = s_mqtt_on_connection_complete,
.user_data = &args,
.clean_session = true,
};

aws_mqtt_client_connection_connect(args.connection, &conn_options);

aws_mutex_lock(&mutex);
ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex));
Expand Down
25 changes: 14 additions & 11 deletions tests/paho_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,20 @@ int main(int argc, char **argv) {
aws_mqtt_client_connection_set_connection_interruption_handlers(
args.connection, s_mqtt_on_interrupted, NULL, s_mqtt_on_resumed, NULL);

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(
args.connection,
&host_name_cur,
1883,
&options,
NULL,
&s_client_id,
true,
0,
s_mqtt_on_connection_complete,
&args));
struct aws_mqtt_connection_options conn_options = {
.host_name = host_name_cur,
.port = 8883,
.socket_options = &options,
.tls_options = NULL,
.client_id = s_client_id,
.keep_alive_time_secs = 0,
.ping_timeout_ms = 0,
.on_connection_complete = s_mqtt_on_connection_complete,
.user_data = &args,
.clean_session = true,
};

ASSERT_SUCCESS(aws_mqtt_client_connection_connect(args.connection, &conn_options));

/* Wait for connack */
aws_mutex_lock(&mutex);
Expand Down

0 comments on commit 39f285f

Please sign in to comment.