Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rumqttc async v5 client request batching #823

Open
wants to merge 13 commits into
base: batching
Choose a base branch
from
Open
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Process multiple outgoing client requests before flushing the network buffer (reduces number of system calls)

* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ matches = "0.1"
pretty_assertions = "1"
pretty_env_logger = "0.5"
serde = { version = "1", features = ["derive"] }
tokio-test = "0.4.4"

[[example]]
name = "tls"
Expand Down
18 changes: 10 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ impl EventLoop {
pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
if self.network.is_none() {
let (network, connack) = match time::timeout(
Duration::from_secs(self.network_options.connection_timeout()),
connect(&self.mqtt_options, self.network_options.clone()),
self.network_options.connection_timeout(),
connect(&self.mqtt_options, self.network_options()),
)
.await
{
Expand Down Expand Up @@ -173,7 +173,7 @@ impl EventLoop {
// let await_acks = self.state.await_acks;
let inflight_full = self.state.inflight >= self.mqtt_options.inflight;
let collision = self.state.collision.is_some();
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
let network_timeout = self.network_options.connection_timeout();

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand Down Expand Up @@ -258,10 +258,12 @@ impl EventLoop {
}
}

pub fn network_options(&self) -> NetworkOptions {
self.network_options.clone()
/// Get network options
pub fn network_options(&self) -> &NetworkOptions {
&self.network_options
}

/// Set network options
pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
self.network_options = network_options;
self
Expand Down Expand Up @@ -293,7 +295,7 @@ impl EventLoop {
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;
Expand All @@ -306,7 +308,7 @@ async fn connect(

pub(crate) async fn socket_connect(
host: String,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> io::Result<TcpStream> {
let addrs = lookup_host(host).await?;
let mut last_err = None;
Expand Down Expand Up @@ -352,7 +354,7 @@ pub(crate) async fn socket_connect(

async fn network_connect(
options: &MqttOptions,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<Network, ConnectionError> {
// Process Unix files early, as proxy is not supported for them.
#[cfg(unix)]
Expand Down
39 changes: 20 additions & 19 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,12 @@ impl From<ClientConfig> for TlsConfiguration {
}

/// Provides a way to configure low level network connection configurations
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
pub struct NetworkOptions {
tcp_send_buffer_size: Option<u32>,
tcp_recv_buffer_size: Option<u32>,
conn_timeout: u64,
/// Connection timeout
connection_timeout: Duration,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: Option<String>,
}
Expand All @@ -379,7 +380,7 @@ impl NetworkOptions {
NetworkOptions {
tcp_send_buffer_size: None,
tcp_recv_buffer_size: None,
conn_timeout: 5,
connection_timeout: Duration::from_secs(5),
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
bind_device: None,
}
Expand All @@ -393,15 +394,15 @@ impl NetworkOptions {
self.tcp_recv_buffer_size = Some(size);
}

/// set connection timeout in secs
pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
self.conn_timeout = timeout;
/// Set connection timeout
pub fn set_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
self.connection_timeout = timeout;
self
}

/// get timeout in secs
pub fn connection_timeout(&self) -> u64 {
self.conn_timeout
/// Get connection timeout
pub fn connection_timeout(&self) -> Duration {
self.connection_timeout
}

/// bind connection to a specific network device by name
Expand Down Expand Up @@ -443,7 +444,7 @@ pub struct MqttOptions {
/// request (publish, subscribe) channel capacity
request_channel_capacity: usize,
/// Max internal request batching
max_request_batch: usize,
max_batch_size: usize,
/// Minimum delay time between consecutive outgoing packets
/// while retransmitting pending packets
pending_throttle: Duration,
Expand Down Expand Up @@ -483,7 +484,7 @@ impl MqttOptions {
max_incoming_packet_size: 10 * 1024,
max_outgoing_packet_size: 10 * 1024,
request_channel_capacity: 10,
max_request_batch: 0,
max_batch_size: 0,
pending_throttle: Duration::from_micros(0),
inflight: 100,
last_will: None,
Expand Down Expand Up @@ -734,7 +735,7 @@ pub enum OptionError {
RequestChannelCapacity,

#[error("Invalid max-request-batch value.")]
MaxRequestBatch,
MaxBatchSize,

#[error("Invalid pending-throttle value.")]
PendingThrottle,
Expand Down Expand Up @@ -842,12 +843,12 @@ impl std::convert::TryFrom<url::Url> for MqttOptions {
options.request_channel_capacity = request_channel_capacity;
}

if let Some(max_request_batch) = queries
.remove("max_request_batch_num")
.map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
if let Some(max_batch_size) = queries
.remove("max_batch_size")
.map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxBatchSize))
.transpose()?
{
options.max_request_batch = max_request_batch;
options.max_batch_size = max_batch_size;
}

if let Some(pending_throttle) = queries
Expand Down Expand Up @@ -887,7 +888,7 @@ impl Debug for MqttOptions {
.field("credentials", &self.credentials)
.field("max_packet_size", &self.max_incoming_packet_size)
.field("request_channel_capacity", &self.request_channel_capacity)
.field("max_request_batch", &self.max_request_batch)
.field("max_batch_size", &self.max_batch_size)
.field("pending_throttle", &self.pending_throttle)
.field("inflight", &self.inflight)
.field("last_will", &self.last_will)
Expand Down Expand Up @@ -970,8 +971,8 @@ mod test {
OptionError::RequestChannelCapacity
);
assert_eq!(
err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
OptionError::MaxRequestBatch
err("mqtt://host:42?client_id=foo&max_batch_size=foo"),
OptionError::MaxBatchSize
);
assert_eq!(
err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Proxy {
self,
broker_addr: &str,
broker_port: u16,
network_options: NetworkOptions,
network_options: &NetworkOptions,
) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
let proxy_addr = format!("{}:{}", self.addr, self.port);

Expand Down
12 changes: 6 additions & 6 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::valid_topic;

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
use flume::{bounded, SendError, Sender, TrySendError};
use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;
Expand Down Expand Up @@ -54,8 +54,8 @@ impl AsyncClient {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.requests_tx.clone();
let (request_tx, request_rx) = bounded(cap);
let eventloop = EventLoop::new(options, request_rx);

let client = AsyncClient { request_tx };

Expand Down Expand Up @@ -479,15 +479,15 @@ impl Client {
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };

let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let (client, eventloop) = runtime.block_on(async { AsyncClient::new(options, cap) });
let client = Client { client };
let connection = Connection::new(eventloop, runtime);

(client, connection)
}

Expand Down
Loading