Skip to content

Commit

Permalink
Change subscriptions to subscribe to anybody who has the contract
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Dec 19, 2023
1 parent 788013f commit 2c5dfd3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
27 changes: 22 additions & 5 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::backtrace::Backtrace as StdTrace;
use std::{pin::Pin, time::Duration};

use freenet_stdlib::prelude::ContractKey;
use futures::{future::BoxFuture, Future};
use tokio::sync::mpsc::error::SendError;

Expand Down Expand Up @@ -300,11 +301,7 @@ impl<T> From<SendError<T>> for OpError {
}

/// If the contract is not found, it will try to get it first if the `try_get` parameter is set.
async fn start_subscription_request(
op_manager: &OpManager,
key: freenet_stdlib::prelude::ContractKey,
try_get: bool,
) {
async fn start_subscription_request(op_manager: &OpManager, key: ContractKey, try_get: bool) {
let sub_op = subscribe::start_op(key.clone());
if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {
if !try_get {
Expand All @@ -322,3 +319,23 @@ async fn start_subscription_request(
}
}
}

async fn has_contract(op_manager: &OpManager, key: ContractKey) -> Result<bool, OpError> {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
key,
fetch_contract: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
..
} => Ok(true),
crate::contract::ContractHandlerEvent::GetResponse {
response: Ok(crate::contract::StoreResponse { state: None, .. }),
..
} => Ok(false),
_ => Err(OpError::UnexpectedOpState),
}
}
26 changes: 5 additions & 21 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,10 @@ pub(crate) async fn request_subscribe(
sub_op: SubscribeOp,
) -> Result<(), OpError> {
let (target, _id) = if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state {
match op_manager
.notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery {
key: key.clone(),
fetch_contract: false,
})
.await?
{
crate::contract::ContractHandlerEvent::GetResponse {
response: Ok(crate::contract::StoreResponse { state: Some(_), .. }),
..
} => {}
crate::contract::ContractHandlerEvent::GetResponse {
key,
response: Ok(crate::contract::StoreResponse { state: None, .. }),
} => {
return Err(OpError::ContractError(ContractError::ContractNotFound(
key.clone(),
)));
}
_ => return Err(OpError::UnexpectedOpState),
if !super::has_contract(op_manager, key.clone()).await? {
return Err(OpError::ContractError(ContractError::ContractNotFound(
key.clone(),
)));
}
const EMPTY: &[PeerId] = &[];
(
Expand Down Expand Up @@ -251,7 +235,7 @@ impl Operation for SubscribeOp {
}
};

if !op_manager.ring.is_subscribed_to_contract(key) {
if !super::has_contract(op_manager, key.clone()).await? {
tracing::debug!(tx = %id, %key, "Contract not found, trying other peer");

let Some(new_target) = op_manager
Expand Down

0 comments on commit 2c5dfd3

Please sign in to comment.