Skip to content

Commit

Permalink
fix(client):improve reconnecting, remove .closed()
Browse files Browse the repository at this point in the history
  • Loading branch information
gbaranski committed Mar 27, 2022
1 parent 287b69f commit ffd0a59
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 39 deletions.
4 changes: 0 additions & 4 deletions examples/chat-client/src/main.rs
Expand Up @@ -23,10 +23,6 @@ impl ezsockets::ClientExt for Client {
Ok(())
}

async fn closed(&mut self) -> Result<(), BoxError> {
Ok(())
}

async fn call(&mut self, message: Self::Message) {
match message {
() => {}
Expand Down
4 changes: 0 additions & 4 deletions examples/simple-client/src/main.rs
Expand Up @@ -20,10 +20,6 @@ impl ezsockets::ClientExt for Client {
Ok(())
}

async fn closed(&mut self) -> Result<(), BoxError> {
Ok(())
}

async fn call(&mut self, message: Self::Message) {
match message {
() => {}
Expand Down
54 changes: 23 additions & 31 deletions src/client.rs
@@ -1,6 +1,5 @@
use crate::socket::Config;
use crate::BoxError;
use crate::CloseFrame;
use crate::Message;
use crate::Socket;
use async_trait::async_trait;
Expand Down Expand Up @@ -71,7 +70,6 @@ pub trait ClientExt: Send {

async fn text(&mut self, text: String) -> Result<(), BoxError>;
async fn binary(&mut self, bytes: Vec<u8>) -> Result<(), BoxError>;
async fn closed(&mut self) -> Result<(), BoxError>;
async fn call(&mut self, message: Self::Message);
}

Expand Down Expand Up @@ -116,7 +114,8 @@ pub async fn connect<E: ClientExt + 'static>(
let future = tokio::spawn(async move {
let http_request = config.connect_http_request();
tracing::info!("connecting to {}...", config.url);
let (stream, _) = tokio_tungstenite::connect_async(http_request).await?;
let (stream, _) = tokio_tungstenite::connect_async(http_request)
.await?;
let socket = Socket::new(stream, Config::default());
tracing::info!("connected to {}", config.url);
let mut actor = ClientActor {
Expand All @@ -126,21 +125,7 @@ pub async fn connect<E: ClientExt + 'static>(
heartbeat: Instant::now(),
config,
};
loop {
let result = actor.run().await;
match result {
Ok(Some(CloseFrame { code, reason })) => {
tracing::info!(?code, %reason, "connection closed");
}
Ok(None) => {
tracing::info!("connection closed");
break; // TODO: Should I really break here?
}
Err(err) => tracing::warn!("connection error: {err}"),
};
actor.reconnect().await;
}
Ok::<_, BoxError>(())
actor.run().await
});
let future = async move { future.await.unwrap() };
(handle, future)
Expand All @@ -155,45 +140,53 @@ struct ClientActor<E: ClientExt> {
}

impl<E: ClientExt> ClientActor<E> {
async fn run(&mut self) -> Result<Option<CloseFrame>, BoxError> {
async fn run(&mut self) -> Result<(), BoxError> {
loop {
tokio::select! {
Some(message) = self.receiver.recv() => {
match message {
ClientMessage::Socket(message) => {
self.socket.send(message.clone().into()).await;
if let Message::Close(frame) = message {
return Ok(frame)
if let Message::Close(_frame) = message {
return Ok(())
}
}
ClientMessage::Call(message) => {
self.client.call(message).await;
}
}
}
Some(message) = self.socket.recv() => {
match message.to_owned() {
Message::Text(text) => self.client.text(text).await?,
Message::Binary(bytes) => self.client.binary(bytes).await?,
Message::Close(_frame) => self.client.closed().await?
message = self.socket.recv() => {
match message {
Some(message) => {
match message.to_owned() {
Message::Text(text) => self.client.text(text).await?,
Message::Binary(bytes) => self.client.binary(bytes).await?,
Message::Close(_frame) => {
self.reconnect().await;
}
};
}
None => {
self.reconnect().await;
}
};
if let Message::Close(frame) = message {
return Ok(frame);
}
}
else => break,
}
}

Ok(None)
Ok(())
}

async fn reconnect(&mut self) {
let reconnect_interval = self
.config
.reconnect_interval
.expect("reconnect interval should be set for reconnecting");
tracing::info!("reconnecting in {}s", reconnect_interval.as_secs());
for i in 1.. {
tokio::time::sleep(reconnect_interval).await;
tracing::info!("reconnecting attempt no: {}...", i);
let result = tokio_tungstenite::connect_async(&self.config.url).await;
match result {
Expand All @@ -210,7 +203,6 @@ impl<E: ClientExt> ClientActor<E> {
err,
reconnect_interval.as_secs()
);
tokio::time::sleep(reconnect_interval).await;
}
};
}
Expand Down

0 comments on commit ffd0a59

Please sign in to comment.