Skip to content

Commit

Permalink
add payload to MQTT_EVENT_SUBSCRIBE
Browse files Browse the repository at this point in the history
+ documentation
+ cleanup logging

Closes #200
Merges #203
  • Loading branch information
bertmelis authored and david-cermak committed Oct 4, 2021
1 parent e1d5a94 commit de47f1c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
10 changes: 8 additions & 2 deletions include/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ typedef enum {
MQTT_EVENT_ERROR = 0, /*!< on error event, additional context: connection return code, error handle from esp_tls (if supported) */
MQTT_EVENT_CONNECTED, /*!< connected event, additional context: session_present flag */
MQTT_EVENT_DISCONNECTED, /*!< disconnected event */
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context: msg_id */
MQTT_EVENT_SUBSCRIBED, /*!< subscribed event, additional context:
- msg_id message id
- data pointer to the received data
- data_len length of the data for this event
*/
MQTT_EVENT_UNSUBSCRIBED, /*!< unsubscribed event */
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
MQTT_EVENT_DATA, /*!< data event, additional context:
Expand All @@ -51,6 +55,8 @@ typedef enum {
- current_data_offset offset of the current data for this event
- total_data_len total length of the data received
- retain retain flag of the message
- qos qos level of the message
- dup dup flag of the message
Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is
longer than internal buffer. In that case only first event contains topic
pointer and length, other contain data only with current data length
Expand Down Expand Up @@ -154,7 +160,7 @@ typedef struct {
esp_mqtt_error_codes_t *error_handle; /*!< esp-mqtt error handle including esp-tls errors as well as internal mqtt errors */
bool retain; /*!< Retained flag of the message associated with this event */
int qos; /*!< qos of the messages associated with this event */
int dup; /*!< Dup flag of the message associated with this event */
bool dup; /*!< dup flag of the message associated with this event */
} esp_mqtt_event_t;

typedef esp_mqtt_event_t *esp_mqtt_event_handle_t;
Expand Down
1 change: 1 addition & 0 deletions lib/include/mqtt_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length);
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len);
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length);
char *mqtt_get_publish_data(uint8_t *buffer, size_t *length);
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length);
uint16_t mqtt_get_id(uint8_t *buffer, size_t length);
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length);

Expand Down
12 changes: 12 additions & 0 deletions lib/mqtt_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ char *mqtt_get_publish_data(uint8_t *buffer, size_t *length)
return (char *)(buffer + i);
}

char *mqtt_get_suback_data(uint8_t *buffer, size_t *length)
{
// SUBACK payload length = total length - (fixed header (2 bytes) + variable header (2 bytes))
// This requires the remaining length to be encoded in 1 byte.
if (*length > 4) {
*length -= 4;
return (char *)(buffer + 4);
}
*length = 0;
return NULL;
}

uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
{
if (length < 1) {
Expand Down
32 changes: 28 additions & 4 deletions mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
client->event.retain = mqtt_get_retain(msg_buf);
client->event.msg_id = mqtt_get_id(msg_buf, msg_data_len);
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buff);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;

post_data_event:
Expand Down Expand Up @@ -1016,6 +1016,28 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
return ESP_OK;
}

static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;

msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Failed to acquire suback data");
return ESP_FAIL;
}
// post data event
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
client->event.data = msg_data;
client->event.current_data_offset = 0;
esp_mqtt_dispatch_event_with_msgid(client);

return ESP_OK;
}

static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{
ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count);
Expand Down Expand Up @@ -1228,9 +1250,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
ESP_LOGD(TAG, "deliver_suback, message_length_read=%zu, message_length=%zu", client->mqtt_state.in_buffer_read_len, client->mqtt_state.message_length);
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
}
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
Expand Down

0 comments on commit de47f1c

Please sign in to comment.