Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions src/kv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{get_blob, Message, PackageId, Request};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::marker::PhantomData;
use thiserror::Error;

/// Actions are sent to a specific key value database, "db" is the name,
Expand Down Expand Up @@ -54,22 +55,29 @@ pub enum KvError {
/// Opening or creating a kv will give you a Result<Kv>.
/// You can call it's impl functions to interact with it.
#[derive(Debug, Serialize, Deserialize)]
pub struct Kv {
pub struct Kv<K, V> {
pub package_id: PackageId,
pub db: String,
pub timeout: u64,
_marker: PhantomData<(K, V)>,
}

impl Kv {
impl<K, V> Kv<K, V>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
/// Get a value.
pub fn get(&self, key: Vec<u8>) -> anyhow::Result<Vec<u8>> {
pub fn get(&self, key: &K) -> anyhow::Result<V> {
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get { key },
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -81,7 +89,9 @@ impl Kv {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
Ok(bytes)
let value = serde_json::from_slice::<V>(&bytes)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
Ok(value)
}
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
Expand All @@ -92,7 +102,10 @@ impl Kv {
}

/// Set a value, optionally in a transaction.
pub fn set(&self, key: Vec<u8>, value: Vec<u8>, tx_id: Option<u64>) -> anyhow::Result<()> {
pub fn set(&self, key: &K, value: &V, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
let value = serde_json::to_vec(value)?;

let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
Expand All @@ -101,7 +114,7 @@ impl Kv {
action: KvAction::Set { key, tx_id },
})?)
.blob_bytes(value)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -118,15 +131,16 @@ impl Kv {
}

/// Delete a value, optionally in a transaction.
pub fn delete(&self, key: Vec<u8>, tx_id: Option<u64>) -> anyhow::Result<()> {
pub fn delete(&self, key: &K, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete { key, tx_id },
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -151,7 +165,7 @@ impl Kv {
db: self.db.clone(),
action: KvAction::BeginTx,
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -176,7 +190,7 @@ impl Kv {
db: self.db.clone(),
action: KvAction::Commit { tx_id },
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -194,15 +208,21 @@ impl Kv {
}

/// Opens or creates a kv db.
pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
pub fn open<K, V>(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Kv<K, V>>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
let timeout = timeout.unwrap_or(5);

let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: KvAction::Open,
})?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -212,6 +232,8 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
KvResponse::Ok => Ok(Kv {
package_id,
db: db.to_string(),
timeout,
_marker: PhantomData,
}),
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
Expand All @@ -222,15 +244,17 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
}

/// Removes and deletes a kv db.
pub fn remove_db(package_id: PackageId, db: &str) -> anyhow::Result<()> {
pub fn remove_db(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<()> {
let timeout = timeout.unwrap_or(5);

let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: KvAction::RemoveDb,
})?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand Down
22 changes: 14 additions & 8 deletions src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum SqliteError {
pub struct Sqlite {
pub package_id: PackageId,
pub db: String,
pub timeout: u64,
}

impl Sqlite {
Expand All @@ -96,7 +97,7 @@ impl Sqlite {
action: SqliteAction::Read { query },
})?)
.blob_bytes(serde_json::to_vec(&params)?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand Down Expand Up @@ -141,7 +142,7 @@ impl Sqlite {
action: SqliteAction::Write { statement, tx_id },
})?)
.blob_bytes(serde_json::to_vec(&params)?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand Down Expand Up @@ -169,7 +170,7 @@ impl Sqlite {
db: self.db.clone(),
action: SqliteAction::BeginTx,
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand Down Expand Up @@ -197,7 +198,7 @@ impl Sqlite {
db: self.db.clone(),
action: SqliteAction::Commit { tx_id },
})?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -218,15 +219,17 @@ impl Sqlite {
}

/// Open or create sqlite database.
pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
pub fn open(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Sqlite> {
let timeout = timeout.unwrap_or(5);

let res = Request::new()
.target(("our", "sqlite", "distro", "sys"))
.body(serde_json::to_vec(&SqliteRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: SqliteAction::Open,
})?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand All @@ -236,6 +239,7 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
SqliteResponse::Ok => Ok(Sqlite {
package_id,
db: db.to_string(),
timeout,
}),
SqliteResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!(
Expand All @@ -249,15 +253,17 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
}

/// Remove and delete sqlite database.
pub fn remove_db(package_id: PackageId, db: &str) -> anyhow::Result<()> {
pub fn remove_db(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<()> {
let timeout = timeout.unwrap_or(5);

let res = Request::new()
.target(("our", "sqlite", "distro", "sys"))
.body(serde_json::to_vec(&SqliteRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: SqliteAction::RemoveDb,
})?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
Expand Down
16 changes: 11 additions & 5 deletions src/vfs/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{Message, Request};
/// You can call it's impl functions to interact with it.
pub struct Directory {
pub path: String,
pub timeout: u64,
}

impl Directory {
Expand All @@ -19,7 +20,7 @@ impl Directory {
let message = Request::new()
.target(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&request)?)
.send_and_await_response(5)?;
.send_and_await_response(self.timeout)?;

match message {
Ok(Message::Response { body, .. }) => {
Expand All @@ -37,10 +38,12 @@ impl Directory {

/// Opens or creates a directory at path.
/// If trying to create an existing directory, will just give you the path.
pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
pub fn open_dir(path: &str, create: bool, timeout: Option<u64>) -> anyhow::Result<Directory> {
let timeout = timeout.unwrap_or(5);
if !create {
return Ok(Directory {
path: path.to_string(),
timeout,
});
}
let request = VfsRequest {
Expand All @@ -51,14 +54,15 @@ pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
let message = Request::new()
.target(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&request)?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match message {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<VfsResponse>(&body)?;
match response {
VfsResponse::Ok => Ok(Directory {
path: path.to_string(),
timeout,
}),
VfsResponse::Err(e) => Err(e.into()),
_ => Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response)),
Expand All @@ -69,7 +73,9 @@ pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
}

/// Removes a dir at path, errors if path not found or path is not a directory.
pub fn remove_dir(path: &str) -> anyhow::Result<()> {
pub fn remove_dir(path: &str, timeout: Option<u64>) -> anyhow::Result<()> {
let timeout = timeout.unwrap_or(5);

let request = VfsRequest {
path: path.to_string(),
action: VfsAction::RemoveDir,
Expand All @@ -78,7 +84,7 @@ pub fn remove_dir(path: &str) -> anyhow::Result<()> {
let message = Request::new()
.target(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&request)?)
.send_and_await_response(5)?;
.send_and_await_response(timeout)?;

match message {
Ok(Message::Response { body, .. }) => {
Expand Down
Loading