diff --git a/src/mqtt/mqtt_test.c b/src/mqtt/mqtt_test.c index 4566fbd..fe65f00 100644 --- a/src/mqtt/mqtt_test.c +++ b/src/mqtt/mqtt_test.c @@ -91,6 +91,26 @@ */ #define TEST_MQTT_TOPIC MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test" +/** + * @brief Sample topic filter 2 to use in tests. + */ +#define TEST_MQTT_TOPIC_2 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test2" + +/** + * @brief Sample topic filter 3 to use in tests. + */ +#define TEST_MQTT_TOPIC_3 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopic3" + +/** + * @brief Sample topic filter 4 to use in tests. + */ +#define TEST_MQTT_TOPIC_4 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testFour" + +/** + * @brief Sample topic filter 5 to use in tests. + */ +#define TEST_MQTT_TOPIC_5 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopicName5" + /** * @brief Sample topic filter to test MQTT retainted message. */ @@ -1593,6 +1613,169 @@ TEST( MqttTest, MQTT_Publish_With_Retain_Flag ) /*-----------------------------------------------------------*/ +/** + * @brief Tests Subscribe and Unsubscribe operations to multiple topic filters + * in a single API call. + * The test subscribes to 5 topics, and then publishes to the same topics one + * at a time. The broker is expected to route the publish message back to the + * test for all topics. + * The test then unsubscribes from the 5 topics which should also succeed. + */ +TEST( MqttTest, MQTT_SubUnsub_Multiple_Topics ) + +{ + MQTTSubscribeInfo_t subscribeParams[ 5 ]; + char * topicList[ 5 ]; + size_t i; + const size_t topicCount = 5U; + MQTTQoS_t qos; + MQTTStatus_t xMQTTStatus; + uint32_t entryTime; + + topicList[ 0 ] = TEST_MQTT_TOPIC; + topicList[ 1 ] = TEST_MQTT_TOPIC_2; + topicList[ 2 ] = TEST_MQTT_TOPIC_3; + topicList[ 3 ] = TEST_MQTT_TOPIC_4; + topicList[ 4 ] = TEST_MQTT_TOPIC_5; + + for( i = 0; i < topicCount; i++ ) + { + subscribeParams[ i ].pTopicFilter = topicList[ i ]; + subscribeParams[ i ].topicFilterLength = strlen( topicList[ i ] ); + subscribeParams[ i ].qos = ( i % 2 ); + } + + globalSubscribePacketIdentifier = MQTT_GetPacketId( &context ); + /* Check that the packet ID is valid according to the MQTT spec. */ + TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalSubscribePacketIdentifier ); + TEST_ASSERT_NOT_EQUAL( 0U, globalSubscribePacketIdentifier ); + + /* Subscribe to all topics. */ + TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Subscribe( &context, + subscribeParams, + topicCount, + globalSubscribePacketIdentifier ) ); + + /* Expect a SUBACK from the broker for the subscribe operation. */ + TEST_ASSERT_FALSE( receivedSubAck ); + entryTime = FRTest_GetTimeMs(); + do + { + xMQTTStatus = MQTT_ProcessLoop( &context ); + + if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) ) + { + /* Timeout. */ + break; + } + else if( receivedSubAck != 0 ) + { + /* No need to loop anymore since we received the SUBACK. */ + break; + } + else + { + /* Do nothing. */ + } + }while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + + TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + TEST_ASSERT_TRUE( receivedSubAck ); + + /* Publish to the same topic, that we subscribed to. */ + for( i = 0; i < topicCount; i++ ) + { + /* Set Qos to be either 1 or 0. */ + qos = ( i % 2 ); + + TEST_ASSERT_EQUAL( MQTTSuccess, publishToTopic( + &context, + topicList[ i ], + false, /* setRetainFlag */ + false, /* isDuplicate */ + qos, /* QoS */ + MQTT_GetPacketId( &context ) ) ); + + /* Reset the PUBACK flag. */ + receivedPubAck = false; + + configPRINTF( ( "%u Entered1", xTaskGetTickCount() ) ); + entryTime = FRTest_GetTimeMs(); + do + { + xMQTTStatus = MQTT_ProcessLoop( &context ); + + if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) ) + { + /* Timeout. */ + break; + } + else + { + /* Do nothing. */ + } + }while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + + TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + + /* Only wait for PUBACK if QoS is not QoS0. */ + if( qos != MQTTQoS0 ) + { + /* Make sure we have received PUBACK response. */ + TEST_ASSERT_TRUE( receivedPubAck ); + } + + /* Make sure that we have received the same message from the server, + * that was published (as we have subscribed to the same topic). */ + TEST_ASSERT_EQUAL( qos, incomingInfo.qos ); + TEST_ASSERT_EQUAL( strlen( topicList[ i ] ), incomingInfo.topicNameLength ); + TEST_ASSERT_EQUAL_MEMORY( topicList[ i ], + incomingInfo.pTopicName, + strlen( topicList[ i ] ) ); + TEST_ASSERT_EQUAL( strlen( MQTT_EXAMPLE_MESSAGE ), incomingInfo.payloadLength ); + TEST_ASSERT_EQUAL_MEMORY( MQTT_EXAMPLE_MESSAGE, + incomingInfo.pPayload, + incomingInfo.payloadLength ); + } + + globalUnsubscribePacketIdentifier = MQTT_GetPacketId( &context ); + /* Check that the packet ID is valid according to the MQTT spec. */ + TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalUnsubscribePacketIdentifier ); + TEST_ASSERT_NOT_EQUAL( 0U, globalUnsubscribePacketIdentifier ); + + /* Un-subscribe from all the topics. */ + TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Unsubscribe( + &context, subscribeParams, topicCount, globalUnsubscribePacketIdentifier ) ); + + receivedUnsubAck = false; + + /* Expect an UNSUBACK from the broker for the unsubscribe operation. */ + entryTime = FRTest_GetTimeMs(); + do + { + xMQTTStatus = MQTT_ProcessLoop( &context ); + + if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) ) + { + /* Timeout. */ + break; + } + else if( receivedUnsubAck != 0 ) + { + break; + } + else + { + /* Do nothing. */ + } + }while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + + TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) ); + TEST_ASSERT_TRUE( receivedUnsubAck ); +} + +/*-----------------------------------------------------------*/ + /** * @brief Test group runner for MQTT test against MQTT broker. */ @@ -1605,6 +1788,7 @@ TEST_GROUP_RUNNER( MqttTest ) RUN_TEST_CASE( MqttTest, MQTT_Resend_Unacked_Publish_QoS1 ); RUN_TEST_CASE( MqttTest, MQTT_Restore_Session_Duplicate_Incoming_Publish_Qos1 ); RUN_TEST_CASE( MqttTest, MQTT_Publish_With_Retain_Flag ); + RUN_TEST_CASE( MqttTest, MQTT_SubUnsub_Multiple_Topics ); } /*-----------------------------------------------------------*/