Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.1.21"
version = "0.1.22"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.1.21"
colink = "0.1.22"
```

## Getting Started
Expand Down Expand Up @@ -38,6 +38,9 @@ cargo run --example user_run_task http://localhost:8080 $(sed -n "1,2p" ~/.colin
cargo run --example host_import_user <address> <host_jwt> <expiration_timestamp> # <expiration_timestamp> is optional
```
```
cargo run --example host_import_users <address> <host_jwt> <number> <expiration_timestamp> # <expiration_timestamp> is optional
```
```
cargo run --example host_import_users_and_exchange_guest_jwts <address> <host_jwt> <number> <expiration_timestamp> # <expiration_timestamp> is optional
```
```
Expand Down Expand Up @@ -85,6 +88,9 @@ cargo run --example user_start_protocol_operator <address> <user_jwt> <protocol_
```
cargo run --example user_stop_protocol_operator <address> <user_jwt> <instance_id>
```
```
cargo run --example user_wait_task <address> <user_jwt> <target_user_id>
```

### Protocol
```
Expand Down
14 changes: 6 additions & 8 deletions examples/auto_confirm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use colink::{CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage, Task};
use colink::{
utils::get_path_timestamp, CoLink, CoLinkInternalTaskIdList, StorageEntry, SubscriptionMessage,
Task,
};
use prost::Message;
use std::env;
use tracing::debug;
Expand Down Expand Up @@ -37,11 +40,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
let list_entry = &res[0];
let list: CoLinkInternalTaskIdList = Message::decode(&*list_entry.payload).unwrap();
if list.task_ids_with_key_paths.is_empty() {
get_timestamp(&list_entry.key_path)
get_path_timestamp(&list_entry.key_path)
} else {
list.task_ids_with_key_paths
.iter()
.map(|x| get_timestamp(&x.key_path))
.map(|x| get_path_timestamp(&x.key_path))
.min()
.unwrap_or(i64::MAX)
}
Expand Down Expand Up @@ -75,8 +78,3 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
}
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}
37 changes: 37 additions & 0 deletions examples/user_wait_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use colink::{
decode_jwt_without_validation, extensions::registry::UserRecord, CoLink, Participant,
};
use prost::Message;
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let args = env::args().skip(1).collect::<Vec<_>>();
let addr = &args[0];
let jwt = &args[1];
let target_user = &args[2];
let user_id = decode_jwt_without_validation(jwt).unwrap().user_id;

let user = UserRecord {
user_id: target_user.to_string(),
..Default::default()
};
let mut payload = vec![];
user.encode(&mut payload).unwrap();
let cl = CoLink::new(addr, jwt);
let participants = vec![Participant {
user_id: user_id.to_string(),
role: "query_from_registries".to_string(),
}];
let task_id = cl
.run_task("registry", target_user.as_bytes(), &participants, false)
.await?;
println!(
"Task {} has been created, waiting for it to finish...",
task_id
);
cl.wait_task(&task_id).await?;
println!("Task {} finished", task_id);

Ok(())
}
19 changes: 7 additions & 12 deletions src/extensions/policy_module.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::colink_proto::*;
use crate::{colink_proto::*, utils::get_path_timestamp};
pub use colink_policy_module_proto::*;
use prost::Message;
mod colink_policy_module_proto {
Expand All @@ -19,7 +19,7 @@ impl crate::application::CoLink {
{
Ok(res) => (
prost::Message::decode(&*res[0].payload)?,
get_timestamp(&res[0].key_path),
get_path_timestamp(&res[0].key_path),
),
Err(_) => (Default::default(), 0),
};
Expand All @@ -30,7 +30,7 @@ impl crate::application::CoLink {
settings.enable = true;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
Expand Down Expand Up @@ -58,7 +58,7 @@ impl crate::application::CoLink {
settings.enable = false;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
Expand Down Expand Up @@ -88,7 +88,7 @@ impl crate::application::CoLink {
settings.rules.push(rule);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
Expand All @@ -109,7 +109,7 @@ impl crate::application::CoLink {
settings.rules.retain(|x| x.rule_id != rule_id);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
let timestamp = get_path_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
Expand All @@ -136,7 +136,7 @@ impl crate::application::CoLink {
if applied_settings_timestamp >= timestamp {
return Ok(());
}
get_timestamp(&res[0].key_path) + 1
get_path_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
Expand All @@ -157,8 +157,3 @@ impl crate::application::CoLink {
Ok(())
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}
2 changes: 1 addition & 1 deletion src/extensions/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::colink_proto::*;
pub use colink_registry_proto::{Registries, Registry};
pub use colink_registry_proto::{Registries, Registry, UserRecord};
use prost::Message;
mod colink_registry_proto {
include!(concat!(env!("OUT_DIR"), "/colink_registry.rs"));
Expand Down
9 changes: 2 additions & 7 deletions src/extensions/wait_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::colink_proto::*;
use crate::{colink_proto::*, utils::get_path_timestamp};
use prost::Message;
use tracing::debug;

Expand All @@ -19,7 +19,7 @@ impl crate::application::CoLink {
if task.status == "finished" {
return Ok(());
}
get_timestamp(&res[0].key_path) + 1
get_path_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
Expand All @@ -40,8 +40,3 @@ impl crate::application::CoLink {
Ok(())
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pub use protocol::{
CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait,
};
pub mod extensions;
pub mod utils;
38 changes: 24 additions & 14 deletions src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::application::*;
use crate::{application::*, utils::get_path_timestamp};
pub use async_trait::async_trait;
use futures_lite::stream::StreamExt;
use lapin::{
Expand All @@ -7,7 +7,11 @@ use lapin::{
Connection, ConnectionProperties,
};
use prost::Message;
use std::{collections::HashMap, sync::mpsc::channel, thread};
use std::{
collections::{HashMap, HashSet},
sync::{mpsc::channel, Arc, Mutex},
thread,
};
use structopt::StructOpt;
use tracing::{debug, error};

Expand Down Expand Up @@ -71,11 +75,11 @@ impl CoLinkProtocol {
let list: CoLinkInternalTaskIdList =
Message::decode(&*list_entry.payload).unwrap();
if list.task_ids_with_key_paths.is_empty() {
get_timestamp(&list_entry.key_path)
get_path_timestamp(&list_entry.key_path)
} else {
list.task_ids_with_key_paths
.iter()
.map(|x| get_timestamp(&x.key_path))
.map(|x| get_path_timestamp(&x.key_path))
.min()
.unwrap_or(i64::MAX)
}
Expand Down Expand Up @@ -151,19 +155,16 @@ impl CoLinkProtocol {
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}

pub fn _protocol_start(
cl: CoLink,
user_funcs: HashMap<String, Box<dyn ProtocolEntry + Send + Sync>>,
) -> Result<(), Error> {
let mut operator_funcs: HashMap<String, Box<dyn ProtocolEntry + Send + Sync>> = HashMap::new();
let mut protocols = vec![];
let mut protocols = HashSet::new();
let failed_protocols = Arc::new(Mutex::new(HashSet::new()));
for (protocol_and_role, user_func) in user_funcs {
let cl = cl.clone();
let failed_protocols = failed_protocols.clone();
if protocol_and_role.ends_with(":@init") {
let protocol_name = protocol_and_role[..protocol_and_role.len() - 6].to_string();
tokio::runtime::Builder::new_multi_thread()
Expand All @@ -181,19 +182,27 @@ pub fn _protocol_start(
.start(cl_clone, Default::default(), Default::default())
.await
{
Ok(_) => {}
Err(e) => error!("{}: {}.", protocol_and_role, e),
Ok(_) => {
cl.update_entry(&is_initialized_key, &[1]).await?;
}
Err(e) => {
error!("{}: {}.", protocol_and_role, e);
failed_protocols.lock().unwrap().insert(protocol_name);
}
}
cl.update_entry(&is_initialized_key, &[1]).await?;
}
cl.unlock(lock).await?;
Ok::<(), Box<dyn std::error::Error + Send + Sync + 'static>>(())
})?;
} else {
protocols.push(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string());
protocols
.insert(protocol_and_role[..protocol_and_role.rfind(':').unwrap()].to_string());
operator_funcs.insert(protocol_and_role, user_func);
}
}
for failed_protocol in &*failed_protocols.lock().unwrap() {
protocols.remove(failed_protocol);
}
let cl_clone = cl.clone();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -259,6 +268,7 @@ pub struct CommandLineArgs {
}

pub fn _colink_parse_args() -> CoLink {
tracing_subscriber::fmt::init();
let CommandLineArgs {
addr,
jwt,
Expand Down
4 changes: 4 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub fn get_path_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}