diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 197c72667..402a04e01 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -404,6 +404,20 @@ async fn process_open_request( related_contracts, subscribe, } => { + // For non-gateway peers: check if peer is ready (peer_id has been set via handshake) + // For gateways: always ready (peer_id set from config) + if !op_manager.is_gateway + && !op_manager + .peer_ready + .load(std::sync::atomic::Ordering::SeqCst) + { + tracing::warn!( + "Client attempted PUT operation before peer initialization complete. \ + Peer must complete initial network handshake before processing client operations." + ); + return Err(Error::Disconnected); + } + let Some(peer_id) = op_manager.ring.connection_manager.get_peer_key() else { tracing::error!("peer id not found at put op, it should be set"); diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index d364b17a7..522498995 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -3,7 +3,7 @@ use parking_lot::RwLock; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, }; use tokio::time::{timeout, Duration}; use tracing::{instrument, Instrument}; @@ -228,6 +228,10 @@ pub(super) struct HandshakeHandler { /// Whether this node is a gateway is_gateway: bool, + + /// Indicates when peer is ready to process client operations (peer_id has been set). + /// Only used for non-gateway peers - set to Some(flag) for regular peers, None for gateways + peer_ready: Option>, } impl HandshakeHandler { @@ -238,6 +242,7 @@ impl HandshakeHandler { router: Arc>, this_location: Option, is_gateway: bool, + peer_ready: Option>, ) -> (Self, HanshakeHandlerMsg, OutboundMessage) { let (pending_msg_tx, pending_msg_rx) = tokio::sync::mpsc::channel(100); let (establish_connection_tx, establish_connection_rx) = tokio::sync::mpsc::channel(100); @@ -255,6 +260,7 @@ impl HandshakeHandler { router, this_location, is_gateway, + peer_ready, }; ( connector, @@ -294,6 +300,13 @@ impl HandshakeHandler { if let Some(addr) = connection.my_address() { tracing::debug!(%addr, "Attempting setting own peer key"); self.connection_manager.try_set_peer_key(addr); + + // For non-gateway peers: mark as ready to accept client operations + if let Some(ref peer_ready) = self.peer_ready { + peer_ready.store(true, std::sync::atomic::Ordering::SeqCst); + tracing::info!("Peer initialization complete: peer_ready set to true, client operations now enabled"); + } + if self.this_location.is_none() { // in the case trust locations is set to true, this peer already had its location set self.connection_manager.update_location(Some(Location::from_address(&addr))); @@ -1513,6 +1526,7 @@ mod tests { Arc::new(RwLock::new(router)), None, is_gateway, + None, // test code doesn't need peer_ready ); ( handler, diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 7bbb96be5..3cdd27182 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -196,6 +196,14 @@ impl P2pConnManager { ) .await?; + // For non-gateway peers, pass the peer_ready flag so it can be set after first handshake + // For gateways, pass None (they're always ready) + let peer_ready = if !self.is_gateway { + Some(self.bridge.op_manager.peer_ready.clone()) + } else { + None + }; + let (mut handshake_handler, handshake_handler_msg, outbound_message) = HandshakeHandler::new( inbound_conn_handler, @@ -204,6 +212,7 @@ impl P2pConnManager { self.bridge.op_manager.ring.router.clone(), self.this_location, self.is_gateway, + peer_ready, ); loop { diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index 69d3cdc3a..45c0fc691 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -6,7 +6,12 @@ //! //! See [`../../architecture.md`](../../architecture.md) for details on its role and interaction with other components. -use std::{cmp::Reverse, collections::BTreeSet, sync::Arc, time::Duration}; +use std::{ + cmp::Reverse, + collections::BTreeSet, + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; use dashmap::{DashMap, DashSet}; use either::Either; @@ -67,6 +72,12 @@ pub(crate) struct OpManager { new_transactions: tokio::sync::mpsc::Sender, pub result_router_tx: Option>, pub actor_clients: bool, + /// Indicates whether the peer is ready to process client operations. + /// For gateways: always true (peer_id is set from config) + /// For regular peers: true only after first successful network handshake sets peer_id + pub peer_ready: Arc, + /// Whether this node is a gateway + pub is_gateway: bool, } impl OpManager { @@ -105,6 +116,17 @@ impl OpManager { .instrument(garbage_span), ); + // Gateways are ready immediately (peer_id set from config) + // Regular peers become ready after first handshake + let is_gateway = config.is_gateway; + let peer_ready = Arc::new(AtomicBool::new(is_gateway)); + + if is_gateway { + tracing::debug!("Gateway node: peer_ready set to true immediately"); + } else { + tracing::debug!("Regular peer node: peer_ready will be set after first handshake"); + } + Ok(Self { ring, ops, @@ -113,6 +135,8 @@ impl OpManager { new_transactions, result_router_tx, actor_clients: config.config.actor_clients, + peer_ready, + is_gateway, }) }