Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into auth_support
Browse files Browse the repository at this point in the history
  • Loading branch information
ting-ms committed May 11, 2024
2 parents db2ef37 + 7f594f8 commit 355ace3
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 21 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes

### Security

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ websocket = ["dep:async-tungstenite", "dep:ws_stream_tungstenite", "dep:http"]
proxy = ["dep:async-http-proxy"]

[dependencies]
futures-util = { version = "0.3", default_features = false, features = ["std", "sink"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time"] }
tokio-util = { version = "0.7", features = ["codec"] }
bytes = "1.5"
Expand Down
5 changes: 3 additions & 2 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl MqttState {
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; std::u16::MAX as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand Down Expand Up @@ -165,6 +165,8 @@ impl MqttState {
&mut self,
packet: Incoming,
) -> Result<Option<Packet>, StateError> {
self.events.push_back(Event::Incoming(packet.clone()));

let outgoing = match &packet {
Incoming::PingResp => self.handle_incoming_pingresp()?,
Incoming::Publish(publish) => self.handle_incoming_publish(publish)?,
Expand All @@ -179,7 +181,6 @@ impl MqttState {
return Err(StateError::WrongPacket);
}
};
self.events.push_back(Event::Incoming(packet));
self.last_incoming = Instant::now();

Ok(outgoing)
Expand Down
7 changes: 4 additions & 3 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl MqttState {
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; std::u16::MAX as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand Down Expand Up @@ -231,6 +231,8 @@ impl MqttState {
&mut self,
mut packet: Incoming,
) -> Result<Option<Packet>, StateError> {
self.events.push_back(Event::Incoming(packet.to_owned()));

let outgoing = match &mut packet {
Incoming::PingResp(_) => self.handle_incoming_pingresp()?,
Incoming::Publish(publish) => self.handle_incoming_publish(publish)?,
Expand All @@ -249,7 +251,6 @@ impl MqttState {
}
};

self.events.push_back(Event::Incoming(packet));
self.last_incoming = Instant::now();
Ok(outgoing)
}
Expand Down Expand Up @@ -346,7 +347,7 @@ impl MqttState {
}
} else if let Some(alias) = topic_alias {
if let Some(topic) = self.topic_alises.get(&alias) {
publish.topic = topic.to_owned();
topic.clone_into(&mut publish.topic);
} else {
self.handle_protocol_error()?;
};
Expand Down
6 changes: 3 additions & 3 deletions rumqttd/src/router/alertlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ pub mod alert {
pub use alert::*;

pub struct AlertLog {
pub config: RouterConfig,
pub _config: RouterConfig,
pub alerts: VecDeque<Alert>,
}

impl AlertLog {
pub fn new(config: RouterConfig) -> AlertLog {
pub fn new(_config: RouterConfig) -> AlertLog {
AlertLog {
config,
_config,
alerts: VecDeque::with_capacity(100),
}
}
Expand Down
14 changes: 7 additions & 7 deletions rumqttd/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub struct Message {
/// Log to sweep
pub topic: String,
/// Qos of the topic
pub qos: u8,
pub _qos: u8,
/// Reply data chain
pub payload: Bytes,
}
Expand Down Expand Up @@ -250,12 +250,12 @@ impl fmt::Debug for Data {
}
}

#[derive(Debug, Clone)]
pub struct Disconnection {
pub id: String,
pub execute_will: bool,
pub pending: Vec<Notification>,
}
// #[derive(Debug, Clone)]
// pub struct Disconnection {
// pub id: String,
// pub execute_will: bool,
// pub pending: Vec<Notification>,
// }

#[derive(Debug, Clone)]
pub struct ShadowRequest {
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl Router {
|session_state| {
connection.subscriptions = session_state.subscriptions;
// for using in acklog
pending_acks = session_state.unacked_pubrels.clone();
pending_acks.clone_from(&session_state.unacked_pubrels);
outgoing.unacked_pubrels = session_state.unacked_pubrels;
session_state.tracker
},
Expand Down
1 change: 0 additions & 1 deletion rumqttd/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::Offset;
use std::usize;
use std::{collections::VecDeque, io};

mod segment;
Expand Down
6 changes: 3 additions & 3 deletions rumqttd/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use tokio::io::{AsyncRead, AsyncWrite};
// use tokio::io::{AsyncRead, AsyncWrite};

mod broker;
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
mod tls;

pub use broker::Broker;

pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> IO for T {}
// pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
// impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> IO for T {}

0 comments on commit 355ace3

Please sign in to comment.