Skip to content

Commit

Permalink
No need for EventLoop::new to be async. (#121)
Browse files Browse the repository at this point in the history
* No need for EventLoop::new to be async.

It never calls await, so the async declaration is unnecessary.

* Construct Delay when needed rather than storing in EventLoop.

Co-authored-by: Ravi Teja <kraviteza@gmail.com>
  • Loading branch information
qwandor and Ravi Teja committed Sep 7, 2020
1 parent c099f42 commit 07614a5
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion benchmarks/rumqttasync.rs
Expand Up @@ -27,7 +27,7 @@ pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Bo
mqttoptions.set_inflight(1000);
mqttoptions.set_max_request_batch(10);

let mut eventloop = EventLoop::new(mqttoptions, 10).await;
let mut eventloop = EventLoop::new(mqttoptions, 10);
let requests_tx = eventloop.handle();
let client_id = id.to_owned();
let payloads = generate_payloads(count, payload_size);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/rumqttasyncbulk.rs
Expand Up @@ -23,7 +23,7 @@ pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Bo
// NOTE More the inflight size, better the perf
mqttoptions.set_inflight(100);

let mut eventloop = EventLoop::new(mqttoptions, 10).await;
let mut eventloop = EventLoop::new(mqttoptions, 10);
let requests_tx = eventloop.handle();
let client_id = id.to_owned();
let payloads = generate_payloads(count, payload_size);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/rumqttasyncqos0.rs
Expand Up @@ -27,7 +27,7 @@ pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Bo
// NOTE More the inflight size, better the perf
mqttoptions.set_inflight(100);

let mut eventloop = EventLoop::new(mqttoptions, 10).await;
let mut eventloop = EventLoop::new(mqttoptions, 10);
let requests_tx = eventloop.handle();
let client_id = id.to_owned();
let payloads = generate_payloads(count, payload_size);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/README.md
Expand Up @@ -56,7 +56,7 @@ use std::error::Error;
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
let mut eventloop = EventLoop::new(mqttoptions, 10).await;
let mut eventloop = EventLoop::new(mqttoptions, 10);
let requests_tx = eventloop.handle();

loop {
Expand Down
3 changes: 2 additions & 1 deletion rumqttc/examples/asyncpubsub.rs
Expand Up @@ -13,7 +13,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(5);

let mut eventloop = EventLoop::new(mqttoptions, 10).await;

let mut eventloop = EventLoop::new(mqttoptions, 10);
let requests_tx = eventloop.handle();
task::spawn(async move {
requests(requests_tx).await;
Expand Down
3 changes: 1 addition & 2 deletions rumqttc/src/client.rs
Expand Up @@ -40,8 +40,7 @@ impl Client {
.enable_all()
.build()
.unwrap();
let eventloop = EventLoop::new(options, cap);
let mut eventloop = runtime.block_on(eventloop);
let mut eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.handle();
let cancel_tx = eventloop.take_cancel_handle().unwrap();

Expand Down
30 changes: 15 additions & 15 deletions rumqttc/src/eventloop.rs
Expand Up @@ -52,7 +52,7 @@ pub struct EventLoop {
/// Network connection to the broker
pub(crate) network: Option<Network>,
/// Keep alive time
pub(crate) keepalive_timeout: Delay,
pub(crate) keepalive_timeout: Instant,
/// Handle to read cancellation requests
pub(crate) cancel_rx: Receiver<()>,
/// Handle to send cancellation requests (and drops)
Expand All @@ -66,7 +66,7 @@ impl EventLoop {
///
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub async fn new(options: MqttOptions, cap: usize) -> EventLoop {
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
let keepalive = options.keep_alive;
let (cancel_tx, cancel_rx) = bounded(5);
let (requests_tx, requests_rx) = bounded(cap);
Expand All @@ -84,7 +84,7 @@ impl EventLoop {
pending,
buffered,
network: None,
keepalive_timeout: time::delay_for(keepalive),
keepalive_timeout: Instant::now() + keepalive,
cancel_rx,
cancel_tx: Some(cancel_tx),
reconnection_delay: Duration::from_secs(0),
Expand Down Expand Up @@ -199,8 +199,8 @@ impl EventLoop {
},
// We generate pings irrespective of network activity. This keeps the ping logic
// simple. We can change this behavior in future if necessary (to prevent extra pings)
_ = &mut self.keepalive_timeout => {
self.keepalive_timeout.reset(Instant::now() + self.options.keep_alive);
_ = time::delay_until(self.keepalive_timeout) => {
self.keepalive_timeout = Instant::now() + self.options.keep_alive;
let request = self.state.handle_outgoing_packet(Request::PingReq)?;
let outgoing = network.write(request).await?;
Ok((None, Some(outgoing)))
Expand Down Expand Up @@ -405,7 +405,7 @@ mod test {

time::delay_for(Duration::from_secs(1)).await;
let options = MqttOptions::new("dummy", "127.0.0.1", 1880);
let mut eventloop = EventLoop::new(options, 5).await;
let mut eventloop = EventLoop::new(options, 5);

let start = Instant::now();
let o = eventloop.poll().await;
Expand All @@ -427,7 +427,7 @@ mod test {
let keep_alive = options.keep_alive();

// start sending requests
let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
// start the eventloop
task::spawn(async move {
run(eventloop, false).await.unwrap();
Expand Down Expand Up @@ -463,7 +463,7 @@ mod test {

// start sending qos0 publishes. this makes sure that there is
// outgoing activity but no incomin activity
let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();
task::spawn(async move {
start_requests(10, QoS::AtMostOnce, 1, requests_tx).await;
Expand Down Expand Up @@ -501,7 +501,7 @@ mod test {
options.set_keep_alive(5);
let keep_alive = options.keep_alive();

let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
task::spawn(async move {
run(eventloop, false).await.unwrap();
});
Expand Down Expand Up @@ -532,7 +532,7 @@ mod test {

time::delay_for(Duration::from_secs(1)).await;
let start = Instant::now();
let mut eventloop = EventLoop::new(options, 5).await;
let mut eventloop = EventLoop::new(options, 5);
loop {
if let Err(e) = eventloop.poll().await {
match e {
Expand All @@ -553,7 +553,7 @@ mod test {

// start sending qos0 publishes. this makes sure that there is
// outgoing activity but no incoming activity
let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();
task::spawn(async move {
start_requests(10, QoS::AtLeastOnce, 1, requests_tx).await;
Expand All @@ -579,7 +579,7 @@ mod test {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 1888);
options.set_inflight(3);

let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();

task::spawn(async move {
Expand Down Expand Up @@ -628,7 +628,7 @@ mod test {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 1891);
options.set_inflight(4);

let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();

task::spawn(async move {
Expand Down Expand Up @@ -662,7 +662,7 @@ mod test {
options.set_keep_alive(5);

// start sending qos0 publishes. Makes sure that there is out activity but no in activity
let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();
task::spawn(async move {
start_requests(10, QoS::AtLeastOnce, 1, requests_tx).await;
Expand Down Expand Up @@ -704,7 +704,7 @@ mod test {

// start sending qos0 publishes. this makes sure that there is
// outgoing activity but no incoming activity
let eventloop = EventLoop::new(options, 5).await;
let eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();
task::spawn(async move {
start_requests(10, QoS::AtLeastOnce, 1, requests_tx).await;
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/lib.rs
Expand Up @@ -54,7 +54,7 @@
//! #[tokio::main(core_threads = 1)]
//! async fn main() {
//! let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
//! let mut eventloop = EventLoop::new(mqttoptions, 10).await;
//! let mut eventloop = EventLoop::new(mqttoptions, 10);
//! let requests_tx = eventloop.handle();
//!
//! loop {
Expand Down

0 comments on commit 07614a5

Please sign in to comment.