From ffd0a59dcea1ab93ec8d4fdd6c614b78cfec0f35 Mon Sep 17 00:00:00 2001 From: Grzegorz Baranski Date: Sun, 27 Mar 2022 15:22:06 +0200 Subject: [PATCH] fix(client):improve reconnecting, remove .closed() --- examples/chat-client/src/main.rs | 4 --- examples/simple-client/src/main.rs | 4 --- src/client.rs | 54 +++++++++++++----------------- 3 files changed, 23 insertions(+), 39 deletions(-) diff --git a/examples/chat-client/src/main.rs b/examples/chat-client/src/main.rs index 856d438..435cb6a 100644 --- a/examples/chat-client/src/main.rs +++ b/examples/chat-client/src/main.rs @@ -23,10 +23,6 @@ impl ezsockets::ClientExt for Client { Ok(()) } - async fn closed(&mut self) -> Result<(), BoxError> { - Ok(()) - } - async fn call(&mut self, message: Self::Message) { match message { () => {} diff --git a/examples/simple-client/src/main.rs b/examples/simple-client/src/main.rs index a369061..ab0a8a9 100644 --- a/examples/simple-client/src/main.rs +++ b/examples/simple-client/src/main.rs @@ -20,10 +20,6 @@ impl ezsockets::ClientExt for Client { Ok(()) } - async fn closed(&mut self) -> Result<(), BoxError> { - Ok(()) - } - async fn call(&mut self, message: Self::Message) { match message { () => {} diff --git a/src/client.rs b/src/client.rs index 9f97269..afbf49a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,5 @@ use crate::socket::Config; use crate::BoxError; -use crate::CloseFrame; use crate::Message; use crate::Socket; use async_trait::async_trait; @@ -71,7 +70,6 @@ pub trait ClientExt: Send { async fn text(&mut self, text: String) -> Result<(), BoxError>; async fn binary(&mut self, bytes: Vec) -> Result<(), BoxError>; - async fn closed(&mut self) -> Result<(), BoxError>; async fn call(&mut self, message: Self::Message); } @@ -116,7 +114,8 @@ pub async fn connect( let future = tokio::spawn(async move { let http_request = config.connect_http_request(); tracing::info!("connecting to {}...", config.url); - let (stream, _) = tokio_tungstenite::connect_async(http_request).await?; + let (stream, _) = tokio_tungstenite::connect_async(http_request) + .await?; let socket = Socket::new(stream, Config::default()); tracing::info!("connected to {}", config.url); let mut actor = ClientActor { @@ -126,21 +125,7 @@ pub async fn connect( heartbeat: Instant::now(), config, }; - loop { - let result = actor.run().await; - match result { - Ok(Some(CloseFrame { code, reason })) => { - tracing::info!(?code, %reason, "connection closed"); - } - Ok(None) => { - tracing::info!("connection closed"); - break; // TODO: Should I really break here? - } - Err(err) => tracing::warn!("connection error: {err}"), - }; - actor.reconnect().await; - } - Ok::<_, BoxError>(()) + actor.run().await }); let future = async move { future.await.unwrap() }; (handle, future) @@ -155,15 +140,15 @@ struct ClientActor { } impl ClientActor { - async fn run(&mut self) -> Result, BoxError> { + async fn run(&mut self) -> Result<(), BoxError> { loop { tokio::select! { Some(message) = self.receiver.recv() => { match message { ClientMessage::Socket(message) => { self.socket.send(message.clone().into()).await; - if let Message::Close(frame) = message { - return Ok(frame) + if let Message::Close(_frame) = message { + return Ok(()) } } ClientMessage::Call(message) => { @@ -171,21 +156,27 @@ impl ClientActor { } } } - Some(message) = self.socket.recv() => { - match message.to_owned() { - Message::Text(text) => self.client.text(text).await?, - Message::Binary(bytes) => self.client.binary(bytes).await?, - Message::Close(_frame) => self.client.closed().await? + message = self.socket.recv() => { + match message { + Some(message) => { + match message.to_owned() { + Message::Text(text) => self.client.text(text).await?, + Message::Binary(bytes) => self.client.binary(bytes).await?, + Message::Close(_frame) => { + self.reconnect().await; + } + }; + } + None => { + self.reconnect().await; + } }; - if let Message::Close(frame) = message { - return Ok(frame); - } } else => break, } } - Ok(None) + Ok(()) } async fn reconnect(&mut self) { @@ -193,7 +184,9 @@ impl ClientActor { .config .reconnect_interval .expect("reconnect interval should be set for reconnecting"); + tracing::info!("reconnecting in {}s", reconnect_interval.as_secs()); for i in 1.. { + tokio::time::sleep(reconnect_interval).await; tracing::info!("reconnecting attempt no: {}...", i); let result = tokio_tungstenite::connect_async(&self.config.url).await; match result { @@ -210,7 +203,6 @@ impl ClientActor { err, reconnect_interval.as_secs() ); - tokio::time::sleep(reconnect_interval).await; } }; }