diff --git a/Cargo.toml b/Cargo.toml
index ddb8b29..9b052c5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "colink"
-version = "0.1.21"
+version = "0.1.22"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
diff --git a/README.md b/README.md
index 3a6b6b0..354fbec 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
-colink = "0.1.21"
+colink = "0.1.22"
```
## Getting Started
@@ -38,6 +38,9 @@ cargo run --example user_run_task http://localhost:8080 $(sed -n "1,2p" ~/.colin
cargo run --example host_import_user
# is optional
```
```
+cargo run --example host_import_users # is optional
+```
+```
cargo run --example host_import_users_and_exchange_guest_jwts # is optional
```
```
@@ -85,6 +88,9 @@ cargo run --example user_start_protocol_operator
```
+```
+cargo run --example user_wait_task
+```
### Protocol
```
diff --git a/examples/auto_confirm.rs b/examples/auto_confirm.rs
index cdb9eaa..ae3c80e 100644
--- a/examples/auto_confirm.rs
+++ b/examples/auto_confirm.rs
@@ -1,4 +1,7 @@
-use colink::{CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage, Task};
+use colink::{
+ utils::get_path_timestamp, CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage,
+ Task,
+};
use prost::Message;
use std::env;
use tracing::debug;
@@ -37,11 +40,11 @@ async fn main() -> Result<(), Box
let list_entry = &res[0];
let list: CoLinkInternalTaskIdList = Message::decode(&*list_entry.payload).unwrap();
if list.task_ids_with_key_paths.is_empty() {
- get_timestamp(&list_entry.key_path)
+ get_path_timestamp(&list_entry.key_path)
} else {
list.task_ids_with_key_paths
.iter()
- .map(|x| get_timestamp(&x.key_path))
+ .map(|x| get_path_timestamp(&x.key_path))
.min()
.unwrap_or(i64::MAX)
}
@@ -75,8 +78,3 @@ async fn main() -> Result<(), Box
}
}
}
-
-fn get_timestamp(key_path: &str) -> i64 {
- let pos = key_path.rfind('@').unwrap();
- key_path[pos + 1..].parse().unwrap()
-}
diff --git a/examples/user_wait_task.rs b/examples/user_wait_task.rs
new file mode 100644
index 0000000..aa842fb
--- /dev/null
+++ b/examples/user_wait_task.rs
@@ -0,0 +1,37 @@
+use colink::{
+ decode_jwt_without_validation, extensions::registry::UserRecord, CoLink, Participant,
+};
+use prost::Message;
+use std::env;
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ let args = env::args().skip(1).collect::>();
+ let addr = &args[0];
+ let jwt = &args[1];
+ let target_user = &args[2];
+ let user_id = decode_jwt_without_validation(jwt).unwrap().user_id;
+
+ let user = UserRecord {
+ user_id: target_user.to_string(),
+ ..Default::default()
+ };
+ let mut payload = vec![];
+ user.encode(&mut payload).unwrap();
+ let cl = CoLink::new(addr, jwt);
+ let participants = vec![Participant {
+ user_id: user_id.to_string(),
+ role: "query_from_registries".to_string(),
+ }];
+ let task_id = cl
+ .run_task("registry", target_user.as_bytes(), &participants, false)
+ .await?;
+ println!(
+ "Task {} has been created, waiting for it to finish...",
+ task_id
+ );
+ cl.wait_task(&task_id).await?;
+ println!("Task {} finished", task_id);
+
+ Ok(())
+}
diff --git a/src/extensions/policy_module.rs b/src/extensions/policy_module.rs
index 80f5468..65801c8 100644
--- a/src/extensions/policy_module.rs
+++ b/src/extensions/policy_module.rs
@@ -1,4 +1,4 @@
-use crate::colink_proto::*;
+use crate::{colink_proto::*, utils::get_path_timestamp};
pub use colink_policy_module_proto::*;
use prost::Message;
mod colink_policy_module_proto {
@@ -19,7 +19,7 @@ impl crate::application::CoLink {
{
Ok(res) => (
prost::Message::decode(&*res[0].payload)?,
- get_timestamp(&res[0].key_path),
+ get_path_timestamp(&res[0].key_path),
),
Err(_) => (Default::default(), 0),
};
@@ -30,7 +30,7 @@ impl crate::application::CoLink {
settings.enable = true;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
- let timestamp = get_timestamp(
+ let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
@@ -58,7 +58,7 @@ impl crate::application::CoLink {
settings.enable = false;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
- let timestamp = get_timestamp(
+ let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
@@ -88,7 +88,7 @@ impl crate::application::CoLink {
settings.rules.push(rule);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
- let timestamp = get_timestamp(
+ let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
@@ -109,7 +109,7 @@ impl crate::application::CoLink {
settings.rules.retain(|x| x.rule_id != rule_id);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
- let timestamp = get_timestamp(
+ let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
@@ -136,7 +136,7 @@ impl crate::application::CoLink {
if applied_settings_timestamp >= timestamp {
return Ok(());
}
- get_timestamp(&res[0].key_path) + 1
+ get_path_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
@@ -157,8 +157,3 @@ impl crate::application::CoLink {
Ok(())
}
}
-
-fn get_timestamp(key_path: &str) -> i64 {
- let pos = key_path.rfind('@').unwrap();
- key_path[pos + 1..].parse().unwrap()
-}
diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs
index f8553ac..35c3572 100644
--- a/src/extensions/registry.rs
+++ b/src/extensions/registry.rs
@@ -1,5 +1,5 @@
use crate::colink_proto::*;
-pub use colink_registry_proto::{Registries, Registry};
+pub use colink_registry_proto::{Registries, Registry, UserRecord};
use prost::Message;
mod colink_registry_proto {
include!(concat!(env!("OUT_DIR"), "/colink_registry.rs"));
diff --git a/src/extensions/wait_task.rs b/src/extensions/wait_task.rs
index d7a78d4..61fef12 100644
--- a/src/extensions/wait_task.rs
+++ b/src/extensions/wait_task.rs
@@ -1,4 +1,4 @@
-use crate::colink_proto::*;
+use crate::{colink_proto::*, utils::get_path_timestamp};
use prost::Message;
use tracing::debug;
@@ -19,7 +19,7 @@ impl crate::application::CoLink {
if task.status == "finished" {
return Ok(());
}
- get_timestamp(&res[0].key_path) + 1
+ get_path_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
@@ -40,8 +40,3 @@ impl crate::application::CoLink {
Ok(())
}
}
-
-fn get_timestamp(key_path: &str) -> i64 {
- let pos = key_path.rfind('@').unwrap();
- key_path[pos + 1..].parse().unwrap()
-}
diff --git a/src/lib.rs b/src/lib.rs
index dcd5f3a..f597140 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -12,3 +12,4 @@ pub use protocol::{
CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait,
};
pub mod extensions;
+pub mod utils;
diff --git a/src/protocol.rs b/src/protocol.rs
index e1013fb..4324908 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -1,4 +1,4 @@
-use crate::application::*;
+use crate::{application::*, utils::get_path_timestamp};
pub use async_trait::async_trait;
use futures_lite::stream::StreamExt;
use lapin::{
@@ -7,7 +7,11 @@ use lapin::{
Connection, ConnectionProperties,
};
use prost::Message;
-use std::{collections::HashMap, sync::mpsc::channel, thread};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::{mpsc::channel, Arc, Mutex},
+ thread,
+};
use structopt::StructOpt;
use tracing::{debug, error};
@@ -71,11 +75,11 @@ impl CoLinkProtocol {
let list: CoLinkInternalTaskIdList =
Message::decode(&*list_entry.payload).unwrap();
if list.task_ids_with_key_paths.is_empty() {
- get_timestamp(&list_entry.key_path)
+ get_path_timestamp(&list_entry.key_path)
} else {
list.task_ids_with_key_paths
.iter()
- .map(|x| get_timestamp(&x.key_path))
+ .map(|x| get_path_timestamp(&x.key_path))
.min()
.unwrap_or(i64::MAX)
}
@@ -151,19 +155,16 @@ impl CoLinkProtocol {
}
}
-fn get_timestamp(key_path: &str) -> i64 {
- let pos = key_path.rfind('@').unwrap();
- key_path[pos + 1..].parse().unwrap()
-}
-
pub fn _protocol_start(
cl: CoLink,
user_funcs: HashMap>,
) -> Result<(), Error> {
let mut operator_funcs: HashMap> = HashMap::new();
- let mut protocols = vec![];
+ let mut protocols = HashSet::new();
+ let failed_protocols = Arc::new(Mutex::new(HashSet::new()));
for (protocol_and_role, user_func) in user_funcs {
let cl = cl.clone();
+ let failed_protocols = failed_protocols.clone();
if protocol_and_role.ends_with(":@init") {
let protocol_name = protocol_and_role[..protocol_and_role.len() - 6].to_string();
tokio::runtime::Builder::new_multi_thread()
@@ -181,19 +182,27 @@ pub fn _protocol_start(
.start(cl_clone, Default::default(), Default::default())
.await
{
- Ok(_) => {}
- Err(e) => error!("{}: {}.", protocol_and_role, e),
+ Ok(_) => {
+ cl.update_entry(&is_initialized_key, &[1]).await?;
+ }
+ Err(e) => {
+ error!("{}: {}.", protocol_and_role, e);
+ failed_protocols.lock().unwrap().insert(protocol_name);
+ }
}
- cl.update_entry(&is_initialized_key, &[1]).await?;
}
cl.unlock(lock).await?;
Ok::<(), Box>(())
})?;
} else {
- protocols.push(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string());
+ protocols
+ .insert(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string());
operator_funcs.insert(protocol_and_role, user_func);
}
}
+ for failed_protocol in &*failed_protocols.lock().unwrap() {
+ protocols.remove(failed_protocol);
+ }
let cl_clone = cl.clone();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
@@ -259,6 +268,7 @@ pub struct CommandLineArgs {
}
pub fn _colink_parse_args() -> CoLink {
+ tracing_subscriber::fmt::init();
let CommandLineArgs {
addr,
jwt,
diff --git a/src/utils.rs b/src/utils.rs
new file mode 100644
index 0000000..5beb6f0
--- /dev/null
+++ b/src/utils.rs
@@ -0,0 +1,4 @@
+pub fn get_path_timestamp(key_path: &str) -> i64 {
+ let pos = key_path.rfind('@').unwrap();
+ key_path[pos + 1..].parse().unwrap()
+}