From 5174bf7415843c3ce11527b16c61e01414f05378 Mon Sep 17 00:00:00 2001 From: Gaurav Aggarwal Date: Wed, 24 Apr 2024 22:29:06 +0530 Subject: [PATCH] Fix out of order PUBACK and PUBLISH handling The current implementation uses task notification to wait for acknowledgements and incoming publishes and therefore, cannot differentiate between acknowledgements and incoming publishes. This causes a crash when an incoming PUBLISH is received before PUBACK for the previous outgoing publish. This commit uses an event group to wait for acknowledgements and incoming publishes. This allows us to differentiate between acknowledgements and incoming publishes by waiting for different bits. This was reported here - https://github.com/FreeRTOS/iot-reference-esp32c3/issues/63. Signed-off-by: Gaurav Aggarwal --- .../sub_pub_unsub_demo/sub_pub_unsub_demo.c | 197 +++++++++--------- 1 file changed, 94 insertions(+), 103 deletions(-) diff --git a/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c b/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c index 492be99..8fd56fc 100644 --- a/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c +++ b/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c @@ -84,6 +84,12 @@ #define CORE_MQTT_AGENT_CONNECTED_BIT ( 1 << 0 ) #define CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT ( 1 << 1 ) +/* MQTT event group bit definitions. */ +#define MQTT_INCOMING_PUBLISH_RECEIVED_BIT ( 1 << 0 ) +#define MQTT_PUBLISH_COMMAND_COMPLETED_BIT ( 1 << 1 ) +#define MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ( 1 << 2 ) +#define MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ( 1 << 3 ) + /* Struct definitions *********************************************************/ /** @@ -92,8 +98,7 @@ */ typedef struct IncomingPublishCallbackContext { - TaskHandle_t xTaskToNotify; - uint32_t ulNotificationValue; + EventGroupHandle_t xMqttEventGroup; char pcIncomingPublish[ subpubunsubconfigSTRING_BUFFER_LENGTH ]; } IncomingPublishCallbackContext_t; @@ -104,8 +109,7 @@ typedef struct IncomingPublishCallbackContext struct MQTTAgentCommandContext { MQTTStatus_t xReturnStatus; - TaskHandle_t xTaskToNotify; - uint32_t ulNotificationValue; + EventGroupHandle_t xMqttEventGroup; IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext; void * pArgs; }; @@ -204,19 +208,19 @@ static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandCont MQTTAgentReturnInfo_t * pxReturnInfo ); /** - * @brief Called by the task to wait for a notification from a callback function + * @brief Called by the task to wait for event from a callback function * after the task first executes either MQTTAgent_Publish()* or * MQTTAgent_Subscribe(). * * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call * - * @param[in] pxCommandContext Context of the initial command. - * @param[out] pulNotifiedValue The task's notification value after it receives - * a notification from the callback. + * @param[in] xMqttEventGroup Event group used for MQTT events. + * @param[in] uxBitsToWaitFor Event to wait for. * - * @return pdTRUE if the task received a notification, otherwise pdFALSE. + * @return Received event. */ -static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue ); +static EventBits_t prvWaitForEvent( EventGroupHandle_t xMqttEventGroup, + EventBits_t uxBitsToWaitFor ); /** * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when @@ -243,10 +247,12 @@ static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext, * for all MQTT brokers. Can also be QoS2 if supported by the broker. AWS IoT * does not support QoS2. * @param[in] pcTopicFilter Topic filter to subscribe to. + * @param[in] xMqttEventGroup Event group used for MQTT events. */ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext, MQTTQoS_t xQoS, - char * pcTopicFilter ); + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ); /** * @brief Unsubscribe to the topic the demo task will also publish to. @@ -255,9 +261,11 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu * for all MQTT brokers. Can also be QoS2 if supported by the broker. AWS IoT * does not support QoS2. * @param[in] pcTopicFilter Topic filter to unsubscribe from. + * @param[in] xMqttEventGroup Event group used for MQTT events. */ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, - char * pcTopicFilter ); + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ); /** * @brief The function that implements the task demonstrated by this file. @@ -323,14 +331,10 @@ static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandCont * initiated the publish can check the operation's status. */ pxCommandContext->xReturnStatus = pxReturnInfo->returnCode; - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - /* Send the context's ulNotificationValue as the notification value so - * the receiving task can check the value it set in the context matches - * the value it receives in the notification. */ - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_PUBLISH_COMMAND_COMPLETED_BIT ); } } @@ -364,11 +368,10 @@ static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandCo } } - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ); } } @@ -390,24 +393,23 @@ static void prvUnsubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommand pxUnsubscribeArgs->pSubscribeInfo->topicFilterLength ); } - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ); } } -static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue ) +static EventBits_t prvWaitForEvent( EventGroupHandle_t xMqttEventGroup, + EventBits_t uxBitsToWaitFor ) { - BaseType_t xReturn; - - /* Wait for this task to get notified, passing out the value it gets - * notified with. */ - xReturn = xTaskNotifyWait( 0, - 0, - pulNotifiedValue, - portMAX_DELAY ); + EventBits_t xReturn; + + xReturn = xEventGroupWaitBits( xMqttEventGroup, + uxBitsToWaitFor, + pdTRUE, /* xClearOnExit. */ + pdTRUE, /* xWaitForAllBits. */ + portMAX_DELAY ); return xReturn; } @@ -435,19 +437,19 @@ static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext, ( pxIncomingPublishCallbackContext->pcIncomingPublish )[ subpubunsubconfigSTRING_BUFFER_LENGTH - 1 ] = 0x00; } - xTaskNotify( pxIncomingPublishCallbackContext->xTaskToNotify, - pxIncomingPublishCallbackContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxIncomingPublishCallbackContext->xMqttEventGroup, + MQTT_INCOMING_PUBLISH_RECEIVED_BIT ); } static void prvPublishToTopic( MQTTQoS_t xQoS, char * pcTopicName, - char * pcPayload ) + char * pcPayload, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulPublishMessageId, ulNotifiedValue = 0; + uint32_t ulPublishMessageId = 0; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTPublishInfo_t xPublishInfo = { 0 }; @@ -481,8 +483,7 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulPublishMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS; xCommandParams.cmdCompleteCallback = prvPublishCommandCallback; @@ -500,17 +501,11 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, ESP_LOGI( TAG, "Task \"%s\" sending publish request to coreMQTT-Agent with message \"%s\" on topic \"%s\" with ID %" PRIu32 ".", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), pcPayload, pcTopicName, ulPublishMessageId ); - /* To ensure ulNotification doesn't accidentally hold the expected value - * as it is to be checked against the value sent from the callback.. */ - ulNotifiedValue = ~ulPublishMessageId; - - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext, &xPublishInfo, &xCommandParams ); @@ -521,10 +516,11 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, * wait for the publish to be sent. */ ESP_LOGI( TAG, "Task \"%s\" waiting for publish %" PRIu32 " to complete.", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), ulPublishMessageId ); - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_PUBLISH_COMMAND_COMPLETED_BIT ); } else { @@ -535,9 +531,8 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulPublishMessageId ) ) + if( ( ( xReceivedEvent & MQTT_PUBLISH_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack for publish message %" PRIu32 ". Re-attempting publish.", @@ -548,21 +543,21 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, ESP_LOGI( TAG, "Publish %" PRIu32 " succeeded for task \"%s\".", ulPublishMessageId, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulPublishMessageId ) ); + } while( ( xReceivedEvent & MQTT_PUBLISH_COMMAND_COMPLETED_BIT ) == 0 || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext, MQTTQoS_t xQoS, - char * pcTopicFilter ) + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulSubscribeMessageId, ulNotifiedValue = 0; + uint32_t ulSubscribeMessageId; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 }; MQTTSubscribeInfo_t xSubscribeInfo = { 0 }; @@ -598,8 +593,7 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulSubscribeMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandContext.pxIncomingPublishCallbackContext = pxIncomingPublishCallbackContext; xCommandContext.pArgs = ( void * ) &xSubscribeArgs; @@ -619,12 +613,10 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu ESP_LOGI( TAG, "Task \"%s\" sending subscribe request to coreMQTT-Agent for topic filter: %s with id %" PRIu32 "", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), pcTopicFilter, ulSubscribeMessageId ); - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Subscribe( &xGlobalMqttAgentContext, &xSubscribeArgs, &xCommandParams ); @@ -633,7 +625,8 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu { /* For QoS 1 and 2, wait for the subscription acknowledgment. For QoS0, * wait for the subscribe to be sent. */ - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ); } else { @@ -644,9 +637,8 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulSubscribeMessageId ) ) + if( ( ( xReceivedEvent & MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack to subscribe message %" PRIu32 ". Re-attempting subscribe.", @@ -658,20 +650,20 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu "Subscribe %" PRIu32 " for topic filter %s succeeded for task \"%s\".", ulSubscribeMessageId, pcTopicFilter, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulSubscribeMessageId ) ); + } while( ( ( xReceivedEvent & MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, - char * pcTopicFilter ) + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulUnsubscribeMessageId, ulNotifiedValue = 0; + uint32_t ulUnsubscribeMessageId; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTAgentSubscribeArgs_t xUnsubscribeArgs = { 0 }; MQTTSubscribeInfo_t xUnsubscribeInfo = { 0 }; @@ -707,8 +699,7 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulUnsubscribeMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandContext.pArgs = ( void * ) &xUnsubscribeArgs; xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS; @@ -726,12 +717,10 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, portMAX_DELAY ); ESP_LOGI( TAG, "Task \"%s\" sending unsubscribe request to coreMQTT-Agent for topic filter: %s with id %" PRIu32 "", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL), pcTopicFilter, ulUnsubscribeMessageId ); - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Unsubscribe( &xGlobalMqttAgentContext, &xUnsubscribeArgs, &xCommandParams ); @@ -740,7 +729,8 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, { /* For QoS 1 and 2, wait for the subscription acknowledgment. For QoS0, * wait for the subscribe to be sent. */ - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ); } else { @@ -751,9 +741,8 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulUnsubscribeMessageId ) ) + if( ( ( xReceivedEvent & MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack to unsubscribe message %" PRIu32 ". Re-attempting subscribe.", @@ -765,27 +754,26 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, "Unsubscribe %" PRIu32 " for topic filter %s succeeded for task \"%s\".", ulUnsubscribeMessageId, pcTopicFilter, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulUnsubscribeMessageId ) ); + } while( ( ( xReceivedEvent & MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) { struct DemoParams * pxParams = ( struct DemoParams * ) pvParameters; - uint32_t ulNotifiedValue; uint32_t ulTaskNumber = pxParams->ulTaskNumber; + EventGroupHandle_t xMqttEventGroup; IncomingPublishCallbackContext_t xIncomingPublishCallbackContext; MQTTQoS_t xQoS; char * pcTopicBuffer = topicBuf[ ulTaskNumber ]; char pcPayload[ subpubunsubconfigSTRING_BUFFER_LENGTH ]; - xIncomingPublishCallbackContext.ulNotificationValue = ulTaskNumber; - xIncomingPublishCallbackContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xMqttEventGroup = xEventGroupCreate(); + xIncomingPublishCallbackContext.xMqttEventGroup = xMqttEventGroup; xQoS = ( MQTTQoS_t ) subpubunsubconfigQOS_LEVEL; @@ -793,7 +781,7 @@ static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) snprintf( pcTopicBuffer, subpubunsubconfigSTRING_BUFFER_LENGTH, "/filter/%s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); while( 1 ) { @@ -802,33 +790,36 @@ static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) * the target. */ prvSubscribeToTopic( &xIncomingPublishCallbackContext, xQoS, - pcTopicBuffer ); + pcTopicBuffer, + xMqttEventGroup ); snprintf( pcPayload, subpubunsubconfigSTRING_BUFFER_LENGTH, "%s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); prvPublishToTopic( xQoS, pcTopicBuffer, - pcPayload ); + pcPayload, + xMqttEventGroup ); - prvWaitForNotification( &ulNotifiedValue ); + prvWaitForEvent( xMqttEventGroup, MQTT_INCOMING_PUBLISH_RECEIVED_BIT ); ESP_LOGI( TAG, "Task \"%s\" received: %s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ), + pcTaskGetName( NULL ), xIncomingPublishCallbackContext.pcIncomingPublish ); - prvUnsubscribeToTopic( xQoS, pcTopicBuffer ); + prvUnsubscribeToTopic( xQoS, pcTopicBuffer, xMqttEventGroup ); ESP_LOGI( TAG, "Task \"%s\" completed a loop. Delaying before next loop.", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); vTaskDelay( pdMS_TO_TICKS( subpubunsubconfigDELAY_BETWEEN_SUB_PUB_UNSUB_LOOPS_MS ) ); } + vEventGroupDelete( xMqttEventGroup ); vTaskDelete( NULL ); }