diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 3b280ef5b..2f5ed4736 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -171,6 +171,11 @@ impl ExpectedInboundTracker { fn contains(&self, addr: SocketAddr) -> bool { self.entries.contains_key(&addr) } + + #[cfg(test)] + fn transactions_for(&self, addr: SocketAddr) -> Option>> { + self.entries.get(&addr).map(|entry| vec![entry.transaction]) + } } async fn run_driver( @@ -324,6 +329,10 @@ mod tests { tracker.register(peer.clone(), None, false); let new_tx = Transaction::new::(); tracker.register(peer.clone(), Some(new_tx), true); + let transactions = tracker + .transactions_for(peer.addr) + .expect("entry should exist"); + assert_eq!(transactions, vec![Some(new_tx)]); let entry = tracker .consume(peer.addr) diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index d1f4fcec3..b3464d370 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -16,6 +16,7 @@ use std::{ use dashmap::{DashMap, DashSet}; use either::Either; use freenet_stdlib::prelude::ContractKey; +use parking_lot::RwLock; use tokio::sync::mpsc; use tracing::Instrument; @@ -26,7 +27,8 @@ use crate::{ message::{MessageStats, NetMessage, NodeEvent, Transaction, TransactionType}, node::PeerId, operations::{ - get::GetOp, put::PutOp, subscribe::SubscribeOp, update::UpdateOp, OpEnum, OpError, + connect::ConnectForwardEstimator, get::GetOp, put::PutOp, subscribe::SubscribeOp, + update::UpdateOp, OpEnum, OpError, }, ring::{ConnectionManager, LiveTransactionTracker, Ring}, }; @@ -203,6 +205,7 @@ pub(crate) struct OpManager { pub ch_outbound: Arc>, new_transactions: tokio::sync::mpsc::Sender, pub result_router_tx: mpsc::Sender<(Transaction, HostResult)>, + pub(crate) connect_forward_estimator: Arc>, /// Indicates whether the peer is ready to process client operations. /// For gateways: always true (peer_id is set from config) /// For regular peers: true only after first successful network handshake sets peer_id @@ -222,6 +225,7 @@ impl Clone for OpManager { ch_outbound: self.ch_outbound.clone(), new_transactions: self.new_transactions.clone(), result_router_tx: self.result_router_tx.clone(), + connect_forward_estimator: self.connect_forward_estimator.clone(), peer_ready: self.peer_ready.clone(), is_gateway: self.is_gateway, sub_op_tracker: self.sub_op_tracker.clone(), @@ -255,6 +259,7 @@ impl OpManager { tracing::info_span!(parent: current_span, "garbage_cleanup_task") }; let sub_op_tracker = SubOperationTracker::new(); + let connect_forward_estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new())); GlobalExecutor::spawn( garbage_cleanup_task( @@ -287,6 +292,7 @@ impl OpManager { ch_outbound: Arc::new(ch_outbound), new_transactions, result_router_tx, + connect_forward_estimator, peer_ready, is_gateway, sub_op_tracker, diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index 84abbcbf1..b3a0802ee 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -165,6 +165,7 @@ impl NodeP2P { ideal_location, ttl, target_connections, + self.op_manager.connect_forward_estimator.clone(), ); tracing::debug!( diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 42f568e6b..02535fc42 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use futures::{stream::FuturesUnordered, StreamExt}; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::task::{self, JoinHandle}; @@ -20,9 +21,14 @@ use crate::message::{InnerMessage, NetMessage, NetMessageV1, NodeEvent, Transact use crate::node::{IsOperationCompleted, NetworkBridge, OpManager, PeerId}; use crate::operations::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::ring::PeerKeyLocation; +use crate::router::{EstimatorType, IsotonicEstimator, IsotonicEvent}; +use crate::transport::TransportKeypair; use crate::util::{Backoff, Contains, IterExt}; use freenet_stdlib::client_api::HostResponse; +const FORWARD_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(20); +const RECENCY_COOLDOWN: Duration = Duration::from_secs(30); + /// Top-level message envelope used by the new connect handshake. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) enum ConnectMsg { @@ -180,6 +186,7 @@ pub(crate) trait RelayContext { desired_location: Location, visited: &[PeerKeyLocation], recency: &HashMap, + estimator: &ConnectForwardEstimator, ) -> Option; } @@ -192,12 +199,76 @@ pub(crate) struct RelayActions { pub observed_address: Option<(PeerKeyLocation, SocketAddr)>, } +#[derive(Debug, Clone)] +pub(crate) struct ForwardAttempt { + peer: PeerKeyLocation, + desired: Location, + sent_at: Instant, +} + +#[derive(Debug, Clone)] +pub(crate) struct ConnectForwardEstimator { + estimator: IsotonicEstimator, +} + +impl ConnectForwardEstimator { + pub(crate) fn new() -> Self { + // Seed with neutral points to allow construction without history. This estimator + // learns, per-node, how often downstream peers accept/complete forwarded Connect + // requests so we can bias forwarding toward peers likely to have capacity. + let key = TransportKeypair::new(); + let dummy_peer = PeerKeyLocation { + peer: PeerId::new("127.0.0.1:0".parse().unwrap(), key.public().clone()), + location: Some(Location::new(0.0)), + }; + let seed_events = [ + IsotonicEvent { + peer: dummy_peer.clone(), + contract_location: Location::new(0.0), + result: 0.5, + }, + IsotonicEvent { + peer: dummy_peer, + contract_location: Location::new(0.5), + result: 0.5, + }, + ]; + + Self { + estimator: IsotonicEstimator::new(seed_events, EstimatorType::Negative), + } + } + + fn record(&mut self, peer: &PeerKeyLocation, desired: Location, success: bool) { + if peer.location.is_none() { + return; + } + // Treat each downstream forward attempt as a Bernoulli outcome for this peer and + // desired ring location; these accumulate to bias future routing choices. + let event = IsotonicEvent { + peer: peer.clone(), + contract_location: desired, + result: if success { 1.0 } else { 0.0 }, + }; + self.estimator.add_event(event); + } + + fn estimate(&self, peer: &PeerKeyLocation, desired: Location) -> Option { + peer.location?; + self.estimator + .estimate_retrieval_time(peer, desired) + .ok() + .map(|p| p.clamp(0.0, 1.0)) + } +} impl RelayState { pub(crate) fn handle_request( &mut self, ctx: &C, observed_remote: &PeerKeyLocation, recency: &HashMap, + forward_attempts: &mut HashMap, + estimator: &ConnectForwardEstimator, ) -> RelayActions { let mut actions = RelayActions::default(); push_unique_peer(&mut self.request.visited, observed_remote.clone()); @@ -240,6 +311,7 @@ impl RelayState { self.request.desired_location, &self.request.visited, recency, + estimator, ) { Some(next) => { let dist = ring_distance(next.location, Some(self.request.desired_location)); @@ -257,6 +329,14 @@ impl RelayState { let forward_snapshot = forward_req.clone(); self.forwarded_to = Some(next.clone()); self.request = forward_req; + forward_attempts.insert( + next.peer.clone(), + ForwardAttempt { + peer: next.clone(), + desired: self.request.desired_location, + sent_at: Instant::now(), + }, + ); actions.forward = Some((next, forward_snapshot)); } None => { @@ -309,6 +389,7 @@ impl RelayContext for RelayEnv<'_> { desired_location: Location, visited: &[PeerKeyLocation], recency: &HashMap, + estimator: &ConnectForwardEstimator, ) -> Option { let skip = VisitedPeerIds { peers: visited }; let router = self.op_manager.ring.router.read(); @@ -318,33 +399,51 @@ impl RelayContext for RelayEnv<'_> { skip, ); - // Prefer least recently forwarded peers. Missing recency wins; otherwise pick the oldest - // recency bucket, then let the router choose among that bucket. This keeps routing bias - // toward the target while avoiding hammering a single neighbor. - let mut best_key: Option> = None; - let mut best: Vec = Vec::new(); + let now = Instant::now(); + let mut scored: Vec<(f64, PeerKeyLocation)> = Vec::new(); + let mut eligible: Vec = Vec::new(); + for cand in candidates { - let key = recency.get(&cand.peer).cloned(); - match best_key { - None => { - best_key = Some(key); - best = vec![cand.clone()]; + if let Some(ts) = recency.get(&cand.peer) { + if now.duration_since(*ts) < RECENCY_COOLDOWN { + continue; } - Some(k) => { - if key < k { - best_key = Some(key); - best = vec![cand.clone()]; - } else if key == k { - best.push(cand.clone()); - } + } + + if cand.location.is_some() { + if let Some(score) = estimator.estimate(&cand, desired_location) { + scored.push((score, cand.clone())); + continue; } + // Keep candidates without estimates for fallback. + eligible.push(cand.clone()); + continue; + } + eligible.push(cand.clone()); + } + + if !scored.is_empty() { + let best_score = scored + .iter() + .map(|(s, _)| *s) + .max_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap_or(0.0); + let best: Vec<_> = scored + .into_iter() + .filter(|(s, _)| (*s - best_score).abs() < f64::EPSILON) + .map(|(_, c)| c) + .collect(); + if !best.is_empty() { + return router.select_peer(best.iter(), desired_location).cloned(); } } - if best.is_empty() { + if eligible.is_empty() { None } else { - router.select_peer(best.iter(), desired_location).cloned() + router + .select_peer(eligible.iter(), desired_location) + .cloned() } } } @@ -399,9 +498,32 @@ pub(crate) struct ConnectOp { /// neighbors when no acceptors are available. Peers without an entry are treated as /// immediately eligible. recency: HashMap, + forward_attempts: HashMap, + connect_forward_estimator: Arc>, } impl ConnectOp { + fn record_forward_outcome(&mut self, peer: &PeerKeyLocation, desired: Location, success: bool) { + self.forward_attempts.remove(&peer.peer); + self.connect_forward_estimator + .write() + .record(peer, desired, success); + } + + fn expire_forward_attempts(&mut self, now: Instant) { + let mut expired = Vec::new(); + for (peer, attempt) in self.forward_attempts.iter() { + if now.duration_since(attempt.sent_at) >= FORWARD_ATTEMPT_TIMEOUT { + expired.push((peer.clone(), attempt.desired)); + } + } + for (peer, desired) in expired { + if let Some(attempt) = self.forward_attempts.remove(&peer) { + self.record_forward_outcome(&attempt.peer, desired, false); + } + } + } + #[allow(clippy::too_many_arguments)] pub(crate) fn new_joiner( id: Transaction, @@ -410,6 +532,7 @@ impl ConnectOp { observed_address: Option, gateway: Option, backoff: Option, + connect_forward_estimator: Arc>, ) -> Self { let state = ConnectState::WaitingForResponses(JoinerState { target_connections, @@ -424,6 +547,8 @@ impl ConnectOp { backoff, desired_location: Some(desired_location), recency: HashMap::new(), + forward_attempts: HashMap::new(), + connect_forward_estimator, } } @@ -431,6 +556,7 @@ impl ConnectOp { id: Transaction, upstream: PeerKeyLocation, request: ConnectRequest, + connect_forward_estimator: Arc>, ) -> Self { let state = ConnectState::Relaying(Box::new(RelayState { upstream, @@ -446,6 +572,8 @@ impl ConnectOp { backoff: None, desired_location: None, recency: HashMap::new(), + forward_attempts: HashMap::new(), + connect_forward_estimator, } } @@ -487,6 +615,7 @@ impl ConnectOp { desired_location: Location, ttl: u8, target_connections: usize, + connect_forward_estimator: Arc>, ) -> (Transaction, Self, ConnectMsg) { let mut visited = vec![own.clone()]; push_unique_peer(&mut visited, target.clone()); @@ -506,6 +635,7 @@ impl ConnectOp { Some(own.peer.addr), Some(target.clone()), None, + connect_forward_estimator, ); let msg = ConnectMsg::Request { @@ -554,7 +684,9 @@ impl ConnectOp { ctx: &C, upstream: PeerKeyLocation, request: ConnectRequest, + estimator: &ConnectForwardEstimator, ) -> RelayActions { + self.expire_forward_attempts(Instant::now()); if !matches!(self.state, Some(ConnectState::Relaying(_))) { self.state = Some(ConnectState::Relaying(Box::new(RelayState { upstream: upstream.clone(), @@ -570,7 +702,13 @@ impl ConnectOp { state.upstream = upstream; state.request = request; let upstream_snapshot = state.upstream.clone(); - state.handle_request(ctx, &upstream_snapshot, &self.recency) + state.handle_request( + ctx, + &upstream_snapshot, + &self.recency, + &mut self.forward_attempts, + estimator, + ) } _ => RelayActions::default(), } @@ -607,9 +745,12 @@ impl Operation for ConnectOp { } Ok(None) => { let op = match msg { - ConnectMsg::Request { from, payload, .. } => { - ConnectOp::new_relay(tx, from.clone(), payload.clone()) - } + ConnectMsg::Request { from, payload, .. } => ConnectOp::new_relay( + tx, + from.clone(), + payload.clone(), + op_manager.connect_forward_estimator.clone(), + ), _ => { tracing::debug!(%tx, "connect received message without existing state"); return Err(OpError::OpNotPresent(tx)); @@ -633,7 +774,12 @@ impl Operation for ConnectOp { match msg { ConnectMsg::Request { from, payload, .. } => { let env = RelayEnv::new(op_manager); - let actions = self.handle_request(&env, from.clone(), payload.clone()); + let estimator = { + let estimator_guard = self.connect_forward_estimator.read(); + estimator_guard.clone() + }; + let actions = + self.handle_request(&env, from.clone(), payload.clone(), &estimator); if let Some((target, address)) = actions.observed_address { let msg = ConnectMsg::ObservedAddress { @@ -747,7 +893,17 @@ impl Operation for ConnectOp { Ok(store_operation_state(&mut self)) } else if let Some(ConnectState::Relaying(state)) = self.state.as_mut() { - let upstream = state.upstream.clone(); + let (forwarded, desired, upstream) = { + let st = state; + ( + st.forwarded_to.clone(), + st.request.desired_location, + st.upstream.clone(), + ) + }; + if let Some(fwd) = forwarded { + self.record_forward_outcome(&fwd, desired, true); + } tracing::debug!( %upstream.peer, acceptor = %sender.peer, @@ -818,6 +974,8 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - backoff: op.backoff.clone(), desired_location: op.desired_location, recency: op.recency.clone(), + forward_attempts: op.forward_attempts.clone(), + connect_forward_estimator: op.connect_forward_estimator.clone(), })) }), } @@ -842,15 +1000,6 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; - tracing::debug!( - peer = %gateway.peer, - reserved_connections = op_manager - .ring - .connection_manager - .get_reserved_connections(), - "join_ring_request: evaluating gateway connection attempt" - ); - if !op_manager .ring .connection_manager @@ -890,6 +1039,7 @@ pub(crate) async fn join_ring_request( location, ttl, target_connections, + op_manager.connect_forward_estimator.clone(), ); op.gateway = Some(Box::new(gateway.clone())); @@ -935,71 +1085,56 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); - let mut in_flight_gateways = HashSet::new(); - loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); - let available_gateways: Vec<_> = unconnected_gateways - .into_iter() - .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) - .collect(); tracing::debug!( - open_connections = open_conns, - inflight_gateway_dials = in_flight_gateways.len(), - available_gateways = available_gateways.len(), - "Connection status before join attempt" + "Connection status: open_connections = {}, unconnected_gateways = {}", + open_conns, + unconnected_gateways.len() ); - let available_count = available_gateways.len(); + let unconnected_count = unconnected_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(available_count) + number_of_parallel_connections.min(unconnected_count) ); - let mut select_all = FuturesUnordered::new(); - for gateway in available_gateways + let select_all = FuturesUnordered::new(); + for gateway in unconnected_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); - in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); - let gateway_clone = gateway.clone(); select_all.push(async move { - ( - join_ring_request(None, &gateway_clone, &op_manager).await, - gateway_clone, - ) + (join_ring_request(None, gateway, &op_manager).await, gateway) }); } - while let Some((res, gateway)) = select_all.next().await { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); + select_all + .for_each(|(res, gateway)| async move { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError( + crate::node::ConnectionError::UnwantedConnection + ) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); + } } - } - in_flight_gateways.remove(&gateway.peer); - } - } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { - tracing::debug!( - open_connections = open_conns, - inflight = in_flight_gateways.len(), - "Below threshold but all gateways are already connected or in-flight" - ); + }) + .await; } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", @@ -1082,6 +1217,7 @@ mod tests { _desired_location: Location, _visited: &[PeerKeyLocation], _recency: &HashMap, + _estimator: &ConnectForwardEstimator, ) -> Option { self.next_hop.clone() } @@ -1096,6 +1232,41 @@ mod tests { } } + #[test] + fn forward_estimator_handles_missing_location() { + let mut estimator = ConnectForwardEstimator::new(); + let key = TransportKeypair::new(); + let peer = PeerKeyLocation { + peer: PeerId::new("127.0.0.1:1111".parse().unwrap(), key.public().clone()), + location: None, + }; + estimator.record(&peer, Location::new(0.25), true); + } + + #[test] + fn expired_forward_attempts_are_cleared() { + let mut op = ConnectOp::new_joiner( + Transaction::new::(), + Location::new(0.1), + 1, + None, + None, + None, + Arc::new(RwLock::new(ConnectForwardEstimator::new())), + ); + let peer = make_peer(2000); + op.forward_attempts.insert( + peer.peer.clone(), + ForwardAttempt { + peer: peer.clone(), + desired: Location::new(0.2), + sent_at: Instant::now() - FORWARD_ATTEMPT_TIMEOUT - Duration::from_secs(1), + }, + ); + op.expire_forward_attempts(Instant::now()); + assert!(op.forward_attempts.is_empty()); + } + #[test] fn relay_accepts_when_policy_allows() { let self_loc = make_peer(4000); @@ -1116,7 +1287,10 @@ mod tests { let ctx = TestRelayContext::new(self_loc.clone()); let recency = HashMap::new(); - let actions = state.handle_request(&ctx, &joiner, &recency); + let mut forward_attempts = HashMap::new(); + let estimator = ConnectForwardEstimator::new(); + let actions = + state.handle_request(&ctx, &joiner, &recency, &mut forward_attempts, &estimator); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); @@ -1147,7 +1321,10 @@ mod tests { .accept(false) .next_hop(Some(next_hop.clone())); let recency = HashMap::new(); - let actions = state.handle_request(&ctx, &joiner, &recency); + let mut forward_attempts = HashMap::new(); + let estimator = ConnectForwardEstimator::new(); + let actions = + state.handle_request(&ctx, &joiner, &recency, &mut forward_attempts, &estimator); assert!(actions.accept_response.is_none()); let (forward_to, request) = actions.forward.expect("expected forward"); @@ -1180,7 +1357,10 @@ mod tests { let ctx = TestRelayContext::new(self_loc); let recency = HashMap::new(); - let actions = state.handle_request(&ctx, &joiner, &recency); + let mut forward_attempts = HashMap::new(); + let estimator = ConnectForwardEstimator::new(); + let actions = + state.handle_request(&ctx, &joiner, &recency, &mut forward_attempts, &estimator); let (target, addr) = actions .observed_address @@ -1215,8 +1395,14 @@ mod tests { let desired = Location::random(); let ttl = 5; let own = make_peer(7300); - let (_tx, op, msg) = - ConnectOp::initiate_join_request(own.clone(), target.clone(), desired, ttl, 2); + let (_tx, op, msg) = ConnectOp::initiate_join_request( + own.clone(), + target.clone(), + desired, + ttl, + 2, + Arc::new(RwLock::new(ConnectForwardEstimator::new())), + ); match msg { ConnectMsg::Request { @@ -1255,11 +1441,17 @@ mod tests { }; let tx = Transaction::new::(); - let mut relay_op = ConnectOp::new_relay(tx, joiner.clone(), request.clone()); + let mut relay_op = ConnectOp::new_relay( + tx, + joiner.clone(), + request.clone(), + Arc::new(RwLock::new(ConnectForwardEstimator::new())), + ); let ctx = TestRelayContext::new(relay_a.clone()) .accept(false) .next_hop(Some(relay_b.clone())); - let actions = relay_op.handle_request(&ctx, joiner.clone(), request.clone()); + let estimator = ConnectForwardEstimator::new(); + let actions = relay_op.handle_request(&ctx, joiner.clone(), request.clone(), &estimator); let (forward_target, forward_request) = actions .forward @@ -1275,11 +1467,20 @@ mod tests { ); // Second hop should accept and notify the joiner. - let mut accepting_relay = - ConnectOp::new_relay(tx, relay_a.clone(), forward_request.clone()); + let mut accepting_relay = ConnectOp::new_relay( + tx, + relay_a.clone(), + forward_request.clone(), + Arc::new(RwLock::new(ConnectForwardEstimator::new())), + ); let ctx_accept = TestRelayContext::new(relay_b.clone()); - let accept_actions = - accepting_relay.handle_request(&ctx_accept, relay_a.clone(), forward_request); + let estimator = ConnectForwardEstimator::new(); + let accept_actions = accepting_relay.handle_request( + &ctx_accept, + relay_a.clone(), + forward_request, + &estimator, + ); let response = accept_actions .accept_response diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index b23bf8ebe..e11321b90 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -659,6 +659,7 @@ impl Ring { ideal_location, ttl, target_connections, + op_manager.connect_forward_estimator.clone(), ); live_tx_tracker.add_transaction(query_target.peer.clone(), tx); diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index a02cb8034..696c5ef98 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -14,13 +14,14 @@ const MIN_POINTS_FOR_REGRESSION: usize = 5; /// outcome of the peer's previous requests. #[derive(Debug, Clone, Serialize)] -pub(super) struct IsotonicEstimator { +pub(crate) struct IsotonicEstimator { pub global_regression: IsotonicRegression, pub peer_adjustments: HashMap, } impl IsotonicEstimator { - // Define a constant for the adjustment prior size. + // Minimum sample size before we apply per-peer adjustments; keeps peer curves from being + // dominated by sparse/noisy data. const ADJUSTMENT_PRIOR_SIZE: u64 = 10; /// Creates a new `PeerOutcomeEstimator` from a list of historical events. @@ -48,9 +49,9 @@ impl IsotonicEstimator { } .expect("Failed to create isotonic regression"); - let adjustment_prior_size = 20; + let adjustment_prior_size = Self::ADJUSTMENT_PRIOR_SIZE; let global_regression_big_enough_to_estimate_peer_adjustments = - global_regression.len() >= adjustment_prior_size; + global_regression.len() >= adjustment_prior_size as usize; let mut peer_adjustments: HashMap = HashMap::new(); @@ -95,9 +96,9 @@ impl IsotonicEstimator { self.global_regression.add_points(&[point]); - let adjustment_prior_size = 20; + let adjustment_prior_size = Self::ADJUSTMENT_PRIOR_SIZE; let global_regression_big_enough_to_estimate_peer_adjustments = - self.global_regression.len() >= adjustment_prior_size; + self.global_regression.len() >= adjustment_prior_size as usize; if global_regression_big_enough_to_estimate_peer_adjustments { let adjustment = event.result @@ -126,9 +127,13 @@ impl IsotonicEstimator { return Err(EstimationError::InsufficientData); } - let distance: f64 = contract_location.distance(peer.location.unwrap()).as_f64(); + let peer_location = peer.location.ok_or(EstimationError::InsufficientData)?; + let distance: f64 = contract_location.distance(peer_location).as_f64(); - let global_estimate = self.global_regression.interpolate(distance).unwrap(); + let global_estimate = self + .global_regression + .interpolate(distance) + .ok_or(EstimationError::InsufficientData)?; // Regression can sometimes produce negative estimates let global_estimate = global_estimate.max(0.0); @@ -153,7 +158,7 @@ impl IsotonicEstimator { } } -pub(super) enum EstimatorType { +pub(crate) enum EstimatorType { /// Where the estimated value is expected to increase as distance increases Positive, /// Where the estimated value is expected to decrease as distance increases @@ -161,7 +166,7 @@ pub(super) enum EstimatorType { } #[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub(super) enum EstimationError { +pub(crate) enum EstimationError { #[error("Insufficient data for estimation")] InsufficientData, } @@ -169,7 +174,7 @@ pub(super) enum EstimationError { /// A routing event is a single request to a peer for a contract, and some value indicating /// the result of the request, such as the time it took to retrieve the contract. #[derive(Debug, Clone)] -pub(super) struct IsotonicEvent { +pub(crate) struct IsotonicEvent { pub peer: PeerKeyLocation, pub contract_location: Location, /// The result of the routing event, which is used to train the estimator, typically the time @@ -180,12 +185,17 @@ pub(super) struct IsotonicEvent { impl IsotonicEvent { fn route_distance(&self) -> Distance { - self.contract_location.distance(self.peer.location.unwrap()) + let peer_location = self + .peer + .location + .ok_or(EstimationError::InsufficientData) + .expect("IsotonicEvent should always carry a peer location"); + self.contract_location.distance(peer_location) } } #[derive(Debug, Clone, Serialize)] -pub(super) struct Adjustment { +pub(crate) struct Adjustment { sum: f64, count: u64, } diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index f5749154b..ff3b640e1 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -2,7 +2,7 @@ mod isotonic_estimator; mod util; use crate::ring::{Distance, Location, PeerKeyLocation}; -use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; +pub(crate) use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; use serde::{Deserialize, Serialize}; use std::time::Duration; use util::{Mean, TransferSpeed}; diff --git a/crates/core/tests/gateway_inbound_identity.rs b/crates/core/tests/gateway_inbound_identity.rs index a91626f31..d48f7b6a7 100644 --- a/crates/core/tests/gateway_inbound_identity.rs +++ b/crates/core/tests/gateway_inbound_identity.rs @@ -35,13 +35,8 @@ async fn gateway_records_real_peer_ids_on_inbound() -> anyhow::Result<()> { ); let gateway = gateways[0]; - let peers_connected_to_gateway: Vec<_> = snapshots - .iter() - .filter(|p| !p.is_gateway && p.connections.iter().any(|id| id == &gateway.id)) - .collect(); - assert!( - !gateway.connections.is_empty() || !peers_connected_to_gateway.is_empty(), + !gateway.connections.is_empty(), "gateway should report at least one peer connection, found none" ); assert!( diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs index 53bf5eb46..7020c4e23 100644 --- a/crates/core/tests/large_network.rs +++ b/crates/core/tests/large_network.rs @@ -307,7 +307,8 @@ impl RiverSession { let stderr = String::from_utf8_lossy(&output.stderr).to_string(); let retriable = stderr.contains("Timeout waiting for") || stderr.contains("connection refused") - || stderr.contains("HTTP request failed"); + || stderr.contains("HTTP request failed") + || stderr.contains("missing contract"); if attempt == MAX_RETRIES || !retriable { bail!("riverctl failed (user {:?}): {}", user, stderr); } diff --git a/crates/core/tests/river_smoke.rs b/crates/core/tests/river_smoke.rs new file mode 100644 index 000000000..8b31bac81 --- /dev/null +++ b/crates/core/tests/river_smoke.rs @@ -0,0 +1,168 @@ +//! Minimal riverctl propagation smoke test to reproduce intermittent "missing contract" errors. +//! +//! This intentionally runs outside CI (ignored) so it can be executed manually when debugging +//! contract propagation races: +//! ```text +//! cargo test -p freenet --test river_smoke -- --ignored --nocapture +//! ``` + +use anyhow::{anyhow, bail, Context, Result}; +use freenet_test_network::{BuildProfile, FreenetBinary, TestNetwork}; +use regex::Regex; +use std::path::PathBuf; +use tokio::time::{sleep, Duration}; +use which::which; + +const ITERATIONS: usize = 5; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore = "manual-only: reproduces riverctl missing contract race"] +async fn river_missing_contract_smoke() -> Result<()> { + let riverctl = which("riverctl").context("riverctl not found in PATH")?; + + for iter in 1..=ITERATIONS { + println!("=== iteration {iter}/{ITERATIONS} ==="); + let network = TestNetwork::builder() + .gateways(1) + .peers(1) + .preserve_temp_dirs_on_failure(true) + .preserve_temp_dirs_on_success(true) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await + .context("start test network")?; + + let gw_url = format!("{}?encodingProtocol=native", network.gateway(0).ws_url()); + let peer_url = format!("{}?encodingProtocol=native", network.peer(0).ws_url()); + + let mut session = RiverSession::initialize(riverctl.clone(), gw_url, peer_url).await?; + match session.send_message("smoke test").await { + Ok(_) => { + println!( + "iteration {iter} succeeded (run_root={})", + network.run_root().display() + ); + } + Err(err) => { + println!( + "iteration {iter} failed: {err} (run_root={})", + network.run_root().display() + ); + bail!(err); + } + } + + // Give the OS a moment to free ports between iterations. + sleep(Duration::from_secs(2)).await; + } + + Ok(()) +} + +struct RiverSession { + riverctl: PathBuf, + room_key: String, + invite_regex: Regex, + room_regex: Regex, + alice_url: String, + bob_url: String, +} + +impl RiverSession { + async fn initialize(riverctl: PathBuf, alice_url: String, bob_url: String) -> Result { + let mut session = Self { + riverctl, + room_key: String::new(), + invite_regex: Regex::new(r"[A-Za-z0-9+/=]{40,}")?, + room_regex: Regex::new(r"[A-Za-z0-9]{40,}")?, + alice_url, + bob_url, + }; + session.setup_room().await?; + Ok(session) + } + + async fn setup_room(&mut self) -> Result<()> { + // No retries here—we want to catch propagation races. + let create_output = self + .run_riverctl( + RiverUser::Alice, + &[ + "room", + "create", + "--name", + "river-smoke", + "--nickname", + "Alice", + ], + ) + .await?; + self.room_key = self + .room_regex + .find(&create_output) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse room owner key from riverctl output"))?; + + let invite_output = self + .run_riverctl( + RiverUser::Alice, + &["invite", "create", self.room_key.as_str()], + ) + .await?; + let invitation_code = self + .invite_regex + .find_iter(&invite_output) + .filter(|m| m.as_str() != self.room_key) + .last() + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse invitation code from riverctl output"))?; + + self.run_riverctl( + RiverUser::Bob, + &["invite", "accept", &invitation_code, "--nickname", "Bob"], + ) + .await?; + + Ok(()) + } + + async fn send_message(&mut self, body: &str) -> Result<()> { + self.run_riverctl( + RiverUser::Alice, + &["message", "send", self.room_key.as_str(), body], + ) + .await + .map(|_| ()) + } + + async fn run_riverctl(&self, user: RiverUser, args: &[&str]) -> Result { + let url = match user { + RiverUser::Alice => &self.alice_url, + RiverUser::Bob => &self.bob_url, + }; + + let output = tokio::process::Command::new(&self.riverctl) + .arg("--node-url") + .arg(url) + .args(args) + .output() + .await + .context("failed to execute riverctl command")?; + + if output.status.success() { + return Ok(String::from_utf8_lossy(&output.stdout).to_string()); + } + + Err(anyhow!( + "riverctl {:?} failed: {}", + args, + String::from_utf8_lossy(&output.stderr) + )) + } +} + +#[derive(Clone, Copy, Debug)] +enum RiverUser { + Alice, + Bob, +}