Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,21 @@ impl ContractExecutor for Executor<Runtime> {
.await
.map_err(ExecutorError::other)?;

// Notify local subscribers of the state change.
// This is critical for cross-node subscriptions: when a BroadcastTo
// update arrives from another node, local WebSocket clients that
// subscribed to this contract need to receive the update notification.
if let Err(err) = self
.send_update_notification(&key, &params, &updated_state)
.await
{
tracing::error!(
"Failed while sending notifications for contract {}: {}",
key,
err
);
}

// todo: forward delta like we are doing with puts
tracing::warn!("Delta updates are not yet supported");
Ok(UpsertResult::Updated(updated_state))
Expand Down
10 changes: 8 additions & 2 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ impl ContractHandlerChannel<SenderHalve> {
.map_err(|_| ContractError::NoEvHandlerResponse)?;

// Route to session actor if session adapter is installed
// IMPORTANT: Use send().await (not try_send) to ensure registration completes
// before the operation starts. try_send can fail under backpressure, causing
// race conditions where responses arrive before registration, leading to timeouts.
if let Some(session_tx) = &self.session_adapter_tx {
// Register all Transaction variants with the session actor
if let WaitingTransaction::Transaction(tx) = waiting_tx {
Expand All @@ -335,7 +338,7 @@ impl ContractHandlerChannel<SenderHalve> {
client_id,
request_id,
};
if let Err(e) = session_tx.try_send(msg) {
if let Err(e) = session_tx.send(msg).await {
tracing::warn!("Failed to notify session actor: {}", e);
} else {
tracing::debug!(
Expand Down Expand Up @@ -370,13 +373,16 @@ impl ContractHandlerChannel<SenderHalve> {
.await
.map_err(|_| ContractError::NoEvHandlerResponse)?;

// IMPORTANT: Use send().await (not try_send) to ensure registration completes
// before the operation starts. try_send can fail under backpressure, causing
// race conditions where responses arrive before registration, leading to timeouts.
if let Some(session_tx) = &self.session_adapter_tx {
let msg = SessionMessage::RegisterTransaction {
tx,
client_id,
request_id,
};
if let Err(e) = session_tx.try_send(msg) {
if let Err(e) = session_tx.send(msg).await {
tracing::warn!("Failed to notify session actor: {}", e);
} else {
tracing::debug!(
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,11 @@ where
tracing::debug!(%id, "operation in progress");
if let Some(target) = target_addr {
tracing::debug!(%id, ?target, "sending updated op state");
network_bridge.send(target, msg).await?;
// CRITICAL: Push state BEFORE sending the message to prevent race condition.
// The response can arrive before send() returns if the network is fast,
// so the operation must already be stored when the response handler runs.
op_manager.push(id, updated_state).await?;
network_bridge.send(target, msg).await?;
} else {
tracing::debug!(%id, "queueing op state for local processing");
debug_assert!(
Expand Down
49 changes: 35 additions & 14 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,16 @@ impl Operation for SubscribeOp {
.expect("forward target must have socket address");
skip.insert(forward_target_addr);

new_state = self.state;
// Transition to AwaitingResponse state so we can handle the ReturnSub.
// CRITICAL: Without this, when ReturnSub arrives, the state is still
// ReceivedRequest which doesn't match the expected AwaitingResponse pattern,
// causing invalid_transition errors.
new_state = Some(SubscribeState::AwaitingResponse {
skip_list: skip.clone(),
retries: 0,
current_hop: op_manager.ring.max_hops_to_live.max(1),
upstream_subscriber: Some(subscriber.clone()),
});
return_msg = Some(SubscribeMsg::SeekNode {
id: *id,
key: *key,
Expand Down Expand Up @@ -617,7 +626,6 @@ impl Operation for SubscribeOp {

let ring_max_htl = op_manager.ring.max_hops_to_live.max(1);
let htl = (*htl).min(ring_max_htl);
let this_peer = op_manager.ring.connection_manager.own_location();
let return_not_subbed = || -> OperationResult {
let return_msg = SubscribeMsg::ReturnSub {
key: *key,
Expand Down Expand Up @@ -732,7 +740,10 @@ impl Operation for SubscribeOp {
skip = ?new_skip_list,
"Forwarding seek to next candidate"
);
// Retry seek node when the contract to subscribe has not been found in this node
// Retry seek node when the contract to subscribe has not been found in this node.
// IMPORTANT: Preserve the original subscriber's identity (pub_key) so
// the final contract holder registers the correct peer as the subscriber.
// The recipient will fill in our address from the packet source.
return build_op_result(
*id,
Some(SubscribeState::AwaitingResponse {
Expand All @@ -741,14 +752,15 @@ impl Operation for SubscribeOp {
current_hop: new_htl,
upstream_subscriber: Some(subscriber.clone()),
}),
// Use PeerAddr::Unknown - the subscriber doesn't know their own
// external address (especially behind NAT). The recipient will
// fill this in from the packet source address.
(SubscribeMsg::SeekNode {
id: *id,
key: *key,
// Keep the original subscriber's pub_key but reset the address
// to Unknown - the next hop will fill it from source_addr.
// This ensures the chain of intermediate peers each forward
// the original subscriber's identity toward the contract holder.
subscriber: PeerKeyLocation::with_unknown_addr(
this_peer.pub_key().clone(),
subscriber.pub_key().clone(),
),
target: new_target,
skip_list: new_skip_list,
Expand Down Expand Up @@ -851,13 +863,22 @@ impl Operation for SubscribeOp {
.ring
.k_closest_potentially_caching(key, &skip_list, 3);
if let Some(target) = candidates.first() {
// Use PeerAddr::Unknown - the subscriber doesn't know their own
// external address (especially behind NAT). The recipient will
// fill this in from the packet source address.
let own_loc = op_manager.ring.connection_manager.own_location();
let subscriber = PeerKeyLocation::with_unknown_addr(
own_loc.pub_key().clone(),
);
// Preserve the original subscriber's identity when retrying.
// If we have an upstream_subscriber (we're an intermediate node),
// use their pub_key. Otherwise, we're the originating node.
let subscriber_pub_key =
if let Some(ref upstream) = upstream_subscriber {
upstream.pub_key().clone()
} else {
op_manager
.ring
.connection_manager
.own_location()
.pub_key()
.clone()
};
let subscriber =
PeerKeyLocation::with_unknown_addr(subscriber_pub_key);
return_msg = Some(SubscribeMsg::SeekNode {
id: *id,
key: *key,
Expand Down
15 changes: 7 additions & 8 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,13 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
Ok(())
}

// This test is disabled due to race conditions in subscription propagation logic.
// The test expects multiple clients across different nodes to receive subscription updates,
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
// Re-enabled after recent fixes to subscription logic - previously exhibited race conditions.
// If this test becomes flaky again, see issue #1798 for historical context.
// Ignored again due to recurring flakiness - fails intermittently with timeout waiting for
// cross-node subscription notifications (Client 3 timeout). See issue #1798.
#[ignore]
// This test validates cross-node subscription propagation: when a client on node-b subscribes
// to a contract PUT on node-a, UPDATE notifications should reach node-b's clients.
// Issue #2220 fixed the root cause where intermediate nodes replaced the original subscriber's
// pub_key with their own when forwarding SeekNode messages.
// Note: This test exhibits timing-related flakiness in CI due to multi-node startup and connection
// timing. When the test infrastructure succeeds in establishing connections, cross-node
// subscriptions work correctly. Consider using --test-threads=1 if flaky in CI.
#[freenet_test(
nodes = ["gateway", "node-a", "node-b"],
auto_connect_peers = true,
Expand Down
Loading