Skip to content

Commit

Permalink
feat(spells): support empty trigger configs [NET-316] (#1412)
Browse files Browse the repository at this point in the history
* Allow use of empty trigger config

An empty trigger config is used either to unsubscribe a spell from triggers
or to not subscribe at all if it was used on installation.

Also, remove updating interface inside the spell-event-bus, since
it's not needed.
  • Loading branch information
kmd-fl committed Jan 18, 2023
1 parent d4d4718 commit 46d8fd5
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 213 deletions.
286 changes: 194 additions & 92 deletions crates/particle-node-tests/tests/spells.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#![feature(assert_matches)]
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
Expand All @@ -26,7 +27,7 @@ use connected_client::ConnectedClient;
use created_swarm::make_swarms;
use fluence_spell_dtos::trigger_config::TriggerConfig;
use service_modules::load_module;
use spell_event_bus::api::MAX_PERIOD_SEC;
use spell_event_bus::api::{TriggerInfo, TriggerInfoAqua, MAX_PERIOD_SEC};
use test_utils::create_service;

type SpellId = String;
Expand Down Expand Up @@ -280,7 +281,7 @@ fn spell_run_oneshot() {
// The config considered empty if start_sec is 0. In this case we don't schedule a spell.
// Script installation will fail because no triggers configured.
#[test]
fn spell_install_fail_empty_config() {
fn spell_install_ok_empty_config() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
Expand All @@ -291,32 +292,59 @@ fn spell_install_fail_empty_config() {

// Note that when period is 0, the spell is executed only once
let config = TriggerConfig::default();
let (spell_id, _) = create_spell(&mut client, script, config, empty);

let data = hashmap! {
"script" => json!(script.to_string()),
"config" => json!(config),
"client" => json!(client.peer_id.to_string()),
"relay" => json!(client.node.to_string()),
"data" => json!(json!(empty).to_string()),
};
// The spell should be installed, but should not be subscribed to any triggers
// We cannot truly check that the spell isn't subscribed to anything right now, but we can check that
// it's counter is zero on different occasions:

// 1. Check that the spell wasn't executed immediately after installation (the case of `start_sec` <= now)
client.send_particle(
r#"
(xor
(call relay ("spell" "install") [script data config] spell_id)
(call client ("return" "") [%last_error%.$.message])
(seq
(call relay (spell_id "get_u32") ["counter"] counter)
(call %init_peer_id% ("return" "") [counter])
)"#,
data,
hashmap! {
"relay" => json!(client.node.to_string()),
"spell_id" => json!(spell_id),
},
);
let response = client
.receive_args()
.wrap_err("receive counter first try")
.unwrap();
if response[0]["success"].as_bool().unwrap() {
let counter = response[0]["num"].as_u64().unwrap();
assert_eq!(counter, 0);
}
// 2. Connect and disconnect a client to the same node. The spell should not be executed
let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();
drop(connected);

if let [JValue::String(error_msg)] = client
client.send_particle(
r#"
(seq
(call relay (spell_id "get_u32") ["counter"] counter)
(call %init_peer_id% ("return" "") [counter])
)"#,
hashmap! {
"relay" => json!(client.node.to_string()),
"spell_id" => json!(spell_id),
},
);
let response = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
let msg = "Local service error, ret_code is 1, error message is '\"Error: config is empty, nothing to do\"'";
assert_eq!(msg, error_msg);
.wrap_err("receive counter second try")
.unwrap();
if response[0]["success"].as_bool().unwrap() {
let counter = response[0]["num"].as_u64().unwrap();
assert_eq!(counter, 0);
}

// 3. We cannot check that it's not scheduled to run in the future, but it's ok for now.
}

#[test]
Expand Down Expand Up @@ -608,11 +636,6 @@ fn spell_remove_service_as_spell() {
load_module("tests/file_share/artifacts", "file_share").expect("load module"),
);

let script = r#"(call %init_peer_id% ("peer" "identify") [] x)"#;
let mut config = TriggerConfig::default();
config.clock.start_sec = 1;
let _spell_id = create_spell(&mut client, script, config, hashmap! {});

let data = hashmap! {
"service_id" => json!(service.id),
"relay" => json!(client.node.to_string()),
Expand Down Expand Up @@ -712,72 +735,6 @@ fn spell_trigger_connection_pool() {
);
}

#[test]
fn spell_update_config() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();

let script = format!(r#"(call "{}" ("return" "") ["called"])"#, client.peer_id);
let mut config = TriggerConfig::default();
config.connections.connect = true;
let (spell_id, _) = create_spell(&mut client, &script, config, hashmap! {});

let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone()).unwrap();

if let [JValue::String(x)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x, "called", "spell must be triggered");
}

let mut config = TriggerConfig::default();
config.connections.disconnect = true;
let data = hashmap! {
"spell_id" => json!(spell_id),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
"config" => json!(config),
};
client.send_particle(
r#"(seq
(call relay ("spell" "update_trigger_config") [spell_id config])
;;(call relay ("op" "noop") [])
(call %init_peer_id% ("return" "") ["done"])
)"#,
data,
);
let result = client.receive_args().wrap_err("receive").unwrap().pop();
let result = match result {
Some(JValue::String(result)) => result,
None => panic!("no results from update_trigger_config particle"),
other => panic!(
"expected JSON String from update_trigger_config particle, got {:?}",
other
),
};
assert_eq!(result, "done", "spell must be updated");

drop(connected);
let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone()).unwrap();
drop(connected);
let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone()).unwrap();
drop(connected);

if let [JValue::String(x)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x, "called", "spell must be triggered after config update");
}
}

#[test]
fn spell_timer_trigger_mailbox_test() {
let swarms = make_swarms(1);
Expand Down Expand Up @@ -946,3 +903,148 @@ fn spell_peer_id_test() {

assert_eq!(result, scope_peer_id);
}
#[test]
fn spell_update_config() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();

let script = format!(
r#"(seq
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(call %init_peer_id% (spell_id "list_pop_string") ["trigger_mailbox"] result)
)
(call "{}" ("return" "") [result])
)"#,
client.peer_id
);
let mut config = TriggerConfig::default();
config.connections.connect = true;
let (spell_id, _) = create_spell(&mut client, &script, config, hashmap! {});
let connected = ConnectedClient::connect_to(swarms[0].multiaddr.clone()).unwrap();

if let [JValue::Object(x)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let info: TriggerInfoAqua = serde_json::from_str(x["str"].as_str().unwrap()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(info, TriggerInfo::Peer(p) if p.connected, "spell must be triggered by the `connected` event");
} else {
panic!("wrong result from spell, expected trigger info with the `connected` event");
}

let mut config = TriggerConfig::default();
config.connections.disconnect = true;
let data = hashmap! {
"spell_id" => json!(spell_id),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
"config" => json!(config),
};
client.send_particle(
r#"(seq
(call relay ("spell" "update_trigger_config") [spell_id config])
(call %init_peer_id% ("return" "") ["done"])
)"#,
data,
);
let result = client.receive_args().wrap_err("receive").unwrap().pop();
let result = match result {
Some(JValue::String(result)) => result,
None => panic!("no results from update_trigger_config particle"),
other => panic!(
"expected JSON String from update_trigger_config particle, got {:?}",
other
),
};
assert_eq!(result, "done", "spell must be updated");

drop(connected);

if let [JValue::Object(x)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let info: TriggerInfoAqua = serde_json::from_str(x["str"].as_str().unwrap()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(info, TriggerInfo::Peer(p) if !p.connected, "spell must be triggered by the `disconnected` event");
} else {
panic!("wrong result from spell, expect trigger info with the `disconnected` event");
}
}

#[test]
fn spell_update_config_stopped_spell() {
let swarms = make_swarms(1);
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.wrap_err("connect client")
.unwrap();

let script = format!(
r#"(seq
(seq
(call %init_peer_id% ("getDataSrv" "spell_id") [] spell_id)
(call %init_peer_id% (spell_id "list_pop_string") ["trigger_mailbox"] result)
)
(call "{}" ("return" "") [result])
)"#,
client.peer_id
);
// create periodic spell
let config = TriggerConfig::default();
let (spell_id, _) = create_spell(&mut client, &script, config, hashmap! {});

// Update trigger config to do something.
let mut config = TriggerConfig::default();
config.clock.start_sec = 1;
let data = hashmap! {
"spell_id" => json!(spell_id),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
"config" => json!(config),
};
client.send_particle(
r#"(seq
(call relay ("spell" "update_trigger_config") [spell_id config])
(call %init_peer_id% ("return" "") ["done"])
)"#,
data,
);
let result = client.receive_args().wrap_err("receive").unwrap().pop();
let result = match result {
Some(JValue::String(result)) => result,
None => panic!("no results from update_trigger_config particle"),
other => panic!(
"expected JSON String from update_trigger_config particle, got {:?}",
other
),
};
assert_eq!(result, "done", "spell must be updated");

if let [JValue::Object(x)] = client
.receive_args()
.wrap_err("receive")
.unwrap()
.as_slice()
{
assert_eq!(x["absent"], JValue::Bool(false), "spell must be triggered");
let info: TriggerInfoAqua = serde_json::from_str(x["str"].as_str().unwrap()).unwrap();
let info: TriggerInfo = info.into();
assert_matches!(
info,
TriggerInfo::Timer(_),
"spell must be triggered by the timer event"
);
} else {
panic!("wrong result from spell, expect trigger info with the timer event");
}
}
Loading

0 comments on commit 46d8fd5

Please sign in to comment.