Skip to content

Commit

Permalink
Log duration since intent throughout establishing new connection
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Mar 8, 2024
1 parent 253d460 commit e509715
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 51 deletions.
75 changes: 57 additions & 18 deletions rust/connlib/snownet/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,10 @@ where
if conn.peer_socket != Some(remote_socket) {
let is_first_connection = conn.peer_socket.is_none();

tracing::info!(old = ?conn.peer_socket, new = ?remote_socket, "Updating remote socket");
tracing::info!(old = ?conn.peer_socket, new = ?remote_socket, duration_since_intent = ?conn.duration_since_intent(self.last_now), "Updating remote socket");
conn.peer_socket = Some(remote_socket);

conn.invalidate_candiates();

tracing::info!(%id, "Sending wireguard handshake");
conn.force_handshake(&mut self.allocations, self.last_now);

if is_first_connection {
Expand Down Expand Up @@ -549,6 +547,7 @@ where
key: [u8; 32],
allowed_stun_servers: HashSet<SocketAddr>,
allowed_turn_servers: HashSet<SocketAddr>,
intent_sent_at: Instant,
) -> Connection {
agent.handle_timeout(self.last_now);

Expand All @@ -575,6 +574,7 @@ where
stats: Default::default(),
buffered_transmits: Default::default(),
buffer: Box::new([0u8; MAX_UDP_SIZE]),
intent_sent_at,
}
}

Expand Down Expand Up @@ -710,6 +710,8 @@ where
continue;
}

let handshake_complete_before_decapsulate = conn.wg_handshake_complete();

let control_flow = conn.decapsulate(
from,
local,
Expand All @@ -720,6 +722,13 @@ where
now,
);

let handshake_complete_after_decapsulate = conn.wg_handshake_complete();

// I can't think of a better way to detect this ...
if !handshake_complete_before_decapsulate && handshake_complete_after_decapsulate {
tracing::info!(%id, duration_since_intent = ?conn.duration_since_intent(now), "Completed wireguard handshake");
}

return match control_flow {
ControlFlow::Continue(c) => ControlFlow::Continue((id, c)),
ControlFlow::Break(b) => ControlFlow::Break(b),
Expand Down Expand Up @@ -747,6 +756,8 @@ where
id: TId,
allowed_stun_servers: HashSet<SocketAddr>,
allowed_turn_servers: HashSet<(SocketAddr, String, String, String)>,
intent_sent_at: Instant,
now: Instant,
) -> Offer {
if self.connections.initial.remove(&id).is_some() {
tracing::info!("Replacing existing initial connection");
Expand Down Expand Up @@ -779,27 +790,27 @@ where
},
};

let existing = self.connections.initial.insert(
id,
InitialConnection {
agent,
session_key,
stun_servers: allowed_stun_servers,
turn_servers: allowed_turn_servers,
created_at: self.last_now,
},
);
let initial_connection = InitialConnection {
agent,
session_key,
stun_servers: allowed_stun_servers,
turn_servers: allowed_turn_servers,
created_at: now,
intent_sent_at,
};
let duration_since_intent = initial_connection.duration_since_intent(now);

let existing = self.connections.initial.insert(id, initial_connection);
debug_assert!(existing.is_none());

tracing::info!("Establishing new connection");
tracing::info!(?duration_since_intent, "Establishing new connection");

params
}

/// Accept an [`Answer`] from the remote for a connection previously created via [`Node::new_connection`].
#[tracing::instrument(level = "info", skip_all, fields(%id))]
pub fn accept_answer(&mut self, id: TId, remote: PublicKey, answer: Answer) {
pub fn accept_answer(&mut self, id: TId, remote: PublicKey, answer: Answer, now: Instant) {
let Some(initial) = self.connections.initial.remove(&id) else {
tracing::debug!("No initial connection state, ignoring answer"); // This can happen if the connection setup timed out.
return;
Expand All @@ -824,11 +835,13 @@ where
*initial.session_key.expose_secret(),
initial.stun_servers,
initial.turn_servers,
initial.intent_sent_at,
);
let duration_since_intent = connection.duration_since_intent(now);

let existing = self.connections.established.insert(id, connection);

tracing::info!(remote = %hex::encode(remote.as_bytes()), "Signalling protocol completed");
tracing::info!(?duration_since_intent, remote = %hex::encode(remote.as_bytes()), "Signalling protocol completed");

debug_assert!(existing.is_none());
}
Expand All @@ -851,6 +864,7 @@ where
remote: PublicKey,
allowed_stun_servers: HashSet<SocketAddr>,
allowed_turn_servers: HashSet<(SocketAddr, String, String, String)>,
now: Instant,
) -> Answer {
debug_assert!(
!self.connections.initial.contains_key(&id),
Expand Down Expand Up @@ -896,6 +910,7 @@ where
*offer.session_key.expose_secret(),
allowed_stun_servers,
allowed_turn_servers,
now, // Technically, this isn't fully correct because gateways don't send intents so we just use the current time.
);
let existing = self.connections.established.insert(id, connection);

Expand Down Expand Up @@ -1221,6 +1236,13 @@ struct InitialConnection {
turn_servers: HashSet<SocketAddr>,

created_at: Instant,
intent_sent_at: Instant,
}

impl InitialConnection {
fn duration_since_intent(&self, now: Instant) -> Duration {
now.duration_since(self.intent_sent_at)
}
}

struct Connection {
Expand All @@ -1240,7 +1262,9 @@ struct Connection {
turn_servers: HashSet<SocketAddr>,

stats: ConnectionStats,

buffer: Box<[u8; MAX_UDP_SIZE]>,
intent_sent_at: Instant,
}

/// The socket of the peer we are connected to.
Expand Down Expand Up @@ -1281,6 +1305,14 @@ impl Connection {
from_connected_remote || from_possible_remote
}

fn wg_handshake_complete(&self) -> bool {
self.tunnel.time_since_last_handshake().is_some()
}

fn duration_since_intent(&self, now: Instant) -> Duration {
now.duration_since(self.intent_sent_at)
}

fn set_remote_from_wg_activity(
&mut self,
local: SocketAddr,
Expand Down Expand Up @@ -1391,7 +1423,12 @@ impl Connection {
TunnResult::Done => {}
TunnResult::Err(e) => return Err(e),
TunnResult::WriteToNetwork(b) => {
make_owned_transmit(peer_socket, b, allocations, now);
self.buffered_transmits.extend(make_owned_transmit(
peer_socket,
b,
allocations,
now,
));
}
_ => panic!("Unexpected result from update_timers"),
};
Expand Down Expand Up @@ -1503,7 +1540,8 @@ impl Connection {
.peer_socket
.expect("cannot force handshake without socket");

make_owned_transmit(socket, bytes, allocations, now);
self.buffered_transmits
.extend(make_owned_transmit(socket, bytes, allocations, now));
}

/// Invalidates all local candidates with a lower or equal priority compared to the nominated one.
Expand Down Expand Up @@ -1543,6 +1581,7 @@ impl Connection {
}
}

#[must_use]
fn make_owned_transmit(
socket: PeerSocket,
message: &[u8],
Expand Down
77 changes: 59 additions & 18 deletions rust/connlib/snownet/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,57 @@ use str0m::{net::Protocol, Candidate};

#[test]
fn connection_times_out_after_10_seconds() {
let start = Instant::now();
let mut alice = ClientNode::<u64>::new(
StaticSecret::random_from_rng(rand::thread_rng()),
Instant::now(),
);

let mut alice =
ClientNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()), start);
let created_at = Instant::now();

let _ = alice.new_connection(1, HashSet::new(), HashSet::new());
alice.handle_timeout(start + Duration::from_secs(20));
let _ = alice.new_connection(
1,
HashSet::new(),
HashSet::new(),
Instant::now(),
created_at,
);
alice.handle_timeout(created_at + Duration::from_secs(20));

assert_eq!(alice.poll_event().unwrap(), Event::ConnectionFailed(1));
}

#[test]
fn answer_after_stale_connection_does_not_panic() {
let start = Instant::now();
let mut alice = ClientNode::<u64>::new(
StaticSecret::random_from_rng(rand::thread_rng()),
Instant::now(),
);
let mut bob = ServerNode::<u64>::new(
StaticSecret::random_from_rng(rand::thread_rng()),
Instant::now(),
);

let mut alice =
ClientNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()), start);
let mut bob = ServerNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()), start);
let created_at = Instant::now();

let offer = alice.new_connection(1, HashSet::new(), HashSet::new());
let answer =
bob.accept_connection(1, offer, alice.public_key(), HashSet::new(), HashSet::new());
let offer = alice.new_connection(
1,
HashSet::new(),
HashSet::new(),
Instant::now(),
created_at,
);
let answer = bob.accept_connection(
1,
offer,
alice.public_key(),
HashSet::new(),
HashSet::new(),
created_at,
);

alice.handle_timeout(start + Duration::from_secs(10));
alice.handle_timeout(created_at + Duration::from_secs(10));

alice.accept_answer(1, bob.public_key(), answer);
alice.accept_answer(1, bob.public_key(), answer, Instant::now());
}

#[test]
Expand All @@ -54,18 +79,30 @@ fn only_generate_candidate_event_after_answer() {
Instant::now(),
);

let offer = alice.new_connection(1, HashSet::new(), HashSet::new());
let offer = alice.new_connection(
1,
HashSet::new(),
HashSet::new(),
Instant::now(),
Instant::now(),
);

assert_eq!(
alice.poll_event(),
None,
"no event to be emitted before accepting the answer"
);

let answer =
bob.accept_connection(1, offer, alice.public_key(), HashSet::new(), HashSet::new());
let answer = bob.accept_connection(
1,
offer,
alice.public_key(),
HashSet::new(),
HashSet::new(),
Instant::now(),
);

alice.accept_answer(1, bob.public_key(), answer);
alice.accept_answer(1, bob.public_key(), answer, Instant::now());

assert!(iter::from_fn(|| alice.poll_event()).any(|ev| ev
== Event::SignalIceCandidate {
Expand All @@ -87,6 +124,8 @@ fn second_connection_with_same_relay_reuses_allocation() {
1,
HashSet::new(),
HashSet::from([relay("user1", "pass1", "realm1")]),
Instant::now(),
Instant::now(),
);

let transmit = alice.poll_transmit().unwrap();
Expand All @@ -97,6 +136,8 @@ fn second_connection_with_same_relay_reuses_allocation() {
2,
HashSet::new(),
HashSet::from([relay("user1", "pass1", "realm1")]),
Instant::now(),
Instant::now(),
);

assert!(alice.poll_transmit().is_none());
Expand Down
18 changes: 8 additions & 10 deletions rust/connlib/tunnel/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ pub struct ClientState {
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct AwaitingConnectionDetails {
domain: Option<Dname>,
pub(crate) struct AwaitingConnectionDetails {
pub domain: Option<Dname>,
gateways: HashSet<GatewayId>,
last_intent_sent_at: Instant,
pub last_intent_sent_at: Instant,
}

impl ClientState {
Expand Down Expand Up @@ -375,15 +375,13 @@ impl ClientState {
}
}

pub(crate) fn get_awaiting_connection_domain(
pub(crate) fn get_awaiting_connection(
&self,
resource: &ResourceId,
) -> Result<&Option<Dname>, ConnlibError> {
Ok(&self
.awaiting_connection
) -> Result<&AwaitingConnectionDetails, ConnlibError> {
self.awaiting_connection
.get(resource)
.ok_or(Error::UnexpectedConnectionDetails)?
.domain)
.ok_or(Error::UnexpectedConnectionDetails)
}

pub(crate) fn attempt_to_reuse_connection(
Expand All @@ -396,7 +394,7 @@ impl ClientState {
.get(&resource)
.ok_or(Error::UnknownResource)?;

let domain = self.get_awaiting_connection_domain(&resource)?.clone();
let domain = self.get_awaiting_connection(&resource)?.domain.clone();

if self.is_connected_to(resource, &domain) {
return Err(Error::UnexpectedConnectionDetails);
Expand Down
Loading

0 comments on commit e509715

Please sign in to comment.