Skip to content

Commit

Permalink
mqtt5: Fix flow control will regard the DUP packet and not consider P…
Browse files Browse the repository at this point in the history
…UBCOMP packet

Closes #243
  • Loading branch information
ESP-YJM committed Jan 18, 2023
1 parent c96f6f8 commit ed76036
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
3 changes: 2 additions & 1 deletion lib/include/mqtt5_client_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ typedef struct {
mqtt5_topic_alias_handle_t peer_topic_alias;
} mqtt5_config_storage_t;

void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client);
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client);
void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client);
Expand Down
27 changes: 15 additions & 12 deletions mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a
static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle);
static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old);

void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client)
void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data);
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
if (msg_qos > 0) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}
bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data);
int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data);
if ((msg_dup == false) && (msg_qos > 0)) {
client->send_publish_packet_count ++;
ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count);
}
}

void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client)
{
if (client->send_publish_packet_count > 0) {
client->send_publish_packet_count --;
ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count);
}
}

Expand All @@ -51,7 +55,6 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client)
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.current_data_offset = 0;
client->send_publish_packet_count --;
}
}

Expand Down Expand Up @@ -291,7 +294,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q
}

/* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/
if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) {
if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) {
ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum);
return ESP_FAIL;
}
Expand Down
15 changes: 14 additions & 1 deletion mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
client->send_publish_packet_count = 0;
return ESP_OK;
}
#endif
Expand Down Expand Up @@ -943,7 +944,9 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_flow_control(client);
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}
#endif
return ESP_OK;
}
Expand Down Expand Up @@ -1367,6 +1370,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
}
break;
case MQTT_MSG_TYPE_PUBACK:
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
#ifdef MQTT_PROTOCOL_5
Expand Down Expand Up @@ -1413,6 +1421,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
#ifdef MQTT_PROTOCOL_5
if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}
#endif
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
#ifdef MQTT_PROTOCOL_5
Expand Down

0 comments on commit ed76036

Please sign in to comment.