From 7f594f8163760040f3247a94d47a6a8c87829907 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 10 May 2024 22:50:26 +0530 Subject: [PATCH] fix: ordering of events returned on `poll()` (#862) * fix: ordering of events as mentioned in #860 * fix: clippy suggestion https://github.com/bytebeamio/rumqtt/actions/runs/9013022148/job/24763146160?pr=862#step:7:216 * chore: clippy suggestions https://github.com/bytebeamio/rumqtt/actions/runs/9028175682/job/24808221311?pr=862 * chore: update CHANGELOG --- rumqttc/CHANGELOG.md | 1 + rumqttc/Cargo.toml | 2 +- rumqttc/src/state.rs | 5 +++-- rumqttc/src/v5/state.rs | 9 +++++---- rumqttd/src/router/alertlog.rs | 6 +++--- rumqttd/src/router/mod.rs | 14 +++++++------- rumqttd/src/router/routing.rs | 2 +- rumqttd/src/segments/mod.rs | 1 - rumqttd/src/server/mod.rs | 6 +++--- 9 files changed, 24 insertions(+), 22 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 1045cfcf1..d85c89032 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Validate filters while creating subscription requests. * Make v4::Connect::write return correct value +* Ordering of `State.events` related to `QoS > 0` publishes ### Security diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index bba64822c..1e5a4ba93 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -23,7 +23,7 @@ websocket = ["dep:async-tungstenite", "dep:ws_stream_tungstenite", "dep:http"] proxy = ["dep:async-http-proxy"] [dependencies] -futures-util = { version = "0.3", default_features = false, features = ["std", "sink"] } +futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] } tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time"] } tokio-util = { version = "0.7", features = ["codec"] } bytes = "1.5" diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 408741e35..41a0cd4c3 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -90,7 +90,7 @@ impl MqttState { // index 0 is wasted as 0 is not a valid packet id outgoing_pub: vec![None; max_inflight as usize + 1], outgoing_rel: vec![None; max_inflight as usize + 1], - incoming_pub: vec![None; std::u16::MAX as usize + 1], + incoming_pub: vec![None; u16::MAX as usize + 1], collision: None, // TODO: Optimize these sizes later events: VecDeque::with_capacity(100), @@ -165,6 +165,8 @@ impl MqttState { &mut self, packet: Incoming, ) -> Result, StateError> { + self.events.push_back(Event::Incoming(packet.clone())); + let outgoing = match &packet { Incoming::PingResp => self.handle_incoming_pingresp()?, Incoming::Publish(publish) => self.handle_incoming_publish(publish)?, @@ -179,7 +181,6 @@ impl MqttState { return Err(StateError::WrongPacket); } }; - self.events.push_back(Event::Incoming(packet)); self.last_incoming = Instant::now(); Ok(outgoing) diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 854aa7b0f..0d8f11f65 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -65,7 +65,7 @@ pub enum StateError { #[error("Connection failed with reason '{reason:?}' ")] ConnFail { reason: ConnectReturnCode }, #[error("Connection closed by peer abruptly")] - ConnectionAborted + ConnectionAborted, } impl From for StateError { @@ -138,7 +138,7 @@ impl MqttState { // index 0 is wasted as 0 is not a valid packet id outgoing_pub: vec![None; max_inflight as usize + 1], outgoing_rel: vec![None; max_inflight as usize + 1], - incoming_pub: vec![None; std::u16::MAX as usize + 1], + incoming_pub: vec![None; u16::MAX as usize + 1], collision: None, // TODO: Optimize these sizes later events: VecDeque::with_capacity(100), @@ -217,6 +217,8 @@ impl MqttState { &mut self, mut packet: Incoming, ) -> Result, StateError> { + self.events.push_back(Event::Incoming(packet.to_owned())); + let outgoing = match &mut packet { Incoming::PingResp(_) => self.handle_incoming_pingresp()?, Incoming::Publish(publish) => self.handle_incoming_publish(publish)?, @@ -234,7 +236,6 @@ impl MqttState { } }; - self.events.push_back(Event::Incoming(packet)); self.last_incoming = Instant::now(); Ok(outgoing) } @@ -331,7 +332,7 @@ impl MqttState { } } else if let Some(alias) = topic_alias { if let Some(topic) = self.topic_alises.get(&alias) { - publish.topic = topic.to_owned(); + topic.clone_into(&mut publish.topic); } else { self.handle_protocol_error()?; }; diff --git a/rumqttd/src/router/alertlog.rs b/rumqttd/src/router/alertlog.rs index 5d7ceb344..10b569f53 100644 --- a/rumqttd/src/router/alertlog.rs +++ b/rumqttd/src/router/alertlog.rs @@ -71,14 +71,14 @@ pub mod alert { pub use alert::*; pub struct AlertLog { - pub config: RouterConfig, + pub _config: RouterConfig, pub alerts: VecDeque, } impl AlertLog { - pub fn new(config: RouterConfig) -> AlertLog { + pub fn new(_config: RouterConfig) -> AlertLog { AlertLog { - config, + _config, alerts: VecDeque::with_capacity(100), } } diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index b76843735..a75cad34f 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -212,7 +212,7 @@ pub struct Message { /// Log to sweep pub topic: String, /// Qos of the topic - pub qos: u8, + pub _qos: u8, /// Reply data chain pub payload: Bytes, } @@ -250,12 +250,12 @@ impl fmt::Debug for Data { } } -#[derive(Debug, Clone)] -pub struct Disconnection { - pub id: String, - pub execute_will: bool, - pub pending: Vec, -} +// #[derive(Debug, Clone)] +// pub struct Disconnection { +// pub id: String, +// pub execute_will: bool, +// pub pending: Vec, +// } #[derive(Debug, Clone)] pub struct ShadowRequest { diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 3d285a99f..ebf362a71 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -322,7 +322,7 @@ impl Router { |session_state| { connection.subscriptions = session_state.subscriptions; // for using in acklog - pending_acks = session_state.unacked_pubrels.clone(); + pending_acks.clone_from(&session_state.unacked_pubrels); outgoing.unacked_pubrels = session_state.unacked_pubrels; session_state.tracker }, diff --git a/rumqttd/src/segments/mod.rs b/rumqttd/src/segments/mod.rs index 251c3040e..2a3b93e12 100644 --- a/rumqttd/src/segments/mod.rs +++ b/rumqttd/src/segments/mod.rs @@ -1,5 +1,4 @@ use crate::Offset; -use std::usize; use std::{collections::VecDeque, io}; mod segment; diff --git a/rumqttd/src/server/mod.rs b/rumqttd/src/server/mod.rs index c099396fa..fb151b1bf 100644 --- a/rumqttd/src/server/mod.rs +++ b/rumqttd/src/server/mod.rs @@ -1,4 +1,4 @@ -use tokio::io::{AsyncRead, AsyncWrite}; +// use tokio::io::{AsyncRead, AsyncWrite}; mod broker; #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] @@ -6,5 +6,5 @@ mod tls; pub use broker::Broker; -pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {} -impl IO for T {} +// pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {} +// impl IO for T {}