Skip to content

Commit

Permalink
feat(rumqttc): simplify keepalive interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent 32bc49e commit e389354
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 21 deletions.
35 changes: 17 additions & 18 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::framed::AsyncReadWrite;
use flume::Receiver;
use futures_util::{Stream, StreamExt};
use tokio::select;
use tokio::time::{self, error::Elapsed, Instant, Sleep};
use tokio::time::Interval;
use tokio::time::{self, error::Elapsed};

use std::collections::VecDeque;
use std::io;
Expand Down Expand Up @@ -85,7 +86,7 @@ pub struct EventLoop {
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
keepalive_timeout: Option<Pin<Box<Sleep>>>,
keepalive_interval: Interval,
}

/// Events which can be yielded by the event loop
Expand All @@ -103,6 +104,8 @@ impl EventLoop {
let pending = IntervalQueue::new(options.pending_throttle);
let batch_size = options.max_batch_size;
let state = MqttState::new(inflight_limit, manual_acks);
assert!(!options.keep_alive.is_zero());
let keepalive_interval = time::interval(options.keep_alive());

EventLoop {
options,
Expand All @@ -111,7 +114,7 @@ impl EventLoop {
requests,
pending,
network: None,
keepalive_timeout: None,
keepalive_interval,
}
}

Expand All @@ -124,7 +127,6 @@ impl EventLoop {
/// > For this reason we recommend setting [`AsycClient`](super::AsyncClient)'s channel capacity to `0`.
pub fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
Expand All @@ -144,14 +146,12 @@ impl EventLoop {
.await??;
self.network = Some(network);

if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

// A connack never produces a response packet. Safe to ignore the return value
// of `handle_incoming_packet`
self.state.handle_incoming_packet(connack)?;
self.pending.reset();

self.pending.reset_immediately();
self.keepalive_interval.reset();
}

// Read buffered events from previous polls before calling a new poll
Expand Down Expand Up @@ -216,8 +216,10 @@ impl EventLoop {
network.write(packet).await?;
}
},
// Process next packet received from io
// Process next packet from io
packet = network.read() => {
// Reset keepalive interval due to packet reception
self.keepalive_interval.reset();
match packet? {
Some(packet) => if let Some(packet) = self.state.handle_incoming_packet(packet)? {
let flush = matches!(packet, Packet::PingResp(_));
Expand All @@ -229,11 +231,8 @@ impl EventLoop {
None => return Err(ConnectionError::ConnectionClosed),
}
},
// We generate pings irrespective of network activity. This keeps the ping logic
// simple. We can change this behavior in future if necessary (to prevent extra pings)
_ = self.keepalive_timeout.as_mut().unwrap() => {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.options.keep_alive);
// Send a ping request on each interval tick
_ = self.keepalive_interval.tick() => {
if let Some(packet) = self.state.handle_outgoing_packet(Request::PingReq)? {
network.write(packet).await?;
}
Expand Down Expand Up @@ -291,10 +290,10 @@ impl<T> IntervalQueue<T> {
self.queue.extend(requests);
}

/// Reset the pending interval tick
pub fn reset(&mut self) {
/// Reset the pending interval tick. Next tick yields immediately
pub fn reset_immediately(&mut self) {
if let Some(interval) = self.interval.as_mut() {
interval.reset();
interval.reset_immediately();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio_util::codec::Framed;
use crate::framed::AsyncReadWrite;

use super::mqttbytes::v5::Packet;
use super::{Codec, Connect, Login, MqttOptions};
use super::{Codec, Connect, MqttOptions};
use super::{Incoming, StateError};

/// Network transforms packets <-> frames efficiently. It takes
Expand Down Expand Up @@ -41,7 +41,7 @@ impl Network {
match self.framed.next().await {
Some(Ok(packet)) => Ok(Some(packet)),
Some(Err(e)) => Err(StateError::Deserialization(e)),
None => Ok(None)
None => Ok(None),
}
}

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub enum StateError {
#[error("Connection failed with reason '{reason:?}' ")]
ConnFail { reason: ConnectReturnCode },
#[error("Connection closed by peer abruptly")]
ConnectionAborted
ConnectionAborted,
}

impl From<mqttbytes::Error> for StateError {
Expand Down

0 comments on commit e389354

Please sign in to comment.