Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6c9be70
feat: Implement Phase 4 - Proximity-based update forwarding (#1848)
sanity Sep 24, 2025
2e9ba4a
fix: Address [Codex] review comments from PR1851
sanity Sep 24, 2025
3d72e88
fix: Address clippy errors for CI
sanity Sep 24, 2025
3e57d48
feat: Implement cache state sync on new peer connections
sanity Sep 24, 2025
6bf9262
feat: Implement periodic batch announcements for proximity cache
sanity Sep 24, 2025
65c039e
fix: Resolve all three critical issues in proximity-based update forw…
sanity Sep 25, 2025
0992b2e
Remove Phase X implementation artifacts from comments
sanity Sep 25, 2025
5451ac4
Address code review feedback from @iduartgomez
sanity Sep 28, 2025
bc7122e
Fix proximity cache announcements and test issues
sanity Sep 30, 2025
d84367f
Add debug logging to diagnose CI test timeout
sanity Sep 30, 2025
630cff1
Increase network stabilization time for CI environment
sanity Sep 30, 2025
daf948f
Increase cache announcement propagation delay for CI
sanity Sep 30, 2025
26f9ce1
Remove debug logging from test
sanity Sep 30, 2025
621be78
refactor: consolidate comments for easier review
sanity Sep 30, 2025
d6a3cac
fix: increase network stabilization delay for CI
sanity Sep 30, 2025
3734ac8
test: add port release delay to prevent connection failures
sanity Oct 1, 2025
17cbc4c
add proximity cache support to OpManager and related components
netsirius Oct 4, 2025
4df12c7
fix: Remove unnecessary async from get_broadcast_targets_update
sanity Oct 5, 2025
764a427
fix: Resolve stack overflow and improve proximity cache broadcasting
sanity Oct 7, 2025
36dfd7c
fix: resolve transport-layer retransmission flooding
sanity Oct 8, 2025
97e97a8
Merge branch 'main' into fix/1848-phase4-proximity-implementation
sanity Oct 8, 2025
bceb641
fix: reduce exponential backoff cap and increase test timeouts
sanity Oct 8, 2025
d02826f
fix: increase test_three_node_network_connectivity timeout to 300s
sanity Oct 9, 2025
84892e4
fix: increase proximity test response timeouts to 120s
sanity Oct 9, 2025
4d2bf81
fix: increase proximity test overall timeout from 300s to 500s
sanity Oct 9, 2025
a4e0a2c
fix: increase network stabilization delay from 45s to 120s
sanity Oct 9, 2025
1c18a14
Merge branch 'main' into fix/1848-phase4-proximity-implementation
sanity Oct 9, 2025
4643550
fix: increase test_three_node_network_connectivity operation timeouts…
sanity Oct 9, 2025
442dda7
fix: prevent orphaned callbacks in handle_connect_peer causing channe…
sanity Oct 9, 2025
93fa53e
refactor: reduce test timeouts after fixing orphaned callback bug
sanity Oct 9, 2025
d024921
refactor: use ContractInstanceId directly in proximity cache instead …
sanity Oct 18, 2025
954a734
Merge branch 'main' into pr-1853-clean-restart
sanity Oct 18, 2025
9ed4ff4
fix: make test_multiple_clients_subscription robust to CI timing
sanity Oct 18, 2025
bea1200
fix: poll for network readiness in test_three_node_network_connectivity
sanity Oct 18, 2025
75e31c2
fix: poll for network readiness in test_proximity_based_update_forwar…
sanity Oct 18, 2025
374cea1
fix: increase retry limits and reduce timeout for test_multiple_clien…
sanity Oct 18, 2025
09ce89e
Merge main to update pr-1853-clean-restart
sanity Oct 18, 2025
c39aa3e
Fix flaky test by setting realistic min_connections for small networks
sanity Oct 18, 2025
2861405
Add WebSocket connection retry logic for test reliability
sanity Oct 19, 2025
f3044f8
fix: reconnect WebSocket clients during test polling
sanity Oct 19, 2025
440bbb8
fix: address proximity cache telemetry and test issues
sanity Oct 19, 2025
17312b2
fix: restore 2-node proximity broadcast workaround with improved docs
sanity Oct 19, 2025
951b22c
Revert "fix: restore 2-node proximity broadcast workaround with impro…
sanity Oct 19, 2025
e52b3c5
fix: Use temp directories in config test to avoid permission errors
sanity Oct 19, 2025
dd5a0fb
fix: set min_connections=1 for small test networks
sanity Oct 20, 2025
15d4cf1
Merge branch 'main' into pr-1853-clean-restart
sanity Oct 20, 2025
1f1f0e0
Merge main to pick up topology manager fix from PR #1963
sanity Oct 20, 2025
c7dd8d2
fix: revert connectivity test debugging changes to match main
sanity Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 104 additions & 4 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ pub async fn client_event_handling<ClientEv>(
mut client_events: ClientEv,
mut client_responses: ClientResponsesReceiver,
node_controller: tokio::sync::mpsc::Sender<NodeEvent>,
proximity_cache: Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> anyhow::Result<Infallible>
where
ClientEv: ClientEventsProxy + Send + 'static,
Expand Down Expand Up @@ -245,7 +246,7 @@ where
}
};
let cli_id = req.client_id;
let res = process_open_request(req, op_manager.clone(), request_router.clone()).await;
let res = process_open_request(req, op_manager.clone(), request_router.clone(), proximity_cache.clone()).await;
results.push(async move {
match res.await {
Ok(Some(Either::Left(res))) => (cli_id, Ok(Some(res))),
Expand Down Expand Up @@ -320,6 +321,9 @@ where
QueryResult::NodeDiagnostics(response) => {
Ok(HostResponse::QueryResponse(QueryResponse::NodeDiagnostics(response)))
}
QueryResult::ProximityCache(proximity_info) => {
Ok(HostResponse::QueryResponse(QueryResponse::ProximityCache(proximity_info)))
}
};
if let Ok(result) = &res {
tracing::debug!(%result, "sending client operation response");
Expand Down Expand Up @@ -356,10 +360,97 @@ enum Error {
}

#[inline]
async fn handle_proximity_cache_info_query(
proximity_cache: &Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> freenet_stdlib::client_api::ProximityCacheInfo {
let (my_cache_contract_ids, neighbor_cache_data) =
proximity_cache.get_introspection_data().await;
let stats = proximity_cache.get_stats().await;

let my_cache = my_cache_contract_ids
.into_iter()
.map(|contract_id| {
use freenet_stdlib::prelude::ContractKey;
let contract_key = ContractKey::from(contract_id);
// Use first 4 bytes for backwards-compatible cache_hash field
let bytes = contract_id.as_bytes();
let cache_hash = u32::from_le_bytes([
bytes.first().copied().unwrap_or(0),
bytes.get(1).copied().unwrap_or(0),
bytes.get(2).copied().unwrap_or(0),
bytes.get(3).copied().unwrap_or(0),
]);

freenet_stdlib::client_api::ContractCacheEntry {
contract_key: contract_key.to_string(),
cache_hash,
cached_since: std::time::SystemTime::now()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cached_since and neighbor.last_update are being set to SystemTime::now() at query time, so every CLI call reports fresh timestamps even if nothing changed. That makes the introspection data look useful but it is effectively random. Could we either carry real timestamps from the cache manager or drop these fields for now so we do not mislead operators?

.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
})
.collect();

let neighbor_caches: Vec<_> = neighbor_cache_data
.into_iter()
.map(|(peer_id, (contract_ids, last_update_time))| {
// Convert ContractInstanceIds to u32 hashes for backwards compatibility
let known_contracts = contract_ids
.into_iter()
.map(|contract_id| {
let bytes = contract_id.as_bytes();
u32::from_le_bytes([
bytes.first().copied().unwrap_or(0),
bytes.get(1).copied().unwrap_or(0),
bytes.get(2).copied().unwrap_or(0),
bytes.get(3).copied().unwrap_or(0),
])
})
.collect();

freenet_stdlib::client_api::NeighborCacheInfo {
peer_id,
known_contracts,
last_update: last_update_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
update_count: 1,
}
})
.collect();

let total_neighbors = neighbor_caches.len();
let total_contracts: usize = neighbor_caches
.iter()
.map(|n| n.known_contracts.len())
.sum();
let avg_cache_size = if total_neighbors > 0 {
total_contracts as f32 / total_neighbors as f32
} else {
0.0
};

freenet_stdlib::client_api::ProximityCacheInfo {
my_cache,
neighbor_caches,
stats: freenet_stdlib::client_api::ProximityStats {
cache_announces_sent: stats.cache_announces_sent,
cache_announces_received: stats.cache_announces_received,
updates_via_proximity: stats.updates_via_proximity,
updates_via_subscription: stats.updates_via_subscription,
false_positive_forwards: stats.false_positive_forwards,
avg_neighbor_cache_size: avg_cache_size,
},
}
}

async fn process_open_request(
mut request: OpenRequest<'static>,
op_manager: Arc<OpManager>,
request_router: Option<Arc<crate::node::RequestRouter>>,
proximity_cache: Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> BoxFuture<'static, Result<Option<Either<QueryResult, mpsc::Receiver<QueryResult>>>, Error>> {
let (callback_tx, callback_rx) = if matches!(
&*request.request,
Expand Down Expand Up @@ -1239,6 +1330,17 @@ async fn process_open_request(
ClientRequest::NodeQueries(query) => {
tracing::debug!("Received node queries from user event: {:?}", query);

if matches!(
query,
freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo
) {
let proximity_info = handle_proximity_cache_info_query(&proximity_cache).await;
return Ok(Some(Either::Left(QueryResult::ProximityCache(
proximity_info,
))));
}

// For other queries, we need to use the callback_tx
let Some(tx) = callback_tx else {
tracing::error!("callback_tx not available for NodeQueries");
unreachable!("callback_tx should always be Some for NodeQueries based on initialization logic");
Expand All @@ -1258,9 +1360,7 @@ async fn process_open_request(
}
}
freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo => {
// TODO: Implement proximity cache info query
tracing::warn!("ProximityCacheInfo query not yet implemented");
return Ok(None);
unreachable!("ProximityCacheInfo handled above")
}
};

Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,18 @@ mod tests {

#[tokio::test]
async fn test_serde_config_args() {
let temp_dir = tempfile::tempdir().unwrap();
let config_dir = temp_dir.path().join("config");
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&config_dir).unwrap();
std::fs::create_dir_all(&data_dir).unwrap();

let args = ConfigArgs {
mode: Some(OperationMode::Local),
config_paths: ConfigPathsArgs {
config_dir: Some(config_dir),
data_dir: Some(data_dir),
},
..Default::default()
};
let cfg = args.build().await.unwrap();
Expand Down
25 changes: 24 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use crate::{
client_events::{ClientId, HostResult},
node::PeerId,
node::{proximity_cache::ProximityCacheMessage, PeerId},
operations::{
connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg,
},
Expand Down Expand Up @@ -255,6 +255,10 @@ pub(crate) enum NetMessageV1 {
},
Update(UpdateMsg),
Aborted(Transaction),
ProximityCache {
from: PeerId,
message: ProximityCacheMessage,
},
}

trait Versioned {
Expand All @@ -279,6 +283,7 @@ impl Versioned for NetMessageV1 {
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
}
}
}
Expand Down Expand Up @@ -334,6 +339,11 @@ pub(crate) enum NodeEvent {
key: ContractKey,
subscribed: bool,
},
/// Broadcast a ProximityCache message to all connected peers
BroadcastProximityCache {
from: PeerId,
message: crate::node::proximity_cache::ProximityCacheMessage,
},
/// Send a message to a peer over the network
SendMessage {
target: PeerId,
Expand Down Expand Up @@ -373,6 +383,7 @@ pub(crate) enum QueryResult {
},
NetworkDebug(NetworkDebugInfo),
NodeDiagnostics(freenet_stdlib::client_api::NodeDiagnosticsResponse),
ProximityCache(freenet_stdlib::client_api::ProximityCacheInfo),
}

impl Display for NodeEvent {
Expand Down Expand Up @@ -415,6 +426,9 @@ impl Display for NodeEvent {
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
)
}
NodeEvent::BroadcastProximityCache { from, .. } => {
write!(f, "BroadcastProximityCache (from {from})")
}
NodeEvent::SendMessage { target, msg } => {
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
}
Expand Down Expand Up @@ -452,6 +466,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.id(),
NetMessageV1::Aborted(tx) => tx,
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
}
}

Expand All @@ -464,6 +479,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}

Expand All @@ -476,6 +492,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.requested_location(),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}
}
Expand All @@ -495,6 +512,12 @@ impl Display for NetMessage {
Unsubscribed { key, from, .. } => {
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
}
ProximityCache { from, message } => {
write!(
f,
"ProximityCache {{ from: {from}, message: {message:?} }}"
)?;
}
},
};
write!(f, "}}")
Expand Down
52 changes: 52 additions & 0 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mod message_processor;
mod network_bridge;
mod op_state_manager;
mod p2p_impl;
pub(crate) mod proximity_cache;
mod request_router;
pub(crate) mod testing_impl;

Expand Down Expand Up @@ -369,6 +370,13 @@ impl NodeConfig {
Ok(gateways)
}
}

/// Set minimum number of connections for this node.
/// Useful for tests with small networks where the default (25) is unrealistic.
pub fn with_min_connections(mut self, min_connections: usize) -> Self {
self.min_number_conn = Some(min_connections);
self
}
}

/// Gateway node to use for joining the network.
Expand Down Expand Up @@ -808,6 +816,28 @@ async fn process_message_v1<CB>(
}
break;
}
NetMessageV1::ProximityCache { from, message } => {
// Handle proximity cache messages
if let Some(proximity_cache) = &op_manager.proximity_cache {
if let Some(response) =
proximity_cache.handle_message(from.clone(), message).await
{
// Send response back to the peer
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.own_location().peer,
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(
"Failed to send proximity cache response to {}: {}",
from,
err
);
}
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down Expand Up @@ -1024,6 +1054,28 @@ where
}
break;
}
NetMessageV1::ProximityCache { from, message } => {
// Handle proximity cache messages
if let Some(proximity_cache) = &op_manager.proximity_cache {
if let Some(response) =
proximity_cache.handle_message(from.clone(), message).await
{
// Send response back to the peer
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.own_location().peer,
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(
"Failed to send proximity cache response to {}: {}",
from,
err
);
}
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down
Loading
Loading