Skip to content
Merged
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,28 @@ impl Executor<Runtime> {
Ok(())
}

/// Delivers update notifications to LOCAL client subscriptions via websocket channels.
///
/// # Architecture: Local vs Network Subscriptions
///
/// Freenet uses two distinct subscription delivery mechanisms:
///
/// 1. **Local subscriptions** (handled here): Clients connected to this node's websocket
/// API register via `register_contract_notifier()`. Updates are delivered directly
/// through executor notification channels stored in `self.update_notifications`.
/// This path is triggered by `LocalSubscribeComplete` events (see `message.rs`).
///
/// 2. **Network subscriptions** (handled in `operations/update.rs`): Remote peers
/// subscribe via `ring.seeding_manager.subscribers`. Updates propagate through
/// the network via `get_broadcast_targets_update()` which filters network peers
/// to receive UPDATE messages.
///
/// This separation allows local clients to receive updates immediately without
/// network round-trips, while network subscriptions follow the peer-to-peer protocol.
///
/// See also:
/// - `operations/subscribe.rs::complete_local_subscription()` - triggers LocalSubscribeComplete
/// - `operations/update.rs::get_broadcast_targets_update()` - network subscription routing
async fn send_update_notification(
&mut self,
key: &ContractKey,
Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,18 @@ pub(crate) enum NodeEvent {
TransactionTimedOut(Transaction),
/// Transaction completed successfully - cleanup client subscription
TransactionCompleted(Transaction),
/// Local subscription completed - deliver SubscribeResponse to client via result router
/// Local subscription completed - deliver SubscribeResponse to client via result router.
///
/// **Architecture Note (Issue #2075):**
/// This event is part of the decoupled subscription architecture. Local client subscriptions
/// are handled separately from network peer subscriptions:
/// - This event notifies the client layer that a subscription request has been processed
/// - Subsequent contract updates are delivered via the executor's `update_notifications`
/// channels (see `send_update_notification` in runtime.rs)
/// - Network peer subscriptions use the `seeding_manager.subscribers` for UPDATE propagation
///
/// This separation keeps the ops/ module (network operations) independent from the
/// client_events/ module (local WebSocket client handling).
LocalSubscribeComplete {
tx: Transaction,
key: ContractKey,
Expand Down
46 changes: 21 additions & 25 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,36 +277,32 @@ pub(crate) async fn request_subscribe(
Ok(())
}

/// Complete a local subscription by notifying the client layer.
///
/// **Architecture Note (Issue #2075):**
/// Local client subscriptions are deliberately kept separate from network subscriptions:
/// - **Network subscriptions** are stored in `ring.seeding_manager.subscribers` and are used
/// for peer-to-peer UPDATE propagation between nodes
/// - **Local subscriptions** are managed by the contract executor via `update_notifications`
/// channels, which deliver `UpdateNotification` directly to WebSocket clients
///
/// This separation eliminates the need for workarounds like the previous `allow_self` hack
/// in `get_broadcast_targets_update()`, and ensures clean architectural boundaries between
/// the network layer (ops/) and the client layer (client_events/).
async fn complete_local_subscription(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
) -> Result<(), OpError> {
let subscriber = op_manager.ring.connection_manager.own_location();
let subscriber_addr = subscriber
.socket_addr()
.expect("own location must have socket address");
// Local subscription - no upstream NAT address
if let Err(err) = op_manager
.ring
.add_subscriber(&key, subscriber.clone(), None)
{
tracing::warn!(
%key,
tx = %id,
subscriber = %subscriber_addr,
error = ?err,
"Failed to register local subscriber"
);
} else {
tracing::debug!(
%key,
tx = %id,
subscriber = %subscriber_addr,
"Registered local subscriber"
);
}

tracing::debug!(
%key,
tx = %id,
"Local subscription completed - client will receive updates via executor notification channel"
);

// Notify client layer that subscription is complete.
// The actual update delivery happens through the executor's update_notifications
// when contract state changes, not through network broadcast targets.
op_manager
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
tx: id,
Expand Down
24 changes: 11 additions & 13 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,15 @@ async fn try_to_broadcast(
}

impl OpManager {
/// Get the list of network subscribers to broadcast an UPDATE to.
///
/// **Architecture Note (Issue #2075):**
/// This function returns only **network peer** subscribers, not local client subscriptions.
/// Local clients receive updates through a separate path via the contract executor's
/// `update_notifications` channels (see `send_update_notification` in runtime.rs).
///
/// This clean separation eliminates the previous `allow_self` workaround that was needed
/// when local subscriptions were mixed with network subscriptions.
pub(crate) fn get_broadcast_targets_update(
&self,
key: &ContractKey,
Expand All @@ -750,21 +759,10 @@ impl OpManager {
.ring
.subscribers_of(key)
.map(|subs| {
let self_addr = self.ring.connection_manager.get_own_addr();
let allow_self = self_addr.as_ref().map(|me| me == sender).unwrap_or(false);
subs.value()
.iter()
.filter(|pk| {
// Allow the sender (or ourselves) to stay in the broadcast list when we're
// originating the UPDATE so local auto-subscribes still receive events.
let is_sender = pk.socket_addr().as_ref() == Some(sender);
let is_self = self_addr.as_ref() == pk.socket_addr().as_ref();
if is_sender || is_self {
allow_self
} else {
true
}
})
// Filter out the sender to avoid sending the update back to where it came from
.filter(|pk| pk.socket_addr().as_ref() != Some(sender))
.cloned()
.collect::<Vec<_>>()
})
Expand Down
74 changes: 74 additions & 0 deletions crates/core/src/ring/seeding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,78 @@ mod tests {

assert!(seeding_manager.is_seeding_contract(&key));
}

/// Test that validates the broadcast target filtering logic used by
/// `get_broadcast_targets_update` in update.rs.
///
/// **Architecture Note (Issue #2075):**
/// After decoupling local from network subscriptions, `get_broadcast_targets_update`
/// simply filters out the sender from the subscriber list. This test validates
/// that the seeding_manager correctly stores and retrieves network subscribers,
/// which is the foundation for UPDATE broadcast targeting.
#[test]
fn test_subscribers_for_broadcast_targeting() {
let seeding_manager = SeedingManager::new();
let contract_key = ContractKey::from(ContractInstanceId::new([3u8; 32]));

// Create network peers (not local clients)
let peer1 = test_peer_id(1);
let peer2 = test_peer_id(2);
let peer3 = test_peer_id(3);

let peer_loc1 = PeerKeyLocation::new(peer1.pub_key.clone(), peer1.addr);
let peer_loc2 = PeerKeyLocation::new(peer2.pub_key.clone(), peer2.addr);
let peer_loc3 = PeerKeyLocation::new(peer3.pub_key.clone(), peer3.addr);

// Register network subscribers
seeding_manager
.add_subscriber(&contract_key, peer_loc1.clone(), None)
.expect("should add peer1");
seeding_manager
.add_subscriber(&contract_key, peer_loc2.clone(), None)
.expect("should add peer2");
seeding_manager
.add_subscriber(&contract_key, peer_loc3.clone(), None)
.expect("should add peer3");

// Retrieve subscribers (as get_broadcast_targets_update would)
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();

// All network peers should be in the list
assert_eq!(subs.len(), 3, "Should have 3 network subscribers");

// Simulate filtering out the sender (as get_broadcast_targets_update does)
// If peer1 is the sender of an UPDATE, it should be filtered out
let sender_addr = peer1.addr;
let broadcast_targets: Vec<_> = subs
.iter()
.filter(|pk| pk.socket_addr().as_ref() != Some(&sender_addr))
.cloned()
.collect();

// Only peer2 and peer3 should receive the broadcast
assert_eq!(
broadcast_targets.len(),
2,
"Should exclude sender from broadcast targets"
);
assert!(
broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer2.addr)),
"peer2 should be in broadcast targets"
);
assert!(
broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer3.addr)),
"peer3 should be in broadcast targets"
);
assert!(
!broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer1.addr)),
"sender (peer1) should NOT be in broadcast targets"
);
}
}
58 changes: 50 additions & 8 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,22 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
Ok(())
}

// This test is disabled due to race conditions in subscription propagation logic.
// This test validates the REMOTE subscription path (local subscribe → remote update):
//
// Architecture being tested:
// - Client on Node A subscribes to a contract
// - Contract is updated on Node B (or gateway)
// - UPDATE propagates through the network to Node A
// - Node A's upsert_contract_state() is called with the new state
// - This triggers send_update_notification() which delivers to the local subscriber
//
// Combined with test_get_with_subscribe_flag (which tests local subscribe → local update),
// this provides full coverage of the executor notification delivery paths.
//
// NOTE: This test is disabled due to race conditions in subscription propagation logic.
// The test expects multiple clients across different nodes to receive subscription updates,
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
// Re-enabled after recent fixes to subscription logic - previously exhibited race conditions.
// If this test becomes flaky again, see issue #1798 for historical context.
// Ignored again due to recurring flakiness - fails intermittently with timeout waiting for
// Ignored due to recurring flakiness - fails intermittently with timeout waiting for
// cross-node subscription notifications (Client 3 timeout). See issue #1798.
#[ignore]
#[freenet_test(
Expand Down Expand Up @@ -1222,6 +1232,19 @@ async fn test_multiple_clients_subscription(ctx: &mut TestContext) -> TestResult
tokio_worker_threads = 4
)]
async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
// This test validates the LOCAL subscription path (Issue #2075 decoupling):
//
// Architecture being tested:
// - Both clients connect to the SAME node (node-a) via websocket
// - Client 2 subscribes via GET with subscribe=true
// - The subscription is registered LOCALLY via register_contract_notifier()
// in the executor, NOT via network peer registration in ring.seeding_manager
// - When Client 1 updates the contract, the update notification is delivered
// directly through the executor's notification channels (send_update_notification)
//
// This test confirms that local subscriptions work independently of network
// subscription propagation - no remote peer registration is required.

// Load test contract
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
Expand Down Expand Up @@ -1306,6 +1329,15 @@ async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
}
}

// At this point, Client 2's subscription is registered LOCALLY in the executor
// via register_contract_notifier(). The subscription is NOT registered in the
// network's ring.seeding_manager.subscribers - that's only for remote peer subscriptions.
// This validates the decoupled architecture from Issue #2075.
tracing::info!(
"Client 2: Local subscription registered via GET with subscribe=true - \
notification delivery will use executor channels, not network broadcast"
);

// Create a new to-do list by deserializing the current state, adding a task, and serializing it back
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
Expand Down Expand Up @@ -1431,11 +1463,21 @@ async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult {
tokio::time::sleep(Duration::from_millis(100)).await;
}

// Assert that client 1 received the notification (proving auto-subscribe worked)
// Assert that Client 2 received the notification, proving that:
// 1. Local subscription via GET with subscribe=true works correctly
// 2. The executor's send_update_notification() delivers to local subscribers
// 3. Network peer registration is NOT required for same-node subscriptions
assert!(
client2_node_a_received_notification,
"Client 2 did not receive update notification within timeout period (auto-subscribe via GET failed)"
);
client2_node_a_received_notification,
"Client 2 did not receive update notification - local subscription path failed. \
This validates that executor notification channels work independently of network \
subscription propagation (Issue #2075 decoupling)."
);

tracing::info!(
"SUCCESS: Local subscription delivered update via executor channels - \
no network registration was required"
);

Ok(())
}
Expand Down
Loading