From cde5a7f05e14106f7a54fa57949159ae528bbe0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 30 Jun 2023 17:04:06 +0200 Subject: [PATCH 01/22] feat(socket): add a on_disconnect callback --- socketioxide/src/socket.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 17949e09..86ffc7c9 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -3,13 +3,13 @@ use std::{ fmt::Debug, sync::{ atomic::{AtomicI64, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, time::Duration, }; use engineioxide::{sid_generator::Sid, SendPacket as EnginePacket}; -use futures::Future; +use futures::{future::BoxFuture, Future}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tokio::sync::oneshot; @@ -28,12 +28,16 @@ use crate::{ SocketIoConfig, }; +pub type DisconnectCallback = + Box>) -> BoxFuture<'static, ()> + Send + Sync + 'static>; + /// A Socket represents a client connected to a namespace. /// It is used to send and receive messages from the client, join and leave rooms, etc. pub struct Socket { config: Arc, ns: Arc>, message_handlers: RwLock>>, + disconnect_handler: Mutex>>, ack_message: RwLock>>>, ack_counter: AtomicI64, tx: tokio::sync::mpsc::Sender, @@ -54,6 +58,7 @@ impl Socket { tx, ns, message_handlers: RwLock::new(HashMap::new()), + disconnect_handler: Mutex::new(None), ack_message: RwLock::new(HashMap::new()), ack_counter: AtomicI64::new(0), handshake, @@ -128,6 +133,30 @@ impl Socket { .insert(event.into(), MessageHandler::boxed(handler)); } + /// ### Register a disconnect handler. + /// The callback will be called when the socket is disconnected from the server or the client or when the underlying connection crashes. + /// ##### Example + /// ``` + /// # use socketioxide::Namespace; + /// # use serde_json::Value; + /// Namespace::builder().add("/", |socket| async move { + /// socket.on("test", |socket, data: Value, bin, _| async move { + /// // Close the current socket + /// socket.disconnect().ok(); + /// }); + /// socket.on_disconnect(|socket| async move { + /// println!("Socket {} on ns {} disconnected", socket.sid, socket.ns()); + /// }); + /// }); + pub fn on_disconnect(&self, callback: C) + where + C: Fn(Arc>) -> F + Send + Sync + 'static, + F: Future + Send + 'static, + { + let handler = Box::new(move |s| Box::pin(callback(s)) as _); + *self.disconnect_handler.lock().unwrap() = Some(handler); + } + /// Emit a message to the client /// ##### Example /// ``` From 0bacda1d0aa0e899af2691692f674a47f439f3da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 30 Jun 2023 17:04:41 +0200 Subject: [PATCH 02/22] feat: move the disconnect/close mecanism in the `Socket` instance --- socketioxide/src/client.rs | 14 +++++++++----- socketioxide/src/ns.rs | 24 ++++-------------------- socketioxide/src/socket.rs | 24 +++++++++++++++++++----- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index 0f6503bd..473de40d 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex}; use engineioxide::handler::EngineIoHandler; use engineioxide::socket::Socket as EIoSocket; +use itertools::Itertools; use serde_json::Value; use engineioxide::sid_generator::Sid; @@ -120,11 +121,14 @@ impl EngineIoHandler for Client { } fn on_disconnect(&self, socket: &EIoSocket) { debug!("eio socket disconnect {}", socket.sid); - self.ns.values().for_each(|ns| { - if let Err(e) = ns.remove_socket(socket.sid) { - error!("Adapter error when disconnecting {}: {}, in a multiple server scenario it could leads to desyncronisation issues", socket.sid, e); - } - }); + let data = self + .ns + .values() + .filter_map(|ns| ns.get_socket(socket.sid).ok()) + .map(|s| s.close()); + if let Err(e) = data.collect::, _>>() { + error!("Adapter error when disconnecting {}: {}, in a multiple server scenario it could leads to desyncronisation issues", socket.sid, e); + } } fn on_message(&self, msg: String, socket: &EIoSocket) { diff --git a/socketioxide/src/ns.rs b/socketioxide/src/ns.rs index 84d501a7..c060541e 100644 --- a/socketioxide/src/ns.rs +++ b/socketioxide/src/ns.rs @@ -3,12 +3,12 @@ use std::{ sync::{Arc, RwLock}, }; -use crate::errors::{AdapterError, SendError}; +use crate::errors::AdapterError; use crate::{ adapter::{Adapter, LocalAdapter}, errors::Error, handshake::Handshake, - packet::{Packet, PacketData}, + packet::PacketData, socket::Socket, SocketIoConfig, }; @@ -67,15 +67,7 @@ impl Namespace { socket } - pub fn disconnect(&self, sid: Sid) -> Result<(), SendError> { - if let Some(socket) = self.sockets.write().unwrap().remove(&sid) { - self.adapter - .del_all(sid) - .map_err(|err| AdapterError(Box::new(err)))?; - socket.send(Packet::disconnect(self.path.clone()))?; - } - Ok(()) - } + /// Remove a socket from a namespace and propagate the event to the adapter pub fn remove_socket(&self, sid: Sid) -> Result<(), AdapterError> { self.sockets.write().unwrap().remove(&sid); self.adapter @@ -87,19 +79,11 @@ impl Namespace { self.sockets.read().unwrap().values().any(|s| s.sid == sid) } - /// Called when a namespace receive a particular packet that should be transmitted to the socket - pub fn socket_recv(&self, sid: Sid, packet: PacketData) -> Result<(), Error> { - self.get_socket(sid)?.recv(packet) - } - pub fn recv(&self, sid: Sid, packet: PacketData) -> Result<(), Error> { match packet { - PacketData::Disconnect => self - .remove_socket(sid) - .map_err(|err| AdapterError(Box::new(err)).into()), PacketData::Connect(_) => unreachable!("connect packets should be handled before"), PacketData::ConnectError(_) => Ok(()), - packet => self.socket_recv(sid, packet), + packet => self.get_socket(sid)?.recv(packet), } } pub fn get_socket(&self, sid: Sid) -> Result>, Error> { diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 86ffc7c9..9476717a 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -14,7 +14,7 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tokio::sync::oneshot; -use crate::errors::SendError; +use crate::errors::{SendError, AdapterError}; use crate::retryer::Retryer; use crate::{ adapter::{Adapter, Room}, @@ -370,9 +370,13 @@ impl Socket { Operators::new(self.ns.clone(), self.sid).broadcast() } - /// Disconnect the socket from the current namespace. - pub fn disconnect(&self) -> Result<(), SendError> { - self.ns.disconnect(self.sid) + /// Disconnect the socket from the current namespace, + /// + /// It will also call the disconnect handler if it is set. + pub fn disconnect(self: Arc) -> Result<(), SendError> { + self.send(Packet::disconnect(self.ns.path.clone()))?; + self.close()?; + Ok(()) } /// Get the current namespace path. @@ -408,14 +412,24 @@ impl Socket { Ok((serde_json::from_value(v.0)?, v.1)) } - // Receive data from client: + /// Called when the socket is gracefully [`disconnect`](Socket::disconnect)ed from the server or the client + /// + /// It maybe also closed when the underlying transport is closed or failed. + pub(crate) fn close(self: Arc) -> Result<(), AdapterError> { + if let Some(handler) = self.disconnect_handler.lock().unwrap().take() { + tokio::spawn(handler(self.clone())); + } + self.ns.remove_socket(self.sid) + } + // Receive data from client: pub(crate) fn recv(self: Arc, packet: PacketData) -> Result<(), Error> { match packet { PacketData::Event(e, data, ack) => self.recv_event(e, data, ack), PacketData::EventAck(data, ack_id) => self.recv_ack(data, ack_id), PacketData::BinaryEvent(e, packet, ack) => self.recv_bin_event(e, packet, ack), PacketData::BinaryAck(packet, ack) => self.recv_bin_ack(packet, ack), + PacketData::Disconnect => self.close().map_err(Error::from), _ => unreachable!(), } } From f97c84f0e04e05f670d814cc62d582a2d3e36391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 30 Jun 2023 17:07:58 +0200 Subject: [PATCH 03/22] fix: remove unnecessary deps --- socketioxide/src/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index 473de40d..f82c7f54 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, Mutex}; use engineioxide::handler::EngineIoHandler; use engineioxide::socket::Socket as EIoSocket; -use itertools::Itertools; use serde_json::Value; use engineioxide::sid_generator::Sid; From 8d6098ad33a7c6b9f595ef3c59fb55ee5a6d6df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 30 Jun 2023 17:10:35 +0200 Subject: [PATCH 04/22] feat: switch from `RwLock` to `Mutex` for the `ack_message` hashmap since there were only `write()` calls --- socketioxide/src/socket.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 9476717a..65d62bd9 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -12,7 +12,7 @@ use engineioxide::{sid_generator::Sid, SendPacket as EnginePacket}; use futures::{future::BoxFuture, Future}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; -use tokio::sync::oneshot; +use tokio::sync::{oneshot}; use crate::errors::{SendError, AdapterError}; use crate::retryer::Retryer; @@ -38,7 +38,7 @@ pub struct Socket { ns: Arc>, message_handlers: RwLock>>, disconnect_handler: Mutex>>, - ack_message: RwLock>>>, + ack_message: Mutex>>>, ack_counter: AtomicI64, tx: tokio::sync::mpsc::Sender, pub handshake: Handshake, @@ -59,7 +59,7 @@ impl Socket { ns, message_handlers: RwLock::new(HashMap::new()), disconnect_handler: Mutex::new(None), - ack_message: RwLock::new(HashMap::new()), + ack_message: Mutex::new(HashMap::new()), ack_counter: AtomicI64::new(0), handshake, sid, @@ -128,7 +128,7 @@ impl Socket { { let handler = Box::new(move |s, v, p, ack_fn| Box::pin(callback(s, v, p, ack_fn)) as _); self.message_handlers - .write() + .lock() .unwrap() .insert(event.into(), MessageHandler::boxed(handler)); } @@ -404,7 +404,7 @@ impl Socket { ) -> Result, AckError> { let (tx, rx) = oneshot::channel(); let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1; - self.ack_message.write().unwrap().insert(ack, tx); + self.ack_message.lock().unwrap().insert(ack, tx); packet.inner.set_ack_id(ack); self.send(packet)?; let timeout = timeout.unwrap_or(self.config.ack_timeout); @@ -454,14 +454,14 @@ impl Socket { } fn recv_ack(self: Arc, data: Value, ack: i64) -> Result<(), Error> { - if let Some(tx) = self.ack_message.write().unwrap().remove(&ack) { + if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) { tx.send((data, vec![])).ok(); } Ok(()) } fn recv_bin_ack(self: Arc, packet: BinaryPacket, ack: i64) -> Result<(), Error> { - if let Some(tx) = self.ack_message.write().unwrap().remove(&ack) { + if let Some(tx) = self.ack_message.lock().unwrap().remove(&ack) { tx.send((packet.data, packet.bin)).ok(); } Ok(()) From 68cfb0b260f3360314eb6d317db2e4b715c6194d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 30 Jun 2023 17:19:55 +0200 Subject: [PATCH 05/22] fix: build was failing because of lock on rwlock instead of `write` --- socketioxide/src/socket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 65d62bd9..5fe9890a 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -128,7 +128,7 @@ impl Socket { { let handler = Box::new(move |s, v, p, ack_fn| Box::pin(callback(s, v, p, ack_fn)) as _); self.message_handlers - .lock() + .write() .unwrap() .insert(event.into(), MessageHandler::boxed(handler)); } From 20f2c87468f29edeb8fe9528458216873fb6b464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 2 Jul 2023 00:19:46 +0200 Subject: [PATCH 06/22] Feature: Adding a `DisconnectReason!` enum The `on_disconnect` fn will then be called with the reason in param. --- e2e/src/engineioxide.rs | 6 +++--- engineioxide/benches/benchmark_polling.rs | 3 ++- engineioxide/src/engine.rs | 20 ++++++++++---------- engineioxide/src/handler.rs | 4 ++-- engineioxide/src/socket.rs | 19 +++++++++++++------ examples/src/engineio-echo/axum_echo.rs | 6 +++--- examples/src/engineio-echo/hyper_echo.rs | 6 +++--- examples/src/engineio-echo/warp_echo.rs | 6 +++--- 8 files changed, 39 insertions(+), 31 deletions(-) diff --git a/e2e/src/engineioxide.rs b/e2e/src/engineioxide.rs index ec206545..93ade0e2 100644 --- a/e2e/src/engineioxide.rs +++ b/e2e/src/engineioxide.rs @@ -3,7 +3,7 @@ use std::time::Duration; use engineioxide::{ - config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, socket::Socket, + config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}, }; use hyper::Server; use tracing::{info, Level}; @@ -19,8 +19,8 @@ impl EngineIoHandler for MyHandler { fn on_connect(&self, socket: &Socket) { println!("socket connect {}", socket.sid); } - fn on_disconnect(&self, socket: &Socket) { - println!("socket disconnect {}", socket.sid); + fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason) { + println!("socket disconnect {}: {:?}", socket.sid, reason); } fn on_message(&self, msg: String, socket: &Socket) { diff --git a/engineioxide/benches/benchmark_polling.rs b/engineioxide/benches/benchmark_polling.rs index 4626d481..03b17080 100644 --- a/engineioxide/benches/benchmark_polling.rs +++ b/engineioxide/benches/benchmark_polling.rs @@ -3,6 +3,7 @@ use std::time::Duration; use bytes::{Buf, Bytes}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use engineioxide::socket::DisconnectReason; use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::Socket}; use engineioxide::sid_generator::Sid; @@ -29,7 +30,7 @@ impl EngineIoHandler for Client { fn on_connect(&self, _: &Socket) {} - fn on_disconnect(&self, _: &Socket) {} + fn on_disconnect(&self, _: &Socket, _reason: DisconnectReason) {} fn on_message(&self, msg: String, socket: &Socket) { socket.emit(msg).unwrap(); diff --git a/engineioxide/src/engine.rs b/engineioxide/src/engine.rs index 8a0ca991..32571f54 100644 --- a/engineioxide/src/engine.rs +++ b/engineioxide/src/engine.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, RwLock}, }; -use crate::sid_generator::Sid; +use crate::{sid_generator::Sid, socket::DisconnectReason}; use crate::{ body::ResponseBody, config::EngineIoConfig, @@ -64,7 +64,7 @@ impl EngineIo B: Send + 'static, { let engine = self.clone(); - let close_fn = Box::new(move |sid: Sid| engine.close_session(sid)); + let close_fn = Box::new(move |sid: Sid, reason: DisconnectReason| engine.close_session(sid, reason)); let sid = generate_sid(); let socket = Socket::new( sid, @@ -108,7 +108,7 @@ impl EngineIo let mut rx = match socket.internal_rx.try_lock() { Ok(s) => s, Err(_) => { - socket.close(); + socket.close(DisconnectReason::TransportError); return Err(Error::HttpErrorResponse(StatusCode::BAD_REQUEST)); } }; @@ -164,13 +164,13 @@ impl EngineIo match Packet::try_from(packet?) { Err(e) => { debug!("[sid={sid}] error parsing packet: {:?}", e); - self.close_session(sid); + self.close_session(sid, DisconnectReason::TransportError); Err(e) } Ok(Packet::Close) => { debug!("[sid={sid}] closing session"); socket.send(Packet::Noop)?; - self.close_session(sid); + self.close_session(sid, DisconnectReason::TransportClose); break; } Ok(Packet::Pong) => socket @@ -251,7 +251,7 @@ impl EngineIo } else { let sid = generate_sid(); let engine = self.clone(); - let close_fn = Box::new(move |sid: Sid| engine.close_session(sid)); + let close_fn = Box::new(move |sid: Sid, reason: DisconnectReason| engine.close_session(sid, reason)); let socket = Socket::new( sid, ConnectionType::WebSocket, @@ -298,7 +298,7 @@ impl EngineIo if let Err(e) = self.ws_forward_to_handler(rx, &socket).await { debug!("[sid={}] error when handling packet: {:?}", socket.sid, e); } - self.close_session(socket.sid); + self.close_session(socket.sid, DisconnectReason::TransportClose); rx_handle.abort(); Ok(()) } @@ -315,7 +315,7 @@ impl EngineIo Message::Text(msg) => match Packet::try_from(msg)? { Packet::Close => { debug!("[sid={}] closing session", socket.sid); - self.close_session(socket.sid); + self.close_session(socket.sid, DisconnectReason::TransportClose); break; } Packet::Pong => socket @@ -414,10 +414,10 @@ impl EngineIo /// Close an engine.io session by removing the socket from the socket map and closing the socket /// It should be the only way to close a session and to remove a socket from the socket map - fn close_session(&self, sid: Sid) { + fn close_session(&self, sid: Sid, reason: DisconnectReason) { let socket = self.sockets.write().unwrap().remove(&sid); if let Some(socket) = socket { - self.handler.on_disconnect(&socket); + self.handler.on_disconnect(&socket, reason); socket.abort_heartbeat(); debug!( "remaining sockets: {:?}", diff --git a/engineioxide/src/handler.rs b/engineioxide/src/handler.rs index 3e0a698e..6c1cab3d 100644 --- a/engineioxide/src/handler.rs +++ b/engineioxide/src/handler.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use crate::socket::Socket; +use crate::socket::{Socket, DisconnectReason}; /// An handler for engine.io events for each sockets. #[async_trait] @@ -12,7 +12,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + Clone + 'static { fn on_connect(&self, socket: &Socket); /// Called when a socket is disconnected. - fn on_disconnect(&self, socket: &Socket); + fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason); /// Called when a message is received from the client. fn on_message(&self, msg: String, socket: &Socket); diff --git a/engineioxide/src/socket.rs b/engineioxide/src/socket.rs index 8880485f..aba3482e 100644 --- a/engineioxide/src/socket.rs +++ b/engineioxide/src/socket.rs @@ -54,6 +54,13 @@ impl From for SocketReq { } } +#[derive(Debug, Clone, PartialEq)] +pub enum DisconnectReason { + TransportClose, + TransportError, + HeartbeatTimeout, +} + /// A [`Socket`] represents a connection to the server. /// It is agnostic to the [`TransportType`](crate::service::TransportType). /// It handles : @@ -96,7 +103,7 @@ where heartbeat_handle: Mutex>>, /// Function to call when the socket is closed - close_fn: Box, + close_fn: Box, /// User data bound to the socket pub data: H::Data, @@ -113,7 +120,7 @@ where conn: ConnectionType, config: &EngineIoConfig, req_data: SocketReq, - close_fn: Box, + close_fn: Box, ) -> Self { let (internal_tx, internal_rx) = mpsc::channel(config.max_buffer_size); let (tx, rx) = mpsc::channel(config.max_buffer_size); @@ -161,7 +168,7 @@ where let handle = tokio::spawn(async move { if let Err(e) = socket.heartbeat_job(interval, timeout).await { - socket.close(); + socket.close(DisconnectReason::HeartbeatTimeout); debug!("[sid={}] heartbeat error: {:?}", socket.sid, e); } }); @@ -232,8 +239,8 @@ where /// Immediately closes the socket and the underlying connection. /// The socket will be removed from the `Engine` and the [`Handler`](crate::handler::EngineIoHandler) will be notified. - pub fn close(&self) { - (self.close_fn)(self.sid); + pub fn close(&self, reason: DisconnectReason) { + (self.close_fn)(self.sid, reason); self.send(Packet::Close).ok(); } @@ -252,7 +259,7 @@ where #[cfg(test)] impl Socket { - pub fn new_dummy(sid: Sid, close_fn: Box) -> Socket { + pub fn new_dummy(sid: Sid, close_fn: Box) -> Socket { let (internal_tx, internal_rx) = mpsc::channel(200); let (tx, rx) = mpsc::channel(200); let (pong_tx, pong_rx) = mpsc::channel(1); diff --git a/examples/src/engineio-echo/axum_echo.rs b/examples/src/engineio-echo/axum_echo.rs index 42def30c..09c30ebe 100644 --- a/examples/src/engineio-echo/axum_echo.rs +++ b/examples/src/engineio-echo/axum_echo.rs @@ -1,6 +1,6 @@ use axum::routing::get; use axum::Server; -use engineioxide::{handler::EngineIoHandler, layer::EngineIoLayer, socket::Socket}; +use engineioxide::{handler::EngineIoHandler, layer::EngineIoLayer, socket::{Socket, DisconnectReason}}; use tracing::info; use tracing_subscriber::FmtSubscriber; @@ -14,8 +14,8 @@ impl EngineIoHandler for MyHandler { fn on_connect(&self, socket: &Socket) { println!("socket connect {}", socket.sid); } - fn on_disconnect(&self, socket: &Socket) { - println!("socket disconnect {}", socket.sid); + fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason) { + println!("socket disconnect {}: {:?}", socket.sid, reason); } fn on_message(&self, msg: String, socket: &Socket) { diff --git a/examples/src/engineio-echo/hyper_echo.rs b/examples/src/engineio-echo/hyper_echo.rs index 63485ff0..0a2cb3c9 100644 --- a/examples/src/engineio-echo/hyper_echo.rs +++ b/examples/src/engineio-echo/hyper_echo.rs @@ -1,4 +1,4 @@ -use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::Socket}; +use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}}; use hyper::Server; use tracing::info; use tracing_subscriber::FmtSubscriber; @@ -13,8 +13,8 @@ impl EngineIoHandler for MyHandler { fn on_connect(&self, socket: &Socket) { println!("socket connect {}", socket.sid); } - fn on_disconnect(&self, socket: &Socket) { - println!("socket disconnect {}", socket.sid); + fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason) { + println!("socket disconnect {}: {:?}", socket.sid, reason); } fn on_message(&self, msg: String, socket: &Socket) { diff --git a/examples/src/engineio-echo/warp_echo.rs b/examples/src/engineio-echo/warp_echo.rs index f9da5a5b..cca04dd4 100644 --- a/examples/src/engineio-echo/warp_echo.rs +++ b/examples/src/engineio-echo/warp_echo.rs @@ -1,4 +1,4 @@ -use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::Socket}; +use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}}; use hyper::Server; use tracing::info; use tracing_subscriber::FmtSubscriber; @@ -14,8 +14,8 @@ impl EngineIoHandler for MyHandler { fn on_connect(&self, socket: &Socket) { println!("socket connect {}", socket.sid); } - fn on_disconnect(&self, socket: &Socket) { - println!("socket disconnect {}", socket.sid); + fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason) { + println!("socket disconnect {}: {:?}", socket.sid, reason); } fn on_message(&self, msg: String, socket: &Socket) { From 5fd0d559aa2bf9d0f8fb2a8efc96912ede9f5271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 2 Jul 2023 00:22:51 +0200 Subject: [PATCH 07/22] feature: SocketIo `DisconnectReason` for the `on_disconnect` handler --- socketioxide/src/client.rs | 12 ++++---- socketioxide/src/lib.rs | 2 +- socketioxide/src/socket.rs | 56 +++++++++++++++++++++++++++++--------- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index f82c7f54..a9b45703 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use engineioxide::handler::EngineIoHandler; -use engineioxide::socket::Socket as EIoSocket; +use engineioxide::socket::{Socket as EIoSocket, DisconnectReason as EIoDisconnectReason}; use serde_json::Value; use engineioxide::sid_generator::Sid; @@ -118,13 +118,13 @@ impl EngineIoHandler for Client { fn on_connect(&self, socket: &EIoSocket) { debug!("eio socket connect {}", socket.sid); } - fn on_disconnect(&self, socket: &EIoSocket) { + fn on_disconnect(&self, socket: &EIoSocket, reason: EIoDisconnectReason) { debug!("eio socket disconnect {}", socket.sid); let data = self .ns .values() .filter_map(|ns| ns.get_socket(socket.sid).ok()) - .map(|s| s.close()); + .map(|s| s.close(reason.clone().into())); if let Err(e) = data.collect::, _>>() { error!("Adapter error when disconnecting {}: {}, in a multiple server scenario it could leads to desyncronisation issues", socket.sid, e); } @@ -136,7 +136,7 @@ impl EngineIoHandler for Client { Ok(packet) => packet, Err(e) => { debug!("socket serialization error: {}", e); - socket.close(); + socket.close(EIoDisconnectReason::TransportError); return; } }; @@ -161,7 +161,7 @@ impl EngineIoHandler for Client { }; if let Err(err) = res { error!("error while processing packet: {:?}", err); - socket.close(); + socket.close(EIoDisconnectReason::TransportError); } } @@ -176,7 +176,7 @@ impl EngineIoHandler for Client { "error while propagating packet to socket {}: {}", socket.sid, e ); - socket.close(); + socket.close(EIoDisconnectReason::TransportError); } } } diff --git a/socketioxide/src/lib.rs b/socketioxide/src/lib.rs index 13a4102a..4d3ca685 100644 --- a/socketioxide/src/lib.rs +++ b/socketioxide/src/lib.rs @@ -68,7 +68,7 @@ pub use errors::{AckError, Error as SocketError}; pub use layer::SocketIoLayer; pub use ns::Namespace; pub use service::SocketIoService; -pub use socket::Socket; +pub use socket::{Socket, DisconnectReason}; mod client; mod config; diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 5fe9890a..9b5a007f 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -8,13 +8,15 @@ use std::{ time::Duration, }; -use engineioxide::{sid_generator::Sid, SendPacket as EnginePacket}; +use engineioxide::{ + sid_generator::Sid, socket::DisconnectReason as EIoDisconnectReason, SendPacket as EnginePacket, +}; use futures::{future::BoxFuture, Future}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; -use tokio::sync::{oneshot}; +use tokio::sync::oneshot; -use crate::errors::{SendError, AdapterError}; +use crate::errors::{AdapterError, SendError}; use crate::retryer::Retryer; use crate::{ adapter::{Adapter, Room}, @@ -28,8 +30,34 @@ use crate::{ SocketIoConfig, }; -pub type DisconnectCallback = - Box>) -> BoxFuture<'static, ()> + Send + Sync + 'static>; +pub type DisconnectCallback = Box< + dyn FnOnce(Arc>, DisconnectReason) -> BoxFuture<'static, ()> + Send + Sync + 'static, +>; + +/// All the possible reasons for a socket to be disconnected. +#[derive(Debug, Clone)] +pub enum DisconnectReason { + /// The connection was closed (example: the user has lost connection, or the network was changed from WiFi to 4G) + TransportClose, + /// The connection has encountered an error + TransportError, + /// The client did not send a PONG packet in the [ping timeout](crate::SocketIoConfigBuilder) delay + HeartbeatTimeout, + /// The client has manually disconnected the socket using [`socket.disconnect()`](https://socket.io/fr/docs/v4/client-api/#socketdisconnect) + ClientNSDisconnect, + /// The socket was forcefully disconnected from the namespace with `Socket::disconnect` + ServerNSDisconnect, +} + +impl From for DisconnectReason { + fn from(reason: EIoDisconnectReason) -> Self { + match reason { + EIoDisconnectReason::TransportClose => DisconnectReason::TransportClose, + EIoDisconnectReason::TransportError => DisconnectReason::TransportError, + EIoDisconnectReason::HeartbeatTimeout => DisconnectReason::HeartbeatTimeout, + } + } +} /// A Socket represents a client connected to a namespace. /// It is used to send and receive messages from the client, join and leave rooms, etc. @@ -150,10 +178,10 @@ impl Socket { /// }); pub fn on_disconnect(&self, callback: C) where - C: Fn(Arc>) -> F + Send + Sync + 'static, + C: Fn(Arc>, DisconnectReason) -> F + Send + Sync + 'static, F: Future + Send + 'static, { - let handler = Box::new(move |s| Box::pin(callback(s)) as _); + let handler = Box::new(move |s, r| Box::pin(callback(s, r)) as _); *self.disconnect_handler.lock().unwrap() = Some(handler); } @@ -375,7 +403,7 @@ impl Socket { /// It will also call the disconnect handler if it is set. pub fn disconnect(self: Arc) -> Result<(), SendError> { self.send(Packet::disconnect(self.ns.path.clone()))?; - self.close()?; + self.close(DisconnectReason::ServerNSDisconnect)?; Ok(()) } @@ -412,12 +440,12 @@ impl Socket { Ok((serde_json::from_value(v.0)?, v.1)) } - /// Called when the socket is gracefully [`disconnect`](Socket::disconnect)ed from the server or the client - /// + /// Called when the socket is gracefully disconnected from the server or the client + /// /// It maybe also closed when the underlying transport is closed or failed. - pub(crate) fn close(self: Arc) -> Result<(), AdapterError> { + pub(crate) fn close(self: Arc, reason: DisconnectReason) -> Result<(), AdapterError> { if let Some(handler) = self.disconnect_handler.lock().unwrap().take() { - tokio::spawn(handler(self.clone())); + tokio::spawn(handler(self.clone(), reason)); } self.ns.remove_socket(self.sid) } @@ -429,7 +457,9 @@ impl Socket { PacketData::EventAck(data, ack_id) => self.recv_ack(data, ack_id), PacketData::BinaryEvent(e, packet, ack) => self.recv_bin_event(e, packet, ack), PacketData::BinaryAck(packet, ack) => self.recv_bin_ack(packet, ack), - PacketData::Disconnect => self.close().map_err(Error::from), + PacketData::Disconnect => self + .close(DisconnectReason::ClientNSDisconnect) + .map_err(Error::from), _ => unreachable!(), } } From f4579d611b640c8cd235e9fbbcf336a32564a316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 2 Jul 2023 00:54:37 +0200 Subject: [PATCH 08/22] fix: add `DisconnectReason` params to tests --- engineioxide/src/config.rs | 4 ++-- socketioxide/src/socket.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/engineioxide/src/config.rs b/engineioxide/src/config.rs index cae3cf72..b3c58415 100644 --- a/engineioxide/src/config.rs +++ b/engineioxide/src/config.rs @@ -81,7 +81,7 @@ impl EngineIoConfigBuilder { /// # use engineioxide::{ /// layer::EngineIoLayer, /// handler::EngineIoHandler, - /// socket::Socket, + /// socket::{Socket, DisconnectReason}, /// }; /// # use std::sync::Arc; /// #[derive(Debug, Clone)] @@ -94,7 +94,7 @@ impl EngineIoConfigBuilder { /// fn on_connect(&self, socket: &Socket) { /// println!("socket connect {}", socket.sid); /// } - /// fn on_disconnect(&self, socket: &Socket) { + /// fn on_disconnect(&self, socket: &Socket, reason: DisconnectReason) { /// println!("socket disconnect {}", socket.sid); /// } /// diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 9b5a007f..cf241dc1 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -172,8 +172,8 @@ impl Socket { /// // Close the current socket /// socket.disconnect().ok(); /// }); - /// socket.on_disconnect(|socket| async move { - /// println!("Socket {} on ns {} disconnected", socket.sid, socket.ns()); + /// socket.on_disconnect(|socket, reason| async move { + /// println!("Socket {} on ns {} disconnected, reason: {:?}", socket.sid, socket.ns(), reason); /// }); /// }); pub fn on_disconnect(&self, callback: C) From 29e9cfe771bc5a30884d754a8f9c75a03fb65447 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 2 Jul 2023 02:31:59 +0200 Subject: [PATCH 09/22] chore(format): fmt codebase --- e2e/src/engineioxide.rs | 5 ++++- engineioxide/src/engine.rs | 9 ++++++--- engineioxide/src/handler.rs | 2 +- engineioxide/src/socket.rs | 5 ++++- examples/src/engineio-echo/axum_echo.rs | 6 +++++- examples/src/engineio-echo/hyper_echo.rs | 6 +++++- examples/src/engineio-echo/warp_echo.rs | 6 +++++- socketioxide/src/client.rs | 2 +- socketioxide/src/lib.rs | 2 +- 9 files changed, 32 insertions(+), 11 deletions(-) diff --git a/e2e/src/engineioxide.rs b/e2e/src/engineioxide.rs index 93ade0e2..620c090b 100644 --- a/e2e/src/engineioxide.rs +++ b/e2e/src/engineioxide.rs @@ -3,7 +3,10 @@ use std::time::Duration; use engineioxide::{ - config::EngineIoConfig, handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}, + config::EngineIoConfig, + handler::EngineIoHandler, + service::EngineIoService, + socket::{DisconnectReason, Socket}, }; use hyper::Server; use tracing::{info, Level}; diff --git a/engineioxide/src/engine.rs b/engineioxide/src/engine.rs index c29f46cd..5500e8b0 100644 --- a/engineioxide/src/engine.rs +++ b/engineioxide/src/engine.rs @@ -5,7 +5,6 @@ use std::{ sync::{Arc, RwLock}, }; -use crate::{sid_generator::Sid, socket::DisconnectReason}; use crate::{ body::ResponseBody, config::EngineIoConfig, @@ -17,6 +16,7 @@ use crate::{ sid_generator::generate_sid, socket::{ConnectionType, Socket, SocketReq}, }; +use crate::{sid_generator::Sid, socket::DisconnectReason}; use bytes::Buf; use futures::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt}; use http::{Request, Response, StatusCode}; @@ -61,7 +61,8 @@ impl EngineIo { B: Send + 'static, { let engine = self.clone(); - let close_fn = Box::new(move |sid: Sid, reason: DisconnectReason| engine.close_session(sid, reason)); + let close_fn = + Box::new(move |sid: Sid, reason: DisconnectReason| engine.close_session(sid, reason)); let sid = generate_sid(); let socket = Socket::new( sid, @@ -248,7 +249,9 @@ impl EngineIo { } else { let sid = generate_sid(); let engine = self.clone(); - let close_fn = Box::new(move |sid: Sid, reason: DisconnectReason| engine.close_session(sid, reason)); + let close_fn = Box::new(move |sid: Sid, reason: DisconnectReason| { + engine.close_session(sid, reason) + }); let socket = Socket::new( sid, ConnectionType::WebSocket, diff --git a/engineioxide/src/handler.rs b/engineioxide/src/handler.rs index 6c1cab3d..36fb8033 100644 --- a/engineioxide/src/handler.rs +++ b/engineioxide/src/handler.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use crate::socket::{Socket, DisconnectReason}; +use crate::socket::{DisconnectReason, Socket}; /// An handler for engine.io events for each sockets. #[async_trait] diff --git a/engineioxide/src/socket.rs b/engineioxide/src/socket.rs index aba3482e..49a874ef 100644 --- a/engineioxide/src/socket.rs +++ b/engineioxide/src/socket.rs @@ -259,7 +259,10 @@ where #[cfg(test)] impl Socket { - pub fn new_dummy(sid: Sid, close_fn: Box) -> Socket { + pub fn new_dummy( + sid: Sid, + close_fn: Box, + ) -> Socket { let (internal_tx, internal_rx) = mpsc::channel(200); let (tx, rx) = mpsc::channel(200); let (pong_tx, pong_rx) = mpsc::channel(1); diff --git a/examples/src/engineio-echo/axum_echo.rs b/examples/src/engineio-echo/axum_echo.rs index 09c30ebe..e4c90735 100644 --- a/examples/src/engineio-echo/axum_echo.rs +++ b/examples/src/engineio-echo/axum_echo.rs @@ -1,6 +1,10 @@ use axum::routing::get; use axum::Server; -use engineioxide::{handler::EngineIoHandler, layer::EngineIoLayer, socket::{Socket, DisconnectReason}}; +use engineioxide::{ + handler::EngineIoHandler, + layer::EngineIoLayer, + socket::{DisconnectReason, Socket}, +}; use tracing::info; use tracing_subscriber::FmtSubscriber; diff --git a/examples/src/engineio-echo/hyper_echo.rs b/examples/src/engineio-echo/hyper_echo.rs index 0a2cb3c9..bd79f271 100644 --- a/examples/src/engineio-echo/hyper_echo.rs +++ b/examples/src/engineio-echo/hyper_echo.rs @@ -1,4 +1,8 @@ -use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}}; +use engineioxide::{ + handler::EngineIoHandler, + service::EngineIoService, + socket::{DisconnectReason, Socket}, +}; use hyper::Server; use tracing::info; use tracing_subscriber::FmtSubscriber; diff --git a/examples/src/engineio-echo/warp_echo.rs b/examples/src/engineio-echo/warp_echo.rs index cca04dd4..a0e4f689 100644 --- a/examples/src/engineio-echo/warp_echo.rs +++ b/examples/src/engineio-echo/warp_echo.rs @@ -1,4 +1,8 @@ -use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::{Socket, DisconnectReason}}; +use engineioxide::{ + handler::EngineIoHandler, + service::EngineIoService, + socket::{DisconnectReason, Socket}, +}; use hyper::Server; use tracing::info; use tracing_subscriber::FmtSubscriber; diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index a9b45703..d2c3eae8 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use engineioxide::handler::EngineIoHandler; -use engineioxide::socket::{Socket as EIoSocket, DisconnectReason as EIoDisconnectReason}; +use engineioxide::socket::{DisconnectReason as EIoDisconnectReason, Socket as EIoSocket}; use serde_json::Value; use engineioxide::sid_generator::Sid; diff --git a/socketioxide/src/lib.rs b/socketioxide/src/lib.rs index 4d3ca685..12a9f1c3 100644 --- a/socketioxide/src/lib.rs +++ b/socketioxide/src/lib.rs @@ -68,7 +68,7 @@ pub use errors::{AckError, Error as SocketError}; pub use layer::SocketIoLayer; pub use ns::Namespace; pub use service::SocketIoService; -pub use socket::{Socket, DisconnectReason}; +pub use socket::{DisconnectReason, Socket}; mod client; mod config; From 18153efa302f872d32fd8568d7434f59ee5c8cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 2 Jul 2023 02:48:28 +0200 Subject: [PATCH 10/22] ft: adding `TransportError` for `DisconnectReasons` --- engineioxide/src/engine.rs | 20 ++++++++++++++++---- engineioxide/src/errors.rs | 10 ++++++++++ engineioxide/src/socket.rs | 6 +++--- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/engineioxide/src/engine.rs b/engineioxide/src/engine.rs index 5500e8b0..7cf1b286 100644 --- a/engineioxide/src/engine.rs +++ b/engineioxide/src/engine.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ body::ResponseBody, config::EngineIoConfig, - errors::Error, + errors::{Error, TransportError}, futures::{http_response, ws_response}, handler::EngineIoHandler, packet::{OpenPacket, Packet}, @@ -106,7 +106,9 @@ impl EngineIo { let mut rx = match socket.internal_rx.try_lock() { Ok(s) => s, Err(_) => { - socket.close(DisconnectReason::TransportError); + socket.close(DisconnectReason::TransportError( + TransportError::MultipleHttpPolling, + )); return Err(Error::HttpErrorResponse(StatusCode::BAD_REQUEST)); } }; @@ -162,7 +164,10 @@ impl EngineIo { match Packet::try_from(packet?) { Err(e) => { debug!("[sid={sid}] error parsing packet: {:?}", e); - self.close_session(sid, DisconnectReason::TransportError); + self.close_session( + sid, + DisconnectReason::TransportError(TransportError::PacketParsing), + ); Err(e) } Ok(Packet::Close) => { @@ -297,8 +302,14 @@ impl EngineIo { self.handler.on_connect(&socket); if let Err(e) = self.ws_forward_to_handler(rx, &socket).await { debug!("[sid={}] error when handling packet: {:?}", socket.sid, e); + //TODO: error here is not always a PacketParsing error + self.close_session( + socket.sid, + DisconnectReason::TransportError(TransportError::PacketParsing), + ); + } else { + self.close_session(socket.sid, DisconnectReason::TransportClose); } - self.close_session(socket.sid, DisconnectReason::TransportClose); rx_handle.abort(); Ok(()) } @@ -309,6 +320,7 @@ impl EngineIo { mut rx: SplitStream>, socket: &Arc>, ) -> Result<(), Error> { + //TODO: handle the rx error here while let Ok(msg) = rx.try_next().await { let Some(msg) = msg else { continue }; match msg { diff --git a/engineioxide/src/errors.rs b/engineioxide/src/errors.rs index 5e307019..c7f872f3 100644 --- a/engineioxide/src/errors.rs +++ b/engineioxide/src/errors.rs @@ -96,3 +96,13 @@ impl From for Response> { } } } + +/// An error used to specify [`crate::socket::DisconnectReason::TransportError`] +#[derive(thiserror::Error, Clone, Debug)] +pub enum TransportError { + #[error("simultaneaous http polling request")] + MultipleHttpPolling, + + #[error("parse error")] + PacketParsing, +} diff --git a/engineioxide/src/socket.rs b/engineioxide/src/socket.rs index 49a874ef..5f57f2cb 100644 --- a/engineioxide/src/socket.rs +++ b/engineioxide/src/socket.rs @@ -13,11 +13,11 @@ use tokio::{ }; use tracing::debug; -use crate::sid_generator::Sid; use crate::{ config::EngineIoConfig, errors::Error, handler::EngineIoHandler, packet::Packet, utils::forward_map_chan, SendPacket, }; +use crate::{errors::TransportError, sid_generator::Sid}; #[derive(Debug, Clone, PartialEq)] pub(crate) enum ConnectionType { @@ -54,10 +54,10 @@ impl From for SocketReq { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum DisconnectReason { TransportClose, - TransportError, + TransportError(TransportError), HeartbeatTimeout, } From 7084ce7e07782bfd8aad3026247356f8fa4609ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 20 Aug 2023 12:18:19 +0200 Subject: [PATCH 11/22] Merge branch 'main' into ft-disconnect-handler --- .github/FUNDING.yml | 13 + .github/workflows/engineio-ci.yml | 101 ++++++ .github/workflows/socketio-ci.yml | 2 +- CONTRIBUTING.md | 268 ++++++++++++++ Cargo.lock | 28 +- Readme.md => README.md | 5 +- e2e/Cargo.toml | 11 +- engineioxide/Cargo.toml | 28 +- engineioxide/Readme.md | 20 + engineioxide/src/config.rs | 56 +++ engineioxide/src/engine.rs | 180 ++++++--- engineioxide/src/errors.rs | 11 +- engineioxide/src/futures.rs | 17 +- engineioxide/src/lib.rs | 4 + engineioxide/src/packet.rs | 64 ++-- engineioxide/src/payload/buf.rs | 151 ++++++++ engineioxide/src/payload/decoder.rs | 542 ++++++++++++++++++++++++++++ engineioxide/src/payload/encoder.rs | 214 +++++++++++ engineioxide/src/payload/mod.rs | 88 +++++ engineioxide/src/service.rs | 195 ++++++++-- engineioxide/src/socket.rs | 111 +++++- examples/Cargo.toml | 2 +- socketioxide/Cargo.toml | 4 +- socketioxide/src/client.rs | 35 +- socketioxide/src/config.rs | 36 +- socketioxide/src/errors.rs | 51 ++- socketioxide/src/handler.rs | 22 +- socketioxide/src/lib.rs | 9 +- socketioxide/src/operators.rs | 8 +- socketioxide/src/packet.rs | 2 +- socketioxide/src/retryer.rs | 124 ------- socketioxide/src/service.rs | 2 +- socketioxide/src/socket.rs | 241 +++++++++---- 33 files changed, 2245 insertions(+), 400 deletions(-) create mode 100644 .github/FUNDING.yml create mode 100644 .github/workflows/engineio-ci.yml create mode 100644 CONTRIBUTING.md rename Readme.md => README.md (97%) create mode 100644 engineioxide/src/payload/buf.rs create mode 100644 engineioxide/src/payload/decoder.rs create mode 100644 engineioxide/src/payload/encoder.rs create mode 100644 engineioxide/src/payload/mod.rs delete mode 100644 socketioxide/src/retryer.rs diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 00000000..5dd73b20 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,13 @@ +# These are supported funding model platforms + +github: totodore # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +patreon: # Replace with a single Patreon username +open_collective: # Replace with a single Open Collective username +ko_fi: # Replace with a single Ko-fi username +tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel +community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry +liberapay: # Replace with a single Liberapay username +issuehunt: # Replace with a single IssueHunt username +otechie: # Replace with a single Otechie username +lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry +custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2'] diff --git a/.github/workflows/engineio-ci.yml b/.github/workflows/engineio-ci.yml new file mode 100644 index 00000000..4b312168 --- /dev/null +++ b/.github/workflows/engineio-ci.yml @@ -0,0 +1,101 @@ +name: EngineIO CI + +on: + push: + tags: + - v* + pull_request: + branches: + - main + +jobs: + e2e_v3: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: actions/checkout@v3 + with: + repository: socketio/engine.io-protocol + path: engine.io-protocol + ref: v3 + - uses: actions/setup-node@v3 + with: + node-version: 16 + - name: Install deps & run tests + run: | + cd engine.io-protocol/test-suite && npm install && cd ../.. + cargo build --bin engineioxide-e2e --release --features engineio-v3 --no-default-features + cargo run --bin engineioxide-e2e --release --features engineio-v3 --no-default-features > server.txt & npm --prefix engine.io-protocol/test-suite test > client.txt + - name: Server output + if: always() + run: cat server.txt + - name: Client output + if: always() + run: cat client.txt + e2e_v4: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: actions/checkout@v3 + with: + repository: socketio/engine.io-protocol + path: engine.io-protocol + ref: main + - uses: actions/setup-node@v3 + with: + node-version: 16 + - name: Install deps & run tests + run: | + cd engine.io-protocol/test-suite && npm install && cd ../.. + cargo build --bin engineioxide-e2e --release --features engineio-v4 --no-default-features + cargo run --bin engineioxide-e2e --release --features engineio-v4 --no-default-features > server.txt & npm --prefix engine.io-protocol/test-suite test > client.txt + - name: Server output + if: always() + run: cat server.txt + - name: Client output + if: always() + run: cat client.txt + e2e_full: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: actions/checkout@v3 + with: + repository: socketio/engine.io-protocol + path: engine.io-protocol-v3 + ref: v3 + - uses: actions/checkout@v3 + with: + repository: socketio/engine.io-protocol + path: engine.io-protocol-v4 + ref: main + - uses: actions/setup-node@v3 + with: + node-version: 16 + - name: Install deps & run tests + run: | + cd engine.io-protocol-v3/test-suite && npm install && cd ../.. + cd engine.io-protocol-v4/test-suite && npm install && cd ../.. + cargo build --bin engineioxide-e2e --release --features engineio-v3,engineio-v4 --no-default-features + cargo run --bin engineioxide-e2e --release --features engineio-v3,engineio-v4 --no-default-features > server.txt & npm --prefix engine.io-protocol-v4/test-suite test > client_v4.txt & npm --prefix engine.io-protocol-v3/test-suite test > client_v3.txt + - name: Server output + if: always() + run: cat server.txt + - name: Client v3 output + if: always() + run: cat client_v3.txt + - name: Client v4 output + if: always() + run: cat client_v4.txt \ No newline at end of file diff --git a/.github/workflows/socketio-ci.yml b/.github/workflows/socketio-ci.yml index 2900da8a..7ef0dccf 100644 --- a/.github/workflows/socketio-ci.yml +++ b/.github/workflows/socketio-ci.yml @@ -27,7 +27,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: stable - - run: cargo test + - run: cargo test --all-features e2e: runs-on: ubuntu-latest diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..e0e6af92 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,268 @@ +# Contributing to socketioxide + +We would love for you to contribute to socketioxide and help make it even better than it is +today! As a contributor, here are the guidelines we would like you to follow: + + + +- [Question or Problem?](#question) +- [Issues and Bugs](#issue) +- [Feature Requests](#feature) +- [Submission Guidelines](#submit) +- [Development Setup](#development) +- [Coding Rules](#rules) +- [Commit Message Guidelines](#commit) + + +## Got a Question or Problem? + +For now, you can open an issue with the `question` label. We will try to answer it as soon as possible. +We may later create a separate forum for this purpose and convert all questions to it. + +## Found a Bug? + +If you find a bug in the source code, you can help us by +[submitting an issue](#submit-issue) to our [GitHub Repository][github]. Even better, you can +[submit a Pull Request](#submit-pr) with a fix. + +## Missing a Feature? + +You can _request_ a new feature by [submitting an issue](#submit-issue) to our GitHub +Repository. If you would like to _implement_ a new feature, please submit an issue with +a proposal for your work first, to be sure that we can use it. +Please consider what kind of change it is: + +- For a **Major Feature**, first open an issue and outline your proposal so that it can be + discussed. This will also allow us to better coordinate our efforts, prevent duplication of work, + and help you to craft the change so that it is successfully accepted into the project. For your issue name, please prefix your proposal with `[discussion]`, for example "[discussion]: your feature idea". +- **Small Features** can be crafted and directly [submitted as a Pull Request](#submit-pr). + +## Submission Guidelines + +### Submitting an Issue + +Before you submit an issue, please search the issue tracker, maybe an issue for your problem already exists and the discussion might inform you of workarounds readily available. + +We want to fix all the issues as soon as possible, but before fixing a bug we need to reproduce and confirm it. In order to reproduce bugs we will systematically ask you to provide a minimal reproduction scenario using a repository or [Gist](https://gist.github.com/). Having a live, reproducible scenario gives us wealth of important information without going back & forth to you with additional questions like: + +- version of socketioxide used +- version of socket.io protocol used +- and most importantly - a use-case that fails + +Unfortunately, we are not able to investigate / fix bugs without a minimal reproduction, so if we don't hear back from you we are going to close an issue that doesn't have enough info to be reproduced. + +You can file new issues by filling out our [new issue form][new_issue]. + +### Submitting a Pull Request (PR) + +Before you submit your Pull Request (PR) consider the following guidelines: + +1. Search [GitHub Pull Requests][gh_prs] for an open or closed PR + that relates to your submission. You don't want to duplicate effort. +1. Fork this repository. +1. Make your changes in a new git branch: + + ```shell + git checkout -b my-fix-branch master + ``` + +1. Create your patch, **including appropriate test cases**. +1. Follow our [Coding Rules](#rules). +1. Run the full socketioxide test suite (see [common scripts](#common-scripts)), + and ensure that all tests pass. +1. Commit your changes using a descriptive commit message that follows our + [commit message conventions](#commit). Adherence to these conventions + is necessary because release notes are automatically generated from these messages. + + ```shell + git commit -a + ``` + + Note: the optional commit `-a` command line option will automatically "add" and "rm" edited files. + +1. Push your branch to GitHub: + + ```shell + git push origin my-fix-branch + ``` + +1. In GitHub, send a pull request to `socketioxide:main`. + +- If we suggest changes then: + + - Make the required updates. + - Re-run the socketioxide test suites to ensure tests are still passing. + - Rebase your branch and force push to your GitHub repository (this will update your Pull Request): + + ```shell + git rebase master -i + git push -f + ``` + +That's it! Thank you for your contribution! + +#### After your pull request is merged + +After your pull request is merged, you can safely delete your branch and pull the changes +from the main (upstream) repository: + +- Delete the remote branch on GitHub either through the GitHub web UI or your local shell as follows: + + ```shell + git push origin --delete my-fix-branch + ``` + +- Check out the master branch: + + ```shell + git checkout master -f + ``` + +- Delete the local branch: + + ```shell + git branch -D my-fix-branch + ``` + +- Update your master with the latest upstream version: + + ```shell + git pull --ff upstream master + ``` + +## Development Setup + +You will need [rustc and cargo](www.rust-lang.org/tools/install). + +1. Clone the socketioxide repository: + + ```shell + git clone https://github.com/totodore/socketioxide + ``` + +2. Depending on what you want to change, clone the [socketio/engine.io-protocol](https://github.com/socketio/engine.io-protocol) repo or the [socketio/socket.io-protocol](https://github.com/socketio/socket.io-protocol) repo or both + ```shell + git clone https://github.com/socketio/engine.io-protocol + ``` + +3. Install dependencies for the two test-suite repos with npm install (or yarn install) + ```shell + cd engine.io-protocol/test-suite + npm install + cd ../../socket.io-protocol/test-suite + npm install + ``` +4. Run the e2e test server you want (in socketioxide/e2e) + ```shell + cargo run --bin engineioxide-e2e --features v3,v4 + ``` + ```shell + cargo run --bin socketioxide-e2e --features v3,v4 + ``` +5. Run the corresponding test suite (in socketio/engine.io-protocol/test-suites or socketio/socket.io-protocol/test-suites) + ```shell + cd engine.io-protocol/test-suite && npm test + ``` + ```shell + cd socket.io-protocol/test-suite && npm test + ``` + + +## Coding Rules + +To ensure consistency throughout the source code, keep these rules in mind as you are working: + +- All features or bug fixes **must be tested** by one or more unit-test. +- The code must adhere to the rustfmt style guide. Run `cargo fmt` to check your code (all PRs with bad-formated code won't pass CI). + +## Commit Message Guidelines + +We have very precise rules over how our git commit messages can be formatted. This leads to **more +readable messages** that are easy to follow when looking through the **project history**. But also, +we use the git commit messages to **generate the socketioxide change log**. + +### Commit Message Format + +Each commit message consists of a **header**, a **body** and a **footer**. The header has a special +format that includes a **type**, a **scope** and a **subject**: + +``` +(): + + + +