Skip to content

Commit

Permalink
mqtt: convert transaction list to vecdeque
Browse files Browse the repository at this point in the history
Allows for more efficient removal from front.

Ticket: #5294
  • Loading branch information
jasonish authored and victorjulien committed Apr 30, 2022
1 parent 3189414 commit 3b422e2
Showing 1 changed file with 35 additions and 34 deletions.
69 changes: 35 additions & 34 deletions rust/src/mqtt/mqtt.rs
@@ -1,4 +1,4 @@
/* Copyright (C) 2020 Open Information Security Foundation
/* Copyright (C) 20222 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand All @@ -25,6 +25,7 @@ use crate::conf::conf_get;
use crate::core::*;
use nom7::Err;
use std;
use std::collections::VecDeque;
use std::ffi::CString;

// Used as a special pseudo packet identifier to denote the first CONNECT
Expand Down Expand Up @@ -98,7 +99,7 @@ impl Transaction for MQTTTransaction {
pub struct MQTTState {
tx_id: u64,
pub protocol_version: u8,
transactions: Vec<MQTTTransaction>,
transactions: VecDeque<MQTTTransaction>,
connected: bool,
skip_request: usize,
skip_response: usize,
Expand All @@ -120,7 +121,7 @@ impl MQTTState {
Self {
tx_id: 0,
protocol_version: 0,
transactions: Vec::new(),
transactions: VecDeque::new(),
connected: false,
skip_request: 0,
skip_response: 0,
Expand Down Expand Up @@ -202,18 +203,18 @@ impl MQTTState {
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push(tx);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
Expand All @@ -222,31 +223,31 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push(tx);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let pkt_id = subscribe.message_id as u32;
Expand All @@ -256,25 +257,25 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let pkt_id = unsubscribe.message_id as u32;
Expand All @@ -284,17 +285,17 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
}
Expand All @@ -307,29 +308,29 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
(*tx).msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
Expand All @@ -339,14 +340,14 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
Expand All @@ -356,14 +357,14 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
Expand All @@ -373,41 +374,41 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
MQTTOperation::UNASSIGNED => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
MQTTOperation::TRUNCATED(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
}
Expand Down Expand Up @@ -577,7 +578,7 @@ impl MQTTState {
}
tx.complete = true;
tx.tx_data.set_event(event as u8);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}

Expand Down

0 comments on commit 3b422e2

Please sign in to comment.