Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 30 additions & 47 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,33 +889,29 @@ async fn process_open_request(
return Ok(None);
};

// Use RequestRouter for deduplication if in actor mode, otherwise direct operation
if let Some(router) = &request_router {
// SUBSCRIBE: Skip router deduplication due to instant-completion race conditions
// When contracts are local, Subscribe completes instantly which breaks deduplication:
// - Client 1 subscribes → operation completes → result delivered → TX removed
// - Client 2 subscribes → tries to reuse TX → but TX already gone
// Solution: Each client gets their own Subscribe operation (they're lightweight)
if let Some(_router) = &request_router {
tracing::debug!(
peer_id = %peer_id,
key = %key,
"Routing SUBSCRIBE request through deduplication layer (actor mode)",
"Processing SUBSCRIBE without deduplication (actor mode - instant-completion safe)",
);

let request = crate::node::DeduplicatedRequest::Subscribe {
key,
client_id,
request_id,
};

let (_transaction_id, should_start_operation) =
router.route_request(request).await.map_err(|e| {
Error::Node(format!("Request routing failed: {}", e))
})?;
// Create operation with new transaction ID
let tx = crate::message::Transaction::new::<
crate::operations::subscribe::SubscribeMsg,
>();

// Register this client for the subscription result with proper WaitingTransaction type
// CRITICAL: Register BEFORE starting operation to avoid race with instant-completion
use crate::contract::WaitingTransaction;
op_manager
.ch_outbound
.waiting_for_transaction_result(
WaitingTransaction::Subscription {
contract_key: *key.id(),
},
WaitingTransaction::Transaction(tx),
client_id,
request_id,
)
Expand All @@ -927,37 +923,24 @@ async fn process_open_request(
);
})?;

// Only start new network operation if this is a new operation
if should_start_operation {
tracing::debug!(
peer_id = %peer_id,
key = %key,
"Starting new SUBSCRIBE network operation via RequestRouter",
);

let op_id = crate::node::subscribe(
op_manager.clone(),
key,
Some(client_id),
)
.await
.inspect_err(|err| {
tracing::error!("Subscribe error: {}", err);
})?;
// Start dedicated operation for this client AFTER registration
let _result_tx = crate::node::subscribe_with_id(
op_manager.clone(),
key,
None, // No legacy registration
Some(tx),
)
.await
.inspect_err(|err| {
tracing::error!("Subscribe error: {}", err);
})?;

tracing::debug!(
request_id = %request_id,
transaction_id = %op_id,
operation = "subscribe",
"Request-Transaction correlation"
);
} else {
tracing::debug!(
peer_id = %peer_id,
key = %key,
"Reusing existing SUBSCRIBE operation via RequestRouter - client registered for result",
);
}
tracing::debug!(
request_id = %request_id,
transaction_id = %tx,
operation = "subscribe",
"SUBSCRIBE operation started with dedicated transaction for this client"
);
} else {
tracing::debug!(
peer_id = %peer_id,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/client_events/result_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ impl ResultRouter {
/// Main routing loop
pub async fn run(mut self) {
while let Some((tx, host_result)) = self.network_results.recv().await {
tracing::info!("ResultRouter received result for transaction: {}", tx);
let msg = SessionMessage::DeliverHostResponse {
tx,
response: std::sync::Arc::new(host_result),
};
tracing::info!(
"ResultRouter sending result to SessionActor for transaction: {}",
tx
);
if let Err(e) = self.session_actor_tx.send(msg).await {
// TODO: Add metric for router send failures
// metrics::ROUTER_SEND_FAILURES.increment();
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/client_events/session_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl SessionActor {
// Track RequestId correlation
self.client_request_ids.insert((tx, client_id), request_id);

debug!(
tracing::info!(
"Registered transaction {} for client {} (request {}), total clients: {}",
tx,
client_id,
Expand Down Expand Up @@ -103,9 +103,10 @@ impl SessionActor {
tx: Transaction,
result: std::sync::Arc<crate::client_events::HostResult>,
) {
tracing::info!("Session actor attempting to deliver result for transaction {}, registered transactions: {}", tx, self.client_transactions.len());
if let Some(waiting_clients) = self.client_transactions.remove(&tx) {
let client_count = waiting_clients.len();
tracing::debug!(
tracing::info!(
"Delivering result for transaction {} to {} clients",
tx,
client_count
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ impl ContractHandlerChannel<SenderHalve> {

// Route to session actor if session adapter is installed
if let Some(session_tx) = &self.session_adapter_tx {
// Only mirror Transaction variants, handle Subscription separately later
// Register all Transaction variants with the session actor
// Note: Subscription variant is legacy and should not be used in actor mode
if let WaitingTransaction::Transaction(tx) = waiting_tx {
let msg = SessionMessage::RegisterTransaction {
tx,
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ pub(crate) enum NodeEvent {
callback: tokio::sync::mpsc::Sender<QueryResult>,
},
TransactionTimedOut(Transaction),
/// Local subscription completed - deliver SubscribeResponse to client via result router
LocalSubscribeComplete {
tx: Transaction,
key: ContractKey,
subscribed: bool,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -389,6 +395,16 @@ impl Display for NodeEvent {
NodeEvent::TransactionTimedOut(transaction) => {
write!(f, "Transaction timed out ({transaction})")
}
NodeEvent::LocalSubscribeComplete {
tx,
key,
subscribed,
} => {
write!(
f,
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
)
}
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,20 @@ pub async fn subscribe(
key: ContractKey,
client_id: Option<ClientId>,
) -> Result<Transaction, OpError> {
let op = subscribe::start_op(key);
subscribe_with_id(op_manager, key, client_id, None).await
}

/// Attempts to subscribe to a contract with a specific transaction ID (for deduplication)
pub async fn subscribe_with_id(
op_manager: Arc<OpManager>,
key: ContractKey,
client_id: Option<ClientId>,
transaction_id: Option<Transaction>,
) -> Result<Transaction, OpError> {
let op = match transaction_id {
Some(id) => subscribe::start_op_with_id(key, id),
None => subscribe::start_op(key),
};
let id = op.id;
if let Some(client_id) = client_id {
use crate::client_events::RequestId;
Expand Down
26 changes: 26 additions & 0 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,32 @@ impl P2pConnManager {
))?;
}
}
NodeEvent::LocalSubscribeComplete {
tx,
key,
subscribed,
} => {
tracing::info!("Received LocalSubscribeComplete event for transaction: {tx}, contract: {key}");

// Deliver SubscribeResponse directly to result router (actor mode)
// Following Nacho's suggestion to tap into result router without state transitions
if let Some(result_router) = &op_manager.result_router_tx {
tracing::info!("Sending SubscribeResponse to result router for transaction: {tx}");
use freenet_stdlib::client_api::{
ContractResponse, HostResponse,
};
let response = Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse { key, subscribed },
));

match result_router.send((tx, response)).await {
Ok(()) => tracing::info!("Successfully sent SubscribeResponse to result router for transaction: {tx}"),
Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e),
}
} else {
tracing::warn!("No result router available for local subscribe completion (legacy mode)");
}
}
NodeEvent::Disconnect { cause } => {
tracing::info!(
"Disconnecting from network{}",
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/node/request_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ pub enum DeduplicatedRequest {
client_id: ClientId,
request_id: RequestId,
},
/// Note: Currently unused - Subscribe operations bypass deduplication to avoid
/// race conditions with instant-completion. Kept for potential future use.
#[allow(dead_code)]
Subscribe {
key: ContractKey,
client_id: ClientId,
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,9 @@ where
NodeEvent::TransactionTimedOut(_) => {
unimplemented!()
}
NodeEvent::LocalSubscribeComplete { .. } => {
unimplemented!()
}
NodeEvent::QuerySubscriptions { .. } => {
unimplemented!()
}
Expand Down
44 changes: 40 additions & 4 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ pub(crate) fn start_op(key: ContractKey) -> SubscribeOp {
SubscribeOp { id, state }
}

/// Create a Subscribe operation with a specific transaction ID (for operation deduplication)
pub(crate) fn start_op_with_id(key: ContractKey, id: Transaction) -> SubscribeOp {
let state = Some(SubscribeState::PrepareRequest { id, key });
SubscribeOp { id, state }
}

/// Request to subscribe to value changes from a contract.
pub(crate) async fn request_subscribe(
op_manager: &OpManager,
Expand All @@ -68,14 +74,44 @@ pub(crate) async fn request_subscribe(
if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state {
// Use k_closest_potentially_caching to try multiple candidates
const EMPTY: &[PeerId] = &[];
let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3); // Try up to 3 candidates
// Try up to 3 candidates
let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3);

let target = match candidates.first() {
Some(peer) => peer.clone(),
None => {
// No remote peers available - this may happen when node is isolated
tracing::warn!(%key, "No remote peers available for subscription - node may be isolated");
return Err(RingError::NoCachingPeers(*key).into());
// No remote peers available - check if we have the contract locally
tracing::debug!(%key, "No remote peers available for subscription, checking locally");

if super::has_contract(op_manager, *key).await? {
// We have the contract locally - complete operation immediately
// Following Nacho's suggestion: use notify_node_event to tap into
// result router directly without state transitions
tracing::info!(%key, tx = %id, "Contract available locally, completing subscription via result router");

// Use notify_node_event to deliver SubscribeResponse directly to client
// This avoids the problem with notify_op_change overwriting the operation
match op_manager
.notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete {
tx: *id,
key: *key,
subscribed: true,
})
.await
{
Ok(()) => {
tracing::info!(%key, tx = %id, "Successfully sent LocalSubscribeComplete event")
}
Err(e) => {
tracing::error!(%key, tx = %id, error = %e, "Failed to send LocalSubscribeComplete event")
}
}

return Ok(());
} else {
tracing::debug!(%key, "Contract not available locally and no remote peers");
return Err(RingError::NoCachingPeers(*key).into());
}
}
};

Expand Down
Loading
Loading