Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.3.4"
version = "0.3.5"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application and protocol developers access the functionali
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.3.4"
colink = "0.3.5"
```

## Getting Started
Expand Down
11 changes: 11 additions & 0 deletions src/extensions/storage_macro.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod append;
mod chunk;
mod fs;
mod redis;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -39,6 +40,10 @@ impl crate::application::CoLink {
self._create_entry_redis(&string_before, &string_after, payload)
.await
}
"fs" => {
self._create_entry_fs(&string_before, &string_after, payload)
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -52,6 +57,7 @@ impl crate::application::CoLink {
match macro_type.as_str() {
"chunk" => self._read_entry_chunk(&string_before).await,
"redis" => self._read_entry_redis(&string_before, &string_after).await,
"fs" => self._read_entry_fs(&string_before, &string_after).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -72,6 +78,10 @@ impl crate::application::CoLink {
self._update_entry_redis(&string_before, &string_after, payload)
.await
}
"fs" => {
self._update_entry_fs(&string_before, &string_after, payload)
.await
}
"append" => self._update_entry_append(&string_before, payload).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
Expand All @@ -89,6 +99,7 @@ impl crate::application::CoLink {
self._delete_entry_redis(&string_before, &string_after)
.await
}
"fs" => self._delete_entry_fs(&string_before, &string_after).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand Down
5 changes: 5 additions & 0 deletions src/extensions/storage_macro/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ impl crate::application::CoLink {
"chunk" => {
return self._append_entry_chunk(&string_before, payload).await;
}
"fs" => {
return self
._append_entry_fs(&string_before, &string_after, payload)
.await;
}
_ => {}
}
}
Expand Down
100 changes: 100 additions & 0 deletions src/extensions/storage_macro/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_recursion::async_recursion;
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
async fn _sm_fs_get_path(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<PathBuf, Error> {
let path_key = format!("{}:path", path_key_name);
let mut path = String::from_utf8(self.read_entry(&path_key).await?)?;
if !path_suffix.is_empty() {
let path_suffix = path_suffix.replace(':', "/");
path += "/";
path += &path_suffix;
}
println!("path: {}", path);
let path = PathBuf::from(path);
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
Ok(path)
}

#[async_recursion]
pub(crate) async fn _create_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
.await?;
file.write_all(payload).await?;
Ok("ok".to_string())
}

#[async_recursion]
pub(crate) async fn _read_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<Vec<u8>, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let data = tokio::fs::read(path).await?;
Ok(data)
}

#[async_recursion]
pub(crate) async fn _update_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(payload).await?;
Ok("ok".to_string())
}

#[async_recursion]
pub(crate) async fn _append_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let lock_token = self.lock(&path.to_string_lossy()).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
let mut file = tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?;
file.write_all(payload).await?;
Ok::<String, Error>("ok".to_string())
}
.await;
self.unlock(lock_token).await?;
res
}

#[async_recursion]
pub(crate) async fn _delete_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
tokio::fs::remove_file(path).await?;
Ok("ok".to_string())
}
}
38 changes: 38 additions & 0 deletions tests/test_storage_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ async fn test_storage_macro_redis() -> Result<(), Box<dyn std::error::Error + Se
Ok(())
}

#[tokio::test]
async fn test_storage_macro_fs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (_ir, _is, cl) = set_up_test_env_single_user().await?;

cl.create_entry("test_storage_macro_fs:path", b"/tmp/colink-sm-fs-test/test")
.await?;
let key_name = "test_storage_macro_fs:$fs";
test_crud(&cl, key_name).await?;

cl.create_entry(
"test_storage_macro_fs_dir:path",
b"/tmp/colink-sm-fs-test/test-dir",
)
.await?;
let key_name = "test_storage_macro_fs_dir:$fs:test-file";
test_crud(&cl, key_name).await?;
let key_name = "test_storage_macro_fs_dir:$fs:test-dir:test-file";
test_crud(&cl, key_name).await?;

Ok(())
}

async fn test_crud(
cl: &CoLink,
key_name: &str,
Expand Down Expand Up @@ -114,6 +136,22 @@ async fn test_storage_macro_chunk_append(
Ok(())
}

#[tokio::test]
async fn test_storage_macro_fs_append(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (_ir, _is, cl) = set_up_test_env_single_user().await?;

cl.create_entry(
"test_storage_macro_fs_append:path",
b"/tmp/colink-sm-fs-test/append-test",
)
.await?;
let key_name = "test_storage_macro_fs_append:$fs";
test_append(&cl, key_name, 5e6 as usize).await?;

Ok(())
}

#[ignore]
#[tokio::test]
async fn test_storage_macro_redis_chunk(
Expand Down