Skip to content

Commit

Permalink
feat(networking): Support restarting network session (#384)
Browse files Browse the repository at this point in the history
These are changes in prep for map transition. Session runner now has a
restart function. When game session is reset, network session runner can
be restarted by re-initializing it and using existing socket with
increment match id.

Sockets now wrap ggrs messages with a match_id that allows filtering in
flight messages from previous matches out to avoid issues when
re-creating ggrs session.

Note that GGRS takes a boxed socket - incrementing match id on our
NetworkMatchSocket or other cloned socket will not impact current ggrs
socket, which is fine as we are re-creating ggrs session anyway. It may
only be effectively updated by cloning socket, incrementing, and
re-creating ggrs session. This also avoids issues with interior
mutability of NetworkMatchSocket (arc wrapping socket), so we operate by
cloning / saving boxed socket + updating this.

Additionally added resource `NetworkInfo` which is updated by net
session runner before each step. Contains current + last confirmed
frame, which gameplay code may use to confirm events (such as map
transition) have been synchronized + confirmed by all players.

These changes + session commands PR should be everything we need on the
bones side to get map transitions in.
  • Loading branch information
MaxCWhitehead committed Apr 14, 2024
1 parent 8be1965 commit b7f59fc
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 34 deletions.
84 changes: 76 additions & 8 deletions framework_crates/bones_framework/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl From<ggrs::InputStatus> for NetworkInputStatus {

/// Module prelude.
pub mod prelude {
pub use super::{certs, debug::prelude::*, input, lan, online, proto};
pub use super::{certs, debug::prelude::*, input, lan, online, proto, NetworkInfo};
}

/// Muliplier for framerate that will be used when playing an online match.
Expand Down Expand Up @@ -138,7 +138,27 @@ pub struct NetworkMatchSocket(Arc<dyn NetworkSocket>);
/// A type-erased [`ggrs::NonBlockingSocket`]
/// implementation.
#[derive(Deref, DerefMut)]
pub struct BoxedNonBlockingSocket(Box<dyn ggrs::NonBlockingSocket<usize> + 'static>);
pub struct BoxedNonBlockingSocket(Box<dyn GgrsSocket>);

impl Clone for BoxedNonBlockingSocket {
fn clone(&self) -> Self {
self.ggrs_socket()
}
}

/// Wraps [`ggrs::Message`] with included `match_id`, used to determine if message received
/// from current match.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GameMessage {
/// Socket match id
pub match_id: u8,
/// Wrapped message
pub message: ggrs::Message,
}

/// Automatically implemented for [`NetworkSocket`] + [`ggrs::NonBlockingSocket<usize>`].
pub trait GgrsSocket: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
impl<T> GgrsSocket for T where T: NetworkSocket + ggrs::NonBlockingSocket<usize> {}

impl ggrs::NonBlockingSocket<usize> for BoxedNonBlockingSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
Expand Down Expand Up @@ -170,6 +190,10 @@ pub trait NetworkSocket: Sync + Send {
fn player_is_local(&self) -> [bool; MAX_PLAYERS];
/// Get the player count for this network match.
fn player_count(&self) -> usize;

/// Increment match id so messages from previous match that are still in flight
/// will be filtered out. Used when starting new session with existing socket.
fn increment_match_id(&mut self);
}

/// The destination for a reliable network message.
Expand All @@ -180,6 +204,17 @@ pub enum SocketTarget {
All,
}

/// Resource updated each frame exposing current frame and last confirmed of online session.
#[derive(HasSchema, Copy, Clone, Default)]
pub struct NetworkInfo {
/// Current frame of simulation step
pub current_frame: i32,

/// Last confirmed frame by all clients.
/// Anything that occurred on this frame is agreed upon by all clients.
pub last_confirmed_frame: i32,
}

/// [`SessionRunner`] implementation that uses [`ggrs`] for network play.
///
/// This is where the whole `ggrs` integration is implemented.
Expand Down Expand Up @@ -207,11 +242,18 @@ pub struct GgrsSessionRunner<'a, InputTypes: NetworkInputConfig<'a>> {

/// Session runner's input collector.
pub input_collector: InputTypes::InputCollector,

/// Store copy of socket to be able to restart session runner with existing socket.
socket: BoxedNonBlockingSocket,

/// Local input delay ggrs session was initialized with
local_input_delay: usize,
}

/// The info required to create a [`GgrsSessionRunner`].
#[derive(Clone)]
pub struct GgrsSessionRunnerInfo {
/// The GGRS socket implementation to use.
/// The socket that will be converted into GGRS socket implementation.
pub socket: BoxedNonBlockingSocket,
/// The list of local players.
pub player_is_local: [bool; MAX_PLAYERS],
Expand All @@ -233,14 +275,16 @@ pub struct GgrsSessionRunnerInfo {
impl GgrsSessionRunnerInfo {
/// See [`GgrsSessionRunnerInfo`] fields for info on arguments.
pub fn new(
socket: &dyn NetworkSocket,
socket: BoxedNonBlockingSocket,
max_prediction_window: Option<usize>,
local_input_delay: Option<usize>,
) -> Self {
let player_is_local = socket.0.player_is_local();
let player_count = socket.0.player_count();
Self {
socket: socket.ggrs_socket(),
player_is_local: socket.player_is_local(),
player_count: socket.player_count(),
socket,
player_is_local,
player_count,
max_prediction_window,
local_input_delay,
}
Expand Down Expand Up @@ -299,7 +343,7 @@ where
let local_player_idx =
local_player_idx.expect("Networking player_is_local array has no local players.");

let session = builder.start_p2p_session(info.socket).unwrap();
let session = builder.start_p2p_session(info.socket.clone()).unwrap();

Self {
last_player_input: InputTypes::Dense::default(),
Expand All @@ -310,6 +354,8 @@ where
last_run: None,
network_fps: network_fps as f64,
input_collector: InputTypes::InputCollector::default(),
socket: info.socket.clone(),
local_input_delay,
}
}
}
Expand Down Expand Up @@ -458,6 +504,11 @@ where
// Input has been consumed, signal that we are in new input frame
self.input_collector.advance_frame();

world.insert_resource(NetworkInfo {
current_frame: self.session.current_frame(),
last_confirmed_frame: self.session.confirmed_frame(),
});

{
world
.resource_mut::<Time>()
Expand Down Expand Up @@ -525,4 +576,21 @@ where
.unwrap();
}
}

fn restart_session(&mut self) {
// Rebuild session info from runner + create new runner

// Increment match id so messages from previous match that are still in flight
// will be filtered out.
self.socket.0.increment_match_id();

let runner_info = GgrsSessionRunnerInfo {
socket: self.socket.clone(),
player_is_local: self.player_is_local,
player_count: self.session.num_players(),
max_prediction_window: Some(self.session.max_prediction()),
local_input_delay: Some(self.local_input_delay),
};
*self = GgrsSessionRunner::new(self.network_fps as f32, runner_info);
}
}
58 changes: 37 additions & 21 deletions framework_crates/bones_framework/src/networking/lan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,12 @@ pub enum LanMatchmakerResponse {
pub struct LanSocket {
///
pub connections: [Option<quinn::Connection>; MAX_PLAYERS],
pub ggrs_receiver: async_channel::Receiver<(usize, ggrs::Message)>,
pub ggrs_receiver: async_channel::Receiver<(usize, GameMessage)>,
pub reliable_receiver: async_channel::Receiver<(usize, Vec<u8>)>,
pub player_idx: usize,
pub player_count: usize,
/// ID for current match, messages received that do not match ID are dropped.
pub match_id: u8,
}

impl LanSocket {
Expand Down Expand Up @@ -578,7 +580,7 @@ impl LanSocket {
}
either::Either::Right(datagram_result) => match datagram_result {
Ok(data) => {
let message: ggrs::Message = postcard::from_bytes(&data)
let message: GameMessage = postcard::from_bytes(&data)
.expect("Could not deserialize net message");

// Debugging code to introduce artificial latency
Expand Down Expand Up @@ -662,29 +664,11 @@ impl LanSocket {
connections,
ggrs_receiver,
reliable_receiver,
match_id: 0,
}
}
}

impl ggrs::NonBlockingSocket<usize> for LanSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let conn = self.connections[*addr].as_ref().unwrap();

// TODO: determine a reasonable size for this buffer.
let msg_bytes = postcard::to_allocvec(msg).unwrap();
conn.send_datagram(Bytes::copy_from_slice(&msg_bytes[..]))
.ok();
}

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
messages.push(message);
}
messages
}
}

impl NetworkSocket for LanSocket {
fn send_reliable(&self, target: SocketTarget, message: &[u8]) {
let task_pool = IoTaskPool::get();
Expand Down Expand Up @@ -748,6 +732,38 @@ impl NetworkSocket for LanSocket {
fn player_is_local(&self) -> [bool; MAX_PLAYERS] {
std::array::from_fn(|i| self.connections[i].is_none() && i < self.player_count)
}

fn increment_match_id(&mut self) {
self.match_id = self.match_id.wrapping_add(1);
}
}

impl ggrs::NonBlockingSocket<usize> for LanSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let msg = GameMessage {
// Consider a way we can send message by reference and avoid clone?
message: msg.clone(),
match_id: self.match_id,
};
let conn = self.connections[*addr].as_ref().unwrap();
let message = bones_matchmaker_proto::SendProxyMessage {
target_client: bones_matchmaker_proto::TargetClient::One(*addr as u8),
message: postcard::to_allocvec(&msg).unwrap(),
};
let msg_bytes = postcard::to_allocvec(&message).unwrap();
conn.send_datagram(Bytes::copy_from_slice(&msg_bytes[..]))
.ok();
}

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
if message.1.match_id == self.match_id {
messages.push((message.0, message.1.message));
}
}
messages
}
}

fn pinger(server: BiChannelServer<PingerRequest, PingerResponse>) {
Expand Down
25 changes: 20 additions & 5 deletions framework_crates/bones_framework/src/networking/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use tracing::{info, warn};

use crate::{networking::NetworkMatchSocket, prelude::*};

use super::{BoxedNonBlockingSocket, NetworkSocket, SocketTarget, MAX_PLAYERS, NETWORK_ENDPOINT};
use super::{
BoxedNonBlockingSocket, GameMessage, NetworkSocket, SocketTarget, MAX_PLAYERS, NETWORK_ENDPOINT,
};

#[derive(Default, PartialEq, Eq, Clone, Copy)]
pub enum SearchState {
Expand Down Expand Up @@ -198,10 +200,12 @@ fn resolve_addr_blocking(addr: &str) -> anyhow::Result<SocketAddr> {
#[derive(Debug, Clone)]
pub struct OnlineSocket {
pub conn: Connection,
pub ggrs_receiver: async_channel::Receiver<(usize, ggrs::Message)>,
pub ggrs_receiver: async_channel::Receiver<(usize, GameMessage)>,
pub reliable_receiver: async_channel::Receiver<(usize, Vec<u8>)>,
pub player_idx: usize,
pub player_count: usize,
/// ID for current match, messages received that do not match ID are dropped.
pub match_id: u8,
}

impl OnlineSocket {
Expand Down Expand Up @@ -292,11 +296,11 @@ impl OnlineSocket {
reliable_receiver,
player_idx,
player_count,
match_id: 0,
}
}
}

// TODO see zig's PR
impl NetworkSocket for OnlineSocket {
fn ggrs_socket(&self) -> BoxedNonBlockingSocket {
BoxedNonBlockingSocket(Box::new(self.clone()))
Expand Down Expand Up @@ -349,13 +353,22 @@ impl NetworkSocket for OnlineSocket {
fn player_count(&self) -> usize {
self.player_count
}

fn increment_match_id(&mut self) {
// This is wrapping addition
self.match_id = self.match_id.wrapping_add(1);
}
}

impl ggrs::NonBlockingSocket<usize> for OnlineSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let msg = GameMessage {
message: msg.clone(),
match_id: self.match_id,
};
let message = bones_matchmaker_proto::SendProxyMessage {
target_client: bones_matchmaker_proto::TargetClient::One(*addr as u8),
message: postcard::to_allocvec(msg).unwrap(),
message: postcard::to_allocvec(&msg).unwrap(),
};
let msg_bytes = postcard::to_allocvec(&message).unwrap();
self.conn
Expand All @@ -366,7 +379,9 @@ impl ggrs::NonBlockingSocket<usize> for OnlineSocket {
fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
messages.push(message);
if message.1.match_id == self.match_id {
messages.push((message.0, message.1.message));
}
}
messages
}
Expand Down
13 changes: 13 additions & 0 deletions framework_crates/bones_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,16 @@ pub trait SessionRunner: Sync + Send + 'static {
/// world.resource_mut::<Time>().update_with_instant(now);
/// stages.run(world);
/// }
/// fn restart_session(&mut self) {}
/// # }
/// ```
fn step(&mut self, now: Instant, world: &mut World, stages: &mut SystemStages);

/// Restart Session Runner. This should reset accumulated time, inputs, etc.
///
/// The expectation is that current players using it may continue to, so something like a network
/// socket or player info should persist.
fn restart_session(&mut self);
}

/// The default [`SessionRunner`], which just runs the systems once every time it is run.
Expand All @@ -152,6 +159,12 @@ impl SessionRunner for DefaultSessionRunner {
world.resource_mut::<Time>().update_with_instant(now);
stages.run(world)
}

// This is a no-op as no state, but implemented this way in case that changes later.
#[allow(clippy::default_constructed_unit_structs)]
fn restart_session(&mut self) {
*self = DefaultSessionRunner::default();
}
}

/// The [`Game`] encompasses a complete bones game's logic, independent of the renderer and IO
Expand Down

0 comments on commit b7f59fc

Please sign in to comment.