diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 9447a1ac8..bbfbbfc4c 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -192,9 +192,11 @@ impl Operation for ConnectOp { skip_forwards = ?skip_forwards, "Got queried for new connections from joiner", ); + // Use the full skip_connections set to avoid recommending peers + // that the joiner is already connected to (including the gateway itself) if let Some(desirable_peer) = op_manager.ring.closest_to_location( *ideal_location, - HashSet::from([joiner.peer.clone()]), + skip_connections.iter().cloned().collect(), ) { tracing::debug!( tx = %id, diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index 71f92fa5f..63f342d9d 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -421,8 +421,369 @@ async fn test_basic_gateway_connectivity() -> TestResult { } } -// test_three_node_network_connectivity has been removed - see issue #1889 -// This test revealed a pre-existing bug in the topology manager where adjust_topology() -// requests duplicate connections to the same peer instead of diversifying connections. -// The test will be re-added once issue #1889 is resolved. -// Issue: https://github.com/freenet/freenet-core/issues/1889 +/// Test three-node network connectivity: +/// 1. Start a gateway and two peers +/// 2. Wait for full mesh connectivity (each node connected to the other two) +/// 3. Perform PUT from peer1 and GET from peer2 to verify network functionality +/// +/// This test verifies that peers connect to each other, not just to the gateway. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_three_node_network_connectivity() -> TestResult { + use freenet_stdlib::client_api::{NodeQuery, QueryResponse}; + use std::collections::HashSet; + + freenet::config::set_logger(Some(LevelFilter::INFO), None); + + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + let initial_state = test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Create network sockets + let gateway_network_socket = TcpListener::bind("127.0.0.1:0")?; + let gateway_ws_socket = TcpListener::bind("127.0.0.1:0")?; + let peer1_ws_socket = TcpListener::bind("127.0.0.1:0")?; + let peer2_ws_socket = TcpListener::bind("127.0.0.1:0")?; + + // Gateway configuration + let temp_dir_gw = tempfile::tempdir()?; + let gateway_key = TransportKeypair::new(); + let gateway_transport_keypair = temp_dir_gw.path().join("private.pem"); + gateway_key.save(&gateway_transport_keypair)?; + gateway_key + .public() + .save(temp_dir_gw.path().join("public.pem"))?; + + let gateway_port = gateway_network_socket.local_addr()?.port(); + let gateway_ws_port = gateway_ws_socket.local_addr()?.port(); + let peer1_ws_port = peer1_ws_socket.local_addr()?.port(); + let peer2_ws_port = peer2_ws_socket.local_addr()?.port(); + + let gateway_config = ConfigArgs { + ws_api: WebsocketApiArgs { + address: Some(Ipv4Addr::LOCALHOST.into()), + ws_api_port: Some(gateway_ws_port), + }, + network_api: NetworkArgs { + public_address: Some(Ipv4Addr::LOCALHOST.into()), + public_port: Some(gateway_port), + is_gateway: true, + skip_load_from_network: true, + gateways: Some(vec![]), + location: Some(RNG.lock().unwrap().random()), + ignore_protocol_checking: true, + address: Some(Ipv4Addr::LOCALHOST.into()), + network_port: Some(gateway_port), + bandwidth_limit: None, + blocked_addresses: None, + }, + config_paths: freenet::config::ConfigPathsArgs { + config_dir: Some(temp_dir_gw.path().to_path_buf()), + data_dir: Some(temp_dir_gw.path().to_path_buf()), + }, + secrets: SecretArgs { + transport_keypair: Some(gateway_transport_keypair), + ..Default::default() + }, + ..Default::default() + }; + + // Gateway info for peers + let gateway_info = InlineGwConfig { + address: (Ipv4Addr::LOCALHOST, gateway_port).into(), + location: Some(RNG.lock().unwrap().random()), + public_key_path: temp_dir_gw.path().join("public.pem"), + }; + + // First peer configuration + let temp_dir_peer1 = tempfile::tempdir()?; + let peer1_key = TransportKeypair::new(); + let peer1_transport_keypair = temp_dir_peer1.path().join("private.pem"); + peer1_key.save(&peer1_transport_keypair)?; + + let peer1_config = ConfigArgs { + ws_api: WebsocketApiArgs { + address: Some(Ipv4Addr::LOCALHOST.into()), + ws_api_port: Some(peer1_ws_port), + }, + network_api: NetworkArgs { + public_address: Some(Ipv4Addr::LOCALHOST.into()), + public_port: None, + is_gateway: false, + skip_load_from_network: true, + gateways: Some(vec![serde_json::to_string(&gateway_info)?]), + location: Some(RNG.lock().unwrap().random()), + ignore_protocol_checking: true, + address: Some(Ipv4Addr::LOCALHOST.into()), + network_port: None, + bandwidth_limit: None, + blocked_addresses: None, + }, + config_paths: freenet::config::ConfigPathsArgs { + config_dir: Some(temp_dir_peer1.path().to_path_buf()), + data_dir: Some(temp_dir_peer1.path().to_path_buf()), + }, + secrets: SecretArgs { + transport_keypair: Some(peer1_transport_keypair), + ..Default::default() + }, + ..Default::default() + }; + + // Second peer configuration + let temp_dir_peer2 = tempfile::tempdir()?; + let peer2_key = TransportKeypair::new(); + let peer2_transport_keypair = temp_dir_peer2.path().join("private.pem"); + peer2_key.save(&peer2_transport_keypair)?; + + let peer2_config = ConfigArgs { + ws_api: WebsocketApiArgs { + address: Some(Ipv4Addr::LOCALHOST.into()), + ws_api_port: Some(peer2_ws_port), + }, + network_api: NetworkArgs { + public_address: Some(Ipv4Addr::LOCALHOST.into()), + public_port: None, + is_gateway: false, + skip_load_from_network: true, + gateways: Some(vec![serde_json::to_string(&gateway_info)?]), + location: Some(RNG.lock().unwrap().random()), + ignore_protocol_checking: true, + address: Some(Ipv4Addr::LOCALHOST.into()), + network_port: None, + bandwidth_limit: None, + blocked_addresses: None, + }, + config_paths: freenet::config::ConfigPathsArgs { + config_dir: Some(temp_dir_peer2.path().to_path_buf()), + data_dir: Some(temp_dir_peer2.path().to_path_buf()), + }, + secrets: SecretArgs { + transport_keypair: Some(peer2_transport_keypair), + ..Default::default() + }, + ..Default::default() + }; + + // Free the sockets before starting nodes + std::mem::drop(gateway_network_socket); + std::mem::drop(gateway_ws_socket); + std::mem::drop(peer1_ws_socket); + std::mem::drop(peer2_ws_socket); + + // Start gateway node + let gateway = async { + let config = gateway_config.build().await?; + let mut node_config = NodeConfig::new(config.clone()).await?; + // Configure realistic connection limits for 3-node network + // Default is 25 min connections, but max possible here is 2 + node_config.min_number_of_connections(2); + node_config.max_number_of_connections(2); + let node = node_config + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Gateway starting"); + node.run().await + } + .boxed_local(); + + // Start first peer node + // Wait 12s to ensure gateway completes aggressive_initial_connections() (10s) + let peer1 = async move { + tokio::time::sleep(Duration::from_secs(12)).await; + let config = peer1_config.build().await?; + let mut node_config = NodeConfig::new(config.clone()).await?; + node_config.min_number_of_connections(2); + node_config.max_number_of_connections(2); + let node = node_config + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Peer 1 starting"); + node.run().await + } + .boxed_local(); + + // Start second peer node + // Wait 17s (12s gateway + 5s peer1 aggressive phase buffer) + let peer2 = async move { + tokio::time::sleep(Duration::from_secs(17)).await; + let config = peer2_config.build().await?; + let mut node_config = NodeConfig::new(config.clone()).await?; + node_config.min_number_of_connections(2); + node_config.max_number_of_connections(2); + let node = node_config + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Peer 2 starting"); + node.run().await + } + .boxed_local(); + + // Main test logic + let test = tokio::time::timeout(Duration::from_secs(180), async move { + // Wait for all nodes to start and complete aggressive_initial_connections() + // Gateway: 0-10s, Peer1: 12-22s, Peer2: 17-27s + // Wait 30s to ensure all aggressive phases complete + buffer + tracing::info!("Waiting for nodes to start and establish connections..."); + tokio::time::sleep(Duration::from_secs(30)).await; + + // Connect to websockets + let uri_gw = + format!("ws://127.0.0.1:{gateway_ws_port}/v1/contract/command?encodingProtocol=native"); + let (stream_gw, _) = connect_async(&uri_gw).await?; + let mut client_gw = WebApi::start(stream_gw); + + let uri1 = + format!("ws://127.0.0.1:{peer1_ws_port}/v1/contract/command?encodingProtocol=native"); + let (stream1, _) = connect_async(&uri1).await?; + let mut client1 = WebApi::start(stream1); + + let uri2 = + format!("ws://127.0.0.1:{peer2_ws_port}/v1/contract/command?encodingProtocol=native"); + let (stream2, _) = connect_async(&uri2).await?; + let mut client2 = WebApi::start(stream2); + + // Retry loop to wait for full mesh connectivity + const MAX_RETRIES: usize = 30; + const RETRY_DELAY: Duration = Duration::from_secs(2); + let mut retry_count = 0; + + loop { + retry_count += 1; + if retry_count > MAX_RETRIES { + bail!( + "Failed to establish full mesh connectivity after {} seconds", + MAX_RETRIES * 2 + ); + } + + tracing::info!( + "Attempt {}/{}: Querying all nodes for connected peers...", + retry_count, + MAX_RETRIES + ); + + // Query each node for connections + client_gw + .send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers)) + .await?; + let gw_resp = tokio::time::timeout(Duration::from_secs(10), client_gw.recv()).await?; + let gw_peers = match gw_resp { + Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers })) => peers, + Ok(other) => bail!("Unexpected response from gateway: {:?}", other), + Err(e) => bail!("Error receiving gateway response: {}", e), + }; + + client1 + .send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers)) + .await?; + let peer1_resp = tokio::time::timeout(Duration::from_secs(10), client1.recv()).await?; + let peer1_peers = match peer1_resp { + Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers })) => peers, + Ok(other) => bail!("Unexpected response from peer1: {:?}", other), + Err(e) => bail!("Error receiving peer1 response: {}", e), + }; + + client2 + .send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers)) + .await?; + let peer2_resp = tokio::time::timeout(Duration::from_secs(10), client2.recv()).await?; + let peer2_peers = match peer2_resp { + Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers })) => peers, + Ok(other) => bail!("Unexpected response from peer2: {:?}", other), + Err(e) => bail!("Error receiving peer2 response: {}", e), + }; + + tracing::info!(" - Gateway has {} connections", gw_peers.len()); + tracing::info!(" - Peer1 has {} connections", peer1_peers.len()); + tracing::info!(" - Peer2 has {} connections", peer2_peers.len()); + + // Check for full mesh (each node connected to the other two) + if gw_peers.len() >= 2 && peer1_peers.len() >= 2 && peer2_peers.len() >= 2 { + let gw_peer_addrs: HashSet<_> = gw_peers.iter().map(|p| p.1).collect(); + let peer1_peer_addrs: HashSet<_> = peer1_peers.iter().map(|p| p.1).collect(); + let peer2_peer_addrs: HashSet<_> = peer2_peers.iter().map(|p| p.1).collect(); + + let fully_connected = gw_peer_addrs.len() == 2 + && peer1_peer_addrs.len() == 2 + && peer2_peer_addrs.len() == 2; + + if fully_connected { + tracing::info!("✅ Full mesh connectivity established!"); + break; + } + } + + tracing::info!("Network not fully connected yet, waiting..."); + tokio::time::sleep(RETRY_DELAY).await; + } + + // Verify functionality with PUT/GET + tracing::info!("Verifying network functionality with PUT/GET operations"); + + make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?; + let resp = tokio::time::timeout(Duration::from_secs(60), client1.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + assert_eq!(key, contract_key); + tracing::info!("Peer1 successfully performed PUT"); + } + Ok(Ok(other)) => bail!("Unexpected PUT response: {:?}", other), + Ok(Err(e)) => bail!("Error receiving PUT response: {}", e), + Err(_) => bail!("Timeout waiting for PUT response"), + } + + make_get(&mut client2, contract_key, true, false).await?; + let get_response = tokio::time::timeout(Duration::from_secs(60), client2.recv()).await; + match get_response { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + contract: recv_contract, + state: recv_state, + .. + }))) => { + assert_eq!(recv_contract.as_ref().unwrap().key(), contract_key); + assert_eq!(recv_state, wrapped_state); + tracing::info!("✅ Peer2 successfully retrieved data from network"); + } + Ok(Ok(other)) => bail!("Unexpected GET response: {:?}", other), + Ok(Err(e)) => bail!("Error receiving GET response: {}", e), + Err(_) => bail!("Timeout waiting for GET response"), + } + + // Clean disconnect + client_gw + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client1 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client2 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok::<_, anyhow::Error>(()) + }); + + select! { + g = gateway => { + g.map_err(|e| anyhow!("Gateway error: {}", e))?; + Ok(()) + } + p1 = peer1 => { + p1.map_err(|e| anyhow!("Peer1 error: {}", e))?; + Ok(()) + } + p2 = peer2 => { + p2.map_err(|e| anyhow!("Peer2 error: {}", e))?; + Ok(()) + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + Ok(()) + } + } +}