Skip to content

Commit

Permalink
fix(workers): add io+timer drivers to tokio runtimes [fixes NET-795] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu committed Mar 7, 2024
1 parent 7e0d1ef commit fb251c9
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 19 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,21 @@ jobs:
cli:
needs:
- nox-snapshot
uses: fluencelabs/cli/.github/workflows/tests.yml@renovate/fluencelabs-js-client-0.x
uses: fluencelabs/cli/.github/workflows/tests.yml@main
with:
ref: renovate/fluencelabs-js-client-0.x
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"

js-client:
needs:
- nox-snapshot
uses: fluencelabs/js-client/.github/workflows/tests.yml@master
uses: fluencelabs/js-client/.github/workflows/tests.yml@main
with:
ref: js-client-v0.9.0
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"

aqua:
needs:
- nox-snapshot
uses: fluencelabs/aqua/.github/workflows/tests.yml@renovate/fluencelabs-js-client-0.x
uses: fluencelabs/aqua/.github/workflows/tests.yml@main
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
ref: renovate/fluencelabs-js-client-0.x
Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use server_config::{
UnresolvedConfig,
};
use tempfile::TempDir;
use test_constants::{EXECUTION_TIMEOUT, TRANSPORT_TIMEOUT};
use test_constants::{EXECUTION_TIMEOUT, IDLE_CONNECTION_TIMEOUT, TRANSPORT_TIMEOUT};
use tokio::sync::oneshot;
use toy_vms::EasyVM;
use tracing::{Instrument, Span};
Expand Down Expand Up @@ -403,6 +403,7 @@ pub async fn create_swarm_with_runtime<RT: AquaRuntime>(

resolved.node_config.aquavm_pool_size = config.pool_size.unwrap_or(1);
resolved.node_config.particle_execution_timeout = EXECUTION_TIMEOUT;
resolved.node_config.transport_config.connection_idle_timeout = IDLE_CONNECTION_TIMEOUT;

let allowed_effectors = config.allowed_effectors.iter().map(|(cid, binaries)| {
(Hash::from_string(cid).unwrap(), binaries.clone())
Expand Down
54 changes: 52 additions & 2 deletions crates/nox-tests/tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use connected_client::ConnectedClient;
use created_swarm::make_swarms;
use eyre::Context;
use hex::FromHex;
use log_utils::enable_logs;
use maplit::hashmap;
use serde_json::json;
use serde_json::{json, Value};
use workers::CUID;

async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String {
pub(crate) async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String {
let init_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
Expand Down Expand Up @@ -116,3 +117,52 @@ async fn test_worker_different_deal_ids() {
assert!(is_worker_active(&mut client, deal_id_lowercase_prefix).await);
assert!(is_worker_active(&mut client, deal_id_mixed_prefix).await);
}

#[tokio::test]
async fn test_resolve_subnet_on_worker() {
let deal_id = "0x9DcaFca9B88f49d91c38a32E7d9A86a7d9a37B04";

enable_logs();
let script = tokio::fs::read("./tests/workers/test_subnet_resolve_on_worker.air")
.await
.wrap_err("read test data")
.unwrap();
let script = String::from_utf8(script)
.wrap_err("decode test data")
.unwrap();

let swarms = make_swarms(1).await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

let worker_id = create_worker(&mut client, deal_id).await;

let data = hashmap! {
"-relay-" => json!(swarms[0].peer_id.to_string()),
"-worker_id-" => json!(worker_id),
"-deal_id-" => json!(deal_id),
};

let result = client
.execute_particle(script.clone(), data.clone())
.await
.wrap_err("execute particle")
.unwrap();

let expected = {
let error = Value::Array(
vec![Value::String("error sending jsonrpc request: 'Networking or low-level protocol error: Server returned an error status code: 429'".to_string())]
);
let mut object_map = serde_json::Map::new();
object_map.insert("error".to_string(), error);
object_map.insert("success".to_string(), Value::Bool(false));
object_map.insert("workers".to_string(), Value::Array(vec![]));

vec![Value::Object(object_map)]
};

assert_eq!(result, expected)
}
13 changes: 13 additions & 0 deletions crates/nox-tests/tests/workers/test_subnet_resolve_on_worker.air
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(xor
(seq
(call -relay- ("op" "noop") [])
(seq
(call -worker_id- ("op" "noop") [])
(seq
(call -worker_id- ("subnet" "resolve") [-deal_id-] subnet)
(call %init_peer_id% ("op" "return") [subnet])
)
)
)
(call %init_peer_id% ("op" "return") [%last_error%.$.instruction])
)
3 changes: 2 additions & 1 deletion crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub fn default_socket_timeout() -> Duration {
}

pub fn default_connection_idle_timeout() -> Duration {
Duration::from_secs(10)
// 180 seconds makes sense because default Particle TTL is 120 sec, and it doesn't seem very efficient for hosts to reconnect while particle is still in flight
Duration::from_secs(180)
}

pub fn default_max_established_per_peer_limit() -> Option<u32> {
Expand Down
7 changes: 2 additions & 5 deletions crates/subnet-resolver/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use chain_data::{next_opt, parse_peer_id, ChainDataError};
use hex_utils::decode_hex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::runtime::Handle;

/// Parse data from chain. Accepts data with and without "0x" prefix.
pub fn parse_chain_data(data: &str) -> Result<Vec<Token>, ChainDataError> {
Expand Down Expand Up @@ -99,7 +98,7 @@ pub fn validate_deal_id(deal_id: String) -> Result<String, ResolveSubnetError> {
}
}

pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult {
pub async fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult {
let res: Result<_, ResolveSubnetError> = try {
let deal_id = validate_deal_id(deal_id)?;
// Description of the `getComputeUnits` function from the `chain.workers` smart contract on chain
Expand All @@ -115,9 +114,7 @@ pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResul
let input = format!("0x{}", hex::encode(input));
let client = HttpClientBuilder::default().build(api_endpoint)?;
let params = rpc_params![json!({ "data": input, "to": deal_id }), json!("latest")];
let response: Result<String, _> = tokio::task::block_in_place(move || {
Handle::current().block_on(async move { client.request("eth_call", params).await })
});
let response = client.request("eth_call", params).await;

let pats = response?;

Expand Down
2 changes: 1 addition & 1 deletion crates/test-constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ pub static TIMEOUT: Duration = Duration::from_secs(15);

pub static SHORT_TIMEOUT: Duration = Duration::from_millis(300);
pub static TRANSPORT_TIMEOUT: Duration = Duration::from_millis(500);
pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
pub static IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
pub static EXECUTION_TIMEOUT: Duration = Duration::from_millis(5000);
pub static PARTICLE_TTL: u32 = 20000;
5 changes: 4 additions & 1 deletion crates/workers/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core_manager::types::{AcquireRequest, WorkType};
use core_manager::CUID;
use fluence_libp2p::PeerId;
use parking_lot::RwLock;
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::{Handle, Runtime, UnhandledPanic};
use types::peer_scope::WorkerId;
use types::DealId;

Expand Down Expand Up @@ -140,9 +140,12 @@ impl Workers {
.worker_threads(threads_count)
// Configuring blocking threads for handling I/O
.max_blocking_threads(threads_count)
.enable_time()
.enable_io()
.on_thread_start(move || {
assignment.pin_current_thread();
})
.unhandled_panic(UnhandledPanic::Ignore) // TODO: try to log panics after fix https://github.com/tokio-rs/tokio/issues/4516
.build()
.map_err(|err| WorkersError::CreateRuntime { worker_id, err })?;
Ok(runtime)
Expand Down
1 change: 1 addition & 0 deletions nox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ once_cell = { workspace = true }
config = "0.13.4"
tonic = "0.9.2"
jsonrpsee = { workspace = true, features = ["ws-client", "macros"] }
tracing-panic = "0.1.1"

[dev-dependencies]
parking_lot = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions nox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use core_manager::manager::{CoreManager, CoreManagerFunctions, PersistentCoreMan
use fs_utils::to_abs_path;
use nox::{env_filter, log_layer, tokio_console_layer, tracing_layer, Node};
use server_config::{load_config, ConfigData, ResolvedConfig};
use tracing_panic::panic_hook;

const VERSION: &str = env!("CARGO_PKG_VERSION");
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
Expand All @@ -61,6 +62,12 @@ fn main() -> eyre::Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
panic_hook(panic_info);
prev_hook(panic_info);
}));

let version = format!("{}; AIR version {}", VERSION, air_interpreter_wasm::VERSION);
let authors = format!("by {AUTHORS}");
let config_data = ConfigData {
Expand Down
6 changes: 3 additions & 3 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ where
("vault", "put") => wrap(self.vault_put(args, particle)),
("vault", "cat") => wrap(self.vault_cat(args, particle)),

("subnet", "resolve") => wrap(self.subnet_resolve(args)),
("subnet", "resolve") => wrap(self.subnet_resolve(args).await),
("run-console", "print") => {
let function_args = args.function_args.iter();
let decider = function_args.filter_map(JValue::as_str).any(|s| s.contains("decider"));
Expand Down Expand Up @@ -1037,10 +1037,10 @@ where
.map_err(|_| JError::new(format!("Error reading vault file `{path}`")))
}

fn subnet_resolve(&self, args: Args) -> Result<JValue, JError> {
async fn subnet_resolve(&self, args: Args) -> Result<JValue, JError> {
let mut args = args.function_args.into_iter();
let deal_id: String = Args::next("deal_id", &mut args)?;
let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint);
let result = subnet_resolver::resolve_subnet(deal_id, &self.connector_api_endpoint).await;
Ok(json!(result))
}
}
Expand Down

0 comments on commit fb251c9

Please sign in to comment.