Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Run these in any worktree before pushing a branch or opening a PR.
```
- Tests can share the static network and access `NETWORK.gateway(0).ws_url()` to communicate via `freenet_stdlib::client_api::WebApi`.
- Run the crate’s suite with `cargo test -p freenet-test-network`. When `preserve_temp_dirs_on_failure(true)` is set, failing startups keep logs under `/tmp/freenet-test-network-<timestamp>/` for inspection.
- A larger soak test lives in `crates/core/tests/large_network.rs`. It is `#[ignore]` by default—run it manually with `cargo test -p freenet --test large_network -- --ignored --nocapture` once you have `riverctl` installed. The test writes diagnostics snapshots to the network’s `run_root()` directory for later analysis.

## Pull Requests & Reviews

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ xz2 = { version = "0.1" }
reqwest = { version = "0.12", features = ["json"] }
rsa = { version = "0.9", features = ["serde", "pem"] }
pkcs8 = { version = "0.10", features = ["std", "pem"] }
sha2 = "0.10"

# Tracing deps
opentelemetry = "0.31"
Expand Down
83 changes: 39 additions & 44 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ pub(crate) enum Event {
transaction: Option<Transaction>,
peer: Option<PeerId>,
connection: PeerConnection,
transient: bool,
courtesy: bool,
},
/// An outbound connection attempt succeeded.
OutboundEstablished {
transaction: Transaction,
peer: PeerId,
connection: PeerConnection,
transient: bool,
courtesy: bool,
},
/// An outbound connection attempt failed.
OutboundFailed {
transaction: Transaction,
peer: PeerId,
error: ConnectionError,
transient: bool,
courtesy: bool,
},
}

Expand All @@ -56,13 +56,13 @@ pub(crate) enum Command {
Connect {
peer: PeerId,
transaction: Transaction,
transient: bool,
courtesy: bool,
},
/// Register expectation for an inbound connection from `peer`.
ExpectInbound {
peer: PeerId,
transaction: Option<Transaction>,
transient: bool,
courtesy: bool,
},
/// Remove state associated with `peer`.
DropConnection { peer: PeerId },
Expand Down Expand Up @@ -122,69 +122,64 @@ impl Stream for HandshakeHandler {
struct ExpectedInbound {
peer: PeerId,
transaction: Option<Transaction>,
transient: bool, // TODO: rename to transient in protocol once we migrate terminology
courtesy: bool,
}

#[derive(Default)]
struct ExpectedInboundTracker {
// Keyed by remote IP to tolerate port changes; multiple expectations per IP
// are tracked and deduped by port.
entries: HashMap<IpAddr, Vec<ExpectedInbound>>,
}

impl ExpectedInboundTracker {
fn register(&mut self, peer: PeerId, transaction: Option<Transaction>, transient: bool) {
fn register(&mut self, peer: PeerId, transaction: Option<Transaction>, courtesy: bool) {
tracing::debug!(
remote = %peer.addr,
transient,
courtesy,
tx = ?transaction,
"ExpectInbound: registering expectation"
);
let list = self.entries.entry(peer.addr.ip()).or_default();
// Replace any existing expectation for the same peer/port to ensure the newest registration wins.
// Replace any existing expectation for the same peer/port so the newest wins.
list.retain(|entry| entry.peer.addr.port() != peer.addr.port());
list.push(ExpectedInbound {
peer,
transaction,
transient,
courtesy,
});
}

fn drop_peer(&mut self, peer: &PeerId) {
if let Some(list) = self.entries.get_mut(&peer.addr.ip()) {
list.retain(|entry| entry.peer != *peer);
list.retain(|entry| entry.peer.addr.port() != peer.addr.port());
if list.is_empty() {
self.entries.remove(&peer.addr.ip());
}
}
}

fn consume(&mut self, addr: SocketAddr) -> Option<ExpectedInbound> {
let ip = addr.ip();
let list = self.entries.get_mut(&ip)?;
if let Some(pos) = list
let list = self.entries.get_mut(&addr.ip())?;
let pos = list
.iter()
.position(|entry| entry.peer.addr.port() == addr.port())
{
let entry = list.remove(pos);
if list.is_empty() {
self.entries.remove(&ip);
}
tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by exact port");
return Some(entry);
}
let entry = list.pop();
.position(|entry| entry.peer.addr.port() == addr.port())?;
let entry = list.swap_remove(pos);
if list.is_empty() {
self.entries.remove(&ip);
self.entries.remove(&addr.ip());
}
if let Some(entry) = entry {
tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by IP fallback");
return Some(entry);
}
None
Some(entry)
}

#[cfg(test)]
fn contains(&self, addr: SocketAddr) -> bool {
self.entries.contains_key(&addr.ip())
self.entries
.get(&addr.ip())
.map(|list| {
list.iter()
.any(|entry| entry.peer.addr.port() == addr.port())
})
.unwrap_or(false)
}
}

Expand All @@ -202,12 +197,12 @@ async fn run_driver(
loop {
select! {
command = commands_rx.recv() => match command {
Some(Command::Connect { peer, transaction, transient }) => {
spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, transient, peer_ready.clone());
Some(Command::Connect { peer, transaction, courtesy }) => {
spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, courtesy, peer_ready.clone());
}
Some(Command::ExpectInbound { peer, transaction, courtesy }) => {
expected_inbound.register(peer, transaction, courtesy);
}
Some(Command::ExpectInbound { peer, transaction, transient }) => {
expected_inbound.register(peer, transaction, transient /* transient */);
}
Some(Command::DropConnection { peer }) => {
expected_inbound.drop_peer(&peer);
}
Expand All @@ -222,8 +217,8 @@ async fn run_driver(

let remote_addr = conn.remote_addr();
let entry = expected_inbound.consume(remote_addr);
let (peer, transaction, transient) = if let Some(entry) = entry {
(Some(entry.peer), entry.transaction, entry.transient)
let (peer, transaction, courtesy) = if let Some(entry) = entry {
(Some(entry.peer), entry.transaction, entry.courtesy)
} else {
(None, None, false)
};
Expand All @@ -232,7 +227,7 @@ async fn run_driver(
transaction,
peer,
connection: conn,
transient,
courtesy,
}).await.is_err() {
break;
}
Expand All @@ -249,7 +244,7 @@ fn spawn_outbound(
events_tx: mpsc::Sender<Event>,
peer: PeerId,
transaction: Transaction,
transient: bool,
courtesy: bool,
peer_ready: Option<Arc<std::sync::atomic::AtomicBool>>,
) {
tokio::spawn(async move {
Expand All @@ -273,13 +268,13 @@ fn spawn_outbound(
transaction,
peer: peer.clone(),
connection,
transient,
courtesy,
},
Err(error) => Event::OutboundFailed {
transaction,
peer: peer.clone(),
error,
transient,
courtesy,
},
};

Expand Down Expand Up @@ -312,7 +307,7 @@ mod tests {
.expect("expected registered inbound entry");
assert_eq!(entry.peer, peer);
assert_eq!(entry.transaction, Some(tx));
assert!(entry.transient);
assert!(entry.courtesy);
assert!(tracker.consume(peer.addr).is_none());
}

Expand Down Expand Up @@ -340,6 +335,6 @@ mod tests {
.consume(peer.addr)
.expect("entry should be present after overwrite");
assert_eq!(entry.transaction, Some(new_tx));
assert!(entry.transient);
assert!(entry.courtesy);
}
}
Loading
Loading