Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 2 additions & 66 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,53 +603,6 @@ impl P2pConnManager {
}
}
}
ConnEvent::OutboundMessageWithTarget { target_addr, msg } => {
// This variant uses an explicit target address from OperationResult.target_addr,
// which is critical for NAT scenarios where the address in the message
// differs from the actual transport address we should send to.
tracing::info!(
tx = %msg.id(),
msg_type = %msg,
target_addr = %target_addr,
msg_target = ?msg.target().map(|t| t.addr()),
"Sending outbound message with explicit target address (NAT routing)"
);

// Look up the connection using the explicit target address
let peer_connection = ctx.connections.get(&target_addr);

match peer_connection {
Some(peer_connection) => {
if let Err(e) =
peer_connection.sender.send(Left(msg.clone())).await
{
tracing::error!(
tx = %msg.id(),
target_addr = %target_addr,
"Failed to send message to peer: {}", e
);
} else {
tracing::info!(
tx = %msg.id(),
target_addr = %target_addr,
"Message successfully sent to peer connection via explicit address"
);
}
}
None => {
// No existing connection - this is unexpected for NAT scenarios
// since we should have the connection from the original request
tracing::error!(
tx = %msg.id(),
target_addr = %target_addr,
msg_target = ?msg.target().map(|t| t.addr()),
connections = ?ctx.connections.keys().collect::<Vec<_>>(),
"No connection found for explicit target address - NAT routing failed"
);
ctx.bridge.op_manager.completed(*msg.id());
}
}
}
ConnEvent::TransportClosed { remote_addr, error } => {
tracing::debug!(
remote = %remote_addr,
Expand Down Expand Up @@ -2313,19 +2266,8 @@ impl P2pConnManager {

fn handle_bridge_msg(&self, msg: Option<P2pBridgeEvent>) -> EventResult {
match msg {
Some(Left((target, msg))) => {
// Use OutboundMessageWithTarget to preserve the target address from
// OperationResult.target_addr. This is critical for NAT scenarios where
// the address in the message differs from the actual transport address.
// The PeerId.addr contains the address that was used to look up the peer
// in P2pBridge::send(), which is the correct transport address.
EventResult::Event(
ConnEvent::OutboundMessageWithTarget {
target_addr: target.addr,
msg: *msg,
}
.into(),
)
Some(Left((_target, msg))) => {
EventResult::Event(ConnEvent::OutboundMessage(*msg).into())
}
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
Expand Down Expand Up @@ -2456,12 +2398,6 @@ enum EventResult {
pub(super) enum ConnEvent {
InboundMessage(IncomingMessage),
OutboundMessage(NetMessage),
/// Outbound message with explicit target address from OperationResult.target_addr.
/// Used when the target address differs from what's in the message (NAT scenarios).
OutboundMessageWithTarget {
target_addr: SocketAddr,
msg: NetMessage,
},
NodeAction(NodeEvent),
ClosedChannel(ChannelCloseReason),
TransportClosed {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ where
self.op_manager.ring.seed_contract(key);
}
if let Some(subscribers) = contract_subscribers.get(&key) {
// add contract subscribers
// add contract subscribers (test setup - no upstream_addr)
for subscriber in subscribers {
if self
.op_manager
.ring
.add_subscriber(&key, subscriber.clone())
.add_subscriber(&key, subscriber.clone(), None)
.is_err()
{
tracing::warn!("Max subscribers for contract {} reached", key);
Expand Down
18 changes: 10 additions & 8 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ impl RelayState {
// Use the joiner with updated observed address for response routing
actions.response_target = Some(self.request.joiner.clone());
tracing::info!(
acceptor_key = %acceptor.pub_key(),
joiner_key = %self.request.joiner.pub_key(),
acceptor_pub_key = %acceptor.pub_key(),
joiner_pub_key = %self.request.joiner.pub_key(),
acceptor_loc = ?acceptor.location,
joiner_loc = ?self.request.joiner.location,
ring_distance = ?dist,
Expand Down Expand Up @@ -690,7 +690,7 @@ impl ConnectOp {
match self.state.as_mut() {
Some(ConnectState::WaitingForResponses(state)) => {
tracing::info!(
acceptor_key = %response.acceptor.pub_key(),
acceptor_pub_key = %response.acceptor.pub_key(),
acceptor_loc = ?response.acceptor.location,
"connect: joiner received ConnectResponse"
);
Expand Down Expand Up @@ -829,7 +829,8 @@ impl Operation for ConnectOp {
address,
};
// Route through upstream (where the request came from) since we may
// not have a direct connection to the target
// not have a direct connection to the target.
// Note: upstream_addr is already validated from source_addr at the start of this match arm.
network_bridge
.send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg)))
.await?;
Expand Down Expand Up @@ -873,7 +874,8 @@ impl Operation for ConnectOp {
payload: response,
};
// Route the response through upstream (where the request came from)
// since we may not have a direct connection to the joiner
// since we may not have a direct connection to the joiner.
// Note: upstream_addr is already validated from source_addr at the start of this match arm.
network_bridge
.send(
upstream_addr,
Expand Down Expand Up @@ -966,14 +968,14 @@ impl Operation for ConnectOp {
let mut updated_payload = payload.clone();
updated_payload.acceptor.peer_addr = PeerAddr::Known(acceptor_addr);
tracing::debug!(
acceptor = %updated_payload.acceptor.peer(),
acceptor_pub_key = %updated_payload.acceptor.pub_key(),
acceptor_addr = %acceptor_addr,
"connect: filled acceptor address from source_addr"
);
updated_payload
} else {
tracing::warn!(
acceptor_key = %payload.acceptor.pub_key(),
acceptor_pub_key = %payload.acceptor.pub_key(),
"connect: response received without source_addr, cannot fill acceptor address"
);
payload.clone()
Expand All @@ -984,7 +986,7 @@ impl Operation for ConnectOp {

tracing::debug!(
upstream_addr = %upstream_addr,
acceptor_key = %forward_payload.acceptor.pub_key(),
acceptor_pub_key = %forward_payload.acceptor.pub_key(),
"connect: forwarding response towards joiner"
);
// Forward response toward the joiner via upstream
Expand Down
60 changes: 34 additions & 26 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,14 @@ impl Operation for GetOp {
);

// Use sender_from_addr (looked up from source_addr) instead of message field
let sender = sender_from_addr.clone().expect(
"RequestGet requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: RequestGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

// Check if operation is already completed
if matches!(self.state, Some(GetState::Finished { .. })) {
Expand Down Expand Up @@ -702,9 +707,14 @@ impl Operation for GetOp {
let this_peer = target.clone();

// Use sender_from_addr (looked up from source_addr) instead of message field
let sender = sender_from_addr.clone().expect(
"SeekNode requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: SeekNode without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

if htl == 0 {
let sender_display = sender.peer().to_string();
Expand Down Expand Up @@ -854,23 +864,21 @@ impl Operation for GetOp {
let id = *id;
let key = *key;

// Handle case where sender lookup failed (e.g., peer disconnected)
// Use sender_from_addr for logging
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
source = ?source_addr,
"GET: ReturnGet (empty) received but sender lookup failed - cannot process"
"GET: ReturnGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

// Use pub_key for logging to avoid panics on Unknown addresses
tracing::info!(
tx = %id,
%key,
from = %sender.pub_key(),
to = %target.pub_key(),
from = %sender.peer(),
to = %target.peer(),
skip = ?skip_list,
"GET: ReturnGet received with empty value"
);
Expand All @@ -882,7 +890,7 @@ impl Operation for GetOp {
%this_peer,
"Neither contract or contract value for contract found at peer {}, \
retrying with other peers",
sender.pub_key()
sender.peer()
);

match self.state {
Expand All @@ -901,10 +909,8 @@ impl Operation for GetOp {
}) => {
// todo: register in the stats for the outcome of the op that failed to get a response from this peer

// Add the failed peer to tried list (only if address is known)
if let Some(addr) = sender.socket_addr() {
tried_peers.insert(PeerId::new(addr, sender.pub_key().clone()));
}
// Add the failed peer to tried list
tried_peers.insert(sender.peer().clone());

// First, check if we have alternatives at this hop level
if !alternatives.is_empty() && attempts_at_hop < DEFAULT_MAX_BREADTH {
Expand All @@ -914,7 +920,7 @@ impl Operation for GetOp {
tracing::info!(
tx = %id,
%key,
next_peer = %next_target.pub_key(),
next_peer = %next_target.peer(),
fetch_contract,
attempts_at_hop = attempts_at_hop + 1,
max_attempts = DEFAULT_MAX_BREADTH,
Expand All @@ -932,11 +938,8 @@ impl Operation for GetOp {
skip_list: tried_peers.clone(),
});

// Update state with the new alternative being tried (only if address is known)
if let Some(addr) = next_target.socket_addr() {
tried_peers
.insert(PeerId::new(addr, next_target.pub_key().clone()));
}
// Update state with the new alternative being tried
tried_peers.insert(next_target.peer().clone());
let updated_tried_peers = tried_peers.clone();
new_state = Some(GetState::AwaitingResponse {
retries,
Expand Down Expand Up @@ -1125,9 +1128,14 @@ impl Operation for GetOp {
let key = *key;

// Use sender_from_addr for logging
let sender = sender_from_addr.clone().expect(
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: ReturnGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap());

Expand Down
Loading
Loading