Skip to content

Commit

Permalink
hotfix: Dont stall the event loop if udp blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
feschber committed Dec 22, 2023
1 parent fed8e02 commit 1cefa38
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,26 @@ impl Server {

pub async fn run(&mut self) -> anyhow::Result<()> {
loop {
tokio::select! {
// safety: cancellation safe
udp_event = receive_event(&self.socket) => {
match udp_event {
Ok(e) => self.handle_udp_rx(e).await,
Err(e) => log::error!("error reading event: {e}"),
}
}
log::trace!("polling...");
tokio::select! { biased;
// safety: cancellation safe
res = self.producer.next() => {
match res {
Some(Ok((client, event))) => {
self.handle_producer_event(client,event).await;
},
Some(Err(e)) => log::error!("error reading from event producer: {e}"),
Some(Err(e)) => return Err(e.into()),
_ => break,
}
}
// safety: cancellation safe
udp_event = receive_event(&self.socket) => {
match udp_event {
Ok(e) => self.handle_udp_rx(e).await,
Err(e) => log::error!("error reading event: {e}"),
}
}
// safety: cancellation safe
stream = self.frontend.accept() => {
match stream {
Ok(s) => self.handle_frontend_stream(s).await,
Expand Down Expand Up @@ -311,14 +312,14 @@ impl Server {
match (event, addr) {
(Event::Pong(), _) => { /* ignore pong events */ }
(Event::Ping(), addr) => {
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
if let Err(e) = send_event(&self.socket, Event::Pong(), addr) {
log::error!("udp send: {}", e);
}
}
(event, addr) => {
// tell clients that we are ready to receive events
if let Event::Enter() = event {
if let Err(e) = send_event(&self.socket, Event::Leave(), addr).await {
if let Err(e) = send_event(&self.socket, Event::Leave(), addr) {
log::error!("udp send: {}", e);
}
}
Expand Down Expand Up @@ -363,7 +364,7 @@ impl Server {
&& state.last_replied.unwrap().elapsed() > Duration::from_secs(1)
{
state.last_replied = Some(Instant::now());
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
if let Err(e) = send_event(&self.socket, Event::Pong(), addr) {
log::error!("udp send: {}", e);
}
}
Expand Down Expand Up @@ -411,7 +412,7 @@ impl Server {
if let State::Receiving | State::AwaitingLeave = self.state {
self.state = State::AwaitingLeave;
if let Some(addr) = state.client.active_addr {
if let Err(e) = send_event(&self.socket, Event::Enter(), addr).await {
if let Err(e) = send_event(&self.socket, Event::Enter(), addr) {
log::error!("udp send: {}", e);
}
}
Expand All @@ -420,7 +421,7 @@ impl Server {
// otherwise we should have an address to
// transmit events to the corrensponding client
if let Some(addr) = state.client.active_addr {
if let Err(e) = send_event(&self.socket, e, addr).await {
if let Err(e) = send_event(&self.socket, e, addr) {
log::error!("udp send: {}", e);
}
}
Expand Down Expand Up @@ -461,7 +462,7 @@ impl Server {
state.last_ping = Some(Instant::now());
for addr in state.client.addrs.iter() {
log::debug!("pinging {addr}");
if let Err(e) = send_event(&self.socket, Event::Ping(), *addr).await {
if let Err(e) = send_event(&self.socket, Event::Ping(), *addr) {
if e.kind() != ErrorKind::WouldBlock {
log::error!("udp send: {}", e);
}
Expand Down Expand Up @@ -598,10 +599,11 @@ async fn receive_event(
}
}

async fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
fn send_event(sock: &UdpSocket, e: Event, addr: SocketAddr) -> Result<usize> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let data: Vec<u8> = (&e).into();
// We are currently abusing a blocking send to get the lowest possible latency.
// It may be better to set the socket to non-blocking and only send when ready.
sock.send_to(&data[..], addr).await
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the event
// producer.
sock.try_send_to(&data, addr)
}

0 comments on commit 1cefa38

Please sign in to comment.