Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
Conflicts:
	src/MQTTAsync.c
  • Loading branch information
Ian Craggs committed Oct 4, 2014
2 parents 69d9be4 + 6d4cf04 commit 8ffd07f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</taskdef>

<property name="output.folder" value="build/output" />
<property name="release.version" value="1.0.1" />
<property name="release.version" value="1.0.2" />

<property name="libname" value="mqttv3c" />
<property name="libname.ssl" value="mqttv3cs" />
Expand Down
80 changes: 68 additions & 12 deletions src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
* Ian Craggs - MQTT 3.1.1 support
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
*******************************************************************************/

/**
Expand Down Expand Up @@ -75,6 +77,8 @@ enum MQTTAsync_threadStates

enum MQTTAsync_threadStates sendThread_state = STOPPED;
enum MQTTAsync_threadStates receiveThread_state = STOPPED;
static thread_id_type sendThread_id = 0,
receiveThread_id = 0;

#if defined(WIN32) || defined(WIN64)
static mutex_type mqttasync_mutex = NULL;
Expand Down Expand Up @@ -325,15 +329,15 @@ void MQTTAsync_lock_mutex(mutex_type amutex)
{
int rc = Thread_lock_mutex(amutex);
if (rc != 0)
Log(LOG_ERROR, 0, "Error %d locking mutex", rc);
Log(LOG_ERROR, 0, "Error %s locking mutex", strerror(rc));
}


void MQTTAsync_unlock_mutex(mutex_type amutex)
{
int rc = Thread_unlock_mutex(amutex);
if (rc != 0)
Log(LOG_ERROR, 0, "Error %d unlocking mutex", rc);
Log(LOG_ERROR, 0, "Error %s unlocking mutex", strerror(rc));
}


Expand Down Expand Up @@ -840,21 +844,19 @@ void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
int i;

for (i = 0; i < command->command.details.sub.count; i++)
{
free(command->command.details.sub.topics[i]);
free(command->command.details.sub.topics);
free(command->command.details.sub.qoss);
}

free(command->command.details.sub.topics);
free(command->command.details.sub.qoss);
}
else if (command->command.type == UNSUBSCRIBE)
{
int i;

for (i = 0; i < command->command.details.unsub.count; i++)
{
free(command->command.details.unsub.topics[i]);
free(command->command.details.unsub.topics);
}

free(command->command.details.unsub.topics);
}
else if (command->command.type == PUBLISH)
{
Expand Down Expand Up @@ -1255,6 +1257,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING;
sendThread_id = Thread_getid();
MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop)
{
Expand All @@ -1281,6 +1284,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
sendThread_state = STOPPING;
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STOPPED;
sendThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
return 0;
Expand Down Expand Up @@ -1455,6 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING;
receiveThread_id = Thread_getid();
while (!tostop)
{
int rc = SOCKET_ERROR;
Expand Down Expand Up @@ -1671,6 +1676,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
}
receiveThread_state = STOPPED;
receiveThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex);
#if !defined(WIN32) && !defined(WIN64)
if (sendThread_state != STOPPED)
Expand Down Expand Up @@ -2143,6 +2149,56 @@ int MQTTAsync_isConnected(MQTTAsync handle)
}


int cmdMessageIDCompare(void* a, void* b)
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)a;
return cmd->command.token == *(int*)b;
}


/**
* Assign a new message id for a client. Make sure it isn't already being used and does
* not exceed the maximum.
* @param m a client structure
* @return the next message id to use, or 0 if none available
*/
int MQTTAsync_assignMsgId(MQTTAsyncs* m)
{
int start_msgid = m->c->msgID;
int msgid = start_msgid;
thread_id_type thread_id = 0;
int locked = 0;

/* need to check: commands list and response list for a client */
FUNC_ENTRY;
/* We might be called in a callback. In which case, this mutex will be already locked. */
thread_id = Thread_getid();
if (thread_id != sendThread_id && thread_id != receiveThread_id)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
locked = 1;
}

msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
while (ListFindItem(commands, &msgid, cmdMessageIDCompare) ||
ListFindItem(m->responses, &msgid, cmdMessageIDCompare))
{
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
if (msgid == start_msgid)
{ /* we've tried them all - none free */
msgid = 0;
break;
}
}
if (msgid != 0)
m->c->msgID = msgid;
if (locked)
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(msgid);
return msgid;
}


int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int* qos, MQTTAsync_responseOptions* response)
{
MQTTAsyncs* m = handle;
Expand Down Expand Up @@ -2175,7 +2231,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
goto exit;
}
}
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
Expand Down Expand Up @@ -2248,7 +2304,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
goto exit;
}
}
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
Expand Down Expand Up @@ -2307,7 +2363,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS;
else if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS;

if (rc != MQTTASYNC_SUCCESS)
Expand Down
3 changes: 2 additions & 1 deletion src/MQTTProtocolClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
*******************************************************************************/

/**
Expand Down Expand Up @@ -596,7 +597,7 @@ void MQTTProtocol_retries(time_t now, Clients* client, int regardless)
else if (m->qos && m->nextMessageType == PUBCOMP)
{
Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);
if (MQTTPacket_send_pubrel(m->msgid, 1, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
if (MQTTPacket_send_pubrel(m->msgid, 0, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
{
client->good = 0;
Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
Expand Down
2 changes: 1 addition & 1 deletion src/Socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ int Socket_error(char* aString, int sock)
if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
{
if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
Log(LOG_ERROR, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock);
Log(TRACE_MINIMUM, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock);
}
FUNC_EXIT_RC(errno);
return errno;
Expand Down
54 changes: 51 additions & 3 deletions test/test4.c
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,16 @@ Test7: Persistence
char* test7_topic = "C client test7";
int test7_messageCount = 0;

void test7_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);

assert("Successful disconnect", 0, "disconnect failed", 0);

test_finished = 1;
}

void test7_onDisconnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
Expand Down Expand Up @@ -1170,7 +1180,6 @@ int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;

MyLog(LOGA_DEBUG, "Test7: received message id %d", message->msgid);

Expand Down Expand Up @@ -1212,6 +1221,24 @@ void test7_onConnect(void* context, MQTTAsync_successData* response)
}


void test7_onConnectOnly(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
int rc;

MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
dopts.context = context;
dopts.timeout = 1000;
dopts.onSuccess = test7_onDisconnect;
rc = MQTTAsync_disconnect(c, &dopts);

assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_finished = 1;
}


/*********************************************************************
Test7: Pending tokens
Expand Down Expand Up @@ -1248,7 +1275,6 @@ int test7(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);

opts.keepAliveInterval = 20;
opts.cleansession = 0;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
Expand All @@ -1259,11 +1285,30 @@ int test7(struct Options options)
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test7_onConnect;

opts.onFailure = NULL;
opts.context = c;

opts.cleansession = 1;
opts.onSuccess = test7_onConnectOnly;
MyLog(LOGA_DEBUG, "Connecting to clean up");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;

while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif

test_finished = 0;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.onSuccess = test7_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
Expand Down Expand Up @@ -1304,6 +1349,7 @@ int test7(struct Options options)
/* disconnect immediately without receiving the incoming messages */
dopts.timeout = 0;
dopts.onSuccess = test7_onDisconnect;
dopts.context = c;
MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */

while (!test_finished)
Expand Down Expand Up @@ -1371,6 +1417,8 @@ int test7(struct Options options)
assertions fail against Mosquitto - needs testing */

dopts.onFailure = test7_onDisconnectFailure;
dopts.onSuccess = test7_onDisconnect;
dopts.timeout = 1000;
MQTTAsync_disconnect(c, &dopts);

Expand Down

0 comments on commit 8ffd07f

Please sign in to comment.