Skip to content

Commit

Permalink
fix(system-services): refactor deployer + fix [NET-586] (#1859)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Oct 30, 2023
1 parent c7e5426 commit 78b0a46
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 97 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/kademlia/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![feature(stmt_expr_attributes)]
#![feature(extract_if)]
#![feature(stmt_expr_attributes)]
/*
* Copyright 2020 Fluence Labs Limited
*
Expand Down
1 change: 1 addition & 0 deletions crates/nox-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ control-macro = { path = "../control-macro" }
json-utils = { path = "../json-utils" }
system-services = { workspace = true }
subnet-resolver = { workspace = true }
fs-utils = { workspace = true }

log-utils = { workspace = true }
fluence-spell-dtos = { workspace = true }
Expand Down
110 changes: 110 additions & 0 deletions crates/nox-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use std::collections::HashMap;
use std::time::Duration;

use eyre::Context;
use fluence_keypair::KeyPair;
use maplit::hashmap;
use serde_json::{json, Value as JValue};

use connected_client::ConnectedClient;
use created_swarm::system_services_config::{DeciderConfig, SystemServicesConfig};
use created_swarm::{make_swarms, make_swarms_with_cfg};
use fluence_spell_dtos::trigger_config::{ClockConfig, TriggerConfig};
use fs_utils::make_tmp_dir_peer_id;
use service_modules::load_module;
use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC};
use test_utils::{create_service, create_service_worker};
Expand Down Expand Up @@ -2079,3 +2082,110 @@ async fn set_alias_by_worker_creator() {
assert_eq!(*resolved, tetraplets_service.id);
}
}

#[tokio::test]
async fn test_decider_api_endpoint_rewrite() {
let expected_endpoint = "test1".to_string();
let swarm_keypair = KeyPair::generate_ed25519();
let swarm_dir = make_tmp_dir_peer_id(swarm_keypair.get_peer_id().to_string());
let swarms = make_swarms_with_cfg(1, |mut cfg| {
cfg.keypair = swarm_keypair.clone();
cfg.tmp_dir = Some(swarm_dir.clone());
cfg.enabled_system_services = vec!["decider".to_string()];
cfg.override_system_services_config = Some(SystemServicesConfig {
enable: vec![],
aqua_ipfs: Default::default(),
decider: DeciderConfig {
network_api_endpoint: expected_endpoint.clone(),
..Default::default()
},
registry: Default::default(),
connector: Default::default(),
});
cfg
})
.await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

client.send_particle(
r#"(seq
(call relay ("decider" "get_string") ["chain"] chain_info_str)
(seq
(call relay ("json" "parse") [chain_info_str.$.str] chain_info)
(call client ("return" "") [chain_info.$.api_endpoint])
)
)"#,
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
);

if let [JValue::String(endpoint)] = client
.receive_args()
.await
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(*endpoint, expected_endpoint);
}

// stop swarm
swarms
.into_iter()
.map(|s| s.exit_outlet.send(()))
.for_each(drop);

let another_endpoint = "another_endpoint_test".to_string();
let swarms = make_swarms_with_cfg(1, |mut cfg| {
cfg.keypair = swarm_keypair.clone();
cfg.tmp_dir = Some(swarm_dir.clone());
cfg.enabled_system_services = vec!["decider".to_string()];
cfg.override_system_services_config = Some(SystemServicesConfig {
enable: vec![],
aqua_ipfs: Default::default(),
decider: DeciderConfig {
network_api_endpoint: another_endpoint.clone(),
..Default::default()
},
registry: Default::default(),
connector: Default::default(),
});
cfg
})
.await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

client.send_particle(
r#"(seq
(call relay ("decider" "get_string") ["chain"] chain_info_str)
(seq
(call relay ("json" "parse") [chain_info_str.$.str] chain_info)
(call client ("return" "") [chain_info.$.api_endpoint])
)
)"#,
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
},
);

if let [JValue::String(endpoint)] = client
.receive_args()
.await
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(*endpoint, another_endpoint);
}
}
1 change: 1 addition & 0 deletions crates/system-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ particle-args = { workspace = true }
now-millis = { workspace = true }
server-config = { workspace = true }
service-modules = { workspace = true }
uuid-utils = { workspace = true }
129 changes: 33 additions & 96 deletions crates/system-services/src/deployer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@ use crate::distro::*;
use crate::CallService;
use crate::{DeploymentStatus, PackageDistro, ServiceDistro, ServiceStatus, SpellDistro};
use eyre::eyre;
use fluence_spell_dtos::trigger_config::TriggerConfig;
use libp2p::PeerId;
use particle_execution::FunctionOutcome;
use particle_modules::{AddBlueprint, ModuleRepository};
use particle_services::{ParticleAppServices, ServiceError, ServiceType};
use serde_json::{json, Value as JValue};
use sorcerer::{get_spell_info, install_spell, remove_spell};
use spell_event_bus::api::SpellEventBusApi;
use sorcerer::{install_spell, remove_spell};
use spell_event_bus::api::{SpellEventBusApi, SpellId};
use spell_service_api::{CallParams, SpellServiceApi};
use spell_storage::SpellStorage;
use std::collections::HashMap;
use std::time::Duration;
use uuid_utils::uuid;

const DEPLOYER_TTL: Duration = Duration::from_millis(60_000);

const DEPLOYER_PARTICLE_ID: &str = "system-services-deployment";

fn get_deployer_particle_id() -> String {
format!("{}_{}", DEPLOYER_PARTICLE_ID, uuid())
}

// Status of the service or spell before deployment
#[derive(Clone, Debug)]
enum ServiceUpdateStatus {
Expand Down Expand Up @@ -115,7 +119,7 @@ impl Deployer {
async fn deploy_system_spell(&self, spell_distro: SpellDistro) -> eyre::Result<ServiceStatus> {
let spell_name = spell_distro.name.clone();
match self.find_same_spell(&spell_distro) {
ServiceUpdateStatus::NeedUpdate(spell_id) => {
Some(spell_id) => {
tracing::debug!(
spell_name,
spell_id,
Expand All @@ -128,7 +132,8 @@ impl Deployer {
spell_name,
"can't update a spell (will redeploy it): {err}"
);
self.clean_old_spell(&spell_name, spell_id).await;
self.remove_old_spell(&spell_name, &spell_id).await?;

let spell_id = self.deploy_spell_common(spell_distro).await?;
tracing::info!(spell_name, spell_id, "redeployed a system spell",);
Ok(ServiceStatus::Created(spell_id))
Expand All @@ -139,15 +144,8 @@ impl Deployer {
}
}
}
ServiceUpdateStatus::NoUpdate(spell_id) => {
tracing::debug!(
spell_name,
spell_id,
"found existing spell that doesn't need to be updated; will not update",
);
Ok(ServiceStatus::Existing(spell_id))
}
ServiceUpdateStatus::NotFound => {

None => {
let spell_id = self.deploy_spell_common(spell_distro).await?;
tracing::info!(spell_name, spell_id, "deployed a system spell",);
Ok(ServiceStatus::Created(spell_id))
Expand Down Expand Up @@ -177,10 +175,10 @@ impl Deployer {
// update trigger config
let config = spell_distro.trigger_config.clone();
self.spells_api.set_trigger_config(params.clone(), config)?;
// update spell
// update spell script
let air = spell_distro.air.to_string();
self.spells_api.set_script(params.clone(), air)?;
// Let's save old_counter
// update init_data without affecting other keys
self.spells_api.update_kv(params, json!(spell_distro.kv))?;

// resubscribe spell
Expand All @@ -196,41 +194,20 @@ impl Deployer {
Ok(())
}

async fn clean_old_spell(&self, spell_name: &str, spell_id: String) {
let result = remove_spell(
DEPLOYER_PARTICLE_ID,
async fn remove_old_spell(&self, spell_name: &str, spell_id: &str) -> eyre::Result<()> {
remove_spell(
&get_deployer_particle_id(),
&self.spell_storage,
&self.services,
&self.spell_event_bus_api,
&spell_id,
self.root_worker_id,
)
.await;
if let Err(err) = result {
tracing::error!(
spell_name,
spell_id,
"Failed to remove old spell (trying to stop it): {err}",
);

let empty_config = TriggerConfig::default();
// Stop old spell
let result: eyre::Result<_> = try {
// Stop the spell to avoid re-subscription
let params = CallParams::local(spell_id.clone(), self.root_worker_id, DEPLOYER_TTL);
self.spells_api.set_trigger_config(params, empty_config)?;

// Unsubscribe spell from execution
self.spell_event_bus_api.unsubscribe(spell_id.clone()).await
};
if let Err(err) = result {
tracing::error!(
spell_name,
spell_id,
"couldn't stop the old spell (will install new spell nevertheless): {err}",
);
}
}
.await
.map_err(|err| {
tracing::error!(spell_name, spell_id, "couldn't remove the old spell: {err}",);
eyre!(err)
})
}

async fn deploy_spell_common(&self, spell_distro: SpellDistro) -> eyre::Result<String> {
Expand All @@ -240,7 +217,7 @@ impl Deployer {
&self.spell_event_bus_api,
&self.spells_api,
self.root_worker_id,
DEPLOYER_PARTICLE_ID.to_string(),
get_deployer_particle_id(),
DEPLOYER_TTL,
spell_distro.trigger_config,
spell_distro.air.to_string(),
Expand All @@ -257,73 +234,33 @@ impl Deployer {
Ok(spell_id)
}

// Two spells are the same if
// - they have the same alias
//
// Need to redeploy (stop the old one, create a new one) a spell if
// - the script is different
// - the trigger config is different
fn find_same_spell(&self, new_spell: &SpellDistro) -> ServiceUpdateStatus {
// Two spells are the same if they have the same alias
fn find_same_spell(&self, new_spell: &SpellDistro) -> Option<SpellId> {
let existing_spell =
self.services
.get_service_info("", self.root_worker_id, new_spell.name.to_string());
let spell = match existing_spell {
Ok(spell) => spell,
match existing_spell {
Err(ServiceError::NoSuchService(_)) => {
log::debug!("no existing spell found for {}", new_spell.name);
return ServiceUpdateStatus::NotFound;
None
}
Err(err) => {
log::error!(
"can't obtain details on a spell `{}` (will create a new one): {err}",
new_spell.name
);
return ServiceUpdateStatus::NotFound;
None
}
};
if spell.service_type != ServiceType::Spell {
log::warn!(
Ok(spell) if spell.service_type != ServiceType::Spell => {
log::warn!(
"alias `{}` already used for a service [{}]; it will be used for a spell, the service won't be removed",
new_spell.name,
spell.id
);
return ServiceUpdateStatus::NotFound;
}

// Request a script and a trigger config from the spell
let spell_info = get_spell_info(
&self.spells_api,
self.root_worker_id,
DEPLOYER_TTL,
spell.id.clone(),
);
let spell_info = match spell_info {
Err(err) => {
log::error!(
"can't obtain details on existing spell {} (will try to update nevertheless): {err}",
new_spell.name
);
return ServiceUpdateStatus::NeedUpdate(spell.id);
None
}
Ok(s) => s,
};

if spell_info.script != new_spell.air {
log::debug!(
"found old {} spell but with a different script; updating the spell",
new_spell.name
);
return ServiceUpdateStatus::NeedUpdate(spell.id);
}
if spell_info.trigger_config != new_spell.trigger_config {
log::debug!(
"found old {} spell but with a different trigger config; updating the spell",
new_spell.name
);
return ServiceUpdateStatus::NeedUpdate(spell.id);
Ok(spell) => Some(spell.id),
}

ServiceUpdateStatus::NoUpdate(spell.id)
}

fn deploy_service_common(&self, service_distro: ServiceDistro) -> eyre::Result<ServiceStatus> {
Expand All @@ -334,7 +271,7 @@ impl Deployer {
ServiceUpdateStatus::NeedUpdate(service_id) => {
tracing::debug!(service_name, service_id, "found existing service that needs to be updated; will remove the old service and deploy a new one");
let result = self.services.remove_service(
DEPLOYER_PARTICLE_ID,
&get_deployer_particle_id(),
self.root_worker_id,
&service_id,
self.root_worker_id,
Expand Down

0 comments on commit 78b0a46

Please sign in to comment.