Skip to content

Commit

Permalink
Some more persistence changes along with tests #534
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Craggs committed Sep 7, 2018
1 parent 96ac7e1 commit b4039d9
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 234 deletions.
82 changes: 51 additions & 31 deletions src/MQTTAsync.c
Expand Up @@ -2781,6 +2781,11 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
goto exit;
}
}
if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
{
rc = MQTTASYNC_WRONG_MQTT_VERSION;
goto exit;
}
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
Expand Down Expand Up @@ -3181,45 +3186,50 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
{
MQTTAsyncs* m = handle;
int i = 0;
int rc = MQTTASYNC_FAILURE;
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* sub;
int msgid = 0;

FUNC_ENTRY;
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
else if (m->c->connected == 0)
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
for (i = 0; i < count; i++)
else for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
break;
}
if (qos[i] < 0 || qos[i] > 2)
{
rc = MQTTASYNC_BAD_QOS;
goto exit;
break;
}
}
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{
if (rc != MQTTASYNC_SUCCESS)
; /* don't overwrite a previous error code */
else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
&& response->subscribeOptionsCount != 0))
{
rc = MQTTASYNC_BAD_MQTT_OPTION;
goto exit;
else if (response)
{
if (m->c->MQTTVersion >= MQTTVERSION_5)
{
if (response->struct_version == 0 || response->onFailure || response->onSuccess)
rc = MQTTASYNC_BAD_MQTT_OPTION;
}
else if (m->c->MQTTVersion < MQTTVERSION_5)
{
if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
rc = MQTTASYNC_BAD_MQTT_OPTION;
}
}
if (rc != MQTTASYNC_SUCCESS)
goto exit;

/* Add subscribe request to operation queue */
sub = malloc(sizeof(MQTTAsync_queuedCommand));
Expand Down Expand Up @@ -3287,34 +3297,42 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
{
MQTTAsyncs* m = handle;
int i = 0;
int rc = SOCKET_ERROR;
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* unsub;
int msgid = 0;

FUNC_ENTRY;
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
else if (m->c->connected == 0)
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
for (i = 0; i < count; i++)
else for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
break;
}
}
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{
if (rc != MQTTASYNC_SUCCESS)
; /* don't overwrite a previous error code */
else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
else if (response)
{
if (m->c->MQTTVersion >= MQTTVERSION_5)
{
if (response->struct_version == 0 || response->onFailure || response->onSuccess)
rc = MQTTASYNC_BAD_MQTT_OPTION;
}
else if (m->c->MQTTVersion < MQTTVERSION_5)
{
if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
rc = MQTTASYNC_BAD_MQTT_OPTION;
}
}
if (rc != MQTTASYNC_SUCCESS)
goto exit;

/* Add unsubscribe request to operation queue */
unsub = malloc(sizeof(MQTTAsync_queuedCommand));
Expand Down Expand Up @@ -4044,6 +4062,8 @@ const char* MQTTAsync_strerror(int code)
return "Invalid protocol scheme";
case MQTTASYNC_BAD_MQTT_OPTION:
return "Options for wrong MQTT version";
case MQTTASYNC_WRONG_MQTT_VERSION:
return "Client created for another version of MQTT";
}

sprintf(buf, "Unknown error code %d", code);
Expand Down
4 changes: 4 additions & 0 deletions src/MQTTAsync.h
Expand Up @@ -183,6 +183,10 @@
* Return code: don't use options for another version of MQTT
*/
#define MQTTASYNC_BAD_MQTT_OPTION -15
/**
* Return code: call not applicable to the client's version of MQTT
*/
#define MQTTASYNC_WRONG_MQTT_VERSION -16


/**
Expand Down
2 changes: 2 additions & 0 deletions src/MQTTClient.c
Expand Up @@ -2716,6 +2716,8 @@ const char* MQTTClient_strerror(int code)
return "Invalid protocol scheme";
case MQTTCLIENT_BAD_MQTT_OPTION:
return "Options for wrong MQTT version";
case MQTTCLIENT_WRONG_MQTT_VERSION:
return "Client created for another version of MQTT";
}

sprintf(buf, "Unknown error code %d", code);
Expand Down
8 changes: 7 additions & 1 deletion src/MQTTPersistence.c
Expand Up @@ -238,6 +238,11 @@ int MQTTPersistence_restore(Clients *c)
msg->nextMessageType = PUBREL;
/* order does not matter for persisted received messages */
ListAppend(c->inboundMsgs, msg, msg->len);
if (c->MQTTVersion >= MQTTVERSION_5)
{
free(msg->publish->payload);
free(msg->publish->topic);
}
publish->topic = NULL;
MQTTPacket_freePublish(publish);
msgs_rcvd++;
Expand Down Expand Up @@ -468,7 +473,8 @@ int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
if (c->persistence != NULL)
{
char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0) //&& qos == 2 )
if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
{
sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId) ;
rc = c->persistence->premove(c->phandle, key);
Expand Down
63 changes: 40 additions & 23 deletions src/MQTTProtocolClient.c
Expand Up @@ -253,7 +253,7 @@ Publications* MQTTProtocol_storePublication(Publish* publish, int* len)
void MQTTProtocol_removePublication(Publications* p)
{
FUNC_ENTRY;
if (--(p->refcount) == 0)
if (p && --(p->refcount) == 0)
{
free(p->payload);
free(p->topic);
Expand Down Expand Up @@ -323,19 +323,21 @@ int MQTTProtocol_handlePublishes(void* pack, int sock)
rc = MQTTPacket_send_pubrec(publish->msgId, &client->net, client->clientID);
if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0)
{
Publish publish;

publish.header.bits.qos = m->qos;
publish.header.bits.retain = m->retain;
publish.msgId = m->msgid;
publish.topic = m->publish->topic;
publish.topiclen = m->publish->topiclen;
publish.payload = m->publish->payload;
publish.payloadlen = m->publish->payloadlen;
publish.MQTTVersion = m->MQTTVersion;
publish.properties = m->properties;

Protocol_processPublication(&publish, client);
Publish publish1;

publish1.header.bits.qos = m->qos;
publish1.header.bits.retain = m->retain;
publish1.msgId = m->msgid;
publish1.topic = m->publish->topic;
publish1.topiclen = m->publish->topiclen;
publish1.payload = m->publish->payload;
publish1.payloadlen = m->publish->payloadlen;
publish1.MQTTVersion = m->MQTTVersion;
publish1.properties = m->properties;

Protocol_processPublication(&publish1, client);
ListRemove(&(state.publications), m->publish);
m->publish = NULL;
}
publish->topic = NULL;
}
Expand Down Expand Up @@ -372,7 +374,9 @@ int MQTTProtocol_handlePubacks(void* pack, int sock)
{
Log(TRACE_MIN, 6, NULL, "PUBACK", client->clientID, puback->msgId);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, puback->msgId);
rc = MQTTPersistence_remove(client,
(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, puback->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
Expand Down Expand Up @@ -431,7 +435,9 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
Log(TRACE_MIN, -1, "Pubrec error %d received for client %s msgid %d, not sending PUBREL",
pubrec->rc, client->clientID, pubrec->msgId);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubrec->msgId);
rc = MQTTPersistence_remove(client,
(pubrec->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, pubrec->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
Expand Down Expand Up @@ -495,26 +501,33 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
{
Publish publish;

memset(&publish, '\0', sizeof(publish));
/* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_pubcomp(pubrel->msgId, &client->net, client->clientID);
publish.header.bits.qos = m->qos;
publish.header.bits.retain = m->retain;
publish.msgId = m->msgid;
publish.topic = m->publish->topic;
publish.topiclen = m->publish->topiclen;
publish.payload = m->publish->payload;
publish.payloadlen = m->publish->payloadlen;
if (m->publish)
{
publish.topic = m->publish->topic;
publish.topiclen = m->publish->topiclen;
publish.payload = m->publish->payload;
publish.payloadlen = m->publish->payloadlen;
}
publish.MQTTVersion = m->MQTTVersion;
if (publish.MQTTVersion >= MQTTVERSION_5)
publish.properties = m->properties;
else
Protocol_processPublication(&publish, client); /* only for 3.1.1 and lower */
#if !defined(NO_PERSISTENCE)
rc += MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId);
rc += MQTTPersistence_remove(client,
(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_RECEIVED : PERSISTENCE_PUBLISH_RECEIVED,
m->qos, pubrel->msgId);
#endif
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(&(state.publications), m->publish);
if (m->publish)
ListRemove(&(state.publications), m->publish);
ListRemove(client->inboundMsgs, m);
++(state.msgs_received);
}
Expand Down Expand Up @@ -562,7 +575,11 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
{
Log(TRACE_MIN, 6, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubcomp->msgId);
rc = MQTTPersistence_remove(client,
(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, pubcomp->msgId);
if (rc != 0)
Log(LOG_ERROR, -1, "Error removing PUBCOMP msgid id %d from persistence", client->clientID, pubcomp->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
Expand Down

0 comments on commit b4039d9

Please sign in to comment.