Skip to content

Commit

Permalink
feat(decider): new decider (#1764)
Browse files Browse the repository at this point in the history
* feat(decider): new decider

* feat(decider): integrate new decider-distro

* feat(decider): https://polygon-mumbai.gateway.tenderly.co

* feat(decider): extend ENV variables for decider config

* chore: wip

* chore: integrate new decider distro

* feat(rpc): change default chain rpc to https://endpoints.omniatech.io/v1/matic/mumbai/public

* fix(rpc): use correct matcher address

* chore: update decider-distro

* chore: update decider-distro

* fix(log): do not display args in Debug

* chore: update decider

* chore: update decider

* chore: update decider

* fix: use aquamrine::log target because RUST_LOG works like shit

* fix: always log function args

* fix: add more logs

* chore: cleanup

* feat(logs): move logs for particle execution and service calls to debug & trace

* feat(logs): log decider's run-console as INFO

* fix: compilation error

* check with latest decider

* set default wallet key

* test with flox matching

* Revert "test with flox matching"

This reverts commit cb84e03.

* update decider distro

---------

Co-authored-by: Alexey Proshutinskiy <alexey.prosh@fluence.one>
  • Loading branch information
folex and justprosh committed Sep 5, 2023
1 parent fe8eab2 commit cd49dd2
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 74 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 23 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ server:
run-console=trace,\
wasmtime_cranelift=off,\
wasmtime_jit=off,\
particle_reap=off" \
particle_reap=trace" \
cargo run --release -p nox

local-env:
Expand All @@ -44,22 +44,37 @@ local-env:
local-env-logs:
docker compose -f docker-compose.yml logs -f

# mumbai 0x93A2897deDcC5478a9581808F5EC25F4FadbC312
# local 0x0f68c702dC151D07038fA40ab3Ed1f9b8BAC2981

local-nox:
FLUENCE_ENV_AQUA_IPFS_EXTERNAL_API_MULTIADDR="/ip4/127.0.0.1/tcp/5001" \
FLUENCE_ENV_CONNECTOR_API_ENDPOINT=http://127.0.0.1:8545 \
FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS="0xea6777e8c011E7968605fd012A9Dd49401ec386C" \
FLUENCE_ENV_CONNECTOR_API_ENDPOINT="http://127.0.0.1:8545" \
FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS="0x0f68c702dC151D07038fA40ab3Ed1f9b8BAC2981" \
FLUENCE_ENV_CONNECTOR_WALLET_KEY="0xfdc4ba94809c7930fe4676b7d845cbf8fa5c1beae8744d959530e5073004cf3f" \
FLUENCE_ENV_CONNECTOR_FROM_BLOCK=earliest \
FLUENCE_ENV_AQUA_IPFS_LOCAL_API_MULTIADDR="/ip4/127.0.0.1/tcp/5001" \
FLUENCE_SYSTEM_SERVICES__DECIDER__DECIDER_PERIOD_SEC=60 \
FLUENCE_MAX_SPELL_PARTICLE_TTL="55s" \
FLUENCE_SYSTEM_SERVICES__DECIDER__NETWORK_ID=31337 \
FLUENCE_SYSTEM_SERVICES__AQUA_IPFS__IPFS_BINARY_PATH="$(shell which ipfs)" \
FLUENCE_SYSTEM_SERVICES__ENABLE="aqua-ipfs" \
FLUENCE_SYSTEM_SERVICES__ENABLE="aqua-ipfs,decider" \
WASM_LOG="trace" \
RUST_LOG="debug,\
aquamarine::aqua_runtime=error,\
execution=trace,\
expired=info,\
effectors=info,\
dispatcher=info,\
aquamarine::plumber=info,\
aquamarine::partice_functions=debug,\
aquamarine::log=debug,\
aquamarine=info,\
aquamarine::aqua_runtime=warn,\
aquamarine::particle_executor=warn,\
ipfs_effector=off,\
ipfs_pure=off,\
system_services=debug,\
marine_core::module::marine_module=info,\
aquamarine=warn,\
tokio_threadpool=info,\
tokio_reactor=info,\
mio=info,\
Expand All @@ -83,7 +98,7 @@ local-nox:
libp2p_swarm=off,\
particle_protocol::libp2p_protocol::upgrade=info,\
libp2p_mplex=off,\
particle_reap=off" \
cargo run -p nox -- --secret-key "74c9Fl8I+XFwlTRnLAyYlSML+Jk6zIkZgtQoo5deuGk="
particle_reap=debug" \
cargo run --release -p nox -- -k "2WijTVdhVRzyZamWjqPx4V4iNMrajegNMwNa2PmvPSZV6RRpo5M2fsPWdQr22HVRubuJhhSw8BrWiGt6FPhFAuXy" --aqua-pool-size 3 --ws-port 9992

.PHONY: server server-debug test release build deploy local-nox local-env local-env-logs
13 changes: 9 additions & 4 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ impl AquaRuntime for AVM<DataStoreError> {
match parse_outcome(outcome) {
Ok((new_data, peers, calls)) if !peers.is_empty() || !calls.is_empty() => {
#[rustfmt::skip]
tracing::debug!(particle_id, "Particle executed: {} call requests, {} next peers", calls.len(), peers.len());
tracing::debug!(
target: "execution",
particle_id,
"Particle executed: {} call requests, {} next peers", calls.len(), peers.len()
);

ParticleEffects {
next_peers: peers,
Expand All @@ -128,7 +132,8 @@ impl AquaRuntime for AVM<DataStoreError> {
}
}
Ok((data, ..)) => {
tracing::info!(
tracing::debug!(
target: "execution",
particle_id,
"Executed particle, next_peer_pks is empty, no call requests. Nothing to do.",
);
Expand All @@ -139,11 +144,11 @@ impl AquaRuntime for AVM<DataStoreError> {
ParticleEffects::empty()
}
Err(ExecutionError::AquamarineError(err)) => {
tracing::warn!(particle_id, "Error executing particle: {}", err);
tracing::warn!(target: "execution", particle_id, "Error executing particle: {}", err);
ParticleEffects::empty()
}
Err(err @ ExecutionError::InvalidResultField { .. }) => {
tracing::warn!(particle_id, "Error parsing outcome for particle: {}", err);
tracing::warn!(target: "execution", particle_id, "Error parsing outcome for particle: {}", err);
ParticleEffects::empty()
}
}
Expand Down
24 changes: 14 additions & 10 deletions aquamarine/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
use humantime::FormattedDuration;

/// Truncate string to be at max 500 graphemes
fn truncate(s: &str) -> &str {
match s.char_indices().nth(500) {
None => s,
Some((idx, _)) => &s[..idx],
}
}

/// Function that logs for different builtin namespaces
pub fn builtin_log_fn(
service: &str,
args: String,
elapsed: FormattedDuration,
particle_id: String,
) {
pub fn builtin_log_fn(service: &str, args: &str, elapsed: FormattedDuration, particle_id: String) {
let args = truncate(args);
match service {
"array" | "cmp" | "debug" | "math" | "op" | "getDataSrv" | "run-console" => {
"array" | "cmp" | "debug" | "math" | "op" | "getDataSrv" | "run-console" | "json" => {
tracing::event!(
tracing::Level::DEBUG,
tracing::Level::TRACE,
"Executed host call {} ({}) [{}]",
args,
elapsed,
particle_id
)
}
"peer" | "script" | "stat" | "sig" | "srv" | "dist" | "kad" => tracing::event!(
tracing::Level::INFO,
tracing::Level::DEBUG,
"Executed host call {} ({}) [{}]",
args,
elapsed,
particle_id
),
_ => tracing::event!(
tracing::Level::INFO,
tracing::Level::DEBUG,
"Executed host call {} ({}) [{}]",
args,
elapsed,
Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl DataStore for ParticleDataStore {
}

fn store_data(&mut self, data: &[u8], particle_id: &str, current_peer_id: &str) -> Result<()> {
tracing::debug!(target: "particle_reap", particle_id = particle_id, "Storing data for particle");
tracing::trace!(target: "particle_reap", particle_id = particle_id, "Storing data for particle");
let data_path = self.data_file(particle_id, current_peer_id);
std::fs::write(&data_path, data).map_err(|err| StoreData(err, data_path))?;

Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
.spawn_blocking(move || {
span.in_scope(move || {
let now = Instant::now();
tracing::info!(particle_id = particle.id, "Executing particle");
tracing::trace!(target: "execution", particle_id = particle.id, "Executing particle");

let particle_params = ParticleParameters {
current_peer_id: Cow::Owned(current_peer_id.to_string()),
Expand Down
19 changes: 7 additions & 12 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use humantime::format_duration as pretty;
use log::Level;
use serde_json::json;
use serde_json::Value as JValue;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -149,16 +148,12 @@ impl<F: ParticleFunctionStatic> Functions<F> {
}
};

let log_args = format!("{:?} {:?} {}", args.service_id, args.function_name, {
if log::max_level() >= Level::Debug {
args.function_args
.first()
.map(|v| v.to_string())
.unwrap_or(String::new())
} else {
String::new()
}
});
let log_args = format!(
"{:?} {:?} {}",
args.service_id,
args.function_name,
json!(&args.function_args)
);
let service_id = args.service_id.clone();
let start = Instant::now();

Expand Down Expand Up @@ -220,7 +215,7 @@ impl<F: ParticleFunctionStatic> Functions<F> {
err
)
} else {
builtin_log_fn(&service_id, log_args, pretty(elapsed), particle_id);
builtin_log_fn(&service_id, &log_args, pretty(elapsed), particle_id);
};

let stats = SingleCallStat {
Expand Down
10 changes: 5 additions & 5 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {

let deadline = Deadline::from(&particle);
if deadline.is_expired(now_ms()) {
tracing::info!(particle_id = particle.id, "Particle is expired, ignoring");
tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired");
self.events
.push_back(Err(AquamarineApiError::ParticleExpired {
particle_id: particle.id,
Expand Down Expand Up @@ -214,7 +214,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
if let Some((vm_id, mut vm)) = self.vm_pool.get_vm() {
let now = now_ms();

self.actors.retain(|(particle_id, peer_id), actor| {
self.actors.retain(|(particle_id, worker_id), actor| {
// if actor hasn't yet expired or is still executing, keep it
// TODO: if actor is expired, cancel execution and return VM back to pool
// https://github.com/fluencelabs/fluence/issues/1212
Expand All @@ -223,12 +223,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {
}
tracing::debug!(
target: "particle_reap",
particle_id = particle_id,
"Reaping particle's actor peer_id={peer_id})"
particle_id = particle_id, worker_id = worker_id.to_string(),
"Reaping particle's actor"
);
// cleanup files and dirs after particle processing (vault & prev_data)
// TODO: do not pass vm https://github.com/fluencelabs/fluence/issues/1216
if let Err(err) = actor.cleanup(particle_id, &peer_id.to_string(), &mut vm) {
if let Err(err) = actor.cleanup(particle_id, &worker_id.to_string(), &mut vm) {
tracing::warn!(
particle_id = particle_id,
"Error cleaning up after particle {:?}",
Expand Down
6 changes: 5 additions & 1 deletion connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
err
)
} else {
tracing::trace!(target: "execution",particle_id = particle_id, "Sent particle to execution");
tracing::trace!(
target: "execution",
particle_id = particle_id,
"Sent particle to execution"
);
}
} else {
break;
Expand Down
20 changes: 15 additions & 5 deletions crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,32 @@ pub fn default_registry_replicate_spell_period_sec() -> u32 {
3600
}

pub fn default_deal_network_api_endpoint() -> String {
"https://testnet.aurora.dev".to_string()
pub fn default_decider_network_api_endpoint() -> String {
"https://endpoints.omniatech.io/v1/matic/mumbai/public".to_string()
}

pub fn default_deal_contract_address_hex() -> String {
"0xb497e025D3095A197E30Ca84DEc36a637E649868".to_string()
pub fn default_matcher_address() -> String {
// on mumbai
"0x93A2897deDcC5478a9581808F5EC25F4FadbC312".to_string()
}

pub fn default_deal_contract_block_hex() -> String {
pub fn default_decider_start_block_hex() -> String {
"latest".to_string()
}

pub fn default_decider_worker_gas() -> u64 {
210_000
}

pub fn default_ipfs_binary_path() -> String {
"/usr/bin/ipfs".to_string()
}

pub fn default_curl_binary_path() -> String {
"/usr/bin/curl".to_string()
}

pub fn default_decider_network_id() -> u64 {
// 80001 = polygon mumbai
80001
}
29 changes: 27 additions & 2 deletions crates/server-config/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,41 @@ impl UnresolvedNodeConfig {
}

if let Ok(decider_contract_addr) = std::env::var("FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS") {
self.system_services.decider.contract_address_hex = decider_contract_addr;
self.system_services.decider.matcher_address = decider_contract_addr;
log::warn!(
"Override configuration of decider system spell (contract address) from ENV"
);
}

if let Ok(decider_from_block) = std::env::var("FLUENCE_ENV_CONNECTOR_FROM_BLOCK") {
self.system_services.decider.contract_block_hex = decider_from_block;
self.system_services.decider.start_block = decider_from_block;
log::warn!("Override configuration of decider system spell (from block) from ENV");
}

if let Ok(decider_wallet_key) = std::env::var("FLUENCE_ENV_CONNECTOR_WALLET_KEY") {
self.system_services.decider.wallet_key = Some(decider_wallet_key);
log::warn!("Override configuration of decider system spell (wallet key) from ENV");
}

if let Ok(worker_gas) = std::env::var("FLUENCE_ENV_CONNECTOR_WORKER_GAS") {
match worker_gas.parse() {
Ok(worker_gas) => {
self.system_services.decider.worker_gas = worker_gas;
log::warn!(
"Override configuration of decider system spell (worker gas) from ENV"
);
}
Err(err) => log::warn!(
"Unable to override worker gas, value is not a valid u64: {}",
err
),
}
}

if let Ok(decider_wallet_key) = std::env::var("FLUENCE_ENV_CONNECTOR_WALLET_KEY") {
self.system_services.decider.wallet_key = Some(decider_wallet_key);
log::warn!("Override configuration of decider system spell (wallet key) from ENV");
}
}
}

Expand Down
25 changes: 17 additions & 8 deletions crates/server-config/src/system_services_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,18 @@ pub struct DeciderConfig {
pub worker_period_sec: u32,
#[serde(default = "default_ipfs_multiaddr")]
pub worker_ipfs_multiaddr: String,
#[serde(default = "default_deal_network_api_endpoint")]
#[serde(default = "default_decider_network_api_endpoint")]
pub network_api_endpoint: String,
#[serde(default = "default_deal_contract_address_hex")]
pub contract_address_hex: String,
#[serde(default = "default_deal_contract_block_hex")]
pub contract_block_hex: String,
#[serde(default = "default_decider_network_id")]
pub network_id: u64,
#[serde(default = "default_matcher_address")]
pub matcher_address: String,
#[serde(default = "default_decider_start_block_hex")]
pub start_block: String,
#[serde(default = "default_decider_worker_gas")]
pub worker_gas: u64,
#[serde(default)]
pub wallet_key: Option<String>,
}

impl Default for DeciderConfig {
Expand All @@ -131,9 +137,12 @@ impl Default for DeciderConfig {
decider_period_sec: default_decider_spell_period_sec(),
worker_period_sec: default_worker_spell_period_sec(),
worker_ipfs_multiaddr: default_ipfs_multiaddr(),
network_api_endpoint: default_deal_network_api_endpoint(),
contract_address_hex: default_deal_contract_address_hex(),
contract_block_hex: default_deal_contract_block_hex(),
network_api_endpoint: default_decider_network_api_endpoint(),
network_id: default_decider_network_id(),
matcher_address: default_matcher_address(),
start_block: default_decider_start_block_hex(),
worker_gas: default_decider_worker_gas(),
wallet_key: None,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/system-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
aqua-ipfs-distro = "=0.5.17"
decider-distro = "=0.4.17"
decider-distro = "=0.5.0"
registry-distro = "=0.8.7"
trust-graph-distro = "=0.4.7"

Expand Down
Loading

0 comments on commit cd49dd2

Please sign in to comment.