diff --git a/AGENTS.md b/AGENTS.md index 81c864d48..2cd2fa452 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,5 +1,7 @@ # Freenet Core – Agent Guide +@~/.claude/freenet-local.md + ## Project Overview Freenet Core is the peer-to-peer runtime that underpins applications in the Freenet ecosystem. The crates in this workspace implement the networking stack, contract execution environment, and developer tooling used by higher-level projects such as River. @@ -8,12 +10,12 @@ Freenet Core is the peer-to-peer runtime that underpins applications in the Free **BEFORE STARTING ANY WORK, verify your working directory:** ```bash -pwd # Must be ~/code/freenet/freenet-core/ - # NOT ~/code/freenet/freenet-core/main +pwd # Must be / + # NOT /main git branch --show-current # Should be your feature branch, NOT 'main' ``` -### ❌ DO NOT work in ~/code/freenet/freenet-core/main +### ❌ DO NOT work in the main worktree directory The main worktree should **always** remain on the `main` branch. Multiple agents working in main will conflict and corrupt branches. @@ -21,7 +23,7 @@ The main worktree should **always** remain on the `main` branch. Multiple agents 1. Create a worktree as a sibling directory: ```bash - cd ~/code/freenet/freenet-core/main + cd /main # Your main checkout directory git worktree add ../fix- cd ../fix- ``` @@ -77,27 +79,22 @@ claude --permission-mode bypassPermissions ### Agent Spawning Template +**Note:** This template assumes you have terminal multiplexer tooling set up. Adapt paths and commands to your local environment. + ```bash # 1. Create/verify worktree -cd ~/code/freenet/freenet-core/main +cd /main git worktree add ../fix-ISSUE-NUMBER branch-name -# 2. Create zellij tab (NO --layout flag!) -zellij action new-tab --name codex-iISSUE-description - -# 3. Switch to mcp tab to avoid input corruption -zellij action go-to-tab-name mcp - -# 4. Start agent with correct params -~/code/mcp/skills/zellij-agent-manager/scripts/send-to-agent.sh \ - codex-iISSUE-description \ - "cd ~/code/freenet/freenet-core/fix-ISSUE-NUMBER && codex -s danger-full-access -a never" +# 2. Start agent in new terminal/tab +# Navigate to worktree and launch with correct parameters +cd /fix-ISSUE-NUMBER +codex -s danger-full-access -a never +# OR: claude --permission-mode bypassPermissions -# 5. Wait for agent to start, then send task -sleep 3 -~/code/mcp/skills/zellij-agent-manager/scripts/send-to-agent.sh \ - codex-iISSUE-description \ - "Your task description here [AI-assisted - Claude]" +# 3. Send task to agent +# (Use your terminal automation/multiplexer tooling) +# Example task: "Fix issue #XXXX - [description]. [AI-assisted - Claude]" ``` ### Common Pitfalls @@ -140,8 +137,8 @@ Run these in any worktree before pushing a branch or opening a PR. - Never remove or ignore failing tests without understanding the root cause. ### Integration Testing with `freenet-test-network` -- Use the `freenet-test-network` crate located at `~/code/freenet/freenet-test-network` to spin up gateways and peers for integration tests. -- Add it as a dev-dependency in your worktree (`freenet-test-network = { path = "../freenet-test-network" }`) and construct networks with the builder API. +- Use the `freenet-test-network` crate from https://github.com/freenet/freenet-test-network to spin up gateways and peers for integration tests. +- Add it as a dev-dependency using either a path (if cloned locally) or git dependency, and construct networks with the builder API. - Sample pattern: ```rust use freenet_test_network::TestNetwork; diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 699f59c73..f6c45093d 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -337,8 +337,21 @@ impl ContractHandlerChannel { }; if let Err(e) = session_tx.try_send(msg) { tracing::warn!("Failed to notify session actor: {}", e); + } else { + tracing::debug!( + %tx, + %client_id, + %request_id, + "Session adapter registered transaction with session actor" + ); } } + } else { + tracing::warn!( + %client_id, + %request_id, + "Session adapter not installed; session actor will not track transaction" + ); } Ok(()) @@ -365,7 +378,22 @@ impl ContractHandlerChannel { }; if let Err(e) = session_tx.try_send(msg) { tracing::warn!("Failed to notify session actor: {}", e); + } else { + tracing::debug!( + %tx, + %client_id, + %request_id, + contract = %contract_key, + "Session adapter registered subscription transaction with session actor" + ); } + } else { + tracing::warn!( + %client_id, + %request_id, + contract = %contract_key, + "Session adapter not installed; subscription transaction not registered with session actor" + ); } Ok(()) diff --git a/crates/core/src/node/message_processor.rs b/crates/core/src/node/message_processor.rs index 81786333c..50ac6db8b 100644 --- a/crates/core/src/node/message_processor.rs +++ b/crates/core/src/node/message_processor.rs @@ -4,11 +4,12 @@ //! network message processing from client notification logic, enabling //! pure network processing when the actor system is enabled. +use crate::client_events::HostResult; use crate::contract::SessionMessage; use crate::message::Transaction; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::{debug, error}; +use tracing::debug; /// Errors that can occur during message processing #[derive(Debug, thiserror::Error)] @@ -35,50 +36,43 @@ impl MessageProcessor { pub async fn handle_network_result( &self, tx: Transaction, - op_result: Result, crate::node::OpError>, + host_result: Option, ) -> Result<(), ProcessingError> { // Pure result forwarding to SessionActor - self.route_to_session_actor(tx, op_result).await + self.route_to_session_actor(tx, host_result).await } /// Route network result to SessionActor - no client parameters needed async fn route_to_session_actor( &self, tx: Transaction, - op_result: Result, crate::node::OpError>, + host_result: Option, ) -> Result<(), ProcessingError> { - // Convert operation result to host result - let host_result = match op_result { - Ok(Some(op_res)) => { - debug!("Converting network result for transaction {}", tx); - Arc::new(op_res.to_host_result()) - } - Ok(None) => { - debug!("No result to forward for transaction {}", tx); - return Ok(()); // No result to forward - } - Err(e) => { - error!("Network operation error for transaction {}: {}", tx, e); - // Create a generic client error for operation failures - use freenet_stdlib::client_api::{ClientError, ErrorKind}; - Arc::new(Err(ClientError::from(ErrorKind::OperationError { - cause: e.to_string().into(), - }))) - } + let status = match &host_result { + Some(Ok(_)) => "op_result", + Some(Err(_)) => "error", + None => "no_result", + }; + tracing::debug!(%tx, status, "Routing network result to SessionActor"); + + let Some(host_result) = host_result else { + debug!("No result to forward for transaction {}", tx); + return Ok(()); }; // Create session message for pure actor routing // The SessionActor will handle all client correlation internally let session_msg = SessionMessage::DeliverHostResponse { tx, - response: host_result, + response: Arc::new(host_result), }; // Send to SessionActor - it handles all client concerns if let Err(e) = self.result_tx.send(session_msg).await { - error!( + tracing::error!( "Failed to send result to SessionActor for transaction {}: {}", - tx, e + tx, + e ); return Err(ProcessingError::ActorCommunication(e)); } @@ -94,7 +88,8 @@ impl MessageProcessor { #[cfg(test)] mod tests { use super::*; - use crate::operations::{get, OpEnum}; + use crate::operations::get; + use crate::operations::OpEnum; use freenet_stdlib::prelude::ContractKey; use tokio::sync::mpsc; @@ -102,29 +97,22 @@ mod tests { Transaction::new::() } - fn create_success_op_result() -> Result, crate::node::OpError> { - // Create a GetOp using the proper constructor + fn create_success_host_result() -> Option { use freenet_stdlib::prelude::ContractInstanceId; let key = ContractKey::from(ContractInstanceId::new([1u8; 32])); let get_op = get::start_op(key, false, false); - Ok(Some(OpEnum::Get(get_op))) + Some(OpEnum::Get(get_op).to_host_result()) } - fn create_none_op_result() -> Result, crate::node::OpError> { - Ok(None) + fn create_none_host_result() -> Option { + None } - fn create_error_op_result() -> Result, crate::node::OpError> { - let tx = create_test_transaction(); - Err(crate::node::OpError::InvalidStateTransition { - tx, - #[cfg(debug_assertions)] - state: Some( - Box::new("test_state".to_string()) as Box - ), - #[cfg(debug_assertions)] - trace: std::backtrace::Backtrace::capture(), - }) + fn create_error_host_result() -> Option { + use freenet_stdlib::client_api::{ClientError, ErrorKind}; + Some(Err(ClientError::from(ErrorKind::OperationError { + cause: "test_state".into(), + }))) } #[tokio::test] @@ -143,7 +131,7 @@ mod tests { let tx = create_test_transaction(); let result = processor - .handle_network_result(tx, create_success_op_result()) + .handle_network_result(tx, create_success_host_result()) .await; assert!(result.is_ok()); @@ -168,7 +156,7 @@ mod tests { let tx = create_test_transaction(); let result = processor - .handle_network_result(tx, create_none_op_result()) + .handle_network_result(tx, create_none_host_result()) .await; assert!(result.is_ok()); @@ -191,7 +179,7 @@ mod tests { let tx = create_test_transaction(); let result = processor - .handle_network_result(tx, create_error_op_result()) + .handle_network_result(tx, create_error_host_result()) .await; assert!(result.is_ok()); @@ -221,7 +209,7 @@ mod tests { drop(session_rx); let result = processor - .handle_network_result(tx, create_success_op_result()) + .handle_network_result(tx, create_success_host_result()) .await; assert!(result.is_err()); diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index e5446027b..61b6f4eba 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -619,7 +619,7 @@ pub(crate) async fn process_message_decoupled( msg: NetMessage, op_manager: Arc, conn_manager: CB, - event_listener: Box, + mut event_listener: Box, executor_callback: Option>, message_processor: std::sync::Arc, pending_op_result: Option>, @@ -633,14 +633,37 @@ pub(crate) async fn process_message_decoupled( msg, op_manager.clone(), conn_manager, - event_listener, - executor_callback, + event_listener.as_mut(), pending_op_result, ) .await; + // Prepare host result for session actor routing + let host_result = match &op_result { + Ok(Some(op_res)) => Some(op_res.to_host_result()), + Ok(None) => None, + Err(e) => Some(Err(freenet_stdlib::client_api::ClientError::from( + freenet_stdlib::client_api::ErrorKind::OperationError { + cause: e.to_string().into(), + }, + ))), + }; + + // Report operation completion for telemetry/result routing + report_result( + Some(tx), + op_result, + &op_manager, + executor_callback, + &mut *event_listener, + ) + .await; + // Delegate to MessageProcessor - it handles all client concerns internally - if let Err(e) = message_processor.handle_network_result(tx, op_result).await { + if let Err(e) = message_processor + .handle_network_result(tx, host_result) + .await + { tracing::error!( "Failed to handle network result for transaction {}: {}", tx, @@ -655,8 +678,7 @@ async fn handle_pure_network_message( msg: NetMessage, op_manager: Arc, conn_manager: CB, - event_listener: Box, - executor_callback: Option>, + event_listener: &mut dyn NetEventRegister, pending_op_result: Option>, ) -> Result, crate::node::OpError> where @@ -669,7 +691,6 @@ where op_manager, conn_manager, event_listener, - executor_callback, pending_op_result, ) .await @@ -838,8 +859,7 @@ async fn handle_pure_network_message_v1( msg: NetMessageV1, op_manager: Arc, mut conn_manager: CB, - mut event_listener: Box, - executor_callback: Option>, + event_listener: &mut dyn NetEventRegister, pending_op_result: Option>, ) -> Result, crate::node::OpError> where @@ -887,7 +907,6 @@ where tx, op_result, &op_manager, - executor_callback, &mut *event_listener, ) .await; @@ -934,7 +953,6 @@ where tx, op_result, &op_manager, - executor_callback, &mut *event_listener, ) .await; @@ -972,7 +990,6 @@ where tx, op_result, &op_manager, - executor_callback, &mut *event_listener, ) .await; @@ -1000,7 +1017,6 @@ where tx, op_result, &op_manager, - executor_callback, &mut *event_listener, ) .await; @@ -1031,7 +1047,6 @@ where tx, op_result, &op_manager, - executor_callback, &mut *event_listener, ) .await; @@ -1060,7 +1075,6 @@ async fn handle_pure_network_result( tx: Option, op_result: Result, OpError>, _op_manager: &Arc, - _executor_callback: Option>, _event_listener: &mut dyn NetEventRegister, ) -> Result, crate::node::OpError> { tracing::debug!("Pure network result handling for transaction: {:?}", tx); @@ -1080,10 +1094,6 @@ async fn handle_pure_network_result( } // TODO: Handle executor callbacks (network concern) - // Executor callback functionality needs to be restored with proper types - if let Some(_callback) = _executor_callback { - tracing::debug!("Executor callback available for transaction {:?} but not implemented in pure network processing", tx); - } } Ok(None) => { tracing::debug!("Network operation returned no result"); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index f0d055715..99b57fa27 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -780,6 +780,15 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; + tracing::debug!( + peer = %gateway.peer, + reserved_connections = op_manager + .ring + .connection_manager + .get_reserved_connections(), + "join_ring_request: evaluating gateway connection attempt" + ); + if !op_manager .ring .connection_manager @@ -864,56 +873,71 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); + let mut in_flight_gateways = HashSet::new(); + loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); + let available_gateways: Vec<_> = unconnected_gateways + .into_iter() + .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) + .collect(); tracing::debug!( - "Connection status: open_connections = {}, unconnected_gateways = {}", - open_conns, - unconnected_gateways.len() + open_connections = open_conns, + inflight_gateway_dials = in_flight_gateways.len(), + available_gateways = available_gateways.len(), + "Connection status before join attempt" ); - let unconnected_count = unconnected_gateways.len(); + let available_count = available_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(unconnected_count) + number_of_parallel_connections.min(available_count) ); - let select_all = FuturesUnordered::new(); - for gateway in unconnected_gateways + let mut select_all = FuturesUnordered::new(); + for gateway in available_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); + in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); + let gateway_clone = gateway.clone(); select_all.push(async move { - (join_ring_request(None, gateway, &op_manager).await, gateway) + ( + join_ring_request(None, &gateway_clone, &op_manager).await, + gateway_clone, + ) }); } - select_all - .for_each(|(res, gateway)| async move { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError( - crate::node::ConnectionError::UnwantedConnection - ) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); - } + while let Some((res, gateway)) = select_all.next().await { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); } - }) - .await; + } + in_flight_gateways.remove(&gateway.peer); + } + } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { + tracing::debug!( + open_connections = open_conns, + inflight = in_flight_gateways.len(), + "Below threshold but all gateways are already connected or in-flight" + ); } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index adce1165d..b64e44d88 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -356,9 +356,21 @@ fn start_subscription_request( op_manager: &OpManager, parent_tx: Transaction, key: ContractKey, +) -> Transaction { + start_subscription_request_internal(op_manager, parent_tx, key, true) +} + +/// Starts a subscription request while allowing callers to opt out of parent tracking. +fn start_subscription_request_internal( + op_manager: &OpManager, + parent_tx: Transaction, + key: ContractKey, + track_parent: bool, ) -> Transaction { let child_tx = Transaction::new_child_of::(&parent_tx); - op_manager.expect_and_register_sub_operation(parent_tx, child_tx); + if track_parent { + op_manager.expect_and_register_sub_operation(parent_tx, child_tx); + } tracing::debug!( %parent_tx, diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 8b2d55c92..27df6ebeb 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -436,7 +436,8 @@ impl Operation for PutOp { skip_list.insert(target.peer.clone()); } - let child_tx = super::start_subscription_request(op_manager, *id, key); + let child_tx = + super::start_subscription_request_internal(op_manager, *id, key, false); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); op_manager.ring.seed_contract(key); @@ -698,8 +699,9 @@ impl Operation for PutOp { %key, "starting child subscription for PUT operation" ); - let child_tx = - super::start_subscription_request(op_manager, *id, key); + let child_tx = super::start_subscription_request_internal( + op_manager, *id, key, false, + ); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); } else { tracing::warn!( @@ -843,7 +845,9 @@ impl Operation for PutOp { // Start subscription and handle dropped contracts let (dropped_contract, old_subscribers) = { - let child_tx = super::start_subscription_request(op_manager, *id, key); + let child_tx = super::start_subscription_request_internal( + op_manager, *id, key, false, + ); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); op_manager.ring.seed_contract(key) }; diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 5e3f19240..d5cf79582 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -162,6 +162,16 @@ impl ConnectionManager { "should_accept: evaluating direct acceptance guard" ); + if self.location_for_peer.read().get(peer_id).is_some() { + tracing::debug!( + %peer_id, + open, + reserved_before, + "Peer already pending/connected; rejecting duplicate reservation" + ); + return false; + } + if self.is_gateway && (open > 0 || reserved_before > 0) { tracing::info!( %peer_id, @@ -218,11 +228,6 @@ impl ConnectionManager { } }; - if open == 0 { - tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); - return true; - } - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; if self.is_gateway { let direct_total = open + reserved_before; @@ -243,11 +248,19 @@ impl ConnectionManager { if self.location_for_peer.read().get(peer_id).is_some() { // We've already accepted this peer (pending or active); treat as a no-op acceptance. - tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + tracing::debug!( + %peer_id, + "Peer already pending/connected after reservation; reverting reservation" + ); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); return true; } - let accepted = if total_conn < self.min_connections { + let accepted = if open == 0 { + tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); + true + } else if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true } else if total_conn >= self.max_connections { @@ -558,6 +571,11 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } + pub(crate) fn get_reserved_connections(&self) -> usize { + self.reserved_connections + .load(std::sync::atomic::Ordering::SeqCst) + } + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -604,3 +622,55 @@ impl ConnectionManager { read.keys().cloned().collect::>().into_iter() } } + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::Duration; + + use crate::transport::TransportKeypair; + + #[test] + fn should_accept_does_not_leak_reservations_for_duplicate_peer() { + let keypair = TransportKeypair::new(); + let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20_000); + let peer_id = PeerId::new(peer_addr, keypair.public().clone()); + let location = Location::from_address(&peer_addr); + + let manager = ConnectionManager::init( + Rate::new_per_second(1_000_000.0), + Rate::new_per_second(1_000_000.0), + Ring::DEFAULT_MIN_CONNECTIONS, + Ring::DEFAULT_MAX_CONNECTIONS, + Ring::DEFAULT_RAND_WALK_ABOVE_HTL, + (keypair.public().clone(), None, AtomicU64::new(0)), + false, + 32, + Duration::from_secs(30), + ); + + assert!(manager.should_accept(location, &peer_id)); + let after_first = manager.reserved_connections.load(Ordering::SeqCst); + assert_eq!(after_first, 1); + { + let known = manager.location_for_peer.read().contains_key(&peer_id); + assert!( + known, + "pending connection should be registered after initial acceptance" + ); + } + + // Second attempt for the same peer should be rejected immediately. + assert!( + !manager.should_accept(location, &peer_id), + "duplicate peer should be rejected by should_accept" + ); + assert_eq!( + manager.reserved_connections.load(Ordering::SeqCst), + after_first, + "repeat should_accept calls should not leak reservations for an existing peer" + ); + } +}