Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions src/mqtt/mqtt_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,26 @@
*/
#define TEST_MQTT_TOPIC MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test"

/**
* @brief Sample topic filter 2 to use in tests.
*/
#define TEST_MQTT_TOPIC_2 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test2"

/**
* @brief Sample topic filter 3 to use in tests.
*/
#define TEST_MQTT_TOPIC_3 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopic3"

/**
* @brief Sample topic filter 4 to use in tests.
*/
#define TEST_MQTT_TOPIC_4 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testFour"

/**
* @brief Sample topic filter 5 to use in tests.
*/
#define TEST_MQTT_TOPIC_5 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopicName5"

/**
* @brief Sample topic filter to test MQTT retainted message.
*/
Expand Down Expand Up @@ -1593,6 +1613,169 @@ TEST( MqttTest, MQTT_Publish_With_Retain_Flag )

/*-----------------------------------------------------------*/

/**
* @brief Tests Subscribe and Unsubscribe operations to multiple topic filters
* in a single API call.
* The test subscribes to 5 topics, and then publishes to the same topics one
* at a time. The broker is expected to route the publish message back to the
* test for all topics.
* The test then unsubscribes from the 5 topics which should also succeed.
*/
TEST( MqttTest, MQTT_SubUnsub_Multiple_Topics )

{
MQTTSubscribeInfo_t subscribeParams[ 5 ];
char * topicList[ 5 ];
size_t i;
const size_t topicCount = 5U;
MQTTQoS_t qos;
MQTTStatus_t xMQTTStatus;
uint32_t entryTime;

topicList[ 0 ] = TEST_MQTT_TOPIC;
topicList[ 1 ] = TEST_MQTT_TOPIC_2;
topicList[ 2 ] = TEST_MQTT_TOPIC_3;
topicList[ 3 ] = TEST_MQTT_TOPIC_4;
topicList[ 4 ] = TEST_MQTT_TOPIC_5;

for( i = 0; i < topicCount; i++ )
{
subscribeParams[ i ].pTopicFilter = topicList[ i ];
subscribeParams[ i ].topicFilterLength = strlen( topicList[ i ] );
subscribeParams[ i ].qos = ( i % 2 );
}

globalSubscribePacketIdentifier = MQTT_GetPacketId( &context );
/* Check that the packet ID is valid according to the MQTT spec. */
TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalSubscribePacketIdentifier );
TEST_ASSERT_NOT_EQUAL( 0U, globalSubscribePacketIdentifier );

/* Subscribe to all topics. */
TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Subscribe( &context,
subscribeParams,
topicCount,
globalSubscribePacketIdentifier ) );

/* Expect a SUBACK from the broker for the subscribe operation. */
TEST_ASSERT_FALSE( receivedSubAck );
entryTime = FRTest_GetTimeMs();
do
{
xMQTTStatus = MQTT_ProcessLoop( &context );

if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
{
/* Timeout. */
break;
}
else if( receivedSubAck != 0 )
{
/* No need to loop anymore since we received the SUBACK. */
break;
}
else
{
/* Do nothing. */
}
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );

TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
TEST_ASSERT_TRUE( receivedSubAck );

/* Publish to the same topic, that we subscribed to. */
for( i = 0; i < topicCount; i++ )
{
/* Set Qos to be either 1 or 0. */
qos = ( i % 2 );

TEST_ASSERT_EQUAL( MQTTSuccess, publishToTopic(
&context,
topicList[ i ],
false, /* setRetainFlag */
false, /* isDuplicate */
qos, /* QoS */
MQTT_GetPacketId( &context ) ) );

/* Reset the PUBACK flag. */
receivedPubAck = false;

configPRINTF( ( "%u Entered1", xTaskGetTickCount() ) );
entryTime = FRTest_GetTimeMs();
do
{
xMQTTStatus = MQTT_ProcessLoop( &context );

if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
{
/* Timeout. */
break;
}
else
{
/* Do nothing. */
}
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );

TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );

/* Only wait for PUBACK if QoS is not QoS0. */
if( qos != MQTTQoS0 )
{
/* Make sure we have received PUBACK response. */
TEST_ASSERT_TRUE( receivedPubAck );
}

/* Make sure that we have received the same message from the server,
* that was published (as we have subscribed to the same topic). */
TEST_ASSERT_EQUAL( qos, incomingInfo.qos );
TEST_ASSERT_EQUAL( strlen( topicList[ i ] ), incomingInfo.topicNameLength );
TEST_ASSERT_EQUAL_MEMORY( topicList[ i ],
incomingInfo.pTopicName,
strlen( topicList[ i ] ) );
TEST_ASSERT_EQUAL( strlen( MQTT_EXAMPLE_MESSAGE ), incomingInfo.payloadLength );
TEST_ASSERT_EQUAL_MEMORY( MQTT_EXAMPLE_MESSAGE,
incomingInfo.pPayload,
incomingInfo.payloadLength );
}

globalUnsubscribePacketIdentifier = MQTT_GetPacketId( &context );
/* Check that the packet ID is valid according to the MQTT spec. */
TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalUnsubscribePacketIdentifier );
TEST_ASSERT_NOT_EQUAL( 0U, globalUnsubscribePacketIdentifier );

/* Un-subscribe from all the topics. */
TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Unsubscribe(
&context, subscribeParams, topicCount, globalUnsubscribePacketIdentifier ) );

receivedUnsubAck = false;

/* Expect an UNSUBACK from the broker for the unsubscribe operation. */
entryTime = FRTest_GetTimeMs();
do
{
xMQTTStatus = MQTT_ProcessLoop( &context );

if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
{
/* Timeout. */
break;
}
else if( receivedUnsubAck != 0 )
{
break;
}
else
{
/* Do nothing. */
}
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );

TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
TEST_ASSERT_TRUE( receivedUnsubAck );
}

/*-----------------------------------------------------------*/

/**
* @brief Test group runner for MQTT test against MQTT broker.
*/
Expand All @@ -1605,6 +1788,7 @@ TEST_GROUP_RUNNER( MqttTest )
RUN_TEST_CASE( MqttTest, MQTT_Resend_Unacked_Publish_QoS1 );
RUN_TEST_CASE( MqttTest, MQTT_Restore_Session_Duplicate_Incoming_Publish_Qos1 );
RUN_TEST_CASE( MqttTest, MQTT_Publish_With_Retain_Flag );
RUN_TEST_CASE( MqttTest, MQTT_SubUnsub_Multiple_Topics );
}

/*-----------------------------------------------------------*/
Expand Down