Skip to content

Commit

Permalink
bulk: fix a few bugs
Browse files Browse the repository at this point in the history
feat(debug_buffers): Debugs ingoing and outgoing traffic with a feature
  • Loading branch information
john-bv committed Feb 6, 2024
1 parent 5d5eb96 commit 5ac30f5
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 30 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ repository = "https://github.com/NetrexMC/RakNet"
rustdoc-args = ["--html-in-header", "./resources/header.html"]

[features]
default = [ "async_std" ]
default = [ "async_tokio" ]
# default = ["async_tokio" ]
mcpe = []
debug = []
debug_all = []
debug_buffers = []
async_std = [ "async-std" ]
async_tokio = [ "tokio" ]

Expand Down
2 changes: 1 addition & 1 deletion examples/async-std/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ edition = "2021"

[dependencies]
async-std = { version = "1.12.0", features = [ "unstable", "attributes" ] }
binary-util = "0.3.0"
binary-util = "0.3.4"
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "async-std" ]}
11 changes: 11 additions & 0 deletions examples/tokio/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "client"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rak-rs = { path = "../../../", features = [ "async_tokio", "debug_buffers" ]}
binary-util = "0.3.4"
tokio = { version= "1.36.0", features = [ "full" ] }
28 changes: 28 additions & 0 deletions examples/tokio/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use rak_rs::{
client::{Client, DEFAULT_MTU},
protocol::reliability::Reliability,
};

#[tokio::main]
async fn main() {
let mut client = Client::new(10, DEFAULT_MTU);

if let Err(e) = client.connect("zeqa.net:19132").await {
println!("Failed to connect: {}", e);
return;
}

client
.send_ord(&[254, 6, 193, 1, 0, 0, 2, 118], 0)
.await
.unwrap();

loop {
let packet = client.recv().await.unwrap();
if packet[0] == 0xfe {
println!("Received (Game): {:?}", packet);
} else {
println!("Received: {:?}", packet);
}
}
}
3 changes: 3 additions & 0 deletions src/client/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ macro_rules! match_ids {
Ok(l) => len = l
};

crate::rakrs_debug_buffers!(true, "[annon]: {:?}", &recv_buf[..len]);

// rakrs_debug!(true, "[CLIENT] Received packet from server: {:x?}", &recv_buf[..len]);

if ids.contains(&recv_buf[0]) {
Expand Down Expand Up @@ -112,6 +114,7 @@ macro_rules! expect_reply {
};

// rakrs_debug!(true, "[CLIENT] Received packet from server: {:x?}", &recv_buf[..len]);
crate::rakrs_debug_buffers!(true, "[annon]: {:?}", &recv_buf[..len]);

let mut reader = ByteReader::from(&recv_buf[1..len]);
if let Ok(packet) = <$reply>::read(&mut reader) {
Expand Down
80 changes: 57 additions & 23 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ pub struct Client {
/// The receive queue is used internally to receive packets from the server.
/// This is read from before sending
recv_queue: Arc<Mutex<RecvQueue>>,
/// The network receive channel is used to receive raw packets from the server.
network_recv: Option<Arc<Mutex<Receiver<Vec<u8>>>>>,
/// The internal channel that is used to dispatch packets to a higher level.
internal_recv: Receiver<Vec<u8>>,
internal_send: Sender<Vec<u8>>,
Expand Down Expand Up @@ -207,7 +205,6 @@ impl Client {
state: Arc::new(Mutex::new(ConnectionState::Offline)),
send_queue: None,
recv_queue: Arc::new(Mutex::new(RecvQueue::new())),
network_recv: None,
mtu,
version,
tasks: Arc::new(Mutex::new(Vec::new())),
Expand Down Expand Up @@ -289,8 +286,6 @@ impl Client {
self.send_queue = Some(send_queue.clone());
let (net_send, net_recv) = bounded::<Vec<u8>>(10);

self.network_recv = Some(Arc::new(Mutex::new(net_recv)));

let closer = self.close_notifier.clone();

Self::ping(socket.clone()).await?;
Expand Down Expand Up @@ -369,7 +364,7 @@ impl Client {
}
});

let recv_task = self.init_recv_task();
let recv_task = self.init_recv_task(net_recv);
let tisk_task = self.init_connect_tick(send_queue.clone());

if let Err(e) = recv_task {
Expand Down Expand Up @@ -482,6 +477,25 @@ impl Client {
}
}

/// Sends a packet immediately, bypassing the send queue.
/// This is useful for things like player login and responses, however is discouraged
/// for general use, due to congestion control.
///
/// # Example
/// ```rust ignore
/// use rak_rs::client::Client;
///
/// #[async_std::main]
/// async fn main() {
/// let mut client = Client::new(10, 1400);
/// if let Err(e) = client.connect("my_server.net:19132").await {
/// println!("Failed to connect! {}", e);
/// return;
/// }
/// // Sent immediately.
/// client.send_immediate(&[0, 1, 2, 3], Reliability::Reliable, 0).await.unwrap();
/// }
/// ```
pub async fn send_immediate(
&self,
buffer: &[u8],
Expand All @@ -503,6 +517,9 @@ impl Client {
}
}

/// Attempts to flush the acknowledgement queue, this can be useful if you notice that
/// the client stops responding to packets, or you are handling large chunks of data
/// that is time sensitive.
pub async fn flush_ack(&self) {
let mut send_q = self.send_queue.as_ref().unwrap().write().await;
let mut recv_q = self.recv_queue.lock().await;
Expand Down Expand Up @@ -541,6 +558,25 @@ impl Client {
}
}

/// Pings a server and returns the latency via [`UnconnectedPong`].
///
/// [`UnconnectedPong`]: crate::protocol::packet::offline::UnconnectedPong
///
/// # Example
/// ```rust ignore
/// use rak_rs::client::Client;
/// use std::sync::Arc;
/// use async_std::net::UdpSocket;
///
/// #[async_std::main]
/// async fn main() {
/// let mut socket = UdpSocket::bind("my_cool_server.net:19193").unwrap();
/// let socket_arc = Arc::new(socket);
/// if let Ok(pong) = Client::ping(socket).await {
/// println!("Latency: {}ms", pong.pong_time - pong.ping_time);
/// }
/// }
/// ```
pub async fn ping(socket: Arc<UdpSocket>) -> Result<UnconnectedPong, ClientError> {
let mut buf: [u8; 2048] = [0; 2048];
let unconnected_ping = UnconnectedPing {
Expand Down Expand Up @@ -593,15 +629,16 @@ impl Client {
}
}

fn init_recv_task(&self) -> Result<JoinHandle<()>, ClientError> {
let net_recv = match self.network_recv {
Some(ref n) => n.clone(),
None => {
rakrs_debug!("[CLIENT] (recv_task) Network recv channel is not initialized");
return Err(ClientError::Killed);
}
};

/// Internal, this is the task that is responsible for receiving packets and dispatching
/// them to the user.
///
/// This is responsible for the user api on [`Client::recv()`].
///
/// [`Client::recv()`]: crate::client::Client::recv
fn init_recv_task(
&self,
mut net_recv: Receiver<Vec<u8>>,
) -> Result<JoinHandle<()>, ClientError> {
let send_queue = match self.send_queue {
Some(ref s) => s.clone(),
None => {
Expand All @@ -618,10 +655,7 @@ impl Client {

return Ok(task::spawn(async move {
'task_loop: loop {
#[cfg(feature = "async_std")]
let net_dispatch = net_recv.lock().await;
#[cfg(feature = "async_tokio")]
let mut net_dispatch = net_recv.lock().await;
let closed_dispatch = closed.clone();

macro_rules! recv_body {
Expand All @@ -640,8 +674,6 @@ impl Client {

recv_time.store(current_epoch(), std::sync::atomic::Ordering::Relaxed);

rakrs_debug!(true, "[CLIENT] (recv_task) Received packet!");

let mut client_state = state.lock().await;

if *client_state == ConnectionState::TimingOut {
Expand Down Expand Up @@ -795,7 +827,7 @@ impl Client {
break;
}
}
pk_recv = net_dispatch.recv().fuse() => {
pk_recv = net_recv.recv().fuse() => {
recv_body!(pk_recv);
}
}
Expand All @@ -808,7 +840,7 @@ impl Client {
break;
}
}
pk_recv = net_dispatch.recv() => {
pk_recv = net_recv.recv() => {
recv_body!(pk_recv);
}
}
Expand All @@ -817,7 +849,9 @@ impl Client {
}

/// This is an internal function that initializes the client connection.
/// This is called by `Client::connect()`.
/// This is called by [`Client::connect()`].
///
/// [`Client::connect()`]: crate::client::Client::connect
fn init_connect_tick(
&self,
send_queue: Arc<RwLock<SendQueue>>,
Expand Down
3 changes: 2 additions & 1 deletion src/client/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::protocol::packet::RakPacket;
use crate::rakrs_debug;
use crate::{rakrs_debug, rakrs_debug_buffers};
#[cfg(feature = "async_std")]
use async_std::net::UdpSocket;
use binary_util::interfaces::Writer;
Expand All @@ -16,6 +16,7 @@ pub async fn send_packet(socket: &Arc<UdpSocket>, packet: RakPacket) -> bool {
rakrs_debug!("[CLIENT] Failed sending payload to server! {}", e);
return false;
} else {
rakrs_debug_buffers!(false, "[annon]\n{:?}", buf.as_slice());
return true;
}
} else {
Expand Down
9 changes: 8 additions & 1 deletion src/connection/queue/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
use crate::protocol::frame::{Frame, FramePacket};
use crate::protocol::reliability::Reliability;
use crate::protocol::MAX_FRAGS;
use crate::rakrs_debug;
use crate::server::current_epoch;
use crate::{rakrs_debug, rakrs_debug_buffers};

use super::{FragmentQueue, OrderedQueue};

Expand Down Expand Up @@ -101,6 +101,13 @@ impl RecvQueue {
return;
}

rakrs_debug_buffers!(
true,
"RecvQueue: {}\n{:?}\n",
frame.body.len(),
frame.body.clone()
);

match frame.reliability {
Reliability::Unreliable => {
self.ready.push(frame.body.clone());
Expand Down
4 changes: 3 additions & 1 deletion src/connection/queue/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::protocol::frame::{Frame, FramePacket};
use crate::protocol::packet::RakPacket;
use crate::protocol::reliability::Reliability;
use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
use crate::rakrs_debug;
use crate::util::{to_address_token, SafeGenerator};
use crate::{rakrs_debug, rakrs_debug_buffers};

use super::{FragmentQueue, FragmentQueueError, NetQueue, RecoveryQueue};

Expand Down Expand Up @@ -250,6 +250,8 @@ impl SendQueue {
}

pub(crate) async fn send_stream(&mut self, packet: &[u8]) {
rakrs_debug_buffers!(false, "SendQ: {}\n{:?}\n", packet.len(), packet);

if let Err(e) = self.socket.send_to(packet, &self.address).await {
// we couldn't sent the packet!
rakrs_debug!(
Expand Down
17 changes: 15 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
pub mod event;

use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::{net::SocketAddr, sync::Arc};

#[cfg(feature = "async_std")]
Expand Down Expand Up @@ -72,8 +73,20 @@ impl PossiblySocketAddr<'_> {
Some(addr.parse::<SocketAddr>().unwrap())
}
PossiblySocketAddr::String(addr) => {
// same as above, except less elegant >_<
Some(addr.clone().as_str().parse::<SocketAddr>().unwrap())
if let Ok(addr) = addr.parse::<SocketAddr>() {
Some(addr.clone())
} else {
// try to parse it as a socket addr then a string
if let Ok(mut addr) = addr.to_socket_addrs() {
if let Some(v) = addr.next() {
Some(v)
} else {
None
}
} else {
None
}
}
}
_ => None,
}
Expand Down
10 changes: 10 additions & 0 deletions src/util/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ macro_rules! rakrs_debug {
}
};
}

#[macro_export]
macro_rules! rakrs_debug_buffers {
($server: literal, $($t: tt)*) => {
if cfg!(feature="debug_buffers") {
let x = if $server == true { "S -> C" } else { "C -> S" };
println!("[rakrs] DBG [{}]: {}", x, format!($($t)*));
}
};
}

0 comments on commit 5ac30f5

Please sign in to comment.