Skip to content

Commit

Permalink
feat(connlib): decrease connection setup latency (#4022)
Browse files Browse the repository at this point in the history
`snownet` is built in a SANS-IO way, which means it doesn't have
internal timers or IO. It is up to the upper layer to correctly check
`poll_timeout` and call `handle_timeout` as soon as that expires. _When_
we want to be called again (i.e. the result of `poll_timeout`) may
change every time `snownet`s internal state changes. This is especially
critical during the initial setup of a connection.

As we learn about our own candidates and candidates from the other
party, we form new pairs. To actually detect whether the pair is a
viable network path, we need to send a STUN request. When to send STUN
requests is controlled by time. A newly formed pair should send a STUN
request as soon as possible to minimize latency.

Previously, we did not update the timer upon which we "wake" `snownet`
using `handle_timeout`. As such, we waited unnecessarily long before
sending STUN requests to newly formed pairs. With this patch, we check
`poll_timeout` at end of the `Tunnel`'s `poll` function and immediately
call `handle_timeout` in case we need to.

Currently, `str0m` throttles updates to `handle_timeout` in 50ms blocks
which still creates some delay. With that commented out, I observed
improvements of ~0.7s for establishing new connections. Most of the
time, the 2nd ping already goes through!
  • Loading branch information
thomaseizinger committed Mar 9, 2024
1 parent ea53ae7 commit a2f289f
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions rust/connlib/tunnel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use connlib_shared::{
CallbackErrorFacade, Callbacks, Error, Result,
};
use device_channel::Device;
use futures_util::{future::BoxFuture, task::AtomicWaker, FutureExt};
use futures_util::{task::AtomicWaker, FutureExt};
use peer::PacketTransform;
use peer_store::PeerStore;
use snownet::{Node, Server};
Expand All @@ -19,6 +19,7 @@ use std::{
fmt,
hash::Hash,
io,
pin::Pin,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -122,6 +123,11 @@ where
Poll::Pending => {}
}

// After any state change, check what the new timeout is and reset it if necessary.
if self.connections_state.poll_timeout(cx).is_ready() {
cx.waker().wake_by_ref()
}

Poll::Pending
}
}
Expand Down Expand Up @@ -180,6 +186,11 @@ where
}
}

// After any state change, check what the new timeout is and reset it if necessary.
if self.connections_state.poll_timeout(cx).is_ready() {
cx.waker().wake_by_ref()
}

Poll::Pending
}
}
Expand Down Expand Up @@ -229,7 +240,7 @@ where
struct ConnectionState<TRole, TId> {
pub node: Node<TRole, TId>,
write_buf: Box<[u8; MAX_UDP_SIZE]>,
connection_pool_timeout: BoxFuture<'static, std::time::Instant>,
timeout: Option<Pin<Box<tokio::time::Sleep>>>,
stats_timer: tokio::time::Interval,
sockets: Sockets,
}
Expand All @@ -242,9 +253,9 @@ where
Ok(ConnectionState {
node: Node::new(private_key, std::time::Instant::now()),
write_buf: Box::new([0; MAX_UDP_SIZE]),
connection_pool_timeout: sleep_until(std::time::Instant::now()).boxed(),
sockets: Sockets::new()?,
stats_timer: tokio::time::interval(Duration::from_secs(60)),
timeout: None,
})
}

Expand Down Expand Up @@ -335,17 +346,6 @@ where
}

fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<Event<TId>> {
if let Poll::Ready(prev_timeout) = self.connection_pool_timeout.poll_unpin(cx) {
self.node.handle_timeout(prev_timeout);
if let Some(new_timeout) = self.node.poll_timeout() {
debug_assert_ne!(prev_timeout, new_timeout, "Timer busy loop!");

self.connection_pool_timeout = sleep_until(new_timeout).boxed();
}

cx.waker().wake_by_ref();
}

if self.stats_timer.poll_tick(cx).is_ready() {
let (node_stats, conn_stats) = self.node.stats();

Expand Down Expand Up @@ -386,6 +386,33 @@ where

Poll::Pending
}

fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if let Some(timeout) = self.node.poll_timeout() {
let timeout = tokio::time::Instant::from_std(timeout);

match self.timeout.as_mut() {
Some(existing_timeout) if existing_timeout.deadline() != timeout => {
existing_timeout.as_mut().reset(timeout)
}
Some(_) => {}
None => self.timeout = Some(Box::pin(tokio::time::sleep_until(timeout))),
}
}

if let Some(timeout) = self.timeout.as_mut() {
ready!(timeout.poll_unpin(cx));
self.node.handle_timeout(timeout.deadline().into());

return Poll::Ready(());
}

// Technically, we should set a waker here because we don't have a timer.
// But the only place where we set a timer is a few lines up.
// That is the same path that will re-poll it so there is no point in using a waker.
// We might want to consider making a `MaybeSleep` type that encapsulates a waker so we don't need to think about it as hard.
Poll::Pending
}
}

pub enum Event<TId> {
Expand All @@ -403,9 +430,3 @@ pub enum Event<TId> {
SendPacket(IpPacket<'static>),
StopPeer(TId),
}

async fn sleep_until(deadline: Instant) -> Instant {
tokio::time::sleep_until(deadline.into()).await;

deadline
}

0 comments on commit a2f289f

Please sign in to comment.