Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ crates/core/tmp_work/
.claude-flow/
.swarm/
test-results/

# Git worktrees
worktrees/
.worktrees/

# Development logs
logs/*
1 change: 0 additions & 1 deletion .worktrees/pr1859
Submodule pr1859 deleted from 362724
1 change: 0 additions & 1 deletion .worktrees/pr1861
Submodule pr1861 deleted from a9e276
180 changes: 116 additions & 64 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,99 +398,151 @@ async fn process_open_request(

let contract_key = contract.key();

// Use RequestRouter for deduplication if in actor mode, otherwise direct operation
if let Some(router) = &request_router {
// Check if this will be a local-only PUT (no network peers available)
// This prevents race condition where PUT completes instantly and TX is removed
// before a second client can reuse it (issue #1886)
let own_location = op_manager.ring.connection_manager.own_location();
let has_remote_peers = op_manager
.ring
.closest_potentially_caching(&contract_key, &[own_location.peer][..])
.is_some();

if !has_remote_peers {
// Local-only PUT - bypass router to avoid race condition
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Routing PUT request through deduplication layer (actor mode)",
"PUT will complete locally (no remote peers), starting direct local PUT operation"
);

let request = crate::node::DeduplicatedRequest::Put {
key: contract_key,
contract: contract.clone(),
related_contracts: related_contracts.clone(),
state: state.clone(),
// Start a local PUT operation without going through the router
// This avoids the race condition while still providing proper result delivery
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
client_id,
request_id,
};

let (transaction_id, should_start_operation) =
router.route_request(request).await.map_err(|e| {
Error::Node(format!("Request routing failed: {}", e))
})?;
);
let op_id = op.id;

// Always register this client for the result
// Register client for transaction result
op_manager
.ch_outbound
.waiting_for_transaction_result(
transaction_id,
client_id,
request_id,
)
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
);
tracing::error!("Error waiting for transaction result: {}", err)
})?;

// Only start new network operation if this is a new operation
if should_start_operation {
// Execute the PUT operation
// Since there are no remote peers, this will complete locally
if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Local PUT request error: {}", err);
}

// Note: We bypass the router for local-only PUTs to avoid the race
// condition where the transaction completes instantly and is removed
// before other clients can join. The operation will complete locally
// and deliver results through the normal transaction mechanism.
} else {
// Has remote peers - use RequestRouter for deduplication if in actor mode, otherwise direct operation
if let Some(router) = &request_router {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting new PUT network operation via RequestRouter",
"Routing PUT request through deduplication layer (actor mode)",
);

let op = put::start_op_with_id(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
let request = crate::node::DeduplicatedRequest::Put {
key: contract_key,
contract: contract.clone(),
related_contracts: related_contracts.clone(),
state: state.clone(),
subscribe,
transaction_id,
);
client_id,
request_id,
};

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
let (transaction_id, should_start_operation) =
router.route_request(request).await.map_err(|e| {
Error::Node(format!("Request routing failed: {}", e))
})?;

// Always register this client for the result
op_manager
.ch_outbound
.waiting_for_transaction_result(
transaction_id,
client_id,
request_id,
)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
);
})?;

// Only start new network operation if this is a new operation
if should_start_operation {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting new PUT network operation via RequestRouter",
);

let op = put::start_op_with_id(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
transaction_id,
);

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Reusing existing PUT operation via RequestRouter - client registered for result",
);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Reusing existing PUT operation via RequestRouter - client registered for result",
"Starting direct PUT operation (legacy mode)",
);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting direct PUT operation (legacy mode)",
);

// Legacy mode: direct operation without deduplication
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
);
let op_id = op.id;
// Legacy mode: direct operation without deduplication
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
);
let op_id = op.id;

op_manager
.ch_outbound
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!("Error waiting for transaction result: {}", err)
})?;
op_manager
.ch_outbound
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
)
})?;

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}
}
}

Expand Down
Loading
Loading