Skip to content

Commit

Permalink
fix(Event Loop): Allow keep-alives <= 5 secs
Browse files Browse the repository at this point in the history
This PR removes the requirement that the keep alive timeout be at
least 5 seconds. This is in keeping with the MQTT spec, which
states that the keep alive value should be between 0 and 65535.
A value of 0 indicates that no keep-alive pings are sent to the
broker.

Issue: bytebeamio#643 - Keep Alive has an arbitrary limit
  • Loading branch information
danieldougherty committed Jul 3, 2023
1 parent 72007ef commit 4c7d639
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
6 changes: 4 additions & 2 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl EventLoop {
};
self.network = Some(network);

if self.keepalive_timeout.is_none() {
if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

Expand Down Expand Up @@ -169,6 +169,7 @@ impl EventLoop {
return Ok(event);
}

let mut no_sleep = Box::pin(time::sleep(Duration::ZERO));
// this loop is necessary since self.incoming.pop_front() might return None. In that case,
// instead of returning a None event, we try again.
select! {
Expand Down Expand Up @@ -227,7 +228,8 @@ impl EventLoop {
},
// We generate pings irrespective of network activity. This keeps the ping logic
// simple. We can change this behavior in future if necessary (to prevent extra pings)
_ = self.keepalive_timeout.as_mut().unwrap() => {
_ = self.keepalive_timeout.as_mut().unwrap_or(&mut no_sleep),
if self.keepalive_timeout.is_some() && !self.mqtt_options.keep_alive.is_zero() => {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);

Expand Down
2 changes: 0 additions & 2 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,6 @@ impl MqttOptions {
/// Set number of seconds after which client should ping the broker
/// if there is no other data exchange
pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
assert!(duration.as_secs() >= 5, "Keep alives should be >= 5 secs");

self.keep_alive = duration;
self
}
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn connection_should_timeout_on_time() {

#[tokio::test]
async fn idle_connection_triggers_pings_on_time() {
let keep_alive = 5;
let keep_alive = 1;

let mut options = MqttOptions::new("dummy", "127.0.0.1", 1885);
options.set_keep_alive(Duration::from_secs(keep_alive));
Expand Down

0 comments on commit 4c7d639

Please sign in to comment.