Skip to content

Commit 6113eac

Browse files
committed
1 parent b1c29e8 commit 6113eac

10 files changed

+147
-39
lines changed

Diff for: ChangeLog.txt

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
Security:
2+
- CVE-2023-28366: Fix memory leak in broker when clients send multiple QoS 2
3+
messages with the same message ID, but then never respond to the PUBREC
4+
commands.
25
- Broker will now reject Will messages that attempt to publish to $CONTROL/.
36
- Broker now validates usernames provided in a TLS certificate or TLS-PSK
47
identity are valid UTF-8.

Diff for: lib/packet_mosq.c

+15
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,21 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
152152

153153
packet->next = NULL;
154154
pthread_mutex_lock(&mosq->out_packet_mutex);
155+
156+
#ifdef WITH_BROKER
157+
if(mosq->out_packet_count >= db.config->max_queued_messages){
158+
mosquitto__free(packet);
159+
if(mosq->is_dropping == false){
160+
mosq->is_dropping = true;
161+
log__printf(NULL, MOSQ_LOG_NOTICE,
162+
"Outgoing messages are being dropped for client %s.",
163+
mosq->id);
164+
}
165+
G_MSGS_DROPPED_INC();
166+
return MOSQ_ERR_SUCCESS;
167+
}
168+
#endif
169+
155170
if(mosq->out_packet){
156171
mosq->out_packet_last->next = packet;
157172
}else{

Diff for: src/context.c

+25-16
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ struct mosquitto *context__init(mosq_sock_t sock)
8383
}
8484
}
8585
context->bridge = NULL;
86-
context->msgs_in.inflight_maximum = db.config->max_inflight_messages;
86+
context->msgs_in.inflight_maximum = 1;
8787
context->msgs_out.inflight_maximum = db.config->max_inflight_messages;
88-
context->msgs_in.inflight_quota = db.config->max_inflight_messages;
88+
context->msgs_in.inflight_quota = 1;
8989
context->msgs_out.inflight_quota = db.config->max_inflight_messages;
9090
context->max_qos = 2;
9191
#ifdef WITH_TLS
@@ -98,6 +98,27 @@ struct mosquitto *context__init(mosq_sock_t sock)
9898
return context;
9999
}
100100

101+
static void context__cleanup_out_packets(struct mosquitto *context)
102+
{
103+
struct mosquitto__packet *packet;
104+
105+
if(!context) return;
106+
107+
if(context->current_out_packet){
108+
packet__cleanup(context->current_out_packet);
109+
mosquitto__free(context->current_out_packet);
110+
context->current_out_packet = NULL;
111+
}
112+
while(context->out_packet){
113+
packet__cleanup(context->out_packet);
114+
packet = context->out_packet;
115+
context->out_packet = context->out_packet->next;
116+
mosquitto__free(packet);
117+
}
118+
context->out_packet_count = 0;
119+
}
120+
121+
101122
/*
102123
* This will result in any outgoing packets going unsent. If we're disconnected
103124
* forcefully then it is usually an error condition and shouldn't be a problem,
@@ -106,8 +127,6 @@ struct mosquitto *context__init(mosq_sock_t sock)
106127
*/
107128
void context__cleanup(struct mosquitto *context, bool force_free)
108129
{
109-
struct mosquitto__packet *packet;
110-
111130
if(!context) return;
112131

113132
if(force_free){
@@ -121,6 +140,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
121140
#endif
122141

123142
alias__free_all(context);
143+
context__cleanup_out_packets(context);
124144

125145
mosquitto__free(context->auth_method);
126146
context->auth_method = NULL;
@@ -148,18 +168,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
148168
context->id = NULL;
149169
}
150170
packet__cleanup(&(context->in_packet));
151-
if(context->current_out_packet){
152-
packet__cleanup(context->current_out_packet);
153-
mosquitto__free(context->current_out_packet);
154-
context->current_out_packet = NULL;
155-
}
156-
while(context->out_packet){
157-
packet__cleanup(context->out_packet);
158-
packet = context->out_packet;
159-
context->out_packet = context->out_packet->next;
160-
mosquitto__free(packet);
161-
}
162-
context->out_packet_count = 0;
171+
context__cleanup_out_packets(context);
163172
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
164173
if(context->adns){
165174
gai_cancel(context->adns);

Diff for: src/database.c

+15-10
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
555555
}
556556
#endif
557557

558-
msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
558+
msg = mosquitto__calloc(1, sizeof(struct mosquitto_client_msg));
559559
if(!msg) return MOSQ_ERR_NOMEM;
560560
msg->prev = NULL;
561561
msg->next = NULL;
@@ -613,6 +613,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
613613

614614
if(dir == mosq_md_out && msg->qos > 0 && state != mosq_ms_queued){
615615
util__decrement_send_quota(context);
616+
}else if(dir == mosq_md_in && msg->qos > 0 && state != mosq_ms_queued){
617+
util__decrement_receive_quota(context);
616618
}
617619

618620
if(dir == mosq_md_out && update){
@@ -796,23 +798,24 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store
796798
return MOSQ_ERR_SUCCESS;
797799
}
798800

799-
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
801+
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_client_msg **client_msg)
800802
{
801-
struct mosquitto_client_msg *tail;
803+
struct mosquitto_client_msg *cmsg;
804+
805+
*client_msg = NULL;
802806

803807
if(!context) return MOSQ_ERR_INVAL;
804808

805-
*stored = NULL;
806-
DL_FOREACH(context->msgs_in.inflight, tail){
807-
if(tail->store->source_mid == mid){
808-
*stored = tail->store;
809+
DL_FOREACH(context->msgs_in.inflight, cmsg){
810+
if(cmsg->store->source_mid == mid){
811+
*client_msg = cmsg;
809812
return MOSQ_ERR_SUCCESS;
810813
}
811814
}
812815

813-
DL_FOREACH(context->msgs_in.queued, tail){
814-
if(tail->store->source_mid == mid){
815-
*stored = tail->store;
816+
DL_FOREACH(context->msgs_in.queued, cmsg){
817+
if(cmsg->store->source_mid == mid){
818+
*client_msg = cmsg;
816819
return MOSQ_ERR_SUCCESS;
817820
}
818821
}
@@ -914,6 +917,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
914917
}else{
915918
/* Message state can be preserved here because it should match
916919
* whatever the client has got. */
920+
msg->dup = 0;
917921
}
918922
}
919923

@@ -924,6 +928,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
924928
* will be sent out of order.
925929
*/
926930
DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
931+
msg->dup = 0;
927932
db__msg_add_to_queued_stats(&context->msgs_in, msg);
928933
if(db__ready_for_flight(context, mosq_md_in, msg->qos)){
929934
switch(msg->qos){

Diff for: src/handle_publish.c

+24-11
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ int handle__publish(struct mosquitto *context)
4242
uint8_t header = context->in_packet.command;
4343
int res = 0;
4444
struct mosquitto_msg_store *msg, *stored = NULL;
45+
struct mosquitto_client_msg *cmsg_stored = NULL;
4546
size_t len;
4647
uint16_t slen;
4748
char *topic_mount;
@@ -287,24 +288,24 @@ int handle__publish(struct mosquitto *context)
287288
}
288289

289290
if(msg->qos > 0){
290-
db__message_store_find(context, msg->source_mid, &stored);
291+
db__message_store_find(context, msg->source_mid, &cmsg_stored);
291292
}
292293

293-
if(stored && msg->source_mid != 0 &&
294-
(stored->qos != msg->qos
295-
|| stored->payloadlen != msg->payloadlen
296-
|| strcmp(stored->topic, msg->topic)
297-
|| memcmp(stored->payload, msg->payload, msg->payloadlen) )){
294+
if(cmsg_stored && cmsg_stored->store && msg->source_mid != 0 &&
295+
(cmsg_stored->store->qos != msg->qos
296+
|| cmsg_stored->store->payloadlen != msg->payloadlen
297+
|| strcmp(cmsg_stored->store->topic, msg->topic)
298+
|| memcmp(cmsg_stored->store->payload, msg->payload, msg->payloadlen) )){
298299

299300
log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", msg->source_mid, context->id);
300301
db__message_remove_incoming(context, msg->source_mid);
301-
stored = NULL;
302+
cmsg_stored = NULL;
302303
}
303304

304-
if(!stored){
305+
if(!cmsg_stored){
305306
if(msg->qos == 0
306307
|| db__ready_for_flight(context, mosq_md_in, msg->qos)
307-
|| db__ready_for_queue(context, msg->qos, &context->msgs_in)){
308+
){
308309

309310
dup = 0;
310311
rc = db__message_store(context, msg, message_expiry_interval, 0, mosq_mo_client);
@@ -316,10 +317,13 @@ int handle__publish(struct mosquitto *context)
316317
}
317318
stored = msg;
318319
msg = NULL;
320+
dup = 0;
319321
}else{
320322
db__msg_store_free(msg);
321323
msg = NULL;
322-
dup = 1;
324+
stored = cmsg_stored->store;
325+
cmsg_stored->dup++;
326+
dup = cmsg_stored->dup;
323327
}
324328

325329
switch(stored->qos){
@@ -345,11 +349,17 @@ int handle__publish(struct mosquitto *context)
345349
}else{
346350
res = 0;
347351
}
352+
348353
/* db__message_insert() returns 2 to indicate dropped message
349354
* due to queue. This isn't an error so don't disconnect them. */
350355
/* FIXME - this is no longer necessary due to failing early above */
351356
if(!res){
352-
if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1;
357+
if(dup == 0 || dup == 1){
358+
rc2 = send__pubrec(context, stored->source_mid, 0, NULL);
359+
if(rc2) rc = rc2;
360+
}else{
361+
return MOSQ_ERR_PROTOCOL;
362+
}
353363
}else if(res == 1){
354364
rc = 1;
355365
}
@@ -374,6 +384,9 @@ int handle__publish(struct mosquitto *context)
374384
}
375385
db__msg_store_free(msg);
376386
}
387+
if(context->out_packet_count >= db.config->max_queued_messages){
388+
rc = MQTT_RC_QUOTA_EXCEEDED;
389+
}
377390
return rc;
378391
}
379392

Diff for: src/mosquitto_broker_internal.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ struct mosquitto_client_msg{
394394
bool retain;
395395
enum mosquitto_msg_direction direction;
396396
enum mosquitto_msg_state state;
397-
bool dup;
397+
uint8_t dup;
398398
};
399399

400400

@@ -651,7 +651,7 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d
651651
int db__messages_delete(struct mosquitto *context, bool force_free);
652652
int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
653653
int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin);
654-
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
654+
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_client_msg **client_msg);
655655
void db__msg_store_add(struct mosquitto_msg_store *store);
656656
void db__msg_store_remove(struct mosquitto_msg_store *store);
657657
void db__msg_store_ref_inc(struct mosquitto_msg_store *store);

Diff for: test/broker/03-publish-qos2-dup.py

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/usr/bin/env python3
2+
3+
from mosq_test_helper import *
4+
5+
def do_test(proto_ver):
6+
rc = 1
7+
connect_packet = mosq_test.gen_connect("03-pub-qos2-dup-test", proto_ver=proto_ver)
8+
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
9+
10+
mid = 1
11+
publish_packet = mosq_test.gen_publish("topic", qos=2, mid=mid, payload="message", proto_ver=proto_ver, dup=1)
12+
pubrec_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver)
13+
14+
disconnect_packet = mosq_test.gen_disconnect(reason_code=130, proto_ver=proto_ver)
15+
16+
port = mosq_test.get_port()
17+
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
18+
19+
try:
20+
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
21+
mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec 1")
22+
mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec 2")
23+
if proto_ver == 5:
24+
mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "disconnect")
25+
rc = 0
26+
else:
27+
try:
28+
mosq_test.do_send_receive(sock, publish_packet, b"", "disconnect1")
29+
rc = 0
30+
except BrokenPipeError:
31+
rc = 0
32+
33+
sock.close()
34+
except Exception as e:
35+
print(e)
36+
except mosq_test.TestError:
37+
pass
38+
finally:
39+
broker.terminate()
40+
broker.wait()
41+
(stdo, stde) = broker.communicate()
42+
if rc:
43+
print(stde.decode('utf-8'))
44+
print("proto_ver=%d" % (proto_ver))
45+
exit(rc)
46+
47+
48+
def all_tests():
49+
rc = do_test(proto_ver=4)
50+
if rc:
51+
return rc;
52+
rc = do_test(proto_ver=5)
53+
if rc:
54+
return rc;
55+
return 0
56+
57+
if __name__ == '__main__':
58+
all_tests()

Diff for: test/broker/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ msg_sequence_test:
8484
./03-publish-qos1-no-subscribers-v5.py
8585
./03-publish-qos1-retain-disabled.py
8686
./03-publish-qos1.py
87+
./03-publish-qos2-dup.py
8788
./03-publish-qos2-max-inflight.py
8889
./03-publish-qos2.py
8990

Diff for: test/broker/test.py

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
(1, './03-publish-qos1-no-subscribers-v5.py'),
6565
(1, './03-publish-qos1-retain-disabled.py'),
6666
(1, './03-publish-qos1.py'),
67+
(1, './03-publish-qos2-dup.py'),
6768
(1, './03-publish-qos2-max-inflight.py'),
6869
(1, './03-publish-qos2.py'),
6970

Diff for: www/pages/security.md

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ follow the steps on [Eclipse Security] page to report it.
1919
Listed with most recent first. Further information on security related issues
2020
can be found in the [security category].
2121

22+
* June 2023: [CVE-2023-28366]: Clients sending unacknowledged QoS 2 messages
23+
with duplicate message ids cause a memory leak. Affecting versions **1.3.2**
24+
to **2.0.15** inclusive, fixed in **2.0.16**.
2225
* August 2022: Deleting the anonymous group in the dynamic security plugin
2326
could lead to a crash. Affecting versions **2.0.0** to **2.0.14** inclusive,
2427
fixed in **2.0.15**.

0 commit comments

Comments
 (0)