Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
edd79da
ci: trigger workflow
sanity Nov 29, 2025
1b6e365
ci: trigger workflow
sanity Nov 29, 2025
fba6604
ci: trigger workflow
sanity Nov 29, 2025
ed5586f
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
104a03f
ci: trigger workflow
sanity Nov 29, 2025
3f9a6f3
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
522a964
refactor: migrate PeerKeyLocation field accesses to use new methods
sanity Nov 27, 2025
3d473ed
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
4aae58d
refactor: use ObservedAddr newtype for source_addr throughout
sanity Nov 29, 2025
2e57361
ci: trigger workflow
sanity Nov 29, 2025
0ef40bc
fix: resolve post-rebase compilation errors
sanity Nov 29, 2025
76d75c6
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
7f6ad28
fix: use upstream_addr for subscribe operation NAT routing
sanity Nov 29, 2025
31a6d9a
fix: route connect responses through upstream_addr
sanity Nov 29, 2025
74f4e77
fix: handle Unknown addresses in get.rs to avoid panics
sanity Nov 30, 2025
d1bf045
fix: resolve compilation errors after rebase
sanity Dec 1, 2025
83ce650
ci: trigger workflow
sanity Nov 29, 2025
132e808
ci: trigger workflow
sanity Nov 29, 2025
241ca2f
ci: trigger workflow
sanity Nov 29, 2025
bf7f8e6
ci: trigger workflow
sanity Nov 29, 2025
be780a5
ci: trigger workflow
sanity Nov 29, 2025
e492dba
ci: trigger workflow
sanity Nov 29, 2025
8b6fb72
ci: trigger workflow
sanity Nov 29, 2025
2f1c113
ci: trigger workflow
sanity Nov 29, 2025
a2c8cd0
ci: trigger workflow
sanity Nov 29, 2025
4230fb0
ci: trigger workflow
sanity Nov 29, 2025
8e93d31
ci: trigger workflow
sanity Nov 29, 2025
b56cbfc
ci: trigger workflow
sanity Nov 29, 2025
f2d9b18
ci: trigger workflow
sanity Nov 29, 2025
2e66fb6
ci: trigger workflow
sanity Nov 29, 2025
5de8a58
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
6c6f82d
ci: trigger workflow
sanity Nov 29, 2025
e84c85f
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
bd30d98
ci: trigger workflow
sanity Nov 29, 2025
cecf6e1
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
de134e5
ci: trigger workflow
sanity Nov 29, 2025
4764ae9
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
a9c4e8d
fix: use ObservedAddr newtype for NAT routing in seeding
sanity Nov 29, 2025
86be6f3
fix: change implementation-detail logs from info to debug
sanity Nov 29, 2025
56893c9
fix: correct add_subscriber upstream_addr usage per Claude review
sanity Nov 30, 2025
379ccee
ci: retrigger CI
sanity Nov 30, 2025
741ad39
ci: retrigger CI
sanity Nov 30, 2025
8614d58
fix: resolve compilation errors after rebase
sanity Dec 1, 2025
9db2219
fix: use pub_key() instead of peer() in tracing for acceptors
sanity Dec 1, 2025
8bd39fc
fix: address code review feedback from PR #2172
sanity Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ where
self.op_manager.ring.seed_contract(key);
}
if let Some(subscribers) = contract_subscribers.get(&key) {
// add contract subscribers
// add contract subscribers (test setup - no upstream_addr)
for subscriber in subscribers {
if self
.op_manager
.ring
.add_subscriber(&key, subscriber.clone())
.add_subscriber(&key, subscriber.clone(), None)
.is_err()
{
tracing::warn!("Max subscribers for contract {} reached", key);
Expand Down
50 changes: 33 additions & 17 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ impl ConnectOp {
match self.state.as_mut() {
Some(ConnectState::WaitingForResponses(state)) => {
tracing::info!(
acceptor = %response.acceptor.peer(),
acceptor_pub_key = %response.acceptor.pub_key(),
acceptor_loc = ?response.acceptor.location,
"connect: joiner received ConnectResponse"
);
Expand Down Expand Up @@ -830,11 +830,21 @@ impl Operation for ConnectOp {
};
// Route through upstream (where the request came from) since we may
// not have a direct connection to the target
if let Some(upstream) = source_addr {
network_bridge
.send(upstream, NetMessage::V1(NetMessageV1::Connect(msg)))
.await?;
}
let Some(upstream) = source_addr else {
tracing::warn!(
tx = %self.id,
"ObservedAddress message has no upstream - was this locally initiated?"
);
// No upstream to route through - this shouldn't happen for relayed connections
return Ok(OperationResult {
return_msg: None,
target_addr: None,
state: Some(OpEnum::Connect(Box::new(self))),
});
};
network_bridge
.send(upstream, NetMessage::V1(NetMessageV1::Connect(msg)))
.await?;
}

if let Some(peer) = actions.expect_connection_from {
Expand Down Expand Up @@ -876,14 +886,20 @@ impl Operation for ConnectOp {
};
// Route the response through upstream (where the request came from)
// since we may not have a direct connection to the joiner
if let Some(upstream) = source_addr {
network_bridge
.send(
upstream,
NetMessage::V1(NetMessageV1::Connect(response_msg)),
)
.await?;
}
let Some(upstream) = source_addr else {
tracing::warn!(
tx = %self.id,
"ConnectResponse has no upstream - was this locally initiated?"
);
// No upstream to route through - this shouldn't happen for relayed connections
return Ok(store_operation_state(&mut self));
};
network_bridge
.send(
upstream,
NetMessage::V1(NetMessageV1::Connect(response_msg)),
)
.await?;
return Ok(store_operation_state(&mut self));
}

Expand Down Expand Up @@ -970,14 +986,14 @@ impl Operation for ConnectOp {
let mut updated_payload = payload.clone();
updated_payload.acceptor.peer_addr = PeerAddr::Known(acceptor_addr);
tracing::debug!(
acceptor = %updated_payload.acceptor.peer(),
acceptor_pub_key = %updated_payload.acceptor.pub_key(),
acceptor_addr = %acceptor_addr,
"connect: filled acceptor address from source_addr"
);
updated_payload
} else {
tracing::warn!(
acceptor = %payload.acceptor.peer(),
acceptor_pub_key = %payload.acceptor.pub_key(),
"connect: response received without source_addr, cannot fill acceptor address"
);
payload.clone()
Expand All @@ -988,7 +1004,7 @@ impl Operation for ConnectOp {

tracing::debug!(
upstream_addr = %upstream_addr,
acceptor = %forward_payload.acceptor.peer(),
acceptor_pub_key = %forward_payload.acceptor.pub_key(),
"connect: forwarding response towards joiner"
);
// Forward response toward the joiner via upstream
Expand Down
44 changes: 32 additions & 12 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,14 @@ impl Operation for GetOp {
);

// Use sender_from_addr (looked up from source_addr) instead of message field
let sender = sender_from_addr.clone().expect(
"RequestGet requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: RequestGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

// Check if operation is already completed
if matches!(self.state, Some(GetState::Finished { .. })) {
Expand Down Expand Up @@ -702,9 +707,14 @@ impl Operation for GetOp {
let this_peer = target.clone();

// Use sender_from_addr (looked up from source_addr) instead of message field
let sender = sender_from_addr.clone().expect(
"SeekNode requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: SeekNode without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

if htl == 0 {
let sender_display = sender.peer().to_string();
Expand Down Expand Up @@ -855,9 +865,14 @@ impl Operation for GetOp {
let key = *key;

// Use sender_from_addr for logging
let sender = sender_from_addr.clone().expect(
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: ReturnGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

tracing::info!(
tx = %id,
Expand Down Expand Up @@ -1113,9 +1128,14 @@ impl Operation for GetOp {
let key = *key;

// Use sender_from_addr for logging
let sender = sender_from_addr.clone().expect(
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
);
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
"GET: ReturnGet without sender lookup - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap());

Expand Down
89 changes: 65 additions & 24 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
message::{InnerMessage, NetMessage, Transaction},
node::{NetworkBridge, OpManager, PeerId},
ring::{Location, PeerKeyLocation, RingError},
transport::ObservedAddr,
};
use freenet_stdlib::{
client_api::{ContractResponse, ErrorKind, HostResponse},
Expand Down Expand Up @@ -274,7 +275,11 @@ async fn complete_local_subscription(
key: ContractKey,
) -> Result<(), OpError> {
let subscriber = op_manager.ring.connection_manager.own_location();
if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) {
// Local subscription - no upstream_addr needed since it's our own peer
if let Err(err) = op_manager
.ring
.add_subscriber(&key, subscriber.clone(), None)
{
tracing::warn!(
%key,
tx = %id,
Expand Down Expand Up @@ -305,7 +310,7 @@ pub(crate) struct SubscribeOp {
state: Option<SubscribeState>,
/// The address we received this operation's message from.
/// Used for connection-based routing: responses are sent back to this address.
upstream_addr: Option<std::net::SocketAddr>,
upstream_addr: Option<ObservedAddr>,
}

impl SubscribeOp {
Expand Down Expand Up @@ -359,11 +364,16 @@ impl Operation for SubscribeOp {
}
Ok(None) => {
// new request to subscribe to a contract, initialize the machine
tracing::debug!(
tx = %id,
?source_addr,
"subscribe: load_or_init creating new op with source_addr as upstream_addr"
);
Ok(OpInitialization {
op: Self {
state: Some(SubscribeState::ReceivedRequest),
id,
upstream_addr: source_addr, // Connection-based routing: store who sent us this request
upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request
},
source_addr,
})
Expand Down Expand Up @@ -403,28 +413,29 @@ impl Operation for SubscribeOp {
target: _,
subscriber,
} => {
// Fill in subscriber's external address from transport layer if unknown.
// This is the key step where the first recipient (gateway) determines the
// subscriber's external address from the actual packet source address.
// ALWAYS use the transport-level source address when available.
// This is critical for NAT peers: they may embed a "known" but wrong address
// (e.g., 127.0.0.1:31337 for loopback). The transport address is the only
// reliable way to route responses back through the NAT.
let mut subscriber = subscriber.clone();
if subscriber.peer_addr.is_unknown() {
if let Some(addr) = source_addr {
subscriber.set_addr(addr);
tracing::debug!(
tx = %id,
%key,
subscriber_addr = %addr,
"subscribe: filled subscriber address from source_addr"
);
}
}

tracing::debug!(
tx = %id,
%key,
subscriber = %subscriber.peer(),
subscriber_orig = %subscriber.peer(),
source_addr = ?source_addr,
"subscribe: processing RequestSub"
);

if let Some(addr) = source_addr {
subscriber.set_addr(addr);
tracing::debug!(
tx = %id,
%key,
subscriber_updated = %subscriber.peer(),
"subscribe: updated subscriber address from transport source"
);
}
let own_loc = op_manager.ring.connection_manager.own_location();

if !matches!(
Expand All @@ -451,9 +462,10 @@ impl Operation for SubscribeOp {
"subscribe: handling RequestSub locally (contract available)"
);

// Use upstream_addr for NAT routing - subscriber may embed wrong address
if op_manager
.ring
.add_subscriber(key, subscriber.clone())
.add_subscriber(key, subscriber.clone(), self.upstream_addr)
.is_err()
Comment on lines +465 to 469
Copy link

Copilot AI Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment and usage of self.upstream_addr are misleading. When handling a RequestSub, this is always the first hop (Gateway receiving from original subscriber). The subscriber's address was already corrected from source_addr at line 422, so it no longer embeds a "wrong address". Additionally, self.upstream_addr was just set to the same source_addr at line 376 when the operation was initialized.

For better code clarity, this should be:

// Subscriber address already updated from source_addr at line 422
if op_manager
    .ring
    .add_subscriber(key, subscriber.clone(), None)
    .is_err()

Copilot uses AI. Check for mistakes.
{
tracing::warn!(
Expand Down Expand Up @@ -520,6 +532,13 @@ impl Operation for SubscribeOp {
subscribed: true,
};

tracing::debug!(
tx = %id,
%key,
upstream_addr = ?self.upstream_addr,
"subscribe: creating ReturnSub with upstream_addr"
);

return build_op_result(
self.id,
None,
Expand Down Expand Up @@ -722,9 +741,13 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_direct,
"subscribe: attempting to register direct subscriber"
);
// Pass None: subscriber address was already corrected by Gateway at the
// start of the subscribe flow. Using self.upstream_addr here would
// incorrectly overwrite with the forwarder's address instead of the
// original subscriber's Gateway-corrected address.
if op_manager
.ring
.add_subscriber(key, subscriber.clone())
.add_subscriber(key, subscriber.clone(), None)
.is_err()
{
tracing::warn!(
Expand Down Expand Up @@ -872,9 +895,10 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_upstream,
"subscribe: attempting to register upstream link"
);
// upstream_subscriber was stored in op state, no transport address available
if op_manager
.ring
.add_subscriber(key, upstream_subscriber.clone())
.add_subscriber(key, upstream_subscriber.clone(), None)
.is_err()
{
tracing::warn!(
Expand Down Expand Up @@ -904,7 +928,15 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_provider,
"subscribe: registering provider/subscription source"
);
if op_manager.ring.add_subscriber(key, sender.clone()).is_err() {
// Pass None: sender was already looked up from source_addr (line ~866),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really confussing, whenever you add a subscriber, yolu should know its external address. this shouldn't be optional, and that address should be clearly identified and embedded in the message.

// so it has the correct transport address. Using self.upstream_addr
// would incorrectly use the original requester's address instead of
// the provider's address.
if op_manager
.ring
.add_subscriber(key, sender.clone(), None)
.is_err()
{
// concurrently it reached max number of subscribers for this contract
tracing::debug!(
tx = %id,
Expand Down Expand Up @@ -961,17 +993,26 @@ fn build_op_result(
id: Transaction,
state: Option<SubscribeState>,
msg: Option<SubscribeMsg>,
upstream_addr: Option<std::net::SocketAddr>,
upstream_addr: Option<ObservedAddr>,
) -> Result<OperationResult, OpError> {
// For response messages (ReturnSub), use upstream_addr directly for routing.
// This is more reliable than extracting from the message's target field, which
// may have been looked up from connection_manager (subject to race conditions).
// For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target.
let target_addr = match &msg {
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr,
// Convert ObservedAddr to SocketAddr at the transport boundary
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()),
_ => msg.as_ref().and_then(|m| m.target_addr()),
};

tracing::debug!(
tx = %id,
msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)),
?upstream_addr,
?target_addr,
"build_op_result: computed target_addr"
);

let output_op = state.map(|state| SubscribeOp {
id,
state: Some(state),
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,10 +1005,10 @@ pub(crate) async fn request_update(
.closest_potentially_caching(&key, [sender.peer().clone()].as_slice());

if let Some(target) = remote_target {
// Subscribe to the contract
// Subscribe on behalf of the requesting peer (no upstream_addr - direct registration)
op_manager
.ring
.add_subscriber(&key, sender.clone())
.add_subscriber(&key, sender.clone(), None)
.map_err(|_| RingError::NoCachingPeers(key))?;

target
Expand Down
Loading
Loading