Skip to content

Commit

Permalink
Merge 3448f90 into d4c30df
Browse files Browse the repository at this point in the history
  • Loading branch information
flxo committed Apr 9, 2024
2 parents d4c30df + 3448f90 commit 00f7ed1
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 272 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Process multiple outgoing client requests before flushing the network buffer (reduces number of system calls)

* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ matches = "0.1"
pretty_assertions = "1"
pretty_env_logger = "0.5"
serde = { version = "1", features = ["derive"] }
tokio-test = "0.4.4"

[[example]]
name = "tls"
Expand Down
18 changes: 10 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ impl EventLoop {
pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
if self.network.is_none() {
let (network, connack) = match time::timeout(
Duration::from_secs(self.network_options.connection_timeout()),
connect(&self.mqtt_options, self.network_options.clone()),
self.network_options.connection_timeout(),
connect(&self.mqtt_options, self.network_options()),
)
.await
{
Expand Down Expand Up @@ -173,7 +173,7 @@ impl EventLoop {
// let await_acks = self.state.await_acks;
let inflight_full = self.state.inflight >= self.mqtt_options.inflight;
let collision = self.state.collision.is_some();
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
let network_timeout = self.network_options.connection_timeout();

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand Down Expand Up @@ -258,10 +258,12 @@ impl EventLoop {
}
}

pub fn network_options(&self) -> NetworkOptions {
self.network_options.clone()
/// Get network options
pub fn network_options(&self) -> &NetworkOptions {
&self.network_options
}

/// Set network options
pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
self.network_options = network_options;
self
Expand Down Expand Up @@ -293,7 +295,7 @@ impl EventLoop {
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;
Expand All @@ -306,7 +308,7 @@ async fn connect(

pub(crate) async fn socket_connect(
host: String,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> io::Result<TcpStream> {
let addrs = lookup_host(host).await?;
let mut last_err = None;
Expand Down Expand Up @@ -352,7 +354,7 @@ pub(crate) async fn socket_connect(

async fn network_connect(
options: &MqttOptions,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<Network, ConnectionError> {
// Process Unix files early, as proxy is not supported for them.
#[cfg(unix)]
Expand Down
39 changes: 20 additions & 19 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,12 @@ impl From<ClientConfig> for TlsConfiguration {
}

/// Provides a way to configure low level network connection configurations
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
pub struct NetworkOptions {
tcp_send_buffer_size: Option<u32>,
tcp_recv_buffer_size: Option<u32>,
conn_timeout: u64,
/// Connection timeout
connection_timeout: Duration,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: Option<String>,
}
Expand All @@ -379,7 +380,7 @@ impl NetworkOptions {
NetworkOptions {
tcp_send_buffer_size: None,
tcp_recv_buffer_size: None,
conn_timeout: 5,
connection_timeout: Duration::from_secs(5),
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: None,
}
Expand All @@ -393,15 +394,15 @@ impl NetworkOptions {
self.tcp_recv_buffer_size = Some(size);
}

/// set connection timeout in secs
pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
self.conn_timeout = timeout;
/// Set connection timeout
pub fn set_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
self.connection_timeout = timeout;
self
}

/// get timeout in secs
pub fn connection_timeout(&self) -> u64 {
self.conn_timeout
/// Get connection timeout
pub fn connection_timeout(&self) -> Duration {
self.connection_timeout
}

/// bind connection to a specific network device by name
Expand Down Expand Up @@ -443,7 +444,7 @@ pub struct MqttOptions {
/// request (publish, subscribe) channel capacity
request_channel_capacity: usize,
/// Max internal request batching
max_request_batch: usize,
max_batch_size: usize,
/// Minimum delay time between consecutive outgoing packets
/// while retransmitting pending packets
pending_throttle: Duration,
Expand Down Expand Up @@ -483,7 +484,7 @@ impl MqttOptions {
max_incoming_packet_size: 10 * 1024,
max_outgoing_packet_size: 10 * 1024,
request_channel_capacity: 10,
max_request_batch: 0,
max_batch_size: 0,
pending_throttle: Duration::from_micros(0),
inflight: 100,
last_will: None,
Expand Down Expand Up @@ -734,7 +735,7 @@ pub enum OptionError {
RequestChannelCapacity,

#[error("Invalid max-request-batch value.")]
MaxRequestBatch,
MaxBatchSize,

#[error("Invalid pending-throttle value.")]
PendingThrottle,
Expand Down Expand Up @@ -842,12 +843,12 @@ impl std::convert::TryFrom<url::Url> for MqttOptions {
options.request_channel_capacity = request_channel_capacity;
}

if let Some(max_request_batch) = queries
.remove("max_request_batch_num")
.map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
if let Some(max_batch_size) = queries
.remove("max_batch_size")
.map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxBatchSize))
.transpose()?
{
options.max_request_batch = max_request_batch;
options.max_batch_size = max_batch_size;
}

if let Some(pending_throttle) = queries
Expand Down Expand Up @@ -887,7 +888,7 @@ impl Debug for MqttOptions {
.field("credentials", &self.credentials)
.field("max_packet_size", &self.max_incoming_packet_size)
.field("request_channel_capacity", &self.request_channel_capacity)
.field("max_request_batch", &self.max_request_batch)
.field("max_batch_size", &self.max_batch_size)
.field("pending_throttle", &self.pending_throttle)
.field("inflight", &self.inflight)
.field("last_will", &self.last_will)
Expand Down Expand Up @@ -970,8 +971,8 @@ mod test {
OptionError::RequestChannelCapacity
);
assert_eq!(
err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
OptionError::MaxRequestBatch
err("mqtt://host:42?client_id=foo&max_batch_size=foo"),
OptionError::MaxBatchSize
);
assert_eq!(
err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Proxy {
self,
broker_addr: &str,
broker_port: u16,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
let proxy_addr = format!("{}:{}", self.addr, self.port);

Expand Down
12 changes: 6 additions & 6 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::valid_topic;

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
use flume::{bounded, SendError, Sender, TrySendError};
use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;
Expand Down Expand Up @@ -54,8 +54,8 @@ impl AsyncClient {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let (request_tx, request_rx) = bounded(cap);
let eventloop = EventLoop::new(options, request_rx);

let client = AsyncClient { request_tx };

Expand Down Expand Up @@ -479,15 +479,15 @@ impl Client {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };

let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let (client, eventloop) = runtime.block_on(async { AsyncClient::new(options, cap) });
let client = Client { client };
let connection = Connection::new(eventloop, runtime);

(client, connection)
}

Expand Down

0 comments on commit 00f7ed1

Please sign in to comment.