Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ opentelemetry_sdk = { optional = true, version = "0.31", features = ["rt-tokio"]
# internal deps
freenet-stdlib = { features = ["net"], workspace = true }
console-subscriber = { version = "0.4.1", optional = true }
tokio-stream = "0.1.17"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["sysinfoapi"] }
Expand Down
1,734 changes: 863 additions & 871 deletions crates/core/src/node/network_bridge/handshake.rs

Large diffs are not rendered by default.

443 changes: 312 additions & 131 deletions crates/core/src/node/network_bridge/p2p_protoc.rs

Large diffs are not rendered by default.

2,264 changes: 2,116 additions & 148 deletions crates/core/src/node/network_bridge/priority_select.rs

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ impl OpManager {
}

pub async fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> {
// Check if operation is already completed - don't push back to HashMap
if self.ops.completed.contains(&id) {
tracing::debug!(
tx = %id,
"OpManager: Ignoring push for already completed operation"
);
return Ok(());
}

if let Some(tx) = self.ops.under_progress.remove(&id) {
if tx.timed_out() {
self.ops.completed.insert(tx);
Expand Down
42 changes: 26 additions & 16 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl Operation for PutOp {
match input {
PutMsg::RequestPut {
id,
sender,
contract,
related_contracts,
value,
Expand All @@ -171,7 +172,7 @@ impl Operation for PutOp {
} => {
// Get the contract key and own location
let key = contract.key();
let sender = op_manager.ring.connection_manager.own_location();
let own_location = op_manager.ring.connection_manager.own_location();

tracing::info!(
"Requesting put for contract {} from {} to {}",
Expand Down Expand Up @@ -268,34 +269,41 @@ impl Operation for PutOp {
// Create a SeekNode message to forward to the next hop
return_msg = Some(PutMsg::SeekNode {
id: *id,
sender,
sender: sender.clone(),
target: forward_target,
value: modified_value.clone(),
contract: contract.clone(),
related_contracts: related_contracts.clone(),
htl: *htl,
});

// Transition to AwaitingResponse state to handle future SuccessfulPut messages
new_state = Some(PutState::AwaitingResponse {
key,
upstream: Some(sender.clone()),
contract: contract.clone(),
state: modified_value,
subscribe: false,
});
} else {
// No other peers to forward to - we're the final destination
tracing::debug!(
tx = %id,
%key,
"No peers to forward to - handling PUT completion locally"
"No peers to forward to - handling PUT completion locally, sending SuccessfulPut back to sender"
);
return_msg = None;
}

// Transition to AwaitingResponse state to handle future SuccessfulPut messages
new_state = Some(PutState::AwaitingResponse {
key,
upstream: match &self.state {
Some(PutState::ReceivedRequest) => None,
_ => None,
},
contract: contract.clone(),
state: modified_value,
subscribe: false,
});
// Send SuccessfulPut back to the sender (upstream node)
return_msg = Some(PutMsg::SuccessfulPut {
id: *id,
target: sender.clone(),
key,
sender: own_location,
});

// Mark operation as finished
new_state = Some(PutState::Finished { key });
}
}
PutMsg::SeekNode {
id,
Expand Down Expand Up @@ -1110,6 +1118,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
// Create RequestPut message and forward to target peer
let msg = PutMsg::RequestPut {
id,
sender: own_location,
contract,
related_contracts,
value,
Expand Down Expand Up @@ -1272,6 +1281,7 @@ mod messages {
/// Internal node instruction to find a route to the target node.
RequestPut {
id: Transaction,
sender: PeerKeyLocation,
contract: ContractContainer,
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
related_contracts: RelatedContracts<'static>,
Expand Down
Loading
Loading