Skip to content
Merged
9 changes: 9 additions & 0 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl ExpectedInboundTracker {
fn contains(&self, addr: SocketAddr) -> bool {
self.entries.contains_key(&addr)
}

#[cfg(test)]
fn transactions_for(&self, addr: SocketAddr) -> Option<Vec<Option<Transaction>>> {
self.entries.get(&addr).map(|entry| vec![entry.transaction])
}
}

async fn run_driver(
Expand Down Expand Up @@ -324,6 +329,10 @@ mod tests {
tracker.register(peer.clone(), None, false);
let new_tx = Transaction::new::<ConnectMsg>();
tracker.register(peer.clone(), Some(new_tx), true);
let transactions = tracker
.transactions_for(peer.addr)
.expect("entry should exist");
assert_eq!(transactions, vec![Some(new_tx)]);

let entry = tracker
.consume(peer.addr)
Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use dashmap::{DashMap, DashSet};
use either::Either;
use freenet_stdlib::prelude::ContractKey;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tracing::Instrument;

Expand All @@ -26,7 +27,8 @@ use crate::{
message::{MessageStats, NetMessage, NodeEvent, Transaction, TransactionType},
node::PeerId,
operations::{
get::GetOp, put::PutOp, subscribe::SubscribeOp, update::UpdateOp, OpEnum, OpError,
connect::ConnectForwardEstimator, get::GetOp, put::PutOp, subscribe::SubscribeOp,
update::UpdateOp, OpEnum, OpError,
},
ring::{ConnectionManager, LiveTransactionTracker, Ring},
};
Expand Down Expand Up @@ -203,6 +205,7 @@ pub(crate) struct OpManager {
pub ch_outbound: Arc<ContractHandlerChannel<SenderHalve>>,
new_transactions: tokio::sync::mpsc::Sender<Transaction>,
pub result_router_tx: mpsc::Sender<(Transaction, HostResult)>,
pub(crate) connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
/// Indicates whether the peer is ready to process client operations.
/// For gateways: always true (peer_id is set from config)
/// For regular peers: true only after first successful network handshake sets peer_id
Expand All @@ -222,6 +225,7 @@ impl Clone for OpManager {
ch_outbound: self.ch_outbound.clone(),
new_transactions: self.new_transactions.clone(),
result_router_tx: self.result_router_tx.clone(),
connect_forward_estimator: self.connect_forward_estimator.clone(),
peer_ready: self.peer_ready.clone(),
is_gateway: self.is_gateway,
sub_op_tracker: self.sub_op_tracker.clone(),
Expand Down Expand Up @@ -255,6 +259,7 @@ impl OpManager {
tracing::info_span!(parent: current_span, "garbage_cleanup_task")
};
let sub_op_tracker = SubOperationTracker::new();
let connect_forward_estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new()));

GlobalExecutor::spawn(
garbage_cleanup_task(
Expand Down Expand Up @@ -287,6 +292,7 @@ impl OpManager {
ch_outbound: Arc::new(ch_outbound),
new_transactions,
result_router_tx,
connect_forward_estimator,
peer_ready,
is_gateway,
sub_op_tracker,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl NodeP2P {
ideal_location,
ttl,
target_connections,
self.op_manager.connect_forward_estimator.clone(),
);

tracing::debug!(
Expand Down
Loading
Loading