Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(snownet): only send relay candidates if hole-punching fails #4268

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 103 additions & 70 deletions rust/connlib/snownet/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const HANDSHAKE_RATE_LIMIT: u64 = 100;
/// How long we will at most wait for a candidate from the remote.
const CANDIDATE_TIMEOUT: Duration = Duration::from_secs(10);

/// How long we will wait for holepunching to succeed before we start trying relays.
const HOLEPUNCH_TIMEOUT: Duration = Duration::from_secs(2);

/// How long we will at most wait for an [`Answer`] from the remote.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);

Expand Down Expand Up @@ -143,6 +146,10 @@ where
for allocation in self.allocations.values_mut() {
allocation.refresh(now);
}

for connection in self.connections.established.values_mut() {
connection.created_at = now;
}
Comment on lines +150 to +152
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of critical. Upon re-connect, we need to reset this timestamp to ensure we have a new grace-period and don't immediately emit all (new) relay candidates. Once #4585 is merged, I'll try to add a unit-test for this.

}

pub fn public_key(&self) -> PublicKey {
Expand Down Expand Up @@ -393,7 +400,13 @@ where
self.bindings_and_allocations_drain_events();

for (id, connection) in self.connections.iter_established_mut() {
connection.handle_timeout(id, now, &mut self.allocations, &mut self.buffered_transmits);
connection.handle_timeout(
id,
now,
&mut self.allocations,
&mut self.buffered_transmits,
&mut self.pending_events,
);
}

for (id, connection) in self.connections.initial.iter_mut() {
Expand Down Expand Up @@ -444,6 +457,7 @@ where
#[allow(clippy::too_many_arguments)]
fn init_connection(
&mut self,
id: TId,
mut agent: IceAgent,
remote: PublicKey,
key: [u8; 32],
Expand All @@ -457,7 +471,7 @@ where
/// Without such a timeout, using a tunnel after the REKEY_TIMEOUT requires handshaking a new session which delays the new application packet by 1 RTT.
const WG_KEEP_ALIVE: Option<u16> = Some(10);

Connection {
let mut connection = Connection {
agent,
tunnel: Tunn::new(
self.private_key.clone(),
Expand All @@ -474,9 +488,20 @@ where
buffer: Box::new([0u8; MAX_UDP_SIZE]),
intent_sent_at,
is_failed: false,
signalling_completed_at: now,
created_at: now,
remote_pub_key: remote,
}
fallback_relay_candidates: Default::default(),
};

connection.seed_agent_with_local_candidates(
id,
self.host_candidates.clone().into_iter(),
&mut self.bindings,
&mut self.allocations,
&mut self.pending_events,
);

connection
}

/// Attempt to add the `local` address as a host candidate.
Expand Down Expand Up @@ -659,11 +684,9 @@ where
for event in binding_events.chain(allocation_events) {
match event {
CandidateEvent::New(candidate) => {
add_local_candidate_to_all(
candidate,
&mut self.connections,
&mut self.pending_events,
);
for conn in self.connections.established.values_mut() {
conn.add_local_candidate(candidate.clone())
}
}
CandidateEvent::Invalid(candidate) => {
for (id, agent) in self.connections.agents_mut() {
Expand Down Expand Up @@ -756,9 +779,8 @@ where
pass: answer.credentials.password,
});

self.seed_agent_with_local_candidates(id, &mut agent);

let connection = self.init_connection(
id,
agent,
remote,
*initial.session_key.expose_secret(),
Expand Down Expand Up @@ -819,9 +841,8 @@ where
},
};

self.seed_agent_with_local_candidates(id, &mut agent);

let connection = self.init_connection(
id,
agent,
remote,
*offer.session_key.expose_secret(),
Expand Down Expand Up @@ -881,38 +902,6 @@ where
tracing::info!(address = %server, "Added new TURN server");
}
}

fn seed_agent_with_local_candidates(&mut self, connection: TId, agent: &mut IceAgent) {
for candidate in self.host_candidates.iter().cloned() {
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}

for candidate in self
.bindings
.values()
.filter_map(|binding| binding.candidate())
{
add_local_candidate(
connection,
agent,
candidate.clone(),
&mut self.pending_events,
);
}

for candidate in self
.allocations
.values()
.flat_map(|allocation| allocation.current_candidates())
{
add_local_candidate(
connection,
agent,
candidate.clone(),
&mut self.pending_events,
);
}
}
}

struct Connections<TId> {
Expand Down Expand Up @@ -1023,29 +1012,6 @@ enum EncodeError {
NoChannel,
}

fn add_local_candidate_to_all<TId>(
candidate: Candidate,
connections: &mut Connections<TId>,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: Copy + fmt::Display,
{
let initial_connections = connections
.initial
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent));
let established_connections = connections
.established
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent));

for (id, agent) in initial_connections.chain(established_connections) {
let _span = info_span!("connection", %id).entered();

add_local_candidate(id, agent, candidate.clone(), pending_events);
}
}

fn add_local_candidate<TId>(
id: TId,
agent: &mut IceAgent,
Expand Down Expand Up @@ -1176,7 +1142,16 @@ struct Connection {

is_failed: bool,

signalling_completed_at: Instant,
/// When this [`Connection`] was created or re-connected.
///
/// Created in this case refers to the creation of this data structure.
/// That also reflects the point in time when we started ICE.
///
/// This timestamp is reset upon [`Node::reconnect`] to allow functionality like trickling of relay candidates to work.
created_at: Instant,

/// Relay candidates we will trickle to the remote if holepunching doesn't succeed within [`HOLEPUNCH_TIMEOUT`].
fallback_relay_candidates: Vec<Candidate>,
}

/// The socket of the peer we are connected to.
Expand Down Expand Up @@ -1225,6 +1200,39 @@ impl Connection {
now.duration_since(self.intent_sent_at)
}

fn seed_agent_with_local_candidates<TId>(
&mut self,
id: TId,
host_candidates: impl Iterator<Item = Candidate>,
bindings: &mut HashMap<SocketAddr, StunBinding>,
allocations: &mut HashMap<SocketAddr, Allocation>,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: fmt::Display + Copy,
{
let binding_candidates = bindings.values().flat_map(|binding| binding.candidate());

let allocation_candidates = allocations
.values()
.flat_map(|allocation| allocation.current_candidates());

for candidate in host_candidates
.chain(binding_candidates)
.chain(allocation_candidates)
{
if candidate.kind() == CandidateKind::Relayed {
self.fallback_relay_candidates.push(candidate);
continue;
}

add_local_candidate(id, &mut self.agent, candidate, pending_events)
}
}

fn add_local_candidate(&mut self, candidate: Candidate) {
self.fallback_relay_candidates.push(candidate)
}

fn set_remote_from_wg_activity(
&mut self,
local: SocketAddr,
Expand Down Expand Up @@ -1264,7 +1272,7 @@ impl Connection {
return None;
}

Some(self.signalling_completed_at + CANDIDATE_TIMEOUT)
Some(self.created_at + CANDIDATE_TIMEOUT)
}

#[tracing::instrument(level = "info", skip_all, fields(%id))]
Expand All @@ -1274,6 +1282,7 @@ impl Connection {
now: Instant,
allocations: &mut HashMap<SocketAddr, Allocation>,
transmits: &mut VecDeque<Transmit<'static>>,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: fmt::Display + Copy,
{
Expand All @@ -1288,6 +1297,28 @@ impl Connection {
return;
}

let duration_since_created = now.duration_since(self.created_at);

if duration_since_created > HOLEPUNCH_TIMEOUT
&& !self.agent.state().is_connected()
&& !self.fallback_relay_candidates.is_empty()
{
tracing::info!("Hole-punch did not succeed after {duration_since_created:?}, sending relay candidates");

let _span = info_span!("connection", %id).entered();

for candidate in self.fallback_relay_candidates.drain(..) {
let is_new = self.agent.add_local_candidate(candidate.clone());

if is_new {
pending_events.push_back(Event::SignalIceCandidate {
connection: id,
candidate: candidate.to_sdp_string(),
});
}
}
}

// TODO: `boringtun` is impure because it calls `Instant::now`.

if now >= self.next_timer_update {
Expand Down Expand Up @@ -1337,6 +1368,8 @@ impl Connection {
source,
..
} => {
self.fallback_relay_candidates.clear();

let candidate = self
.local_candidate(source)
.expect("to only nominate existing candidates");
Expand Down
91 changes: 90 additions & 1 deletion rust/connlib/snownet/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
time::{Duration, Instant, SystemTime},
vec,
};
use str0m::{net::Protocol, Candidate};
use str0m::{net::Protocol, Candidate, CandidateKind};
use tracing::{debug_span, info_span, Span};
use tracing_subscriber::util::SubscriberInitExt;

Expand Down Expand Up @@ -103,6 +103,95 @@ fn reconnect_discovers_new_interface() {
assert_eq!(bob.failed_connections().count(), 0);
}

#[test]
fn does_not_emit_relay_candidates_if_direct_succeeds() {
let _guard = setup_tracing();

let (alice, bob) = alice_and_bob();

let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger"));
let mut alice: TestNode = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80");
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80");
let firewall = Firewall::default();
let mut clock = Clock::new();

let mut relays = [relay];

handshake(&mut alice, &mut bob, &relays, &clock);

loop {
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
break;
}

progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
}

assert!(alice
.signalled_candidates()
.all(|(_, c, _)| c.kind() != CandidateKind::Relayed));
assert!(bob
.signalled_candidates()
.all(|(_, c, _)| c.kind() != CandidateKind::Relayed));
}

#[test]
fn relay_candidates_are_emitted_with_a_delay() {
const HOLEPUNCH_TIMEOUT: Duration = Duration::from_secs(2);

let _guard = setup_tracing();

let (alice, bob) = alice_and_bob();

let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger"));
let mut alice: TestNode = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80");
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80");
let firewall = Firewall::default()
.with_block_rule("1.1.1.1:80", "2.2.2.2:80")
.with_block_rule("2.2.2.2:80", "1.1.1.1:80");
let mut clock = Clock::new();

let mut relays = [relay];

handshake(&mut alice, &mut bob, &relays, &clock);

loop {
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
break;
}

progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
}

// Assert Alice honors hole-punch timeout
{
let (relay_candidates, other_candidates) = alice
.signalled_candidates()
.partition::<Vec<_>, _>(|(_, c, _)| c.kind() == CandidateKind::Relayed);

for (_, _, other) in other_candidates {
assert!(relay_candidates
.iter()
.map(|(_, _, time)| time)
.all(|relay| relay.duration_since(other) >= HOLEPUNCH_TIMEOUT))
}
}

// Assert Bob honors hole-punch timeout
{
let (relay_candidates, other_candidates) = alice
.signalled_candidates()
.partition::<Vec<_>, _>(|(_, c, _)| c.kind() == CandidateKind::Relayed);

for (_, _, other) in other_candidates {
assert!(relay_candidates
.iter()
.map(|(_, _, time)| time)
.all(|relay| relay.duration_since(other) >= HOLEPUNCH_TIMEOUT))
}
}
}

#[test]
fn connection_times_out_after_20_seconds() {
let (mut alice, _) = alice_and_bob();
Expand Down
Loading