Skip to content

Commit

Permalink
Resolve issue knolleary#832 (Potential memory corruption)
Browse files Browse the repository at this point in the history
The memmove command is executed without any sanity checking on used values causing random reboots when an MQTT package gets processed with an impossible topic length.
With a high rate test flow of MQTT messages this would sometimes corrupt memory after a few seconds.

Extra sanity checks and calculation for payload_offset and message length have been implemented to better handle messages and prevent this memory corruption.

Tested with a flow of about 50.000 messages per hour for multiple days. Went from hunderds of errors per hour til a few per day, the last errors mainly because the MQTT server couldn't keep up with the message flow. Never crashed again.
  • Loading branch information
arjenhiemstra committed Feb 27, 2021
1 parent 2d228f2 commit c4a0dc0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 49 deletions.
130 changes: 81 additions & 49 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,82 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
return len;
}

boolean PubSubClient::loop_read() {
if (_client == nullptr) {
return false;
}
if (!_client->available()) {
return false;
}
uint8_t llen;
uint16_t len = readPacket(&llen);
if (len == 0) {
return false;
}
unsigned long t = millis();
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;

switch(type) {
case MQTTPUBLISH:
{
if (callback) {
const boolean msgId_present = (buffer[0]&0x06) == MQTTQOS1;
const uint16_t tl_offset = llen+1;
const uint16_t tl = (buffer[tl_offset]<<8)+buffer[tl_offset+1]; /* topic length in bytes */
const uint16_t topic_offset = tl_offset+2;
const uint16_t msgId_offset = topic_offset+tl;
const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset;
if ((payload_offset) >= this->bufferSize) {return false;}
if (len < payload_offset) {return false;}
memmove(buffer+topic_offset-1,buffer+topic_offset,tl); /* move topic inside buffer 1 byte to front */
buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+topic_offset-1;
uint8_t *payload;
// msgId only present for QOS>0
if (msgId_present) {
const uint16_t msgId = (buffer[msgId_offset]<<8)+buffer[msgId_offset+1];
payload = buffer+payload_offset;
callback(topic,payload,len-payload_offset);
if (_client->connected()) {
buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
if (_client->write(buffer,4) != 0) {
lastOutActivity = t;
}
}
} else {
// No msgId
payload = buffer+payload_offset;
callback(topic,payload,len-payload_offset);
}
}
break;
}
case MQTTPINGREQ:
{
if (_client->connected()) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
}
break;
}
case MQTTPINGRESP:
{
pingOutstanding = false;
break;
}
default:
return false;
}
return true;
}

boolean PubSubClient::loop() {
loop_read();
if (connected()) {
unsigned long t = millis();
if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
Expand All @@ -376,56 +451,13 @@ boolean PubSubClient::loop() {
_client->stop();
return false;
} else {
this->buffer[0] = MQTTPINGREQ;
this->buffer[1] = 0;
_client->write(this->buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = this->buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) this->buffer+llen+2;
// msgId only present for QOS>0
if ((this->buffer[0]&0x06) == MQTTQOS1) {
msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
payload = this->buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);

this->buffer[0] = MQTTPUBACK;
this->buffer[1] = 2;
this->buffer[2] = (msgId >> 8);
this->buffer[3] = (msgId & 0xFF);
_client->write(this->buffer,4);
lastOutActivity = t;

} else {
payload = this->buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
this->buffer[0] = MQTTPINGRESP;
this->buffer[1] = 0;
_client->write(this->buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
buffer[0] = MQTTPINGREQ;
buffer[1] = 0;
if (_client->write(buffer,2) != 0) {
lastOutActivity = t;
lastInActivity = t;
}
} else if (!connected()) {
// readPacket has closed the connection
return false;
pingOutstanding = true;
}
}
return true;
Expand Down
1 change: 1 addition & 0 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class PubSubClient : public Print {
boolean subscribe(const char* topic);
boolean subscribe(const char* topic, uint8_t qos);
boolean unsubscribe(const char* topic);
boolean loop_read();
boolean loop();
boolean connected();
int state();
Expand Down

0 comments on commit c4a0dc0

Please sign in to comment.