From 686dcc35a67b258efad71ea6fdb6b3c1ea014c4a Mon Sep 17 00:00:00 2001 From: Colden Cullen Date: Wed, 24 Apr 2019 16:42:53 -0700 Subject: [PATCH] Add logging to MQTT (#68) * Setup logging subjects * Add logging to topic tree * Add logging to client.c * Add logging to client_channel_handler.c * iot test: Load io subjects, log publish packet id --- include/aws/mqtt/mqtt.h | 19 ++- source/client.c | 197 +++++++++++++++++++++++++++++--- source/client_channel_handler.c | 9 ++ source/mqtt.c | 34 ++++-- source/topic_tree.c | 132 +++++++++++++++++++-- tests/aws_iot_client_test.c | 5 +- 6 files changed, 357 insertions(+), 39 deletions(-) diff --git a/include/aws/mqtt/mqtt.h b/include/aws/mqtt/mqtt.h index df3aaac9..7cc30934 100644 --- a/include/aws/mqtt/mqtt.h +++ b/include/aws/mqtt/mqtt.h @@ -58,6 +58,12 @@ enum aws_mqtt_error { AWS_ERROR_END_MQTT_RANGE = 0x1800, }; +enum aws_mqtt_log_subject { + AWS_LS_MQTT_GENERAL = 0x1400, + AWS_LS_MQTT_CLIENT, + AWS_LS_MQTT_TOPIC_TREE, +}; + /** Function called on cleanup of a userdata. */ typedef void(aws_mqtt_userdata_cleanup_fn)(void *userdata); @@ -68,11 +74,18 @@ bool aws_mqtt_is_valid_topic(const struct aws_byte_cursor *topic); AWS_MQTT_API bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter); -/* - * Loads error strings for debugging and logging purposes. +/** + * Initializes internal datastructures used by aws-c-mqtt. + * Must be called before using any functionality in aws-c-mqtt. + */ +AWS_MQTT_API +void aws_mqtt_library_init(struct aws_allocator *allocator); + +/** + * Shuts down the internal datastructures used by aws-c-mqtt. */ AWS_MQTT_API -void aws_mqtt_load_error_strings(void); +void aws_mqtt_library_clean_up(void); AWS_EXTERN_C_END diff --git a/source/client.c b/source/client.c index 153de41c..85dc652c 100644 --- a/source/client.c +++ b/source/client.c @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -27,6 +28,7 @@ #include #include +#include #ifdef _MSC_VER # pragma warning(disable : 4204) @@ -44,6 +46,8 @@ int aws_mqtt_client_init( struct aws_allocator *allocator, struct aws_client_bootstrap *bootstrap) { + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "client=%p: Initalizing MQTT client", (void *)client); + AWS_ZERO_STRUCT(*client); client->allocator = allocator; client->bootstrap = bootstrap; @@ -57,6 +61,8 @@ int aws_mqtt_client_init( void aws_mqtt_client_clean_up(struct aws_mqtt_client *client) { + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "client=%p: Cleaning up MQTT client", (void *)client); + AWS_ZERO_STRUCT(*client); } @@ -71,6 +77,9 @@ static void s_mqtt_client_shutdown( struct aws_mqtt_client_connection *connection = user_data; + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code); + /* Always clear slot, as that's what's been shutdown */ if (connection->slot) { if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTING) { @@ -87,6 +96,8 @@ static void s_mqtt_client_shutdown( /* If reconnect attempt failed, schedule the next attempt */ struct aws_event_loop *el = aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group); + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Reconnect failed, retrying", (void *)connection); + aws_event_loop_schedule_task_future( el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt); @@ -94,6 +105,11 @@ static void s_mqtt_client_shutdown( connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Disconnect completed, clearing request queue and calling callback", + (void *)connection); + /* Successfully shutdown, so clear the outstanding requests */ aws_hash_table_clear(&connection->outstanding_requests.table); @@ -101,6 +117,9 @@ static void s_mqtt_client_shutdown( } else if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTING) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, "id=%p: Initial connection attempt failed, calling callback", (void *)connection); + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false); @@ -113,6 +132,11 @@ static void s_mqtt_client_shutdown( if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED) { + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Connection lost, calling callback and attempting reconnect", + (void *)connection); + connection->state = AWS_MQTT_CLIENT_STATE_RECONNECTING; MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code); } @@ -124,6 +148,12 @@ static void s_mqtt_client_shutdown( /* This will only be true if the user called disconnect from the on_interrupted callback */ if (connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) { + + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Caller requested disconnect from on_interrupted callback, aborting reconnect", + (void *)connection); + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); @@ -159,6 +189,9 @@ static void s_mqtt_client_init( return; } + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, "id=%p: Connection successfully opened, sending CONNECT packet", (void *)connection); + /* Reset the current timeout timer */ connection->reconnect_timeouts.current = connection->reconnect_timeouts.min; @@ -166,6 +199,10 @@ static void s_mqtt_client_init( connection->slot = aws_channel_slot_new(channel); if (!connection->slot) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_CLIENT, + "id=%p: Failed to create new slot, something has gone horribly wrong", + (void *)connection); aws_channel_shutdown(channel, aws_last_error()); return; } @@ -207,14 +244,20 @@ static void s_mqtt_client_init( struct aws_io_message *message = mqtt_get_message_for_packet(connection, &connect.fixed_header); if (!message) { + + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to get message from pool", (void *)connection); goto handle_error; } if (aws_mqtt_packet_connect_encode(&message->message_data, &connect)) { + + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to encode CONNECT packet", (void *)connection); goto handle_error; } if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) { + + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to send encoded CONNECT packet upstream", (void *)connection); goto handle_error; } @@ -242,6 +285,12 @@ static void s_attempt_reconect(struct aws_task *task, void *userdata, enum aws_t connection->reconnect_timeouts.next_attempt += aws_timestamp_convert( connection->reconnect_timeouts.current, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds", + (void *)connection, + connection->reconnect_timeouts.current); + /* Check before multipying to avoid potential overflow */ if (connection->reconnect_timeouts.current > connection->reconnect_timeouts.max / 2) { connection->reconnect_timeouts.current = connection->reconnect_timeouts.max; @@ -296,6 +345,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt return NULL; } + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Creating new connection", (void *)connection); + /* Initialize the client */ AWS_ZERO_STRUCT(*connection); connection->allocator = client->allocator; @@ -308,18 +359,21 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt if (aws_mutex_init(&connection->pending_requests.mutex)) { - goto handle_error; + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to initialize pending_requests mutex", (void *)connection); + goto failed_init_pending_requests_mutex; } if (aws_mqtt_topic_tree_init(&connection->subscriptions, connection->allocator)) { - goto handle_error; + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to initialize subscriptions topic_tree", (void *)connection); + goto failed_init_subscriptions; } if (aws_memory_pool_init( &connection->requests_pool, connection->allocator, 32, sizeof(struct aws_mqtt_outstanding_request))) { - goto handle_error; + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to initialize request pool", (void *)connection); + goto failed_init_request_pool; } if (aws_hash_table_init( @@ -331,7 +385,9 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt NULL, &s_outstanding_request_destroy)) { - goto handle_error; + AWS_LOGF_ERROR( + AWS_LS_MQTT_CLIENT, "id=%p: Failed to initialize outstanding requests table", (void *)connection); + goto failed_init_outstanding_requests_table; } /* Initialize the handler */ @@ -341,19 +397,17 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt return connection; -handle_error: +failed_init_outstanding_requests_table: + aws_memory_pool_clean_up(&connection->requests_pool); +failed_init_request_pool: aws_mqtt_topic_tree_clean_up(&connection->subscriptions); - aws_hash_table_clean_up(&connection->outstanding_requests.table); - - if (connection->requests_pool.data_ptr) { - aws_memory_pool_clean_up(&connection->requests_pool); - } +failed_init_subscriptions: + aws_mutex_clean_up(&connection->outstanding_requests.mutex); - if (connection) { - aws_mem_release(client->allocator, connection); - } +failed_init_pending_requests_mutex: + aws_mem_release(client->allocator, connection); return NULL; } @@ -367,6 +421,8 @@ void aws_mqtt_client_connection_destroy(struct aws_mqtt_client_connection *conne assert(connection); assert(connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTED); + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Destroying connection", (void *)connection); + aws_string_destroy(connection->host_name); /* Clear the credentials */ @@ -413,12 +469,20 @@ int aws_mqtt_client_connection_set_will( bool retain, const struct aws_byte_cursor *payload) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Setting last will with topic \"" PRInSTR "\"", + (void *)connection, + AWS_BYTE_CURSOR_PRI(*topic)); + if (!aws_mqtt_is_valid_topic(topic)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Will topic is invalid", (void *)connection); return aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC); } struct aws_byte_buf topic_buf = aws_byte_buf_from_array(topic->ptr, topic->len); if (aws_byte_buf_init_copy(&connection->will.topic, connection->allocator, &topic_buf)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy will topic", (void *)connection); goto cleanup; } @@ -427,6 +491,7 @@ int aws_mqtt_client_connection_set_will( struct aws_byte_buf payload_buf = aws_byte_buf_from_array(payload->ptr, payload->len); if (aws_byte_buf_init_copy(&connection->will.payload, connection->allocator, &payload_buf)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy will body", (void *)connection); goto cleanup; } @@ -447,14 +512,19 @@ int aws_mqtt_client_connection_set_login( assert(connection); assert(username); + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting username and password", (void *)connection); + connection->username = aws_string_new_from_array(connection->allocator, username->ptr, username->len); if (!connection->username) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy username", (void *)connection); return AWS_OP_ERR; } if (password) { connection->password = aws_string_new_from_array(connection->allocator, password->ptr, password->len); if (!connection->password) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy password", (void *)connection); + aws_string_destroy(connection->username); return AWS_OP_ERR; } } @@ -469,6 +539,13 @@ int aws_mqtt_client_connection_set_reconnect_timeout( assert(connection); + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Setting reconnect timeouts min: %" PRIu64 " max: %" PRIu64, + (void *)connection, + min_timeout, + max_timeout); + connection->reconnect_timeouts.min = min_timeout; connection->reconnect_timeouts.max = max_timeout; @@ -482,6 +559,9 @@ int aws_mqtt_client_connection_set_connection_interruption_handlers( aws_mqtt_client_on_connection_resumed_fn *on_resumed, void *on_resumed_ud) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, "id=%p: Setting connection interrupted and resumed handlers", (void *)connection); + connection->on_interrupted = on_interrupted; connection->on_interrupted_ud = on_interrupted_ud; connection->on_resumed = on_resumed; @@ -498,6 +578,8 @@ int aws_mqtt_client_connection_connect( struct aws_mqtt_client_connection *connection, const struct aws_mqtt_connection_options *connection_options) { + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Opening connection", (void *)connection); + if (connection->state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) { return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED); } @@ -525,6 +607,9 @@ int aws_mqtt_client_connection_connect( /* Cheat and set the tls_options host_name to our copy if they're the same */ if (connection_options->tls_options) { if (aws_tls_connection_options_copy(&connection->tls_options, connection_options->tls_options)) { + + AWS_LOGF_ERROR( + AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy TLS Connection Options into connection", (void *)connection); return AWS_OP_ERR; } @@ -532,6 +617,9 @@ int aws_mqtt_client_connection_connect( struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(connection->host_name); if (aws_tls_connection_options_set_server_name( &connection->tls_options, connection->allocator, &host_name_cur)) { + + AWS_LOGF_ERROR( + AWS_LS_MQTT_CLIENT, "id=%p: Failed to set TLS Connection Options server name", (void *)connection); goto error; } } @@ -549,6 +637,7 @@ int aws_mqtt_client_connection_connect( assert(!connection->reconnect_task); connection->reconnect_task = aws_mem_acquire(connection->allocator, sizeof(struct aws_mqtt_reconnect_task)); if (!connection->reconnect_task) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to allocate reconnect task", (void *)connection); goto error; } aws_atomic_init_ptr(&connection->reconnect_task->connection_ptr, connection); @@ -559,6 +648,7 @@ int aws_mqtt_client_connection_connect( struct aws_byte_buf client_id_buf = aws_byte_buf_from_array(connection_options->client_id.ptr, connection_options->client_id.len); if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy client_id into connection", (void *)connection); goto client_id_alloc_failed; } @@ -623,6 +713,7 @@ int aws_mqtt_client_connection_reconnect( } if (result) { /* Connection attempt failed */ + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to begin connection routine", (void *)connection); return AWS_OP_ERR; } @@ -641,6 +732,8 @@ int aws_mqtt_client_connection_disconnect( if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED || connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING) { + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Closing connection", (void *)connection); + connection->on_disconnect = on_disconnect; connection->on_disconnect_ud = userdata; @@ -650,6 +743,7 @@ int aws_mqtt_client_connection_disconnect( return AWS_OP_SUCCESS; } + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Connection is not open, and may not be closed", (void *)connection); return aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED); } @@ -713,6 +807,13 @@ static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t message_id, bool initing_packet = task_arg->subscribe.fixed_header.packet_type == 0; struct aws_io_message *message = NULL; + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting send of subscribe %" PRIu16 " (%s)", + (void *)task_arg->connection, + message_id, + is_first_attempt ? "first attempt" : "resend"); + if (initing_packet) { /* Init the subscribe packet */ if (aws_mqtt_packet_subscribe_init(&task_arg->subscribe, task_arg->connection->allocator, message_id)) { @@ -798,6 +899,13 @@ static void s_subscribe_complete( struct subscribe_task_arg *task_arg = userdata; + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Subscribe %" PRIu16 " completed with error_code %d", + (void *)connection, + packet_id, + error_code); + if (task_arg->on_suback) { task_arg->on_suback(connection, packet_id, &task_arg->topics, error_code, task_arg->on_suback_ud); } @@ -831,6 +939,8 @@ uint16_t aws_mqtt_client_connection_subscribe_multiple( goto handle_error; } + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting multi-topic subscribe", (void *)connection); + for (size_t i = 0; i < num_topics; ++i) { struct aws_mqtt_topic_subscription *request = NULL; @@ -860,6 +970,12 @@ uint16_t aws_mqtt_client_connection_subscribe_multiple( /* Update request topic cursor to refer to owned string */ task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter); + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Adding topic \"" PRInSTR "\"", + (void *)connection, + AWS_BYTE_CURSOR_PRI(task_topic->request.topic)); + /* Push into the list */ aws_array_list_push_back(&task_arg->topics, &request); } @@ -867,6 +983,8 @@ uint16_t aws_mqtt_client_connection_subscribe_multiple( uint16_t packet_id = mqtt_create_request(task_arg->connection, &s_subscribe_send, task_arg, &s_subscribe_complete, task_arg); + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Sending multi-topic subscribe %" PRIu16, (void *)connection, packet_id); + if (packet_id) { return packet_id; } @@ -907,6 +1025,13 @@ static void s_subscribe_single_complete( struct subscribe_task_arg *task_arg = userdata; + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Subscribe %" PRIu16 " completed with error code %d", + (void *)connection, + packet_id, + error_code); + assert(aws_array_list_length(&task_arg->topics) == 1); if (task_arg->on_suback) { @@ -989,6 +1114,13 @@ uint16_t aws_mqtt_client_connection_subscribe( uint16_t packet_id = mqtt_create_request(task_arg->connection, &s_subscribe_send, task_arg, &s_subscribe_single_complete, task_arg); + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Starting subscribe %" PRIu16 " on topic " PRInSTR, + (void *)connection, + packet_id, + AWS_BYTE_CURSOR_PRI(task_topic->request.topic)); + if (packet_id) { return packet_id; } @@ -1036,6 +1168,13 @@ static enum aws_mqtt_client_request_state s_unsubscribe_send( struct unsubscribe_task_arg *task_arg = userdata; struct aws_io_message *message = NULL; + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting send of unsubscribe %" PRIu16 " %s", + (void *)task_arg->connection, + message_id, + is_first_attempt ? "first attempt" : "resend"); + static const size_t num_topics = 1; AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size); @@ -1101,8 +1240,7 @@ static void s_unsubscribe_complete( struct unsubscribe_task_arg *task_arg = userdata; - if (error_code != AWS_OP_ERR) { - } + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Unsubscribe %" PRIu16 " complete", (void *)connection, packet_id); if (task_arg->on_unsuback) { task_arg->on_unsuback(connection, packet_id, error_code, task_arg->on_unsuback_ud); @@ -1135,7 +1273,12 @@ uint16_t aws_mqtt_client_connection_unsubscribe( task_arg->on_unsuback = on_unsuback; task_arg->on_unsuback_ud = on_unsuback_ud; - return mqtt_create_request(connection, &s_unsubscribe_send, task_arg, s_unsubscribe_complete, task_arg); + uint16_t packet_id = + mqtt_create_request(connection, &s_unsubscribe_send, task_arg, s_unsubscribe_complete, task_arg); + + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting unsubscribe %" PRIu16, (void *)connection, packet_id); + + return packet_id; } /******************************************************************************* @@ -1159,6 +1302,13 @@ struct publish_task_arg { static enum aws_mqtt_client_request_state s_publish_send(uint16_t message_id, bool is_first_attempt, void *userdata) { struct publish_task_arg *task_arg = userdata; + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting send of publish %" PRIu16 " %s", + (void *)task_arg->connection, + message_id, + is_first_attempt ? "first attempt" : "resend"); + bool is_qos_0 = task_arg->qos == AWS_MQTT_QOS_AT_MOST_ONCE; if (is_qos_0) { message_id = 0; @@ -1229,6 +1379,8 @@ static void s_publish_complete( void *userdata) { struct publish_task_arg *task_arg = userdata; + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Publish %" PRIu16 " complete", (void *)connection, packet_id); + if (task_arg->on_complete) { task_arg->on_complete(connection, packet_id, error_code, task_arg->userdata); } @@ -1266,7 +1418,16 @@ uint16_t aws_mqtt_client_connection_publish( arg->on_complete = on_complete; arg->userdata = userdata; - return mqtt_create_request(connection, &s_publish_send, arg, &s_publish_complete, arg); + uint16_t packet_id = mqtt_create_request(connection, &s_publish_send, arg, &s_publish_complete, arg); + + AWS_LOGF_DEBUG( + AWS_LS_MQTT_CLIENT, + "id=%p: Starting publish %" PRIu16 " to topic " PRInSTR, + (void *)connection, + packet_id, + AWS_BYTE_CURSOR_PRI(*topic)); + + return packet_id; } /******************************************************************************* @@ -1319,6 +1480,8 @@ static enum aws_mqtt_client_request_state s_pingreq_send(uint16_t message_id, bo int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection) { + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting ping", (void *)connection); + mqtt_create_request(connection, &s_pingreq_send, connection, NULL, NULL); return AWS_OP_SUCCESS; diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index bee898e2..75fdc777 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -18,6 +18,8 @@ #include #include +#include + #include #include #include @@ -41,6 +43,7 @@ static int s_packet_handler_default( (void)connection; (void)message_cursor; + AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Unhandled packet type received", (void *)connection); return aws_raise_error(AWS_ERROR_MQTT_INVALID_PACKET_TYPE); } @@ -74,6 +77,8 @@ static int s_packet_handler_connack( struct aws_mqtt_client_connection *connection, struct aws_byte_cursor message_cursor) { + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: CONNACK received", (void *)connection); + struct aws_mqtt_packet_connack connack; if (aws_mqtt_packet_connack_decode(&message_cursor, &connack)) { return AWS_OP_ERR; @@ -254,6 +259,8 @@ static int s_packet_handler_pingresp( (void)message_cursor; + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: PINGRESP received", (void *)connection); + /* Store the timestamp this was received */ aws_channel_current_clock_time(connection->slot->channel, &connection->last_pingresp_timestamp); @@ -670,6 +677,8 @@ static void s_mqtt_disconnect_task(struct aws_channel_task *channel_task, void * struct mqtt_shutdown_task *task = AWS_CONTAINER_OF(channel_task, struct mqtt_shutdown_task, task); struct aws_mqtt_client_connection *connection = arg; + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Doing disconnect", (void *)connection); + /* If there is an outstanding reconnect task, cancel it */ if (connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING && connection->reconnect_task) { aws_atomic_store_ptr(&connection->reconnect_task->connection_ptr, NULL); diff --git a/source/mqtt.c b/source/mqtt.c index 27e7fd00..8dba23e9 100644 --- a/source/mqtt.c +++ b/source/mqtt.c @@ -15,6 +15,8 @@ #include +#include + /******************************************************************************* * Topic Validation ******************************************************************************/ @@ -91,15 +93,17 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter) } /******************************************************************************* - * Load Error String + * Library Init ******************************************************************************/ -void aws_mqtt_load_error_strings() { +void aws_mqtt_library_init(struct aws_allocator *allocator) { + + (void)allocator; - static bool s_error_strings_loaded = false; - if (!s_error_strings_loaded) { + static bool s_library_initialized = false; + if (!s_library_initialized) { - s_error_strings_loaded = true; + s_library_initialized = true; #define AWS_DEFINE_ERROR_INFO_MQTT(C, ES) AWS_DEFINE_ERROR_INFO(C, ES, "libaws-c-mqtt") /* clang-format off */ @@ -147,10 +151,26 @@ void aws_mqtt_load_error_strings() { /* clang-format on */ #undef AWS_DEFINE_ERROR_INFO_MQTT - static struct aws_error_info_list s_list = { + static struct aws_error_info_list s_error_list = { .error_list = s_errors, .count = AWS_ARRAY_SIZE(s_errors), }; - aws_register_error_info(&s_list); + aws_register_error_info(&s_error_list); + + /* clang-format off */ + static struct aws_log_subject_info s_logging_subjects[] = { + DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_GENERAL, "mqtt", "Misc MQTT logging"), + DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_CLIENT, "mqtt-client", "MQTT client and connections"), + DEFINE_LOG_SUBJECT_INFO(AWS_LS_MQTT_TOPIC_TREE, "mqtt-topic-tree", "MQTT subscription tree"), + }; + /* clang-format on */ + + static struct aws_log_subject_info_list s_logging_subjects_list = { + .subject_list = s_logging_subjects, + .count = AWS_ARRAY_SIZE(s_logging_subjects), + }; + aws_register_log_subject_info_list(&s_logging_subjects_list); } } + +void aws_mqtt_library_clean_up(void) {} diff --git a/source/topic_tree.c b/source/topic_tree.c index 7abbbad9..346694f4 100644 --- a/source/topic_tree.c +++ b/source/topic_tree.c @@ -15,6 +15,8 @@ #include +#include + #include #include @@ -62,35 +64,36 @@ static struct topic_tree_action *s_topic_tree_action_create(struct aws_array_lis struct topic_tree_action *action = NULL; - bool was_pushed = false; - /* Push an empty action into the transaction and get a pointer to it. */ struct topic_tree_action empty_action; AWS_ZERO_STRUCT(empty_action); if (aws_array_list_push_back(transaction, &empty_action)) { - goto handle_error; + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to insert action into transaction, array_list_push_back failed"); + goto push_back_failed; } - was_pushed = true; if (aws_array_list_get_at_ptr(transaction, (void **)&action, aws_array_list_length(transaction) - 1)) { - goto handle_error; + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to retrieve most recent action from transaction"); + goto get_at_failed; } - return action; + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Created action", (void *)action); -handle_error: + return action; - if (was_pushed) { - aws_array_list_pop_back(transaction); - } +get_at_failed: + aws_array_list_pop_back(transaction); +push_back_failed: return NULL; } static void s_topic_tree_action_destroy(struct topic_tree_action *action) { + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Destroying action", (void *)action); + if (action->mode == AWS_MQTT_TOPIC_TREE_REMOVE) { aws_array_list_clean_up(&action->to_remove); } @@ -105,6 +108,9 @@ static int s_topic_tree_action_to_remove( if (action->mode != AWS_MQTT_TOPIC_TREE_REMOVE) { if (aws_array_list_init_dynamic(&action->to_remove, allocator, size_hint, sizeof(void *))) { + + AWS_LOGF_ERROR( + AWS_LS_MQTT_TOPIC_TREE, "action=%p: Failed to initialize to_remove list in action", (void *)action); return AWS_OP_ERR; } action->mode = AWS_MQTT_TOPIC_TREE_REMOVE; @@ -131,11 +137,20 @@ static struct aws_mqtt_topic_node *s_topic_node_new( struct aws_mqtt_topic_node *node = aws_mem_acquire(allocator, sizeof(struct aws_mqtt_topic_node)); if (!node) { + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to allocate new topic node"); return NULL; } AWS_ZERO_STRUCT(*node); assert(!topic_filter || full_topic); + if (topic_filter) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "node=%p: Creating new node with topic filter " PRInSTR, + (void *)node, + AWS_BYTE_CURSOR_PRI(*topic_filter)); + } + if (topic_filter) { node->topic = *topic_filter; node->topic_filter = full_topic; @@ -144,6 +159,8 @@ static struct aws_mqtt_topic_node *s_topic_node_new( /* Init the sub topics map */ if (aws_hash_table_init(&node->subtopics, allocator, 0, aws_hash_byte_cursor_ptr, byte_cursor_eq, NULL, NULL)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_TOPIC_TREE, "node=%p: Failed to initialize subtopics table in topic node", (void *)node); aws_mem_release(allocator, node); return NULL; } @@ -155,6 +172,8 @@ static int s_topic_node_destroy_hash_foreach_wrap(void *context, struct aws_hash static void s_topic_node_destroy(struct aws_mqtt_topic_node *node, struct aws_allocator *allocator) { + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Destroying topic tree node", (void *)node); + /* Traverse all children and remove */ aws_hash_table_foreach(&node->subtopics, s_topic_node_destroy_hash_foreach_wrap, allocator); @@ -182,6 +201,8 @@ int aws_mqtt_topic_tree_init(struct aws_mqtt_topic_tree *tree, struct aws_alloca assert(tree); assert(allocator); + AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Creating new topic tree", (void *)tree); + tree->root = s_topic_node_new(allocator, NULL, NULL); if (!tree->root) { return AWS_OP_ERR; @@ -199,6 +220,8 @@ void aws_mqtt_topic_tree_clean_up(struct aws_mqtt_topic_tree *tree) { assert(tree); + AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Cleaning up topic tree", (void *)tree); + if (tree->allocator && tree->root) { s_topic_node_destroy(tree->root, tree->allocator); @@ -234,9 +257,12 @@ static int s_topic_node_string_finder(void *userdata, struct aws_hash_element *e return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes); + return 0; } + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes); *topic_filter = node->topic_filter; return 0; } @@ -250,6 +276,13 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct case AWS_MQTT_TOPIC_TREE_ADD: case AWS_MQTT_TOPIC_TREE_UPDATE: { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p action=%p: Committing %s topic tree action", + (void *)tree, + (void *)action, + (action->mode == AWS_MQTT_TOPIC_TREE_ADD) ? "add" : "update"); + /* Destroy old userdata */ if (action->node_to_update->cleanup && action->node_to_update->userdata) { /* If there was userdata assigned to this node, pass it out. */ @@ -273,6 +306,12 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct case AWS_MQTT_TOPIC_TREE_REMOVE: { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p action=%p: Committing remove topic tree action", + (void *)tree, + (void *)action); + struct aws_mqtt_topic_node *current = action->node_to_update; const size_t sub_parts_len = aws_array_list_length(&action->to_remove) - 1; @@ -282,6 +321,8 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct /* "unsubscribe" current. */ if (current->cleanup && current->userdata) { + AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Cleaning up node's userdata", (void *)current); + /* If there was userdata assigned to this node, pass it out. */ current->cleanup(current->userdata); } @@ -308,6 +349,14 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct aws_array_list_get_at(&action->to_remove, &grandma, i - 1); assert(grandma); /* Must be in bounds */ + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p node=%p: Removing child node %p with topic \"" PRInSTR "\"", + (void *)tree, + (void *)grandma, + (void *)node, + AWS_BYTE_CURSOR_PRI(node->topic)); + aws_hash_table_remove(&grandma->subtopics, &node->topic, NULL, NULL); /* Make sure the following loop doesn't hit this node. */ @@ -322,6 +371,14 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct } } else { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Node %p with topic \"" PRInSTR + "\" has children or is a subscription, leaving in place", + (void *)tree, + (void *)node, + AWS_BYTE_CURSOR_PRI(node->topic)); + /* Once we've found one node with children, the rest are guaranteed to. */ break; } @@ -354,6 +411,12 @@ static void s_topic_tree_action_commit(struct topic_tree_action *action, struct /* Uh oh, Mom's using my topic string again! Steal it and replace it with a new one, Indiana * Jones style. */ + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Found node %p reusing topic filter part, replacing with next child", + (void *)tree, + (void *)parent); + if (!new_topic_filter) { /* Set new_tf to old_tf so it's easier to check against the existing node. * Basically, it's an INOUT param. */ @@ -403,6 +466,12 @@ static void s_topic_tree_action_roll_back(struct topic_tree_action *action, stru switch (action->mode) { case AWS_MQTT_TOPIC_TREE_ADD: { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p action=%p: Rolling back add transaction action", + (void *)tree, + (void *)action); + /* Remove the first new node from it's parent's map */ aws_hash_table_remove(&action->last_found->subtopics, &action->first_created->topic, NULL, NULL); /* Recursively destroy all other created nodes */ @@ -416,6 +485,12 @@ static void s_topic_tree_action_roll_back(struct topic_tree_action *action, stru } case AWS_MQTT_TOPIC_TREE_REMOVE: case AWS_MQTT_TOPIC_TREE_UPDATE: { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p action=%p: Rolling back remove/update transaction, no changes made", + (void *)tree, + (void *)action); + /* Aborting a remove or update doesn't require any actions. */ break; } @@ -442,6 +517,12 @@ int aws_mqtt_topic_tree_transaction_insert( assert(topic_filter); assert(callback); + AWS_LOGF_DEBUG( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Inserting topic filter %s into topic tree", + (void *)tree, + topic_filter->bytes); + struct aws_mqtt_topic_node *current = tree->root; struct topic_tree_action *action = s_topic_tree_action_create(transaction); @@ -488,6 +569,12 @@ int aws_mqtt_topic_tree_transaction_insert( elem->value = current; if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Topic part \"" PRInSTR "\" is new, it and all children will be added as new nodes", + (void *)tree, + AWS_BYTE_CURSOR_PRI(sub_part)); + /* Store the node we just made, and make sure we don't store again */ action->mode = AWS_MQTT_TOPIC_TREE_ADD; action->first_created = current; @@ -504,6 +591,13 @@ int aws_mqtt_topic_tree_transaction_insert( /* Node found (or created), add the topic filter and callbacks */ if (current->owns_topic_filter) { + + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p node=%p: Updating existing node that alrady owns its topic_filter, throwing out parameter", + (void *)tree, + (void *)current); + /* If the topic filter was already here, this is already a subscription. Free the new topic_filter so all existing byte_cursors remain valid. */ aws_string_destroy((void *)topic_filter); @@ -529,6 +623,12 @@ int aws_mqtt_topic_tree_transaction_remove( assert(transaction); assert(topic_filter); + AWS_LOGF_DEBUG( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Removing topic filter \"" PRInSTR "\" from topic tree", + (void *)tree, + AWS_BYTE_CURSOR_PRI(*topic_filter)); + struct topic_tree_action *action = s_topic_tree_action_create(transaction); if (!action) { return AWS_OP_ERR; @@ -538,20 +638,24 @@ int aws_mqtt_topic_tree_transaction_remove( AWS_ZERO_STRUCT(sub_topic_parts); if (aws_array_list_init_dynamic(&sub_topic_parts, tree->allocator, 1, sizeof(struct aws_byte_cursor))) { + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to initialize topic parts array", (void *)tree); goto handle_error; } if (aws_byte_cursor_split_on_char(topic_filter, '/', &sub_topic_parts)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to split topic filter", (void *)tree); goto handle_error; } const size_t sub_parts_len = aws_array_list_length(&sub_topic_parts); if (!sub_parts_len) { + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to get topic parts length", (void *)tree); goto handle_error; } s_topic_tree_action_to_remove(action, tree->allocator, sub_parts_len); struct aws_mqtt_topic_node *current = tree->root; if (aws_array_list_push_back(&action->to_remove, ¤t)) { + AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert root node into to_remove list", (void *)tree); goto handle_error; } @@ -568,6 +672,8 @@ int aws_mqtt_topic_tree_transaction_remove( /* If the node exists, just traverse it */ current = elem->value; if (aws_array_list_push_back(&action->to_remove, ¤t)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert topic node into to_remove list", (void *)tree); goto handle_error; } } else { @@ -719,6 +825,12 @@ int aws_mqtt_topic_tree_publish(const struct aws_mqtt_topic_tree *tree, struct a assert(tree); assert(pub); + AWS_LOGF_TRACE( + AWS_LS_MQTT_TOPIC_TREE, + "tree=%p: Publishing on topic " PRInSTR, + (void *)tree, + AWS_BYTE_CURSOR_PRI(pub->topic_name)); + struct aws_byte_cursor sub_part; AWS_ZERO_STRUCT(sub_part); s_topic_tree_publish_do_recurse(&sub_part, tree->root, pub); diff --git a/tests/aws_iot_client_test.c b/tests/aws_iot_client_test.c index 7bc2177f..3500b94f 100644 --- a/tests/aws_iot_client_test.c +++ b/tests/aws_iot_client_test.c @@ -201,10 +201,11 @@ int main(int argc, char **argv) { aws_tls_init_static_state(args.allocator); aws_load_error_strings(); aws_io_load_error_strings(); - aws_mqtt_load_error_strings(); + aws_io_load_log_subject_strings(); + aws_mqtt_library_init(args.allocator); struct aws_logger_standard_options logger_options = { - .level = AWS_LL_DEBUG, + .level = AWS_LL_TRACE, .file = stdout, };