Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(socketioxide): disconnect handler #41

Merged
merged 24 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cde5a7f
feat(socket): add a on_disconnect callback
Totodore Jun 30, 2023
0bacda1
feat: move the disconnect/close mecanism in the `Socket` instance
Totodore Jun 30, 2023
f97c84f
fix: remove unnecessary deps
Totodore Jun 30, 2023
8d6098a
feat: switch from `RwLock` to `Mutex` for the `ack_message` hashmap s…
Totodore Jun 30, 2023
68cfb0b
fix: build was failing because of lock on rwlock instead of `write`
Totodore Jun 30, 2023
20f2c87
Feature: Adding a `DisconnectReason!` enum
Totodore Jul 1, 2023
5fd0d55
feature: SocketIo `DisconnectReason` for the `on_disconnect` handler
Totodore Jul 1, 2023
f4579d6
fix: add `DisconnectReason` params to tests
Totodore Jul 1, 2023
fd8cd48
Merge branch 'main' into ft-disconnect-handler
Totodore Jul 2, 2023
29e9cfe
chore(format): fmt codebase
Totodore Jul 2, 2023
18153ef
ft: adding `TransportError` for `DisconnectReasons`
Totodore Jul 2, 2023
7084ce7
Merge branch 'main' into ft-disconnect-handler
Totodore Aug 20, 2023
42cbd0d
Merge branch 'main' into ft-disconnect-handler
Totodore Sep 15, 2023
5fcd902
feat(tests): add integration tests for disconnect reasons
Totodore Sep 15, 2023
bbbfb59
feat(engineioxide/service) add `TransportType` into string impl
Totodore Sep 15, 2023
2dc58bb
feat(engineioxide/socket): Add eq impl on `DisconnectReason`
Totodore Sep 15, 2023
a148028
feat(engineioxide/socket): flatten `DisconnectReason` errors
Totodore Sep 16, 2023
9c699b0
refactor(engineioxide/socket):
Totodore Sep 16, 2023
caf2bea
feat(socketio): add conversion between `Error` and engine.io `Disconn…
Totodore Sep 17, 2023
62ea742
feat(socketio): add `on_disconnect` callback to examples
Totodore Sep 17, 2023
f6ee3c4
fix(clippy): option.map used incorrectly
Totodore Sep 17, 2023
69e182f
doc(socketio/socket): improve docs
Totodore Sep 17, 2023
6605011
fix(doc): fix doc test
Totodore Sep 17, 2023
6efd60d
tests(socketio): add integration tests for `DisconnectReason`
Totodore Sep 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
}
fn on_disconnect(&self, socket: &EIoSocket<Self>) {
debug!("eio socket disconnect {}", socket.sid);
self.ns.values().for_each(|ns| {
if let Err(e) = ns.remove_socket(socket.sid) {
error!("Adapter error when disconnecting {}: {}, in a multiple server scenario it could leads to desyncronisation issues", socket.sid, e);
}
});
let data = self
.ns
.values()
.filter_map(|ns| ns.get_socket(socket.sid).ok())
.map(|s| s.close());
if let Err(e) = data.collect::<Result<Vec<_>, _>>() {
error!("Adapter error when disconnecting {}: {}, in a multiple server scenario it could leads to desyncronisation issues", socket.sid, e);
}
}

fn on_message(&self, msg: String, socket: &EIoSocket<Self>) {
Expand Down
24 changes: 4 additions & 20 deletions socketioxide/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::{
sync::{Arc, RwLock},
};

use crate::errors::{AdapterError, SendError};
use crate::errors::AdapterError;
use crate::{
adapter::{Adapter, LocalAdapter},
errors::Error,
handshake::Handshake,
packet::{Packet, PacketData},
packet::PacketData,
socket::Socket,
SocketIoConfig,
};
Expand Down Expand Up @@ -67,15 +67,7 @@ impl<A: Adapter> Namespace<A> {
socket
}

pub fn disconnect(&self, sid: Sid) -> Result<(), SendError> {
if let Some(socket) = self.sockets.write().unwrap().remove(&sid) {
self.adapter
.del_all(sid)
.map_err(|err| AdapterError(Box::new(err)))?;
socket.send(Packet::disconnect(self.path.clone()))?;
}
Ok(())
}
/// Remove a socket from a namespace and propagate the event to the adapter
pub fn remove_socket(&self, sid: Sid) -> Result<(), AdapterError> {
self.sockets.write().unwrap().remove(&sid);
self.adapter
Expand All @@ -87,19 +79,11 @@ impl<A: Adapter> Namespace<A> {
self.sockets.read().unwrap().values().any(|s| s.sid == sid)
}

/// Called when a namespace receive a particular packet that should be transmitted to the socket
pub fn socket_recv(&self, sid: Sid, packet: PacketData) -> Result<(), Error> {
self.get_socket(sid)?.recv(packet)
}

pub fn recv(&self, sid: Sid, packet: PacketData) -> Result<(), Error> {
match packet {
PacketData::Disconnect => self
.remove_socket(sid)
.map_err(|err| AdapterError(Box::new(err)).into()),
PacketData::Connect(_) => unreachable!("connect packets should be handled before"),
PacketData::ConnectError(_) => Ok(()),
packet => self.socket_recv(sid, packet),
packet => self.get_socket(sid)?.recv(packet),
}
}
pub fn get_socket(&self, sid: Sid) -> Result<Arc<Socket<A>>, Error> {
Expand Down
71 changes: 57 additions & 14 deletions socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
fmt::Debug,
sync::{
atomic::{AtomicI64, Ordering},
Arc, RwLock,
Arc, Mutex, RwLock,
},
time::Duration,
};

use engineioxide::{sid_generator::Sid, SendPacket as EnginePacket};
use futures::Future;
use futures::{future::BoxFuture, Future};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use tokio::sync::oneshot;
use tokio::sync::{oneshot};

use crate::errors::SendError;
use crate::errors::{SendError, AdapterError};
use crate::retryer::Retryer;
use crate::{
adapter::{Adapter, Room},
Expand All @@ -28,13 +28,17 @@
SocketIoConfig,
};

pub type DisconnectCallback<A> =
Box<dyn FnOnce(Arc<Socket<A>>) -> BoxFuture<'static, ()> + Send + Sync + 'static>;

/// A Socket represents a client connected to a namespace.
/// It is used to send and receive messages from the client, join and leave rooms, etc.
pub struct Socket<A: Adapter> {
config: Arc<SocketIoConfig>,
ns: Arc<Namespace<A>>,
message_handlers: RwLock<HashMap<String, BoxedHandler<A>>>,
ack_message: RwLock<HashMap<i64, oneshot::Sender<AckResponse<Value>>>>,
disconnect_handler: Mutex<Option<DisconnectCallback<A>>>,
ack_message: Mutex<HashMap<i64, oneshot::Sender<AckResponse<Value>>>>,
ack_counter: AtomicI64,
tx: tokio::sync::mpsc::Sender<EnginePacket>,
pub handshake: Handshake,
Expand All @@ -54,7 +58,8 @@
tx,
ns,
message_handlers: RwLock::new(HashMap::new()),
ack_message: RwLock::new(HashMap::new()),
disconnect_handler: Mutex::new(None),
ack_message: Mutex::new(HashMap::new()),
ack_counter: AtomicI64::new(0),
handshake,
sid,
Expand Down Expand Up @@ -123,11 +128,35 @@
{
let handler = Box::new(move |s, v, p, ack_fn| Box::pin(callback(s, v, p, ack_fn)) as _);
self.message_handlers
.write()
.lock()
Fixed Show fixed Hide fixed
.unwrap()
.insert(event.into(), MessageHandler::boxed(handler));
}

/// ### Register a disconnect handler.
/// The callback will be called when the socket is disconnected from the server or the client or when the underlying connection crashes.
/// ##### Example
/// ```
/// # use socketioxide::Namespace;
/// # use serde_json::Value;
/// Namespace::builder().add("/", |socket| async move {
/// socket.on("test", |socket, data: Value, bin, _| async move {
/// // Close the current socket
/// socket.disconnect().ok();
/// });
/// socket.on_disconnect(|socket| async move {
/// println!("Socket {} on ns {} disconnected", socket.sid, socket.ns());
/// });
/// });
pub fn on_disconnect<C, F>(&self, callback: C)
where
C: Fn(Arc<Socket<A>>) -> F + Send + Sync + 'static,
F: Future<Output = ()> + Send + 'static,
{
let handler = Box::new(move |s| Box::pin(callback(s)) as _);
*self.disconnect_handler.lock().unwrap() = Some(handler);
}

/// Emit a message to the client
/// ##### Example
/// ```
Expand Down Expand Up @@ -341,9 +370,13 @@
Operators::new(self.ns.clone(), self.sid).broadcast()
}

/// Disconnect the socket from the current namespace.
pub fn disconnect(&self) -> Result<(), SendError> {
self.ns.disconnect(self.sid)
/// Disconnect the socket from the current namespace,
///
/// It will also call the disconnect handler if it is set.
pub fn disconnect(self: Arc<Self>) -> Result<(), SendError> {
self.send(Packet::disconnect(self.ns.path.clone()))?;
self.close()?;
Ok(())
}

/// Get the current namespace path.
Expand Down Expand Up @@ -371,22 +404,32 @@
) -> Result<AckResponse<V>, AckError> {
let (tx, rx) = oneshot::channel();
let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1;
self.ack_message.write().unwrap().insert(ack, tx);
self.ack_message.lock().unwrap().insert(ack, tx);
packet.inner.set_ack_id(ack);
self.send(packet)?;
let timeout = timeout.unwrap_or(self.config.ack_timeout);
let v = tokio::time::timeout(timeout, rx).await??;
Ok((serde_json::from_value(v.0)?, v.1))
}

// Receive data from client:
/// Called when the socket is gracefully [`disconnect`](Socket::disconnect)ed from the server or the client
///
/// It maybe also closed when the underlying transport is closed or failed.
pub(crate) fn close(self: Arc<Self>) -> Result<(), AdapterError> {
if let Some(handler) = self.disconnect_handler.lock().unwrap().take() {
tokio::spawn(handler(self.clone()));
}
self.ns.remove_socket(self.sid)
}

// Receive data from client:
pub(crate) fn recv(self: Arc<Self>, packet: PacketData) -> Result<(), Error> {
match packet {
PacketData::Event(e, data, ack) => self.recv_event(e, data, ack),
PacketData::EventAck(data, ack_id) => self.recv_ack(data, ack_id),
PacketData::BinaryEvent(e, packet, ack) => self.recv_bin_event(e, packet, ack),
PacketData::BinaryAck(packet, ack) => self.recv_bin_ack(packet, ack),
PacketData::Disconnect => self.close().map_err(Error::from),
_ => unreachable!(),
}
}
Expand All @@ -411,14 +454,14 @@
}

fn recv_ack(self: Arc<Self>, data: Value, ack: i64) -> Result<(), Error> {
if let Some(tx) = self.ack_message.write().unwrap().remove(&ack) {
if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) {
tx.send((data, vec![])).ok();
}
Ok(())
}

fn recv_bin_ack(self: Arc<Self>, packet: BinaryPacket, ack: i64) -> Result<(), Error> {
if let Some(tx) = self.ack_message.write().unwrap().remove(&ack) {
if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) {
tx.send((packet.data, packet.bin)).ok();
}
Ok(())
Expand Down
Loading