Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,20 @@ async fn process_open_request(
related_contracts,
subscribe,
} => {
// For non-gateway peers: check if peer is ready (peer_id has been set via handshake)
// For gateways: always ready (peer_id set from config)
if !op_manager.is_gateway
&& !op_manager
.peer_ready
.load(std::sync::atomic::Ordering::SeqCst)
{
tracing::warn!(
"Client attempted PUT operation before peer initialization complete. \
Peer must complete initial network handshake before processing client operations."
);
return Err(Error::Disconnected);
}

let Some(peer_id) = op_manager.ring.connection_manager.get_peer_key()
else {
tracing::error!("peer id not found at put op, it should be set");
Expand Down
16 changes: 15 additions & 1 deletion crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use parking_lot::RwLock;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::Arc,
sync::{atomic::AtomicBool, Arc},
};
use tokio::time::{timeout, Duration};
use tracing::{instrument, Instrument};
Expand Down Expand Up @@ -228,6 +228,10 @@ pub(super) struct HandshakeHandler {

/// Whether this node is a gateway
is_gateway: bool,

/// Indicates when peer is ready to process client operations (peer_id has been set).
/// Only used for non-gateway peers - set to Some(flag) for regular peers, None for gateways
peer_ready: Option<Arc<AtomicBool>>,
}

impl HandshakeHandler {
Expand All @@ -238,6 +242,7 @@ impl HandshakeHandler {
router: Arc<RwLock<Router>>,
this_location: Option<Location>,
is_gateway: bool,
peer_ready: Option<Arc<AtomicBool>>,
) -> (Self, HanshakeHandlerMsg, OutboundMessage) {
let (pending_msg_tx, pending_msg_rx) = tokio::sync::mpsc::channel(100);
let (establish_connection_tx, establish_connection_rx) = tokio::sync::mpsc::channel(100);
Expand All @@ -255,6 +260,7 @@ impl HandshakeHandler {
router,
this_location,
is_gateway,
peer_ready,
};
(
connector,
Expand Down Expand Up @@ -294,6 +300,13 @@ impl HandshakeHandler {
if let Some(addr) = connection.my_address() {
tracing::debug!(%addr, "Attempting setting own peer key");
self.connection_manager.try_set_peer_key(addr);

// For non-gateway peers: mark as ready to accept client operations
if let Some(ref peer_ready) = self.peer_ready {
peer_ready.store(true, std::sync::atomic::Ordering::SeqCst);
tracing::info!("Peer initialization complete: peer_ready set to true, client operations now enabled");
}

if self.this_location.is_none() {
// in the case trust locations is set to true, this peer already had its location set
self.connection_manager.update_location(Some(Location::from_address(&addr)));
Expand Down Expand Up @@ -1513,6 +1526,7 @@ mod tests {
Arc::new(RwLock::new(router)),
None,
is_gateway,
None, // test code doesn't need peer_ready
);
(
handler,
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ impl P2pConnManager {
)
.await?;

// For non-gateway peers, pass the peer_ready flag so it can be set after first handshake
// For gateways, pass None (they're always ready)
let peer_ready = if !self.is_gateway {
Some(self.bridge.op_manager.peer_ready.clone())
} else {
None
};

let (mut handshake_handler, handshake_handler_msg, outbound_message) =
HandshakeHandler::new(
inbound_conn_handler,
Expand All @@ -204,6 +212,7 @@ impl P2pConnManager {
self.bridge.op_manager.ring.router.clone(),
self.this_location,
self.is_gateway,
peer_ready,
);

loop {
Expand Down
26 changes: 25 additions & 1 deletion crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
//!
//! See [`../../architecture.md`](../../architecture.md) for details on its role and interaction with other components.

use std::{cmp::Reverse, collections::BTreeSet, sync::Arc, time::Duration};
use std::{
cmp::Reverse,
collections::BTreeSet,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};

use dashmap::{DashMap, DashSet};
use either::Either;
Expand Down Expand Up @@ -67,6 +72,12 @@ pub(crate) struct OpManager {
new_transactions: tokio::sync::mpsc::Sender<Transaction>,
pub result_router_tx: Option<mpsc::Sender<(Transaction, HostResult)>>,
pub actor_clients: bool,
/// Indicates whether the peer is ready to process client operations.
/// For gateways: always true (peer_id is set from config)
/// For regular peers: true only after first successful network handshake sets peer_id
pub peer_ready: Arc<AtomicBool>,
/// Whether this node is a gateway
pub is_gateway: bool,
}

impl OpManager {
Expand Down Expand Up @@ -105,6 +116,17 @@ impl OpManager {
.instrument(garbage_span),
);

// Gateways are ready immediately (peer_id set from config)
// Regular peers become ready after first handshake
let is_gateway = config.is_gateway;
let peer_ready = Arc::new(AtomicBool::new(is_gateway));

if is_gateway {
tracing::debug!("Gateway node: peer_ready set to true immediately");
} else {
tracing::debug!("Regular peer node: peer_ready will be set after first handshake");
}

Ok(Self {
ring,
ops,
Expand All @@ -113,6 +135,8 @@ impl OpManager {
new_transactions,
result_router_tx,
actor_clients: config.config.actor_clients,
peer_ready,
is_gateway,
})
}

Expand Down
Loading