Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage of iot_mqtt_operation.c #771

Merged
merged 10 commits into from Feb 4, 2020
40 changes: 22 additions & 18 deletions libraries/standard/mqtt/src/iot_mqtt_operation.c
Expand Up @@ -1045,11 +1045,12 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
* since a network response could modify the status. */
if( networkPending == false )
{
/* Operations that are not waiting for a network response either failed or
* completed successfully. Check that a status was set. */
IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );

/* Notify of operation completion if this job set a status. */
if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )
{
_IotMqtt_Notify( pOperation );
}
_IotMqtt_Notify( pOperation );
}
}
}
Expand All @@ -1060,13 +1061,15 @@ void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
IotTaskPoolJob_t pOperationJob,
void * pContext )
{
bool destroyOperation = false;
_mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
IotMqttCallbackParam_t callbackParam = { 0 };

/* Check parameters. The task pool and job parameter is not used when asserts
* are disabled. */
( void ) pTaskPool;
( void ) pOperationJob;
( void ) destroyOperation;
IotMqtt_Assert( pOperationJob == pOperation->job );

/* The operation's callback function and status must be set. */
Expand All @@ -1082,11 +1085,11 @@ void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,
&callbackParam );

/* Attempt to destroy the operation once the user callback returns. */
if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
{
_IotMqtt_DestroyOperation( pOperation );
}
/* Decrement the operation reference count. This function is at the end of the
* operation lifecycle, so the operation must be destroyed here. */
destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );
IotMqtt_Assert( destroyOperation == true );
_IotMqtt_DestroyOperation( pOperation );
}

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -1289,17 +1292,18 @@ void _IotMqtt_Notify( _mqttOperation_t * pOperation )
}
else
{
/* Only waitable operations will have a reference count greater than 1.
* Non-waitable operations will not reach this block. */
IotMqtt_Assert( waitable == true );

/* Post to a waitable operation's semaphore. */
if( waitable == true )
{
IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
"notified of completion.",
pOperation->pMqttConnection,
IotMqtt_OperationType( pOperation->u.operation.type ),
pOperation );
IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
"notified of completion.",
pOperation->pMqttConnection,
IotMqtt_OperationType( pOperation->u.operation.type ),
pOperation );

IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
}
IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
}
}
else
Expand Down
179 changes: 176 additions & 3 deletions libraries/standard/mqtt/test/unit/iot_tests_mqtt_api.c
Expand Up @@ -132,7 +132,7 @@
* @brief Length of an arbitrary packet for testing. A buffer will be allocated
* for it, but its contents don't matter.
*/
#define PACKET_LENGTH ( 1 )
#define PACKET_LENGTH ( 32 )

/*-----------------------------------------------------------*/

Expand Down Expand Up @@ -558,8 +558,8 @@ static IotMqttError_t _getNextByte( IotNetworkConnection_t pNetworkInterface,
/**
* @brief A PINGREQ serializer that attempts to allocate memory (unlike the default).
*/
IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket,
size_t * pPacketSize )
static IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket,
size_t * pPacketSize )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;

Expand All @@ -582,6 +582,18 @@ IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket,

/*-----------------------------------------------------------*/

/**
* @brief A completion callback that does nothing.
*/
static void _completionCallback( void * pContext,
IotMqttCallbackParam_t * pCallbackParam )
{
( void ) pContext;
( void ) pCallbackParam;
}

/*-----------------------------------------------------------*/

/**
* @brief Test group for MQTT API tests.
*/
Expand Down Expand Up @@ -635,6 +647,8 @@ TEST_GROUP_RUNNER( MQTT_Unit_API )
RUN_TEST_CASE( MQTT_Unit_API, StringCoverage );
RUN_TEST_CASE( MQTT_Unit_API, OperationCreateDestroy );
RUN_TEST_CASE( MQTT_Unit_API, OperationWaitTimeout );
RUN_TEST_CASE( MQTT_Unit_API, OperationFindMatch );
RUN_TEST_CASE( MQTT_Unit_API, OperationLists );
RUN_TEST_CASE( MQTT_Unit_API, ConnectParameters );
RUN_TEST_CASE( MQTT_Unit_API, ConnectMallocFail );
RUN_TEST_CASE( MQTT_Unit_API, ConnectRestoreSessionMallocFail );
Expand All @@ -643,6 +657,7 @@ TEST_GROUP_RUNNER( MQTT_Unit_API )
RUN_TEST_CASE( MQTT_Unit_API, PublishQoS0Parameters );
RUN_TEST_CASE( MQTT_Unit_API, PublishQoS0MallocFail );
RUN_TEST_CASE( MQTT_Unit_API, PublishQoS1 );
RUN_TEST_CASE( MQTT_Unit_API, PublishRetryPeriod );
RUN_TEST_CASE( MQTT_Unit_API, PublishDuplicates );
RUN_TEST_CASE( MQTT_Unit_API, SubscribeUnsubscribeParameters );
RUN_TEST_CASE( MQTT_Unit_API, SubscribeMallocFail );
Expand Down Expand Up @@ -946,6 +961,107 @@ TEST( MQTT_Unit_API, OperationWaitTimeout )

/*-----------------------------------------------------------*/

/**
* @brief Test edge cases when searching for operations.
*/
TEST( MQTT_Unit_API, OperationFindMatch )
{
int32_t i = 0;
uint16_t packetIdentifier = 0;
IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
_mqttOperation_t * pMatchedOperation = NULL;
_mqttOperation_t * pOperation[ 2 ] = { NULL, NULL };
gordonwang0 marked this conversation as resolved.
Show resolved Hide resolved

/* Create a new MQTT connection. */
_pMqttConnection = IotTestMqtt_createMqttConnection( AWS_IOT_MQTT_SERVER,
&_networkInfo,
0 );
TEST_ASSERT_NOT_NULL( _pMqttConnection );

/* Set up operations. */
for( i = 0; i < 2; i++ )
{
status = _IotMqtt_CreateOperation( _pMqttConnection, 0, NULL, &( pOperation[ i ] ) );
TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, status );

TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_CreateJob( _IotMqtt_ProcessCompletedOperation,
pOperation[ i ],
&( pOperation[ i ]->jobStorage ),
&( pOperation[ i ]->job ) ) );

IotListDouble_Remove( &( pOperation[ i ]->link ) );
IotListDouble_InsertHead( &( _pMqttConnection->pendingResponse ), &( pOperation[ i ]->link ) );

pOperation[ i ]->u.operation.packetIdentifier = ( uint16_t ) ( i + 1 );
pOperation[ i ]->u.operation.periodic.retry.nextPeriodMs = DUP_CHECK_RETRY_MS;
pOperation[ i ]->u.operation.periodic.retry.limit = DUP_CHECK_RETRY_LIMIT;
}

pOperation[ 0 ]->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
pOperation[ 1 ]->u.operation.type = IOT_MQTT_SUBSCRIBE;

/* Set one operation's job to an invalid state, then try to find it. The invalid state
* will cause that job to be ignored. */
packetIdentifier = 1;
pOperation[ 0 ]->jobStorage.status = IOT_TASKPOOL_STATUS_COMPLETED;
pMatchedOperation = _IotMqtt_FindOperation( _pMqttConnection,
IOT_MQTT_PUBLISH_TO_SERVER,
&packetIdentifier );
TEST_ASSERT_NULL( pMatchedOperation );

/* Clean up operations. */
for( i = 0; i < 2; i++ )
{
TEST_ASSERT_EQUAL_INT( true, _IotMqtt_DecrementOperationReferences( pOperation[ i ], false ) );
_IotMqtt_DestroyOperation( pOperation[ i ] );
}

/* Disconnect the MQTT connection. */
IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
}

/*-----------------------------------------------------------*/

/**
* @brief Tests the behavior of send and notify with different link statuses.
*/
TEST( MQTT_Unit_API, OperationLists )
{
_mqttOperation_t * pOperation = NULL;
IotMqttCallbackInfo_t callbackInfo = IOT_MQTT_CALLBACK_INFO_INITIALIZER;

/* Create a new MQTT connection. */
_networkInterface.send = _sendSuccess;
_pMqttConnection = IotTestMqtt_createMqttConnection( AWS_IOT_MQTT_SERVER,
&_networkInfo,
0 );
TEST_ASSERT_NOT_NULL( _pMqttConnection );

/* Create a new MQTT operation. */
callbackInfo.function = _completionCallback;
TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( _pMqttConnection,
0,
&callbackInfo,
&pOperation ) );
TEST_ASSERT_NOT_NULL( pOperation );
pOperation->u.operation.pMqttPacket = IotMqtt_MallocMessage( PACKET_LENGTH );
pOperation->u.operation.packetSize = PACKET_LENGTH;

/* Process a send with operation unlinked. Check that operation gets linked afterwards. */
IotListDouble_Remove( &( pOperation->link ) );
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );
TEST_ASSERT_EQUAL_INT( true, IotLink_IsLinked( &( pOperation->link ) ) );

/* Notify with the operation linked. */
pOperation->u.operation.status = IOT_MQTT_SUCCESS;
_IotMqtt_Notify( pOperation );

/* Disconnect the MQTT connection. */
IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
}

/*-----------------------------------------------------------*/

/**
* @brief Tests the behavior of @ref mqtt_function_connect with various
* invalid parameters.
Expand Down Expand Up @@ -1396,6 +1512,63 @@ TEST( MQTT_Unit_API, PublishQoS1 )

/*-----------------------------------------------------------*/

/**
* @brief Tests that PUBLISH retry periods are calculated correctly.
*/
TEST( MQTT_Unit_API, PublishRetryPeriod )
{
_mqttOperation_t * pOperation = NULL;
uint32_t periodMs = IOT_MQTT_RETRY_MS_CEILING / 2;

/* Create a new MQTT connection. */
_networkInterface.send = _sendSuccess;
_pMqttConnection = IotTestMqtt_createMqttConnection( false,
&_networkInfo,
0 );
TEST_ASSERT_NOT_NULL( _pMqttConnection );

/* Create a PUBLISH with retry operation. */
TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( _pMqttConnection,
IOT_MQTT_FLAG_WAITABLE,
NULL,
&pOperation ) );
TEST_ASSERT_NOT_NULL( pOperation );
pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
pOperation->u.operation.pMqttPacket = IotMqtt_MallocMessage( PACKET_LENGTH );
pOperation->u.operation.packetSize = PACKET_LENGTH;
pOperation->u.operation.periodic.retry.limit = DUP_CHECK_RETRY_LIMIT;
pOperation->u.operation.periodic.retry.nextPeriodMs = periodMs;
IotListDouble_Remove( &( pOperation->link ) );

/* Simulate send of PUBLISH. */
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );

/* Immediately cancel retried PUBLISH, then check statuses set by send. */
TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pOperation->job,
NULL ) );
TEST_ASSERT_EQUAL( IOT_MQTT_STATUS_PENDING, pOperation->u.operation.status );
TEST_ASSERT_EQUAL( 1, pOperation->u.operation.periodic.retry.count );
TEST_ASSERT_EQUAL( 2 * periodMs, pOperation->u.operation.periodic.retry.nextPeriodMs );

/* Simulate another send. Check that the retry ceiling is respected. */
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );

/* Immediately cancel retried PUBLISH, then check statuses set by send. */
TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pOperation->job,
NULL ) );
TEST_ASSERT_EQUAL( IOT_MQTT_STATUS_PENDING, pOperation->u.operation.status );
TEST_ASSERT_EQUAL( 2, pOperation->u.operation.periodic.retry.count );
TEST_ASSERT_EQUAL( IOT_MQTT_RETRY_MS_CEILING, pOperation->u.operation.periodic.retry.nextPeriodMs );

/* Clean up. */
TEST_ASSERT_EQUAL_INT( false, _IotMqtt_DecrementOperationReferences( pOperation, false ) );
IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
}

/*-----------------------------------------------------------*/

/**
* @brief Tests that duplicate QoS 1 PUBLISH packets are different from the
* original.
Expand Down