diff --git a/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c b/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c index 492be99..12f093f 100644 --- a/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c +++ b/main/demo_tasks/sub_pub_unsub_demo/sub_pub_unsub_demo.c @@ -84,6 +84,12 @@ #define CORE_MQTT_AGENT_CONNECTED_BIT ( 1 << 0 ) #define CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT ( 1 << 1 ) +/* MQTT event group bit definitions. */ +#define MQTT_INCOMING_PUBLISH_RECEIVED_BIT ( 1 << 0 ) +#define MQTT_PUBLISH_COMMAND_COMPLETED_BIT ( 1 << 1 ) +#define MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ( 1 << 2 ) +#define MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ( 1 << 3 ) + /* Struct definitions *********************************************************/ /** @@ -92,8 +98,7 @@ */ typedef struct IncomingPublishCallbackContext { - TaskHandle_t xTaskToNotify; - uint32_t ulNotificationValue; + EventGroupHandle_t xMqttEventGroup; char pcIncomingPublish[ subpubunsubconfigSTRING_BUFFER_LENGTH ]; } IncomingPublishCallbackContext_t; @@ -104,8 +109,7 @@ typedef struct IncomingPublishCallbackContext struct MQTTAgentCommandContext { MQTTStatus_t xReturnStatus; - TaskHandle_t xTaskToNotify; - uint32_t ulNotificationValue; + EventGroupHandle_t xMqttEventGroup; IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext; void * pArgs; }; @@ -204,19 +208,19 @@ static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandCont MQTTAgentReturnInfo_t * pxReturnInfo ); /** - * @brief Called by the task to wait for a notification from a callback function + * @brief Called by the task to wait for event from a callback function * after the task first executes either MQTTAgent_Publish()* or * MQTTAgent_Subscribe(). * * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call * - * @param[in] pxCommandContext Context of the initial command. - * @param[out] pulNotifiedValue The task's notification value after it receives - * a notification from the callback. + * @param[in] xMqttEventGroup Event group used for MQTT events. + * @param[in] uxBitsToWaitFor Event to wait for. * - * @return pdTRUE if the task received a notification, otherwise pdFALSE. + * @return Received event. */ -static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue ); +static EventBits_t prvWaitForEvent( EventGroupHandle_t xMqttEventGroup, + EventBits_t uxBitsToWaitFor ); /** * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when @@ -243,10 +247,12 @@ static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext, * for all MQTT brokers. Can also be QoS2 if supported by the broker. AWS IoT * does not support QoS2. * @param[in] pcTopicFilter Topic filter to subscribe to. + * @param[in] xMqttEventGroup Event group used for MQTT events. */ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext, MQTTQoS_t xQoS, - char * pcTopicFilter ); + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ); /** * @brief Unsubscribe to the topic the demo task will also publish to. @@ -255,9 +261,11 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu * for all MQTT brokers. Can also be QoS2 if supported by the broker. AWS IoT * does not support QoS2. * @param[in] pcTopicFilter Topic filter to unsubscribe from. + * @param[in] xMqttEventGroup Event group used for MQTT events. */ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, - char * pcTopicFilter ); + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ); /** * @brief The function that implements the task demonstrated by this file. @@ -323,14 +331,10 @@ static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandCont * initiated the publish can check the operation's status. */ pxCommandContext->xReturnStatus = pxReturnInfo->returnCode; - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - /* Send the context's ulNotificationValue as the notification value so - * the receiving task can check the value it set in the context matches - * the value it receives in the notification. */ - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_PUBLISH_COMMAND_COMPLETED_BIT ); } } @@ -364,11 +368,10 @@ static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandCo } } - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ); } } @@ -390,24 +393,23 @@ static void prvUnsubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommand pxUnsubscribeArgs->pSubscribeInfo->topicFilterLength ); } - if( pxCommandContext->xTaskToNotify != NULL ) + if( pxCommandContext->xMqttEventGroup != NULL ) { - xTaskNotify( pxCommandContext->xTaskToNotify, - pxCommandContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxCommandContext->xMqttEventGroup, + MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ); } } -static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue ) +static EventBits_t prvWaitForEvent( EventGroupHandle_t xMqttEventGroup, + EventBits_t uxBitsToWaitFor ) { - BaseType_t xReturn; - - /* Wait for this task to get notified, passing out the value it gets - * notified with. */ - xReturn = xTaskNotifyWait( 0, - 0, - pulNotifiedValue, - portMAX_DELAY ); + EventBits_t xReturn; + + xReturn = xEventGroupWaitBits( xMqttEventGroup, + uxBitsToWaitFor, + pdTRUE, /* xClearOnExit. */ + pdTRUE, /* xWaitForAllBits. */ + portMAX_DELAY ); return xReturn; } @@ -435,19 +437,19 @@ static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext, ( pxIncomingPublishCallbackContext->pcIncomingPublish )[ subpubunsubconfigSTRING_BUFFER_LENGTH - 1 ] = 0x00; } - xTaskNotify( pxIncomingPublishCallbackContext->xTaskToNotify, - pxIncomingPublishCallbackContext->ulNotificationValue, - eSetValueWithOverwrite ); + xEventGroupSetBits( pxIncomingPublishCallbackContext->xMqttEventGroup, + MQTT_INCOMING_PUBLISH_RECEIVED_BIT ); } static void prvPublishToTopic( MQTTQoS_t xQoS, char * pcTopicName, - char * pcPayload ) + char * pcPayload, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulPublishMessageId, ulNotifiedValue = 0; + uint32_t ulPublishMessageId = 0; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTPublishInfo_t xPublishInfo = { 0 }; @@ -481,8 +483,7 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulPublishMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS; xCommandParams.cmdCompleteCallback = prvPublishCommandCallback; @@ -500,17 +501,11 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, ESP_LOGI( TAG, "Task \"%s\" sending publish request to coreMQTT-Agent with message \"%s\" on topic \"%s\" with ID %" PRIu32 ".", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), pcPayload, pcTopicName, ulPublishMessageId ); - /* To ensure ulNotification doesn't accidentally hold the expected value - * as it is to be checked against the value sent from the callback.. */ - ulNotifiedValue = ~ulPublishMessageId; - - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext, &xPublishInfo, &xCommandParams ); @@ -521,10 +516,11 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, * wait for the publish to be sent. */ ESP_LOGI( TAG, "Task \"%s\" waiting for publish %" PRIu32 " to complete.", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), ulPublishMessageId ); - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_PUBLISH_COMMAND_COMPLETED_BIT ); } else { @@ -535,9 +531,8 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulPublishMessageId ) ) + if( ( ( xReceivedEvent & MQTT_PUBLISH_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack for publish message %" PRIu32 ". Re-attempting publish.", @@ -548,21 +543,21 @@ static void prvPublishToTopic( MQTTQoS_t xQoS, ESP_LOGI( TAG, "Publish %" PRIu32 " succeeded for task \"%s\".", ulPublishMessageId, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulPublishMessageId ) ); + } while( ( xReceivedEvent & MQTT_PUBLISH_COMMAND_COMPLETED_BIT ) == 0 || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext, MQTTQoS_t xQoS, - char * pcTopicFilter ) + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulSubscribeMessageId, ulNotifiedValue = 0; + uint32_t ulSubscribeMessageId; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 }; MQTTSubscribeInfo_t xSubscribeInfo = { 0 }; @@ -598,8 +593,7 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulSubscribeMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandContext.pxIncomingPublishCallbackContext = pxIncomingPublishCallbackContext; xCommandContext.pArgs = ( void * ) &xSubscribeArgs; @@ -619,12 +613,10 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu ESP_LOGI( TAG, "Task \"%s\" sending subscribe request to coreMQTT-Agent for topic filter: %s with id %" PRIu32 "", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), pcTopicFilter, ulSubscribeMessageId ); - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Subscribe( &xGlobalMqttAgentContext, &xSubscribeArgs, &xCommandParams ); @@ -633,7 +625,8 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu { /* For QoS 1 and 2, wait for the subscription acknowledgment. For QoS0, * wait for the subscribe to be sent. */ - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ); } else { @@ -644,9 +637,8 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulSubscribeMessageId ) ) + if( ( ( xReceivedEvent & MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack to subscribe message %" PRIu32 ". Re-attempting subscribe.", @@ -658,20 +650,20 @@ static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPu "Subscribe %" PRIu32 " for topic filter %s succeeded for task \"%s\".", ulSubscribeMessageId, pcTopicFilter, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulSubscribeMessageId ) ); + } while( ( ( xReceivedEvent & MQTT_SUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, - char * pcTopicFilter ) + char * pcTopicFilter, + EventGroupHandle_t xMqttEventGroup ) { - uint32_t ulUnsubscribeMessageId, ulNotifiedValue = 0; + uint32_t ulUnsubscribeMessageId; MQTTStatus_t xCommandAdded; - BaseType_t xCommandAcknowledged = pdFALSE; + EventBits_t xReceivedEvent = 0; MQTTAgentSubscribeArgs_t xUnsubscribeArgs = { 0 }; MQTTSubscribeInfo_t xUnsubscribeInfo = { 0 }; @@ -707,8 +699,7 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, * message. * This gets updated in the callback function so the variable must persist * until the callback executes. */ - xCommandContext.ulNotificationValue = ulUnsubscribeMessageId; - xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xCommandContext.xMqttEventGroup = xMqttEventGroup; xCommandContext.pArgs = ( void * ) &xUnsubscribeArgs; xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS; @@ -726,12 +717,10 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, portMAX_DELAY ); ESP_LOGI( TAG, "Task \"%s\" sending unsubscribe request to coreMQTT-Agent for topic filter: %s with id %" PRIu32 "", - pcTaskGetName( xCommandContext.xTaskToNotify ), + pcTaskGetName( NULL ), pcTopicFilter, ulUnsubscribeMessageId ); - xCommandAcknowledged = pdFALSE; - xCommandAdded = MQTTAgent_Unsubscribe( &xGlobalMqttAgentContext, &xUnsubscribeArgs, &xCommandParams ); @@ -740,7 +729,8 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, { /* For QoS 1 and 2, wait for the subscription acknowledgment. For QoS0, * wait for the subscribe to be sent. */ - xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue ); + xReceivedEvent = prvWaitForEvent( xMqttEventGroup, + MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ); } else { @@ -751,9 +741,8 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, /* Check all ways the status was passed back just for demonstration * purposes. */ - if( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulUnsubscribeMessageId ) ) + if( ( ( xReceivedEvent & MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ) { ESP_LOGW( TAG, "Error or timed out waiting for ack to unsubscribe message %" PRIu32 ". Re-attempting subscribe.", @@ -765,27 +754,26 @@ static void prvUnsubscribeToTopic( MQTTQoS_t xQoS, "Unsubscribe %" PRIu32 " for topic filter %s succeeded for task \"%s\".", ulUnsubscribeMessageId, pcTopicFilter, - pcTaskGetName( xCommandContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); } - } while( ( xCommandAcknowledged != pdTRUE ) || - ( xCommandContext.xReturnStatus != MQTTSuccess ) || - ( ulNotifiedValue != ulUnsubscribeMessageId ) ); + } while( ( ( xReceivedEvent & MQTT_UNSUBSCRIBE_COMMAND_COMPLETED_BIT ) == 0 ) || + ( xCommandContext.xReturnStatus != MQTTSuccess ) ); } static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) { struct DemoParams * pxParams = ( struct DemoParams * ) pvParameters; - uint32_t ulNotifiedValue; uint32_t ulTaskNumber = pxParams->ulTaskNumber; + EventGroupHandle_t xMqttEventGroup; IncomingPublishCallbackContext_t xIncomingPublishCallbackContext; MQTTQoS_t xQoS; char * pcTopicBuffer = topicBuf[ ulTaskNumber ]; char pcPayload[ subpubunsubconfigSTRING_BUFFER_LENGTH ]; - xIncomingPublishCallbackContext.ulNotificationValue = ulTaskNumber; - xIncomingPublishCallbackContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); + xMqttEventGroup = xEventGroupCreate(); + xIncomingPublishCallbackContext.xMqttEventGroup = xMqttEventGroup; xQoS = ( MQTTQoS_t ) subpubunsubconfigQOS_LEVEL; @@ -793,7 +781,7 @@ static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) snprintf( pcTopicBuffer, subpubunsubconfigSTRING_BUFFER_LENGTH, "/filter/%s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); while( 1 ) { @@ -802,33 +790,36 @@ static void prvSubscribePublishUnsubscribeTask( void * pvParameters ) * the target. */ prvSubscribeToTopic( &xIncomingPublishCallbackContext, xQoS, - pcTopicBuffer ); + pcTopicBuffer, + xMqttEventGroup ); snprintf( pcPayload, subpubunsubconfigSTRING_BUFFER_LENGTH, "%s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); prvPublishToTopic( xQoS, pcTopicBuffer, - pcPayload ); + pcPayload, + xMqttEventGroup ); - prvWaitForNotification( &ulNotifiedValue ); + prvWaitForEvent( xMqttEventGroup, MQTT_INCOMING_PUBLISH_RECEIVED_BIT ); ESP_LOGI( TAG, "Task \"%s\" received: %s", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ), + pcTaskGetName( NULL ), xIncomingPublishCallbackContext.pcIncomingPublish ); - prvUnsubscribeToTopic( xQoS, pcTopicBuffer ); + prvUnsubscribeToTopic( xQoS, pcTopicBuffer, xMqttEventGroup ); ESP_LOGI( TAG, "Task \"%s\" completed a loop. Delaying before next loop.", - pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) ); + pcTaskGetName( NULL ) ); vTaskDelay( pdMS_TO_TICKS( subpubunsubconfigDELAY_BETWEEN_SUB_PUB_UNSUB_LOOPS_MS ) ); } + vEventGroupDelete( xMqttEventGroup ); vTaskDelete( NULL ); }