Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow selecting player and map in LAN games. #766

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,50 @@ pub static NETWORK_ENDPOINT: Lazy<quinn::Endpoint> = Lazy::new(|| {
endpoint
});

/// Trait implemented by sockets that may be used for a [`GgrsSessionRunner`].
pub trait GgrsSocket: ggrs::NonBlockingSocket<usize> {}
/// Resource containing the network socket while there is a connection to a LAN or online game.
#[derive(Resource, Deref, DerefMut)]
pub struct NetworkMatchSocket(pub Box<dyn NetworkSocket>);

/// A boxed [`ggrs::NonBlockingSocket`] implementation.
#[derive(Deref, DerefMut)]
pub struct BoxedNonBlockingSocket(Box<dyn ggrs::NonBlockingSocket<usize> + 'static>);

impl ggrs::NonBlockingSocket<usize> for BoxedNonBlockingSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
self.0.send_to(msg, addr)
}

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
self.0.receive_all_messages()
}
}

/// Trait implemented by network match sockets.
pub trait NetworkSocket: Sync + Send {
/// Get a GGRS socket from this network socket.
fn ggrs_socket(&self) -> BoxedNonBlockingSocket;
/// Send a reliable message to the given [`SocketTarget`].
fn send_reliable(&self, target: SocketTarget, message: &[u8]);
/// Receive reliable messages from other players. The `usize` is the index of the player that
/// sent the message.
fn recv_reliable(&self) -> Vec<(usize, Vec<u8>)>;
/// Close the connection.
fn close(&self);
/// Get the player index of the local player.
fn player_idx(&self) -> usize;
/// Return, for every player index, whether the player is a local player.
fn player_is_local(&self) -> [bool; MAX_PLAYERS];
/// Get the player count for this network match.
fn player_count(&self) -> usize;
}

/// The target for a reliable network message.
pub enum SocketTarget {
/// Send to a specific player.
Player(usize),
/// Broadcast to all players.
All,
}

/// [`SessionRunner`] implementation that uses [`ggrs`] for network play.
pub struct GgrsSessionRunner {
Expand All @@ -79,9 +121,8 @@ pub struct GgrsSessionRunner {
}

/// The info required to create a [`GgrsSessionRunner`].
#[derive(Debug)]
pub struct GgrsSessionRunnerInfo {
pub socket: LanSocket,
pub socket: BoxedNonBlockingSocket,
pub player_is_local: [bool; MAX_PLAYERS],
pub player_count: usize,
}
Expand Down
146 changes: 136 additions & 10 deletions src/networking/lan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn lan_matchmaker(
// Send the connections to the game so that it can start the network match.
matchmaker_channel
.try_send(LanMatchmakerResponse::GameStarting {
lan_socket: LanSocket::new(connections),
lan_socket: LanSocket::new(0, connections),
player_idx: 0,
player_count,
})
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn lan_matchmaker(
peer_connections[i] = Some(conn);
}

let lan_socket = LanSocket::new(peer_connections);
let lan_socket = LanSocket::new(player_idx, peer_connections);
info!("Connections established.");

matchmaker_channel
Expand Down Expand Up @@ -272,24 +272,33 @@ pub enum LanMatchmakerResponse {
},
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct LanSocket {
pub connections: [Option<quinn::Connection>; MAX_PLAYERS],
pub ggrs_message_channel: async_channel::Receiver<(usize, ggrs::Message)>,
pub ggrs_receiver: async_channel::Receiver<(usize, ggrs::Message)>,
pub reliable_receiver: async_channel::Receiver<(usize, Vec<u8>)>,
pub player_idx: usize,
pub player_count: usize,
}

impl LanSocket {
pub fn new(connections: [Option<quinn::Connection>; MAX_PLAYERS]) -> Self {
let (sender, receiver) = async_channel::unbounded();
pub fn new(player_idx: usize, connections: [Option<quinn::Connection>; MAX_PLAYERS]) -> Self {
let (ggrs_sender, ggrs_receiver) = async_channel::unbounded();
let (reliable_sender, reliable_receiver) = async_channel::unbounded();

let pool = bevy::tasks::IoTaskPool::get();

// Spawn tasks to receive network messages from each peer
#[allow(clippy::needless_range_loop)]
for i in 0..MAX_PLAYERS {
if let Some(conn) = connections[i].clone() {
let sender = sender.clone();
let ggrs_sender = ggrs_sender.clone();

// Unreliable message receiver
let conn_ = conn.clone();
pool.spawn(async move {
let conn = conn_;

#[cfg(feature = "debug-network-slowdown")]
use turborand::prelude::*;
#[cfg(feature = "debug-network-slowdown")]
Expand Down Expand Up @@ -323,7 +332,56 @@ impl LanSocket {
)
.await;
}
if sender.send((i, message)).await.is_err() {
if ggrs_sender.send((i, message)).await.is_err() {
break;
}
}
Err(e) => {
warn!("Connection error: {e}");
}
},
}
}
})
.detach();

// Reliable message receiver
let reliable_sender = reliable_sender.clone();
pool.spawn(async move {
#[cfg(feature = "debug-network-slowdown")]
use turborand::prelude::*;
#[cfg(feature = "debug-network-slowdown")]
let rng = AtomicRng::new();

loop {
let event =
future::or(async { either::Left(conn.closed().await) }, async {
either::Right(conn.accept_uni().await)
})
.await;

match event {
either::Either::Left(closed) => {
warn!("Connection error: {closed}");
break;
}
either::Either::Right(result) => match result {
Ok(stream) => {
let data =
stream.read_to_end(4096).await.expect("Network read error");

// Debugging code to introduce artificial latency
#[cfg(feature = "debug-network-slowdown")]
{
use async_timer::Oneshot;
async_timer::oneshot::Timer::new(
std::time::Duration::from_millis(
(rng.f32_normalized() * 100.0) as u64 + 1,
),
)
.await;
}
if reliable_sender.send((i, data)).await.is_err() {
break;
}
}
Expand All @@ -339,8 +397,11 @@ impl LanSocket {
}

Self {
player_idx,
player_count: connections.iter().flatten().count() + 1,
connections,
ggrs_message_channel: receiver,
ggrs_receiver,
reliable_receiver,
}
}
}
Expand All @@ -357,9 +418,74 @@ impl ggrs::NonBlockingSocket<usize> for LanSocket {

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_message_channel.try_recv() {
while let Ok(message) = self.ggrs_receiver.try_recv() {
messages.push(message);
}
messages
}
}

impl NetworkSocket for LanSocket {
fn send_reliable(&self, target: SocketTarget, message: &[u8]) {
let task_pool = IoTaskPool::get();
let message = Bytes::copy_from_slice(message);

match target {
SocketTarget::Player(i) => {
let conn = self.connections[i].as_ref().unwrap().clone();

task_pool
.spawn(async move {
let mut stream = conn.open_uni().await.unwrap();
stream.write_chunk(message).await.unwrap();
stream.finish().await.unwrap();
})
.detach();
}
SocketTarget::All => {
for conn in &self.connections {
if let Some(conn) = conn.clone() {
let message = message.clone();
task_pool
.spawn(async move {
let mut stream = conn.open_uni().await.unwrap();
stream.write_chunk(message).await.unwrap();
stream.finish().await.unwrap();
})
.detach();
}
}
}
}
}

fn recv_reliable(&self) -> Vec<(usize, Vec<u8>)> {
let mut messages = Vec::new();
while let Ok(message) = self.reliable_receiver.try_recv() {
messages.push(message);
}
messages
}

fn ggrs_socket(&self) -> BoxedNonBlockingSocket {
BoxedNonBlockingSocket(Box::new(self.clone()))
}

fn close(&self) {
for conn in self.connections.iter().flatten() {
conn.close(0u8.into(), &[]);
}
}

fn player_idx(&self) -> usize {
self.player_idx
}

fn player_count(&self) -> usize {
self.player_count
}

fn player_is_local(&self) -> [bool; MAX_PLAYERS] {
std::array::from_fn(|i| self.connections[i].is_none() && i < self.player_count)
}
}
2 changes: 1 addition & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl<'w, 's> SessionManager<'w, 's> {
}

#[cfg(not(target_arch = "wasm32"))]
pub fn start_lan(
pub fn start_network(
&mut self,
core_info: CoreSessionInfo,
lan_info: crate::networking::GgrsSessionRunnerInfo,
Expand Down
Loading