Skip to content

Commit

Permalink
Merge pull request #907 from iduartgomez/186548078-op-logic-fixes
Browse files Browse the repository at this point in the history
186548078 - Multiple op fixes
  • Loading branch information
iduartgomez committed Dec 14, 2023
2 parents 8ef468f + 5f77a3a commit 6ef6048
Show file tree
Hide file tree
Showing 54 changed files with 2,883 additions and 2,402 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/bin/freenet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn run_local(config: PeerCliConfig) -> Result<(), DynError> {
let port = config.port;
let ip = config.address;
freenet::config::Config::set_op_mode(OperationMode::Local);
let executor = Executor::from_config(config).await?;
let executor = Executor::from_config(config, None).await?;
let socket: SocketAddr = (ip, port).into();
freenet::server::local_node::run_local_node(executor, socket).await
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Clients events related logic and type definitions. For example, receival of client events from applications throught the HTTP gateway.

use freenet_stdlib::client_api::ClientRequest;
use freenet_stdlib::client_api::{ClientError, HostResponse};
use futures::future::BoxFuture;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async fn process_client_request(
Err(err) => {
let result_error = bincode::serialize(&Err::<HostResponse, ClientError>(
ErrorKind::DeserializationError {
cause: format!("{err}"),
cause: format!("{err}").into(),
}
.into(),
))
Expand Down
79 changes: 32 additions & 47 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use crate::runtime::ContractError as ContractRtError;
//! Handling of contracts and delegates, including storage, execution, caching, etc.
//!
//! Internally uses the wasm_runtime module to execute contract and/or delegate instructions.

use either::Either;
use freenet_stdlib::prelude::*;

mod executor;
mod handler;
mod in_memory;
pub mod storages;

pub(crate) use executor::{
executor_channel, ExecutorToEventLoopChannel, NetworkEventListenerHalve,
executor_channel, mock_runtime::MockRuntime, Callback, ExecutorToEventLoopChannel,
NetworkEventListenerHalve,
};
pub(crate) use handler::{
contract_handler_channel, ClientResponses, ClientResponsesSender, ContractHandler,
ContractHandlerChannel, ContractHandlerEvent, EventId, NetworkContractHandler, SenderHalve,
StoreResponse,
client_responses_channel, contract_handler_channel, in_memory::MemoryContractHandler,
ClientResponsesReceiver, ClientResponsesSender, ContractHandler, ContractHandlerChannel,
ContractHandlerEvent, NetworkContractHandler, SenderHalve, StoreResponse, WaitingResolution,
};
pub(crate) use in_memory::{MemoryContractHandler, MockRuntime};

pub use executor::{Executor, ExecutorError, OperationMode};

Expand Down Expand Up @@ -68,7 +70,7 @@ where
id,
ContractHandlerEvent::GetResponse {
key,
response: Err(err.into()),
response: Err(err),
},
)
.await
Expand All @@ -79,61 +81,28 @@ where
}
}
}
ContractHandlerEvent::Cache(contract) => {
let key = contract.key();
match contract_handler
.executor()
.store_contract(contract)
.instrument(tracing::info_span!("store_contract", %key))
.await
{
Ok(_) => {
contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::CacheResult(Ok(())))
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
Err(err) => {
tracing::error!("Error while caching: {err}");
let err = ContractError::ContractRuntimeError(err);
contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::CacheResult(Err(err)))
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
}
}
ContractHandlerEvent::PutQuery {
key,
state,
related_contracts,
parameters,
contract,
} => {
let put_result = contract_handler
.executor()
.upsert_contract_state(
key.clone(),
Either::Left(state),
related_contracts,
parameters,
contract,
)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await
.map_err(Into::into);
.await;
contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::PutResponse {
new_value: put_result,
new_value: put_result.map_err(Into::into),
},
)
.await
Expand All @@ -142,6 +111,24 @@ where
error
})?;
}
ContractHandlerEvent::Subscribe { key } => {
let response = contract_handler
.executor()
.subscribe_to_contract(key.clone())
.await;
contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::SubscribeResponse { key, response },
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
todo!()
}
_ => unreachable!(),
}
}
Expand All @@ -153,8 +140,6 @@ pub(crate) enum ContractError {
ChannelDropped(Box<ContractHandlerEvent>),
#[error("contract {0} not found in storage")]
ContractNotFound(ContractKey),
#[error(transparent)]
ContractRuntimeError(ContractRtError),
#[error("{0}")]
IOError(#[from] std::io::Error),
#[error("no response received from handler")]
Expand Down
Loading

0 comments on commit 6ef6048

Please sign in to comment.