Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,18 @@ pub(crate) enum NodeEvent {
TransactionTimedOut(Transaction),
/// Transaction completed successfully - cleanup client subscription
TransactionCompleted(Transaction),
/// Local subscription completed - deliver SubscribeResponse to client via result router
/// Local subscription completed - deliver SubscribeResponse to client via result router.
///
/// **Architecture Note (Issue #2075):**
/// This event is part of the decoupled subscription architecture. Local client subscriptions
/// are handled separately from network peer subscriptions:
/// - This event notifies the client layer that a subscription request has been processed
/// - Subsequent contract updates are delivered via the executor's `update_notifications`
/// channels (see `send_update_notification` in runtime.rs)
/// - Network peer subscriptions use the `seeding_manager.subscribers` for UPDATE propagation
///
/// This separation keeps the ops/ module (network operations) independent from the
/// client_events/ module (local WebSocket client handling).
LocalSubscribeComplete {
tx: Transaction,
key: ContractKey,
Expand Down
46 changes: 21 additions & 25 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,36 +277,32 @@ pub(crate) async fn request_subscribe(
Ok(())
}

/// Complete a local subscription by notifying the client layer.
///
/// **Architecture Note (Issue #2075):**
/// Local client subscriptions are deliberately kept separate from network subscriptions:
/// - **Network subscriptions** are stored in `ring.seeding_manager.subscribers` and are used
/// for peer-to-peer UPDATE propagation between nodes
/// - **Local subscriptions** are managed by the contract executor via `update_notifications`
/// channels, which deliver `UpdateNotification` directly to WebSocket clients
///
/// This separation eliminates the need for workarounds like the previous `allow_self` hack
/// in `get_broadcast_targets_update()`, and ensures clean architectural boundaries between
/// the network layer (ops/) and the client layer (client_events/).
async fn complete_local_subscription(
op_manager: &OpManager,
id: Transaction,
key: ContractKey,
) -> Result<(), OpError> {
let subscriber = op_manager.ring.connection_manager.own_location();
let subscriber_addr = subscriber
.socket_addr()
.expect("own location must have socket address");
// Local subscription - no upstream NAT address
if let Err(err) = op_manager
.ring
.add_subscriber(&key, subscriber.clone(), None)
{
tracing::warn!(
%key,
tx = %id,
subscriber = %subscriber_addr,
error = ?err,
"Failed to register local subscriber"
);
} else {
tracing::debug!(
%key,
tx = %id,
subscriber = %subscriber_addr,
"Registered local subscriber"
);
}

tracing::debug!(
%key,
tx = %id,
"Local subscription completed - client will receive updates via executor notification channel"
);

// Notify client layer that subscription is complete.
// The actual update delivery happens through the executor's update_notifications
// when contract state changes, not through network broadcast targets.
op_manager
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
tx: id,
Expand Down
24 changes: 11 additions & 13 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,15 @@ async fn try_to_broadcast(
}

impl OpManager {
/// Get the list of network subscribers to broadcast an UPDATE to.
///
/// **Architecture Note (Issue #2075):**
/// This function returns only **network peer** subscribers, not local client subscriptions.
/// Local clients receive updates through a separate path via the contract executor's
/// `update_notifications` channels (see `send_update_notification` in runtime.rs).
///
/// This clean separation eliminates the previous `allow_self` workaround that was needed
/// when local subscriptions were mixed with network subscriptions.
pub(crate) fn get_broadcast_targets_update(
&self,
key: &ContractKey,
Expand All @@ -750,21 +759,10 @@ impl OpManager {
.ring
.subscribers_of(key)
.map(|subs| {
let self_addr = self.ring.connection_manager.get_own_addr();
let allow_self = self_addr.as_ref().map(|me| me == sender).unwrap_or(false);
subs.value()
.iter()
.filter(|pk| {
// Allow the sender (or ourselves) to stay in the broadcast list when we're
// originating the UPDATE so local auto-subscribes still receive events.
let is_sender = pk.socket_addr().as_ref() == Some(sender);
let is_self = self_addr.as_ref() == pk.socket_addr().as_ref();
if is_sender || is_self {
allow_self
} else {
true
}
})
// Filter out the sender to avoid sending the update back to where it came from
.filter(|pk| pk.socket_addr().as_ref() != Some(sender))
.cloned()
.collect::<Vec<_>>()
})
Expand Down
82 changes: 82 additions & 0 deletions crates/core/src/ring/seeding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,16 @@ impl SeedingManager {
self.seeding_contract.contains_key(key)
}

/// Add a **network peer** subscriber for contract update propagation.
///
/// Will return an error in case the max number of subscribers has been added.
///
/// **Architecture Note (Issue #2075):**
/// This method is exclusively for **network peer** subscriptions used in peer-to-peer
/// UPDATE message propagation. Local client subscriptions are handled separately by the
/// contract executor's `update_notifications` channels, which deliver updates directly
/// to WebSocket clients without going through the network broadcast path.
///
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
/// message was received. This is used instead of the address embedded in `subscriber`
/// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages.
Expand Down Expand Up @@ -329,4 +337,78 @@ mod tests {
// Should not panic when removing from non-existent contract
seeding_manager.remove_subscriber_by_peer(&contract_key, &peer);
}

/// Test that validates the broadcast target filtering logic used by
/// `get_broadcast_targets_update` in update.rs.
///
/// **Architecture Note (Issue #2075):**
/// After decoupling local from network subscriptions, `get_broadcast_targets_update`
/// simply filters out the sender from the subscriber list. This test validates
/// that the seeding_manager correctly stores and retrieves network subscribers,
/// which is the foundation for UPDATE broadcast targeting.
#[test]
fn test_subscribers_for_broadcast_targeting() {
let seeding_manager = SeedingManager::new();
let contract_key = ContractKey::from(ContractInstanceId::new([3u8; 32]));

// Create network peers (not local clients)
let peer1 = test_peer_id(1);
let peer2 = test_peer_id(2);
let peer3 = test_peer_id(3);

let peer_loc1 = PeerKeyLocation::new(peer1.pub_key.clone(), peer1.addr);
let peer_loc2 = PeerKeyLocation::new(peer2.pub_key.clone(), peer2.addr);
let peer_loc3 = PeerKeyLocation::new(peer3.pub_key.clone(), peer3.addr);

// Register network subscribers
seeding_manager
.add_subscriber(&contract_key, peer_loc1.clone(), None)
.expect("should add peer1");
seeding_manager
.add_subscriber(&contract_key, peer_loc2.clone(), None)
.expect("should add peer2");
seeding_manager
.add_subscriber(&contract_key, peer_loc3.clone(), None)
.expect("should add peer3");

// Retrieve subscribers (as get_broadcast_targets_update would)
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();

// All network peers should be in the list
assert_eq!(subs.len(), 3, "Should have 3 network subscribers");

// Simulate filtering out the sender (as get_broadcast_targets_update does)
// If peer1 is the sender of an UPDATE, it should be filtered out
let sender_addr = peer1.addr;
let broadcast_targets: Vec<_> = subs
.iter()
.filter(|pk| pk.socket_addr().as_ref() != Some(&sender_addr))
.cloned()
.collect();

// Only peer2 and peer3 should receive the broadcast
assert_eq!(
broadcast_targets.len(),
2,
"Should exclude sender from broadcast targets"
);
assert!(
broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer2.addr)),
"peer2 should be in broadcast targets"
);
assert!(
broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer3.addr)),
"peer3 should be in broadcast targets"
);
assert!(
!broadcast_targets
.iter()
.any(|p| p.socket_addr() == Some(peer1.addr)),
"sender (peer1) should NOT be in broadcast targets"
);
}
}
12 changes: 5 additions & 7 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,13 +636,11 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
Ok(())
}

// This test is disabled due to race conditions in subscription propagation logic.
// The test expects multiple clients across different nodes to receive subscription updates,
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
// Re-enabled after recent fixes to subscription logic - previously exhibited race conditions.
// If this test becomes flaky again, see issue #1798 for historical context.
// Ignored again due to recurring flakiness - fails intermittently with timeout waiting for
// cross-node subscription notifications (Client 3 timeout). See issue #1798.
// This test validates cross-node subscription propagation.
// Issue #2075 decoupled local client subscriptions from network subscriptions, which
// should improve reliability. However, the test still exhibits flakiness due to
// network timing/transaction timeout issues unrelated to the subscription logic.
// See issue #1798 for historical context.
#[ignore]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have an agent work on re-enabling this test

#[freenet_test(
nodes = ["gateway", "node-a", "node-b"],
Expand Down
Loading