From fa5cc9c6c51ae924eae436fd8f6cbb85a039ba3c Mon Sep 17 00:00:00 2001 From: stneng Date: Tue, 25 Oct 2022 21:21:56 +0000 Subject: [PATCH 1/2] resolve issues --- Cargo.toml | 2 +- README.md | 8 ++++++- examples/auto_confirm.rs | 14 ++++++------- examples/user_wait_task.rs | 37 +++++++++++++++++++++++++++++++++ src/extensions/policy_module.rs | 19 +++++++---------- src/extensions/registry.rs | 2 +- src/extensions/wait_task.rs | 9 ++------ src/lib.rs | 1 + src/protocol.rs | 22 ++++++++++---------- src/utils.rs | 4 ++++ 10 files changed, 77 insertions(+), 41 deletions(-) create mode 100644 examples/user_wait_task.rs create mode 100644 src/utils.rs diff --git a/Cargo.toml b/Cargo.toml index ddb8b29..9b052c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "colink" -version = "0.1.21" +version = "0.1.22" edition = "2021" description = "CoLink Rust SDK" license = "MIT" diff --git a/README.md b/README.md index 3a6b6b0..354fbec 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional Add this to your Cargo.toml: ```toml [dependencies] -colink = "0.1.21" +colink = "0.1.22" ``` ## Getting Started @@ -38,6 +38,9 @@ cargo run --example user_run_task http://localhost:8080 $(sed -n "1,2p" ~/.colin cargo run --example host_import_user
# is optional ``` ``` +cargo run --example host_import_users
# is optional +``` +``` cargo run --example host_import_users_and_exchange_guest_jwts
# is optional ``` ``` @@ -85,6 +88,9 @@ cargo run --example user_start_protocol_operator
``` +``` +cargo run --example user_wait_task
+``` ### Protocol ``` diff --git a/examples/auto_confirm.rs b/examples/auto_confirm.rs index cdb9eaa..ae3c80e 100644 --- a/examples/auto_confirm.rs +++ b/examples/auto_confirm.rs @@ -1,4 +1,7 @@ -use colink::{CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage, Task}; +use colink::{ + utils::get_path_timestamp, CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage, + Task, +}; use prost::Message; use std::env; use tracing::debug; @@ -37,11 +40,11 @@ async fn main() -> Result<(), Box let list_entry = &res[0]; let list: CoLinkInternalTaskIdList = Message::decode(&*list_entry.payload).unwrap(); if list.task_ids_with_key_paths.is_empty() { - get_timestamp(&list_entry.key_path) + get_path_timestamp(&list_entry.key_path) } else { list.task_ids_with_key_paths .iter() - .map(|x| get_timestamp(&x.key_path)) + .map(|x| get_path_timestamp(&x.key_path)) .min() .unwrap_or(i64::MAX) } @@ -75,8 +78,3 @@ async fn main() -> Result<(), Box } } } - -fn get_timestamp(key_path: &str) -> i64 { - let pos = key_path.rfind('@').unwrap(); - key_path[pos + 1..].parse().unwrap() -} diff --git a/examples/user_wait_task.rs b/examples/user_wait_task.rs new file mode 100644 index 0000000..aa842fb --- /dev/null +++ b/examples/user_wait_task.rs @@ -0,0 +1,37 @@ +use colink::{ + decode_jwt_without_validation, extensions::registry::UserRecord, CoLink, Participant, +}; +use prost::Message; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = env::args().skip(1).collect::>(); + let addr = &args[0]; + let jwt = &args[1]; + let target_user = &args[2]; + let user_id = decode_jwt_without_validation(jwt).unwrap().user_id; + + let user = UserRecord { + user_id: target_user.to_string(), + ..Default::default() + }; + let mut payload = vec![]; + user.encode(&mut payload).unwrap(); + let cl = CoLink::new(addr, jwt); + let participants = vec![Participant { + user_id: user_id.to_string(), + role: "query_from_registries".to_string(), + }]; + let task_id = cl + .run_task("registry", target_user.as_bytes(), &participants, false) + .await?; + println!( + "Task {} has been created, waiting for it to finish...", + task_id + ); + cl.wait_task(&task_id).await?; + println!("Task {} finished", task_id); + + Ok(()) +} diff --git a/src/extensions/policy_module.rs b/src/extensions/policy_module.rs index 80f5468..65801c8 100644 --- a/src/extensions/policy_module.rs +++ b/src/extensions/policy_module.rs @@ -1,4 +1,4 @@ -use crate::colink_proto::*; +use crate::{colink_proto::*, utils::get_path_timestamp}; pub use colink_policy_module_proto::*; use prost::Message; mod colink_policy_module_proto { @@ -19,7 +19,7 @@ impl crate::application::CoLink { { Ok(res) => ( prost::Message::decode(&*res[0].payload)?, - get_timestamp(&res[0].key_path), + get_path_timestamp(&res[0].key_path), ), Err(_) => (Default::default(), 0), }; @@ -30,7 +30,7 @@ impl crate::application::CoLink { settings.enable = true; let mut payload = vec![]; settings.encode(&mut payload).unwrap(); - let timestamp = get_timestamp( + let timestamp = get_path_timestamp( &self .update_entry("_policy_module:settings", &payload) .await?, @@ -58,7 +58,7 @@ impl crate::application::CoLink { settings.enable = false; let mut payload = vec![]; settings.encode(&mut payload).unwrap(); - let timestamp = get_timestamp( + let timestamp = get_path_timestamp( &self .update_entry("_policy_module:settings", &payload) .await?, @@ -88,7 +88,7 @@ impl crate::application::CoLink { settings.rules.push(rule); let mut payload = vec![]; settings.encode(&mut payload).unwrap(); - let timestamp = get_timestamp( + let timestamp = get_path_timestamp( &self .update_entry("_policy_module:settings", &payload) .await?, @@ -109,7 +109,7 @@ impl crate::application::CoLink { settings.rules.retain(|x| x.rule_id != rule_id); let mut payload = vec![]; settings.encode(&mut payload).unwrap(); - let timestamp = get_timestamp( + let timestamp = get_path_timestamp( &self .update_entry("_policy_module:settings", &payload) .await?, @@ -136,7 +136,7 @@ impl crate::application::CoLink { if applied_settings_timestamp >= timestamp { return Ok(()); } - get_timestamp(&res[0].key_path) + 1 + get_path_timestamp(&res[0].key_path) + 1 } Err(_) => 0, }; @@ -157,8 +157,3 @@ impl crate::application::CoLink { Ok(()) } } - -fn get_timestamp(key_path: &str) -> i64 { - let pos = key_path.rfind('@').unwrap(); - key_path[pos + 1..].parse().unwrap() -} diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs index f8553ac..35c3572 100644 --- a/src/extensions/registry.rs +++ b/src/extensions/registry.rs @@ -1,5 +1,5 @@ use crate::colink_proto::*; -pub use colink_registry_proto::{Registries, Registry}; +pub use colink_registry_proto::{Registries, Registry, UserRecord}; use prost::Message; mod colink_registry_proto { include!(concat!(env!("OUT_DIR"), "/colink_registry.rs")); diff --git a/src/extensions/wait_task.rs b/src/extensions/wait_task.rs index d7a78d4..61fef12 100644 --- a/src/extensions/wait_task.rs +++ b/src/extensions/wait_task.rs @@ -1,4 +1,4 @@ -use crate::colink_proto::*; +use crate::{colink_proto::*, utils::get_path_timestamp}; use prost::Message; use tracing::debug; @@ -19,7 +19,7 @@ impl crate::application::CoLink { if task.status == "finished" { return Ok(()); } - get_timestamp(&res[0].key_path) + 1 + get_path_timestamp(&res[0].key_path) + 1 } Err(_) => 0, }; @@ -40,8 +40,3 @@ impl crate::application::CoLink { Ok(()) } } - -fn get_timestamp(key_path: &str) -> i64 { - let pos = key_path.rfind('@').unwrap(); - key_path[pos + 1..].parse().unwrap() -} diff --git a/src/lib.rs b/src/lib.rs index dcd5f3a..f597140 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,3 +12,4 @@ pub use protocol::{ CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait, }; pub mod extensions; +pub mod utils; diff --git a/src/protocol.rs b/src/protocol.rs index e1013fb..49ba7d5 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,4 +1,4 @@ -use crate::application::*; +use crate::{application::*, utils::get_path_timestamp}; pub use async_trait::async_trait; use futures_lite::stream::StreamExt; use lapin::{ @@ -7,7 +7,11 @@ use lapin::{ Connection, ConnectionProperties, }; use prost::Message; -use std::{collections::HashMap, sync::mpsc::channel, thread}; +use std::{ + collections::{HashMap, HashSet}, + sync::mpsc::channel, + thread, +}; use structopt::StructOpt; use tracing::{debug, error}; @@ -71,11 +75,11 @@ impl CoLinkProtocol { let list: CoLinkInternalTaskIdList = Message::decode(&*list_entry.payload).unwrap(); if list.task_ids_with_key_paths.is_empty() { - get_timestamp(&list_entry.key_path) + get_path_timestamp(&list_entry.key_path) } else { list.task_ids_with_key_paths .iter() - .map(|x| get_timestamp(&x.key_path)) + .map(|x| get_path_timestamp(&x.key_path)) .min() .unwrap_or(i64::MAX) } @@ -151,17 +155,12 @@ impl CoLinkProtocol { } } -fn get_timestamp(key_path: &str) -> i64 { - let pos = key_path.rfind('@').unwrap(); - key_path[pos + 1..].parse().unwrap() -} - pub fn _protocol_start( cl: CoLink, user_funcs: HashMap>, ) -> Result<(), Error> { let mut operator_funcs: HashMap> = HashMap::new(); - let mut protocols = vec![]; + let mut protocols = HashSet::new(); for (protocol_and_role, user_func) in user_funcs { let cl = cl.clone(); if protocol_and_role.ends_with(":@init") { @@ -190,7 +189,8 @@ pub fn _protocol_start( Ok::<(), Box>(()) })?; } else { - protocols.push(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string()); + protocols + .insert(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string()); operator_funcs.insert(protocol_and_role, user_func); } } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..5beb6f0 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,4 @@ +pub fn get_path_timestamp(key_path: &str) -> i64 { + let pos = key_path.rfind('@').unwrap(); + key_path[pos + 1..].parse().unwrap() +} From 9ad5f325a65d1ff0f3c4b2b4f3329b1369dd51b9 Mon Sep 17 00:00:00 2001 From: stneng Date: Tue, 25 Oct 2022 21:41:50 +0000 Subject: [PATCH 2/2] issue #20 --- src/protocol.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/protocol.rs b/src/protocol.rs index 49ba7d5..4324908 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -9,7 +9,7 @@ use lapin::{ use prost::Message; use std::{ collections::{HashMap, HashSet}, - sync::mpsc::channel, + sync::{mpsc::channel, Arc, Mutex}, thread, }; use structopt::StructOpt; @@ -161,8 +161,10 @@ pub fn _protocol_start( ) -> Result<(), Error> { let mut operator_funcs: HashMap> = HashMap::new(); let mut protocols = HashSet::new(); + let failed_protocols = Arc::new(Mutex::new(HashSet::new())); for (protocol_and_role, user_func) in user_funcs { let cl = cl.clone(); + let failed_protocols = failed_protocols.clone(); if protocol_and_role.ends_with(":@init") { let protocol_name = protocol_and_role[..protocol_and_role.len() - 6].to_string(); tokio::runtime::Builder::new_multi_thread() @@ -180,10 +182,14 @@ pub fn _protocol_start( .start(cl_clone, Default::default(), Default::default()) .await { - Ok(_) => {} - Err(e) => error!("{}: {}.", protocol_and_role, e), + Ok(_) => { + cl.update_entry(&is_initialized_key, &[1]).await?; + } + Err(e) => { + error!("{}: {}.", protocol_and_role, e); + failed_protocols.lock().unwrap().insert(protocol_name); + } } - cl.update_entry(&is_initialized_key, &[1]).await?; } cl.unlock(lock).await?; Ok::<(), Box>(()) @@ -194,6 +200,9 @@ pub fn _protocol_start( operator_funcs.insert(protocol_and_role, user_func); } } + for failed_protocol in &*failed_protocols.lock().unwrap() { + protocols.remove(failed_protocol); + } let cl_clone = cl.clone(); tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -259,6 +268,7 @@ pub struct CommandLineArgs { } pub fn _colink_parse_args() -> CoLink { + tracing_subscriber::fmt::init(); let CommandLineArgs { addr, jwt,