Skip to content

Commit

Permalink
http_server: spawn one thread per connected client
Browse files Browse the repository at this point in the history
Drop frames if the client cannot accept them fast enough
  • Loading branch information
emilk committed Sep 17, 2021
1 parent 07b63c6 commit 1676329
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions puffin_http/CHANGELOG.md
Expand Up @@ -4,6 +4,7 @@ All notable changes to `puffin_http` will be documented in this file.


## Unreleased
* Better handle slow clients, especially when there are multiple clients.


## 0.5.1 - 2021-09-16
Expand Down
1 change: 0 additions & 1 deletion puffin_http/Cargo.toml
Expand Up @@ -16,7 +16,6 @@ include = [ "**/*.rs", "Cargo.toml"]
anyhow = "1"
log = "0.4"
puffin = { version = "0.8.0", path = "../puffin", features = ["serialization"] }
retain_mut = "0.1.3"
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
Expand Down
71 changes: 51 additions & 20 deletions puffin_http/src/server.rs
Expand Up @@ -3,9 +3,12 @@ use puffin::GlobalProfiler;
use std::{
io::Write,
net::{SocketAddr, TcpListener, TcpStream},
sync::Arc,
sync::{mpsc, Arc},
};

/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
const MAX_FRAMES_IN_QUEUE: usize = 30;

/// Listens for incoming connections
/// and streams them puffin profiler data.
///
Expand All @@ -22,8 +25,7 @@ impl Server {
.set_nonblocking(true)
.context("TCP set_nonblocking")?;

let (tx, rx): (std::sync::mpsc::Sender<Arc<puffin::FrameData>>, _) =
std::sync::mpsc::channel();
let (tx, rx): (mpsc::Sender<Arc<puffin::FrameData>>, _) = mpsc::channel();

std::thread::Builder::new()
.name("puffin-server".to_owned())
Expand Down Expand Up @@ -58,24 +60,34 @@ impl Drop for Server {
}
}

type Packet = Arc<[u8]>;

/// Listens for incoming connections
/// and streams them puffin profiler data.
struct PuffinServerImpl {
tcp_listener: TcpListener,
clients: Vec<(SocketAddr, TcpStream)>,
clients: Vec<(SocketAddr, mpsc::SyncSender<Packet>)>,
}

impl PuffinServerImpl {
fn accept_new_clients(&mut self) -> anyhow::Result<()> {
loop {
match self.tcp_listener.accept() {
Ok((stream, client_addr)) => {
stream
Ok((tcp_stream, client_addr)) => {
tcp_stream
.set_nonblocking(false)
.context("stream.set_nonblocking")?;

log::info!("{} connected", client_addr);
self.clients.push((client_addr, stream));

let (packet_tx, packet_rx) = mpsc::sync_channel(MAX_FRAMES_IN_QUEUE);

std::thread::Builder::new()
.name("puffin-server-client".to_owned())
.spawn(move || client_loop(packet_rx, client_addr, tcp_stream))
.context("Couldn't spawn thread")?;

self.clients.push((client_addr, packet_tx));
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break; // Nothing to do for now.
Expand All @@ -94,29 +106,48 @@ impl PuffinServerImpl {
}
puffin::profile_function!();

let mut message = vec![];
message
let mut packet = vec![];
packet
.write_all(&crate::PROTOCOL_VERSION.to_le_bytes())
.unwrap();
frame
.write_into(&mut message)
.write_into(&mut packet)
.context("Encode puffin frame")?;

use retain_mut::RetainMut as _;
self.clients
.retain_mut(|(addr, stream)| match stream.write_all(&message) {
let packet: Packet = packet.into();

self.clients.retain(
|(client_addr, packet_tx)| match packet_tx.try_send(packet.clone()) {
Ok(()) => true,
Err(err) => {
Err(mpsc::TrySendError::Disconnected(_)) => false,
Err(mpsc::TrySendError::Full(_)) => {
log::info!(
"puffin server failed sending to {}: {} (kind: {:?})",
addr,
err,
err.kind()
"puffin client {} is not accepting data fast enough; dropping a frame",
client_addr
);
false
true
}
});
},
);

Ok(())
}
}

fn client_loop(
packet_rx: mpsc::Receiver<Packet>,
client_addr: SocketAddr,
mut tcp_stream: TcpStream,
) {
while let Ok(packet) = packet_rx.recv() {
if let Err(err) = tcp_stream.write_all(&packet) {
log::info!(
"puffin server failed sending to {}: {} (kind: {:?})",
client_addr,
err,
err.kind()
);
break;
}
}
}

0 comments on commit 1676329

Please sign in to comment.