diff --git a/.gitignore b/.gitignore index 6423068bf..ea8fd1d2f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .env trace-*.json +.temp diff --git a/CHANGELOG.md b/CHANGELOG.md index b62778892..63a4fbf0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,17 @@ List of changes for this repo, including `atomic-cli`, `atomic-server` and `atomic-lib`. By far most changes relate to `atomic-server`, so if not specified, assume the changes are relevant only for the server. -## [v0.30.4] - 2021-01-15 +## [v0.31.0] - 2022-01-25 + +- Huge performance increase for queries! Added sortable index, big refactor #114 +- Added `store.query()` function with better query options, such as `starts_at` and `limit`. Under the hood, this powers `Collection`s, +- `Resource.save` returns a `CommitResponse`. +- Refactor `Commit.apply_opts`, structure options. +- Remove the potentially confusing `commit.apply` method. +- `store.tpf` now takes a `Value` instead of `String`. +- Improved sorting logic. Still has some problems. + +## [v0.30.4] - 2022-01-15 Run with `--rebuild-index` the first time, if you use an existing database. Note that due to an issue in actix, I'm unable to publish the `atomic-server` crate at this moment. diff --git a/Cargo.lock b/Cargo.lock index 97a34be5b..3d39f3d83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -526,7 +526,7 @@ dependencies = [ [[package]] name = "atomic-cli" -version = "0.30.0" +version = "0.31.0" dependencies = [ "assert_cmd", "atomic_lib", @@ -540,7 +540,7 @@ dependencies = [ [[package]] name = "atomic-server" -version = "0.30.4" +version = "0.31.0" dependencies = [ "acme-lib", "actix", @@ -588,7 +588,7 @@ checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" [[package]] name = "atomic_lib" -version = "0.30.4" +version = "0.31.0" dependencies = [ "base64", "bincode", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 27c30d369..980688e09 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -6,10 +6,10 @@ license = "MIT" name = "atomic-cli" readme = "README.md" repository = "https://github.com/joepio/atomic-data-rust" -version = "0.30.0" +version = "0.31.0" [dependencies] -atomic_lib = {version = "0.30.0", path = "../lib", features = ["config", "rdf"]} +atomic_lib = {version = "0.31.0", path = "../lib", features = ["config", "rdf"]} clap = "2.33.3" colored = "2.0.0" dirs = "3.0.1" diff --git a/cli/src/main.rs b/cli/src/main.rs index 42d5c9392..1d5033930 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -37,7 +37,7 @@ impl Context<'_> { self.store.set_default_agent(Agent { subject: write_ctx.agent.clone(), private_key: Some(write_ctx.private_key.clone()), - created_at: atomic_lib::datetime_helpers::now(), + created_at: atomic_lib::utils::now(), name: None, public_key: generate_public_key(&write_ctx.private_key).public, }); diff --git a/lib/.DS_Store b/lib/.DS_Store index b2851af22..e4ed883f3 100644 Binary files a/lib/.DS_Store and b/lib/.DS_Store differ diff --git a/lib/.tmp/db/conf b/lib/.tmp/db/conf deleted file mode 100644 index 4154d7c45..000000000 --- a/lib/.tmp/db/conf +++ /dev/null @@ -1,4 +0,0 @@ -segment_size: 524288 -use_compression: false -version: 0.34 -vQÁ \ No newline at end of file diff --git a/lib/.tmp/db/db b/lib/.tmp/db/db deleted file mode 100644 index b47bf9f0d..000000000 Binary files a/lib/.tmp/db/db and /dev/null differ diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 8420d79fc..ff0ac42b3 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -6,7 +6,7 @@ license = "MIT" name = "atomic_lib" readme = "README.md" repository = "https://github.com/joepio/atomic-data-rust" -version = "0.30.4" +version = "0.31.0" [dependencies] base64 = "0.13.0" diff --git a/lib/src/agents.rs b/lib/src/agents.rs index 786bb5797..8695b6870 100644 --- a/lib/src/agents.rs +++ b/lib/src/agents.rs @@ -2,7 +2,7 @@ //! Agents are actors (such as users) that can edit content. //! https://docs.atomicdata.dev/commits/concepts.html -use crate::{datetime_helpers, errors::AtomicResult, urls, Resource, Storelike}; +use crate::{errors::AtomicResult, urls, Resource, Storelike}; #[derive(Clone, Debug)] pub struct Agent { @@ -60,7 +60,7 @@ impl Agent { public_key: keypair.public.clone(), subject: format!("{}/agents/{}", store.get_server_url(), keypair.public), name: name.map(|x| x.to_owned()), - created_at: datetime_helpers::now(), + created_at: crate::utils::now(), } } @@ -72,7 +72,7 @@ impl Agent { public_key: public_key.into(), subject: format!("{}/agents/{}", store.get_server_url(), public_key), name: None, - created_at: datetime_helpers::now(), + created_at: crate::utils::now(), }) } } diff --git a/lib/src/client.rs b/lib/src/client.rs index 9a48ad8f1..fb966b29c 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -25,7 +25,7 @@ pub fn fetch_resource( /// Returns the various x-atomic authentication headers, includign agent signature pub fn get_authentication_headers(url: &str, agent: &Agent) -> AtomicResult> { let mut headers = Vec::new(); - let now = crate::datetime_helpers::now().to_string(); + let now = crate::utils::now().to_string(); let message = format!("{} {}", url, now); let signature = sign_message( &message, @@ -97,7 +97,7 @@ pub fn fetch_tpf( /// Posts a Commit to the endpoint of the Subject from the Commit pub fn post_commit(commit: &crate::Commit, store: &impl Storelike) -> AtomicResult<()> { - let server_url = crate::url_helpers::server_url(commit.get_subject())?; + let server_url = crate::utils::server_url(commit.get_subject())?; // Default Commit endpoint is `https://example.com/commit` let endpoint = format!("{}commit", server_url); post_commit_custom_endpoint(&endpoint, commit, store) diff --git a/lib/src/collections.rs b/lib/src/collections.rs index 3b6ebdb6c..9d8192346 100644 --- a/lib/src/collections.rs +++ b/lib/src/collections.rs @@ -1,6 +1,10 @@ //! Collections are dynamic resources that refer to multiple resources. //! They are constructed using a TPF query -use crate::{errors::AtomicResult, storelike::ResourceCollection, urls, Resource, Storelike}; +use crate::{ + errors::AtomicResult, + storelike::{Query, ResourceCollection}, + urls, Resource, Storelike, Value, +}; #[derive(Debug)] pub struct TpfQuery { @@ -154,7 +158,7 @@ pub struct Collection { /// Sorts a vector or resources by some property. #[tracing::instrument] -fn sort_resources( +pub fn sort_resources( mut resources: ResourceCollection, sort_by: &str, sort_desc: bool, @@ -163,8 +167,8 @@ fn sort_resources( let val_a = a.get(sort_by); let val_b = b.get(sort_by); if val_a.is_err() || val_b.is_err() { - return std::cmp::Ordering::Equal; - } + return std::cmp::Ordering::Greater; + }; if val_b.unwrap().to_string() > val_a.unwrap().to_string() { if sort_desc { std::cmp::Ordering::Greater @@ -193,103 +197,43 @@ impl Collection { if collection_builder.page_size < 1 { return Err("Page size must be greater than 0".into()); } - // Execute the TPF query, get all the subjects. - // Note that these are not yet authorized. - let atoms = store.tpf( - None, - collection_builder.property.as_deref(), - collection_builder.value.as_deref(), - collection_builder.include_external, - )?; - // Remove duplicate subjects - let mut subjects_deduplicated: Vec = atoms - .iter() - .map(|atom| atom.subject.clone()) - .collect::>() - .into_iter() - .collect(); - // Sort by subject, better than no sorting - subjects_deduplicated.sort(); - - // WARNING: Entering expensive loop! - // This is needed for sorting, authorization and including nested resources. - // It could be skipped if there is no authorization and sorting requirement. - let mut resources = Vec::new(); - for subject in subjects_deduplicated.iter() { - // These nested resources are not fully calculated - they will be presented as -is - match store.get_resource_extended(subject, true, for_agent) { - Ok(resource) => { - resources.push(resource); - } - Err(e) => match e.error_type { - crate::AtomicErrorType::NotFoundError => {} - crate::AtomicErrorType::UnauthorizedError => {} - crate::AtomicErrorType::OtherError => { - return Err( - format!("Error when getting resource in collection: {}", e).into() - ) - } - }, - } - } - if let Some(sort) = &collection_builder.sort_by { - resources = sort_resources(resources, sort, collection_builder.sort_desc); - } - let mut subjects = Vec::new(); - for r in resources.iter() { - subjects.push(r.get_subject().clone()) - } - let mut all_pages: Vec> = Vec::new(); - let mut all_pages_nested: Vec> = Vec::new(); - let mut page: Vec = Vec::new(); - let mut page_nested: Vec = Vec::new(); - let current_page = collection_builder.current_page; - for (i, subject) in subjects.iter().enumerate() { - page.push(subject.into()); - if collection_builder.include_nested { - page_nested.push(resources[i].clone()); - } - if page.len() >= collection_builder.page_size { - all_pages.push(page); - all_pages_nested.push(page_nested); - page = Vec::new(); - page_nested = Vec::new(); - // No need to calculte more than necessary - if all_pages.len() > current_page { - break; - } - } - // Add the last page when handling the last subject - if i == subjects.len() - 1 { - all_pages.push(page); - all_pages_nested.push(page_nested); - break; - } - } - if all_pages.is_empty() { - all_pages.push(Vec::new()); - all_pages_nested.push(Vec::new()); - } - // Maybe I should default to last page, if current_page is too high? - let members = all_pages - .get(current_page) - .ok_or(format!("Page number {} is too high", current_page))? - .clone(); - let total_items = subjects.len(); - // Construct the pages (TODO), use pageSize - let total_pages = - (total_items + collection_builder.page_size - 1) / collection_builder.page_size; - let members_nested = if collection_builder.include_nested { - Some( - all_pages_nested - .get(current_page) - .ok_or(format!("Page number {} is too high", current_page))? - .clone(), - ) - } else { - None + // Warning: this _assumes_ that the Value is a string. + // This will work for most datatypes, but not for things like resource arrays! + // We could improve this by taking the datatype of the `property`, and parsing the string. + let value_filter = collection_builder + .value + .as_ref() + .map(|val| Value::String(val.clone())); + + let q = Query { + property: collection_builder.property.clone(), + value: value_filter, + limit: Some(collection_builder.page_size), + start_val: None, + end_val: None, + offset: collection_builder.page_size * collection_builder.current_page, + sort_by: collection_builder.sort_by.clone(), + sort_desc: collection_builder.sort_desc, + include_external: collection_builder.include_external, + include_nested: collection_builder.include_nested, + for_agent: for_agent.map(|a| a.to_string()), }; + + let query_result = store.query(&q)?; + let members = query_result.subjects; + let members_nested = Some(query_result.resources); + let total_items = query_result.count; + let pages_fraction = total_items as f64 / collection_builder.page_size as f64; + let total_pages = pages_fraction.ceil() as usize; + if collection_builder.current_page > total_pages { + return Err(format!( + "Page number out of bounds, got {}, max {}", + collection_builder.current_page, total_pages + ) + .into()); + } + let collection = Collection { total_pages, members, @@ -651,4 +595,29 @@ mod test { == "2" ); } + + #[test] + fn sorting_resources() { + let prop = urls::DESCRIPTION.to_string(); + let mut a = Resource::new("first".into()); + a.set_propval_unsafe(prop.clone(), Value::Markdown("1".into())); + let mut b = Resource::new("second".into()); + b.set_propval_unsafe(prop.clone(), Value::Markdown("2".into())); + let mut c = Resource::new("third_missing_property".into()); + + let asc = vec![a.clone(), b.clone(), c.clone()]; + let sorted = sort_resources(asc.clone(), &prop, false); + assert_eq!(a.get_subject(), sorted[0].get_subject()); + assert_eq!(b.get_subject(), sorted[1].get_subject()); + assert_eq!(c.get_subject(), sorted[2].get_subject()); + + let sorted_desc = sort_resources(asc.clone(), &prop, true); + assert_eq!(b.get_subject(), sorted_desc[0].get_subject()); + assert_eq!(a.get_subject(), sorted_desc[1].get_subject()); + assert_eq!( + c.get_subject(), + sorted_desc[2].get_subject(), + "c is missing the sorted property - it should _alway_ be last" + ); + } } diff --git a/lib/src/commit.rs b/lib/src/commit.rs index 110253453..71a83b470 100644 --- a/lib/src/commit.rs +++ b/lib/src/commit.rs @@ -5,8 +5,8 @@ use std::collections::{HashMap, HashSet}; use urls::{SET, SIGNER}; use crate::{ - datatype::DataType, datetime_helpers, errors::AtomicResult, hierarchy, resources::PropVals, - urls, Atom, Resource, Storelike, Value, + datatype::DataType, errors::AtomicResult, hierarchy, resources::PropVals, urls, Atom, Resource, + Storelike, Value, }; /// Contains two resources. The first is the Resource representation of the applied Commits. @@ -20,6 +20,15 @@ pub struct CommitResponse { pub commit_struct: Commit, } +#[derive(Clone, Debug)] +pub struct CommitOpts { + pub validate_schema: bool, + pub validate_signature: bool, + pub validate_timestamp: bool, + pub validate_rights: bool, + pub update_index: bool, +} + /// A Commit is a set of changes to a Resource. /// Use CommitBuilder if you're programmatically constructing a Delta. #[derive(Clone, Debug, Serialize)] @@ -51,29 +60,15 @@ pub struct Commit { } impl Commit { - /// Apply a single signed Commit to the store. - /// Creates, edits or destroys a resource. - /// Checks if the signature is created by the Agent, and validates the data shape. - /// Does not check if the correct rights are present. - /// If you need more control over which checks to perform, use apply_opts - pub fn apply(&self, store: &impl Storelike) -> AtomicResult { - self.apply_opts(store, true, true, false, false, true) - } - /// Apply a single signed Commit to the store. /// Creates, edits or destroys a resource. /// Allows for control over which validations should be performed. /// Returns the generated Commit, the old Resource and the new Resource. - /// TODO: Should check if the Agent has the correct rights. #[tracing::instrument(skip(store))] pub fn apply_opts( &self, store: &impl Storelike, - validate_schema: bool, - validate_signature: bool, - validate_timestamp: bool, - validate_rights: bool, - update_index: bool, + opts: &CommitOpts, ) -> AtomicResult { let subject_url = url::Url::parse(&self.subject) .map_err(|e| format!("Subject '{}' is not a URL. {}", &self.subject, e))?; @@ -82,7 +77,7 @@ impl Commit { return Err("Subject URL cannot have query parameters".into()); } - if validate_signature { + if opts.validate_signature { let signature = match self.signature.as_ref() { Some(sig) => sig, None => return Err("No signature set".into()), @@ -107,7 +102,7 @@ impl Commit { })?; } // Check if the created_at lies in the past - if validate_timestamp { + if opts.validate_timestamp { check_timestamp(self.created_at)?; } let commit_resource: Resource = self.clone().into_resource(store)?; @@ -123,7 +118,7 @@ impl Commit { let resource_new = self.apply_changes(resource_old.clone(), store, false)?; - if validate_rights { + if opts.validate_rights { if is_new { hierarchy::check_write(store, &resource_new, &self.signer)?; } else { @@ -143,7 +138,7 @@ impl Commit { } }; // Check if all required props are there - if validate_schema { + if opts.validate_schema { resource_new.check_required_props(store)?; } @@ -170,7 +165,7 @@ impl Commit { if destroy { // Note: the value index is updated before this action, in resource.apply_changes() store.remove_resource(&self.subject)?; - store.add_resource_opts(&commit_resource, false, update_index, false)?; + store.add_resource_opts(&commit_resource, false, opts.update_index, false)?; return Ok(CommitResponse { resource_new: None, resource_old, @@ -179,10 +174,10 @@ impl Commit { }); } } - self.apply_changes(resource_old.clone(), store, update_index)?; + self.apply_changes(resource_old.clone(), store, opts.update_index)?; // Save the Commit to the Store. We can skip the required props checking, but we need to make sure the commit hasn't been applied before. - store.add_resource_opts(&commit_resource, false, update_index, false)?; + store.add_resource_opts(&commit_resource, false, opts.update_index, false)?; // Save the resource, but skip updating the index - that has been done in a previous step. store.add_resource_opts(&resource_new, false, false, true)?; Ok(CommitResponse { @@ -195,6 +190,7 @@ impl Commit { /// Updates the values in the Resource according to the `set`, `remove` and `destroy` attributes in the Commit. /// Optionally also updates the index in the Store. + /// The Old Resource is only needed when `update_index` is true, and is used for checking #[tracing::instrument(skip(store))] pub fn apply_changes( &self, @@ -202,33 +198,38 @@ impl Commit { store: &impl Storelike, update_index: bool, ) -> AtomicResult { + let resource_unedited = resource.clone(); if let Some(set) = self.set.clone() { - for (prop, val) in set.iter() { + for (prop, new_val) in set.iter() { + resource.set_propval(prop.into(), new_val.to_owned(), store)?; + if update_index { - let atom = Atom::new(resource.get_subject().clone(), prop.into(), val.clone()); - if let Ok(_v) = resource.get(prop) { - store.remove_atom_from_index(&atom)?; + let new_atom = + Atom::new(resource.get_subject().clone(), prop.into(), new_val.clone()); + if let Ok(old_val) = resource_unedited.get(prop) { + let old_atom = + Atom::new(resource.get_subject().clone(), prop.into(), old_val.clone()); + store.remove_atom_from_index(&old_atom, &resource_unedited)?; } - store.add_atom_to_index(&atom)?; + store.add_atom_to_index(&new_atom, &resource)?; } - resource.set_propval(prop.into(), val.to_owned(), store)?; } } if let Some(remove) = self.remove.clone() { for prop in remove.iter() { + resource.remove_propval(prop); if update_index { - let val = resource.get(prop)?; + let val = resource_unedited.get(prop)?; let atom = Atom::new(resource.get_subject().clone(), prop.into(), val.clone()); - store.remove_atom_from_index(&atom)?; + store.remove_atom_from_index(&atom, &resource_unedited)?; } - resource.remove_propval(prop); } } // Remove all atoms from index if destroy if let Some(destroy) = self.destroy { if destroy { for atom in resource.to_atoms()?.iter() { - store.remove_atom_from_index(atom)?; + store.remove_atom_from_index(atom, &resource_unedited)?; } } } @@ -238,7 +239,14 @@ impl Commit { /// Applies a commit without performing authorization / signature / schema checks. /// Does not update the index. pub fn apply_unsafe(&self, store: &impl Storelike) -> AtomicResult { - self.apply_opts(store, false, false, false, false, false) + let opts = CommitOpts { + validate_schema: false, + validate_signature: false, + validate_timestamp: false, + validate_rights: false, + update_index: false, + }; + self.apply_opts(store, &opts) } /// Converts a Resource of a Commit into a Commit @@ -282,7 +290,7 @@ impl Commit { let commit_subject = match self.signature.as_ref() { Some(sig) => format!("{}/commits/{}", store.get_server_url(), sig), None => { - let now = crate::datetime_helpers::now(); + let now = crate::utils::now(); format!("{}/commitsUnsigned/{}", store.get_server_url(), now) } }; @@ -387,7 +395,7 @@ impl CommitBuilder { agent: &crate::agents::Agent, store: &impl Storelike, ) -> AtomicResult { - let now = crate::datetime_helpers::now(); + let now = crate::utils::now(); sign_at(self, agent, now, store) } @@ -477,7 +485,7 @@ pub fn sign_message(message: &str, private_key: &str, public_key: &str) -> Atomi const ACCEPTABLE_TIME_DIFFERENCE: i64 = 10000; pub fn check_timestamp(timestamp: i64) -> AtomicResult<()> { - let now = datetime_helpers::now(); + let now = crate::utils::now(); if timestamp > now + ACCEPTABLE_TIME_DIFFERENCE { return Err(format!( "Commit CreatedAt timestamp must lie in the past. Check your clock. Timestamp now: {} CreatedAt is: {}", @@ -491,6 +499,16 @@ pub fn check_timestamp(timestamp: i64) -> AtomicResult<()> { #[cfg(test)] mod test { + lazy_static::lazy_static! { + pub static ref OPTS: CommitOpts = CommitOpts { + validate_schema: true, + validate_signature: true, + validate_timestamp: true, + validate_rights: false, + update_index: true, + }; + } + use super::*; use crate::{agents::Agent, Storelike}; @@ -509,7 +527,7 @@ mod test { commitbuiler.set(property2.into(), value2); let commit = commitbuiler.sign(&agent, &store).unwrap(); let commit_subject = commit.get_subject().to_string(); - let _created_resource = commit.apply(&store).unwrap(); + let _created_resource = commit.apply_opts(&store, &OPTS).unwrap(); let resource = store.get_resource(subject).unwrap(); assert!(resource.get(property1).unwrap().to_string() == value1.to_string()); @@ -605,13 +623,13 @@ mod test { let subject = "https://invalid.com?q=invalid"; let commitbuiler = crate::commit::CommitBuilder::new(subject.into()); let commit = commitbuiler.sign(&agent, &store).unwrap(); - commit.apply(&store).unwrap_err(); + commit.apply_opts(&store, &OPTS).unwrap_err(); } { let subject = "https://valid.com/valid"; let commitbuiler = crate::commit::CommitBuilder::new(subject.into()); let commit = commitbuiler.sign(&agent, &store).unwrap(); - commit.apply(&store).unwrap(); + commit.apply_opts(&store, &OPTS).unwrap(); } } } diff --git a/lib/src/datetime_helpers.rs b/lib/src/datetime_helpers.rs deleted file mode 100644 index a50586d2b..000000000 --- a/lib/src/datetime_helpers.rs +++ /dev/null @@ -1,7 +0,0 @@ -/// Returns the current timestamp in milliseconds since UNIX epoch -pub fn now() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("You're a time traveler") - .as_millis() as i64 -} diff --git a/lib/src/db.rs b/lib/src/db.rs index 7ed027e1e..b6ae69508 100644 --- a/lib/src/db.rs +++ b/lib/src/db.rs @@ -6,31 +6,57 @@ use std::{ sync::{Arc, Mutex}, }; +use tracing::{instrument, trace}; + use crate::{ endpoints::{default_endpoints, Endpoint}, errors::{AtomicError, AtomicResult}, resources::PropVals, - storelike::{ResourceCollection, Storelike}, + storelike::{Query, QueryResult, ResourceCollection, Storelike}, Atom, Resource, Value, }; -/// Inside the index_vals, each value is mapped to this type. +use self::query_index::{ + atom_to_indexable_atoms, check_if_atom_matches_watched_query_filters, query_indexed, + update_indexed_member, watch_collection, IndexAtom, QueryFilter, END_CHAR, +}; + +mod query_index; +#[cfg(test)] +pub mod test; + +/// Inside the reference_index, each value is mapped to this type. /// The String on the left represents a Property URL, and the second one is the set of subjects. pub type PropSubjectMap = HashMap>; /// The Db is a persistent on-disk Atomic Data store. -/// It's an implementation of Storelike. +/// It's an implementation of [Storelike]. +/// It uses [sled::Tree]s as Key Value stores. +/// It builds a value index for performant [Query]s. +/// It stores [Resource]s as [PropVals]s by their subject as key. +/// +/// ## Resources +/// +/// ## Value Index +/// +/// The Value index stores #[derive(Clone)] pub struct Db { - // The Key-Value store that contains all data. - // Resources can be found using their Subject. - // Try not to use this directly, but use the Trees. + /// The Key-Value store that contains all data. + /// Resources can be found using their Subject. + /// Try not to use this directly, but use the Trees. db: sled::Db, default_agent: Arc>>, - // Stores all resources. The Key is the Subject as a string, the value a PropVals. Both must be serialized using bincode. + /// Stores all resources. The Key is the Subject as a string, the value a [PropVals]. Both must be serialized using bincode. resources: sled::Tree, - // Stores all Atoms. The key is the atom.value, the value a vector of Atoms. - index_vals: sled::Tree, + /// Index for all AtommicURLs, indexed by their Value. Used to speed up TPF queries. See [key_for_reference_index] + reference_index: sled::Tree, + /// Stores the members of Collections, easily sortable. + /// See [collections_index] + members_index: sled::Tree, + /// A list of all the Collections currently being used. Is used to update `members_index`. + /// See [collections_index] + watched_queries: sled::Tree, /// The address where the db will be hosted, e.g. http://localhost/ server_url: String, /// Endpoints are checked whenever a resource is requested. They calculate (some properties of) the resource and return it. @@ -44,13 +70,17 @@ impl Db { pub fn init(path: &std::path::Path, server_url: String) -> AtomicResult { let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?; let resources = db.open_tree("resources").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?; - let index_vals = db.open_tree("index_vals")?; + let reference_index = db.open_tree("reference_index")?; + let members_index = db.open_tree("members_index")?; + let watched_queries = db.open_tree("watched_queries")?; let store = Db { db, default_agent: Arc::new(Mutex::new(None)), resources, - index_vals, + reference_index, + members_index, server_url, + watched_queries, endpoints: default_endpoints(), }; crate::populate::populate_base_models(&store) @@ -59,7 +89,7 @@ impl Db { } /// Internal method for fetching Resource data. - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn set_propvals(&self, subject: &str, propvals: &PropVals) -> AtomicResult<()> { let resource_bin = bincode::serialize(propvals)?; let subject_bin = bincode::serialize(subject)?; @@ -69,7 +99,7 @@ impl Db { /// Finds resource by Subject, return PropVals HashMap /// Deals with the binary API of Sled - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn get_propvals(&self, subject: &str) -> AtomicResult { let subject_binary = bincode::serialize(subject) .map_err(|e| format!("Can't serialize {}: {}", subject, e))?; @@ -97,17 +127,20 @@ impl Db { /// Returns true if the index has been built. pub fn has_index(&self) -> bool { - !self.index_vals.is_empty() + !self.reference_index.is_empty() } - /// Removes all values from the index. + /// Removes all values from the indexes. pub fn clear_index(&self) -> AtomicResult<()> { - self.index_vals.clear()?; + self.reference_index.clear()?; + self.members_index.clear()?; + self.watched_queries.clear()?; Ok(()) } } impl Storelike for Db { + #[instrument(skip(self))] fn add_atoms(&self, atoms: Vec) -> AtomicResult<()> { // Start with a nested HashMap, containing only strings. let mut map: HashMap = HashMap::new(); @@ -136,28 +169,21 @@ impl Storelike for Db { Ok(()) } - // This only adds ResourceArrays and AtomicURLs at this moment, which means that many values cannot be accessed in the TPF query (thus, collections) - #[tracing::instrument(skip(self))] - fn add_atom_to_index(&self, atom: &Atom) -> AtomicResult<()> { - let values_vec = match &atom.value { - // This results in wrong indexing, as some subjects will be numbers. - Value::ResourceArray(_v) => atom.values_to_subjects()?, - Value::AtomicUrl(v) => vec![v.into()], - _other => return Ok(()), - }; - - for val_subject in values_vec { - // https://github.com/joepio/atomic-data-rust/issues/282 - // The key is a newline delimited string, with the following format: - let key = key_for_value_index(&val_subject, &atom.property, &atom.subject); - + #[instrument(skip(self))] + fn add_atom_to_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> { + for index_atom in atom_to_indexable_atoms(atom)? { // It's OK if this overwrites a value - let _existing = self.index_vals.insert(key.as_bytes(), b"")?; + add_atom_to_reference_index(&index_atom, self)?; + // Also update the query index to keep collections performant + check_if_atom_matches_watched_query_filters(self, &index_atom, atom, false, resource) + .map_err(|e| { + format!("Failed to check_if_atom_matches_watched_collections. {}", e) + })?; } Ok(()) } - #[tracing::instrument(skip(self, resource), fields(sub = %resource.get_subject()))] + #[instrument(skip(self, resource), fields(sub = %resource.get_subject()))] fn add_resource_opts( &self, resource: &Resource, @@ -184,27 +210,27 @@ impl Storelike for Db { for (prop, val) in pv.iter() { // Possible performance hit - these clones can be replaced by modifying remove_atom_from_index let remove_atom = crate::Atom::new(subject.into(), prop.into(), val.clone()); - self.remove_atom_from_index(&remove_atom)?; + self.remove_atom_from_index(&remove_atom, resource) + .map_err(|e| { + format!("Failed to remove atom from index {}. {}", remove_atom, e) + })?; } } for a in resource.to_atoms()? { - self.add_atom_to_index(&a)?; + self.add_atom_to_index(&a, resource) + .map_err(|e| format!("Failed to add atom to index {}. {}", a, e))?; } } self.set_propvals(resource.get_subject(), resource.get_propvals()) } - #[tracing::instrument(skip(self))] - fn remove_atom_from_index(&self, atom: &Atom) -> AtomicResult<()> { - let vec = match atom.value.to_owned() { - Value::ResourceArray(_v) => atom.values_to_subjects()?, - Value::AtomicUrl(subject) => vec![subject], - _other => return Ok(()), - }; + #[instrument(skip(self))] + fn remove_atom_from_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> { + for index_atom in atom_to_indexable_atoms(atom)? { + delete_atom_from_reference_index(&index_atom, self)?; - for val in vec { - let key = key_for_value_index(&val, &atom.property, &atom.subject); - self.index_vals.remove(&key.as_bytes())?; + check_if_atom_matches_watched_query_filters(self, &index_atom, atom, true, resource) + .map_err(|e| format!("Checking atom went wrong: {}", e))?; } Ok(()) } @@ -226,7 +252,7 @@ impl Storelike for Db { } } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn get_resource(&self, subject: &str) -> AtomicResult { let propvals = self.get_propvals(subject); @@ -239,14 +265,14 @@ impl Storelike for Db { } } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn get_resource_extended( &self, subject: &str, skip_dynamic: bool, for_agent: Option<&str>, ) -> AtomicResult { - tracing::trace!("get_resource_extended: {}", subject); + trace!("get_resource_extended: {}", subject); // This might add a trailing slash let mut url = url::Url::parse(subject)?; let clone = url.clone(); @@ -284,7 +310,7 @@ impl Storelike for Db { resource.set_subject(subject.into()); if let Some(agent) = for_agent { - crate::hierarchy::check_read(self, &resource, agent)?; + let _explanation = crate::hierarchy::check_read(self, &resource, agent)?; } // Whether the resource has dynamic properties @@ -335,7 +361,95 @@ impl Storelike for Db { Ok(resource) } - #[tracing::instrument(skip(self))] + /// Search the Store, returns the matching subjects. + /// The second returned vector should be filled if query.include_resources is true. + /// Tries `query_cache`, which you should implement yourself. + #[instrument(skip(self))] + fn query(&self, q: &Query) -> AtomicResult { + if let Ok(res) = query_indexed(self, q) { + if res.count > 0 { + // Yay, we have a cache hit! + // We don't have to perform a (more expansive) TPF query + sorting + return Ok(res); + } + } + + // No cache hit, perform the query + let mut atoms = self.tpf( + None, + q.property.as_deref(), + q.value.as_ref(), + q.include_external, + )?; + let count = atoms.len(); + + let mut subjects = Vec::new(); + let mut resources = Vec::new(); + for atom in atoms.iter() { + // These nested resources are not fully calculated - they will be presented as -is + subjects.push(atom.subject.clone()); + // We need the Resources if we want to sort by a non-subject value + if q.include_nested || q.sort_by.is_some() { + // We skip checking for Agent, because we don't return these results directly anyway + match self.get_resource_extended(&atom.subject, true, None) { + Ok(resource) => { + resources.push(resource); + } + Err(e) => match e.error_type { + crate::AtomicErrorType::NotFoundError => {} + crate::AtomicErrorType::UnauthorizedError => {} + crate::AtomicErrorType::OtherError => { + return Err( + format!("Error when getting resource in collection: {}", e).into() + ) + } + }, + } + } + } + + if atoms.is_empty() { + return Ok(QueryResult { + subjects: vec![], + resources: vec![], + count, + }); + } + + // If there is a sort value, we need to change the items to contain that sorted value, instead of the one matched in the TPF query + if let Some(sort) = &q.sort_by { + // We don't use the existing array, we clear it. + atoms = Vec::new(); + for r in &resources { + // Users _can_ sort by optional properties! So we need a fallback defauil + let fallback_default = crate::Value::String(END_CHAR.into()); + let sorted_val = r.get(sort).unwrap_or(&fallback_default); + let atom = Atom { + subject: r.get_subject().to_string(), + property: sort.to_string(), + value: sorted_val.to_owned(), + }; + atoms.push(atom) + } + // Now we sort by the value that the user wants to sort by + atoms.sort_by(|a, b| a.value.to_string().cmp(&b.value.to_string())); + } + + let q_filter: QueryFilter = q.into(); + + // Maybe make this optional? + watch_collection(self, &q_filter)?; + + // Add the atoms to the query_index + for atom in atoms { + update_indexed_member(self, &q_filter, &atom, false)?; + } + + // Retry the same query! + query_indexed(self, q) + } + + #[instrument(skip(self))] fn all_resources(&self, include_external: bool) -> ResourceCollection { let mut resources: ResourceCollection = Vec::new(); let self_url = self @@ -370,14 +484,15 @@ impl Storelike for Db { Ok(()) } - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn remove_resource(&self, subject: &str) -> AtomicResult<()> { if let Ok(found) = self.get_propvals(subject) { - for (prop, val) in found { - let remove_atom = crate::Atom::new(subject.into(), prop, val); - self.remove_atom_from_index(&remove_atom)?; + let resource = Resource::from_propvals(found, subject.to_string()); + for (prop, val) in resource.get_propvals() { + let remove_atom = crate::Atom::new(subject.into(), prop.clone(), val.clone()); + self.remove_atom_from_index(&remove_atom, &resource)?; } - let binary_subject = bincode::serialize(subject).unwrap(); + let binary_subject = bincode::serialize(subject)?; let _found = self.resources.remove(&binary_subject)?; } else { return Err(format!( @@ -394,16 +509,16 @@ impl Storelike for Db { } // TPF implementation that used the index_value cache, far more performant than the StoreLike implementation - #[tracing::instrument(skip(self))] + #[instrument(skip(self))] fn tpf( &self, q_subject: Option<&str>, q_property: Option<&str>, - q_value: Option<&str>, + q_value: Option<&Value>, // Whether resources from outside the store should be searched through include_external: bool, ) -> AtomicResult> { - tracing::trace!("tpf"); + trace!("tpf"); let mut vec: Vec = Vec::new(); let hassub = q_subject.is_some(); @@ -426,11 +541,11 @@ impl Storelike for Db { // If the value is a resourcearray, check if it is inside let val_equals = |val: &str| { - let q = q_value.unwrap(); + let q = q_value.unwrap().to_sortable_string(); val == q || { if val.starts_with('[') { match crate::parse::parse_json_array(val) { - Ok(vec) => return vec.contains(&q.into()), + Ok(vec) => return vec.contains(&q), Err(_) => return val == q, } } @@ -477,9 +592,10 @@ impl Storelike for Db { } else { format!("{}\n", q_value.unwrap()) }; - for item in self.index_vals.scan_prefix(key_prefix) { + for item in self.reference_index.scan_prefix(key_prefix) { let (k, _v) = item?; let key_string = String::from_utf8(k.to_vec())?; + // WARNING: Converts all Atoms to Strings, the datatype is lost here let atom = key_to_atom(&key_string)?; if include_external || atom.subject.starts_with(self.get_server_url()) { vec.push(atom) @@ -497,11 +613,28 @@ impl Storelike for Db { } } -fn key_for_value_index(val: &str, prop: &str, subject: &str) -> String { - format!("{}\n{}\n{}", val, prop, subject) +#[instrument(skip(store))] +fn add_atom_to_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { + let _existing = store + .reference_index + .insert(key_for_reference_index(index_atom).as_bytes(), b"")?; + Ok(()) } -/// Parses a Value index key string, converts it into an atom. Note that the Value of the atom will allways be a string here. +#[instrument(skip(store))] +fn delete_atom_from_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> { + store + .reference_index + .remove(&key_for_reference_index(index_atom).as_bytes())?; + Ok(()) +} + +/// Constructs the Key for the index_value cache. +fn key_for_reference_index(atom: &IndexAtom) -> String { + format!("{}\n{}\n{}", atom.value, atom.property, atom.subject) +} + +/// Parses a Value index key string, converts it into an atom. Note that the Value of the atom will allways be a single AtomicURL here. fn key_to_atom(key: &str) -> AtomicResult { let mut parts = key.split('\n'); let val = parts.next().ok_or("Invalid key for value index")?; @@ -519,250 +652,3 @@ fn corrupt_db_message(subject: &str) -> String { } const DB_CORRUPT_MSG: &str = "Could not deserialize item from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export / serialize your data and import your data again."; - -#[cfg(test)] -pub mod test { - use crate::urls; - - use super::*; - use ntest::timeout; - - /// Creates new temporary database, populates it, removes previous one. - /// Can only be run one thread at a time, because it requires a lock on the DB file. - fn init(id: &str) -> Db { - let tmp_dir_path = format!("tmp/db/{}", id); - let _try_remove_existing = std::fs::remove_dir_all(&tmp_dir_path); - let store = Db::init( - std::path::Path::new(&tmp_dir_path), - "https://localhost".into(), - ) - .unwrap(); - let agent = store.create_agent(None).unwrap(); - store.set_default_agent(agent); - store.populate().unwrap(); - store - } - - /// Share the Db instance between tests. Otherwise, all tests try to init the same location on disk and throw errors. - /// Note that not all behavior can be properly tested with a shared database. - /// If you need a clean one, juts call init("someId"). - use lazy_static::lazy_static; // 1.4.0 - use std::sync::Mutex; - lazy_static! { - pub static ref DB: Mutex = Mutex::new(init("shared")); - } - - #[test] - #[timeout(30000)] - fn basic() { - let store = DB.lock().unwrap().clone(); - // We can create a new Resource, linked to the store. - // Note that since this store only exists in memory, it's data cannot be accessed from the internet. - // Let's make a new Property instance! - let mut new_resource = - crate::Resource::new_instance("https://atomicdata.dev/classes/Property", &store) - .unwrap(); - // And add a description for that Property - new_resource - .set_propval_shortname("description", "the age of a person", &store) - .unwrap(); - new_resource - .set_propval_shortname("shortname", "age", &store) - .unwrap(); - new_resource - .set_propval_shortname("datatype", crate::urls::INTEGER, &store) - .unwrap(); - // Changes are only applied to the store after saving them explicitly. - new_resource.save_locally(&store).unwrap(); - // The modified resource is saved to the store after this - - // A subject URL has been created automatically. - let subject = new_resource.get_subject(); - let fetched_new_resource = store.get_resource(subject).unwrap(); - let description_val = fetched_new_resource - .get_shortname("description", &store) - .unwrap() - .to_string(); - assert!(description_val == "the age of a person"); - - // Try removing something - store.get_resource(crate::urls::CLASS).unwrap(); - store.remove_resource(crate::urls::CLASS).unwrap(); - // Should throw an error, because can't remove non-existent resource - store.remove_resource(crate::urls::CLASS).unwrap_err(); - // Should throw an error, because resource is deleted - store.get_propvals(crate::urls::CLASS).unwrap_err(); - - assert!(store.all_resources(false).len() < store.all_resources(true).len()); - } - - #[test] - fn populate_collections() { - let store = DB.lock().unwrap().clone(); - let subjects: Vec = store - .all_resources(false) - .into_iter() - .map(|r| r.get_subject().into()) - .collect(); - println!("{:?}", subjects); - let collections_collection_url = format!("{}/collections", store.get_server_url()); - let collections_resource = store - .get_resource_extended(&collections_collection_url, false, None) - .unwrap(); - let member_count = collections_resource - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - assert!(member_count > 11); - let nested = collections_resource - .get(crate::urls::COLLECTION_INCLUDE_NESTED) - .unwrap() - .to_bool() - .unwrap(); - assert!(nested); - } - - #[test] - /// Check if the cache is working - fn add_atom_to_index() { - let store = DB.lock().unwrap().clone(); - let subject = urls::CLASS.into(); - let property: String = urls::PARENT.into(); - let val_string = urls::AGENT; - let value = Value::new(val_string, &crate::datatype::DataType::AtomicUrl).unwrap(); - // This atom should normally not exist - Agent is not the parent of Class. - let atom = Atom::new(subject, property.clone(), value); - store.add_atom_to_index(&atom).unwrap(); - let found_no_external = store - .tpf(None, Some(&property), Some(val_string), false) - .unwrap(); - // Don't find the atom if no_external is true. - assert_eq!( - found_no_external.len(), - 0, - "found items - should ignore external items" - ); - let found_external = store - .tpf(None, Some(&property), Some(val_string), true) - .unwrap(); - // If we see the atom, it's in the index. - assert_eq!(found_external.len(), 1); - } - - #[test] - /// Check if a resource is properly removed from the DB after a delete command. - /// Also counts commits. - fn destroy_resource_and_check_collection_and_commits() { - let store = init("counter"); - let agents_url = format!("{}/agents", store.get_server_url()); - let agents_collection_1 = store - .get_resource_extended(&agents_url, false, None) - .unwrap(); - let agents_collection_count_1 = agents_collection_1 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - assert_eq!( - agents_collection_count_1, 1, - "The Agents collection is not one (we assume there is one agent already present from init)" - ); - - // We will count the commits, and check if they've incremented later on. - let commits_url = format!("{}/commits", store.get_server_url()); - let commits_collection_1 = store - .get_resource_extended(&commits_url, false, None) - .unwrap(); - let commits_collection_count_1 = commits_collection_1 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - println!("Commits collection count 1: {}", commits_collection_count_1); - - let mut resource = crate::agents::Agent::new(None, &store) - .unwrap() - .to_resource(&store) - .unwrap(); - resource.save_locally(&store).unwrap(); - let agents_collection_2 = store - .get_resource_extended(&agents_url, false, None) - .unwrap(); - let agents_collection_count_2 = agents_collection_2 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - assert_eq!( - agents_collection_count_2, 2, - "The Resource was not found in the collection." - ); - - let commits_collection_2 = store - .get_resource_extended(&commits_url, false, None) - .unwrap(); - let commits_collection_count_2 = commits_collection_2 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - println!("Commits collection count 2: {}", commits_collection_count_2); - assert_eq!( - commits_collection_count_2, - commits_collection_count_1 + 1, - "The commits collection did not increase after saving the resource." - ); - - resource.destroy(&store).unwrap(); - let agents_collection_3 = store - .get_resource_extended(&agents_url, false, None) - .unwrap(); - let agents_collection_count_3 = agents_collection_3 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - assert_eq!( - agents_collection_count_3, 1, - "The collection count did not decrease after destroying the resource." - ); - - let commits_collection_3 = store - .get_resource_extended(&commits_url, false, None) - .unwrap(); - let commits_collection_count_3 = commits_collection_3 - .get(crate::urls::COLLECTION_MEMBER_COUNT) - .unwrap() - .to_int() - .unwrap(); - println!("Commits collection count 3: {}", commits_collection_count_3); - assert_eq!( - commits_collection_count_3, - commits_collection_count_2 + 1, - "The commits collection did not increase after destroying the resource." - ); - } - - #[test] - fn get_extended_resource_pagination() { - let store = DB.lock().unwrap().clone(); - let subject = format!("{}/commits?current_page=2", store.get_server_url()); - // Should throw, because page 2 is out of bounds for default page size - let _wrong_resource = store - .get_resource_extended(&subject, false, None) - .unwrap_err(); - // let subject = "https://atomicdata.dev/classes?current_page=2&page_size=1"; - let subject_with_page_size = format!("{}&page_size=1", subject); - let resource = store - .get_resource_extended(&subject_with_page_size, false, None) - .unwrap(); - let cur_page = resource - .get(urls::COLLECTION_CURRENT_PAGE) - .unwrap() - .to_int() - .unwrap(); - assert_eq!(cur_page, 2); - assert_eq!(resource.get_subject(), &subject_with_page_size); - } -} diff --git a/lib/src/db/query_index.rs b/lib/src/db/query_index.rs new file mode 100644 index 000000000..3893ae73c --- /dev/null +++ b/lib/src/db/query_index.rs @@ -0,0 +1,557 @@ +//! The Collections Cache is used to speed up queries. +//! It sorts Members by their Value, so we can quickly paginate and sort. +//! It relies on lexicographic ordering of keys, which Sled utilizes using `scan_prefix` queries. + +use crate::{ + errors::AtomicResult, + storelike::{Query, QueryResult}, + values::query_value_compare, + Atom, Db, Resource, Storelike, Value, +}; +use serde::{Deserialize, Serialize}; + +/// A subset of a full [Query]. +/// Represents a sorted filter on the Store. +/// A Value in the `watched_collections`. +/// These are used to check whether collections have to be updated when values have changed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QueryFilter { + /// Filtering by property URL + pub property: Option, + /// Filtering by value + pub value: Option, + /// The property by which the collection is sorted + pub sort_by: Option, +} + +impl From<&Query> for QueryFilter { + fn from(q: &Query) -> Self { + QueryFilter { + property: q.property.clone(), + value: q.value.clone(), + sort_by: q.sort_by.clone(), + } + } +} + +/// Differs from a Regular Atom, since the value here is always a string, +/// and in the case of ResourceArrays, only a _single_ subject is used for each atom. +/// One IndexAtom for every member of the ResourceArray is created. +#[derive(Debug, Clone)] +pub struct IndexAtom { + pub subject: String, + pub property: String, + pub value: String, +} + +/// Last character in lexicographic ordering +pub const FIRST_CHAR: &str = "\u{0000}"; +pub const END_CHAR: &str = "\u{ffff}"; + +#[tracing::instrument(skip(store))] +/// Performs a query on the `members_index` Tree, which is a lexicographic sorted list of all hits for QueryFilters. +pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult { + let start = if let Some(val) = &q.start_val { + val.clone() + } else { + Value::String(FIRST_CHAR.into()) + }; + let end = if let Some(val) = &q.end_val { + val.clone() + } else { + Value::String(END_CHAR.into()) + }; + let start_key = create_query_index_key(&q.into(), Some(&start), None)?; + let end_key = create_query_index_key(&q.into(), Some(&end), None)?; + + let iter: Box>> = + if q.sort_desc { + Box::new(store.members_index.range(start_key..end_key).rev()) + } else { + Box::new(store.members_index.range(start_key..end_key)) + }; + + let mut subjects: Vec = vec![]; + let mut resources = Vec::new(); + let mut count = 0; + + for (i, kv) in iter.enumerate() { + if let Some(limit) = q.limit { + if subjects.len() < limit && i >= q.offset { + let (k, _v) = kv.map_err(|_e| "Unable to parse query_cached")?; + let (_q_filter, _val, subject) = parse_collection_members_key(&k)?; + + // When an agent is defined, we must perform authorization checks + // WARNING: EXPENSIVE! + // TODO: Make async + if q.include_nested || q.for_agent.is_some() { + match store.get_resource_extended(subject, true, q.for_agent.as_deref()) { + Ok(resource) => { + resources.push(resource); + subjects.push(subject.into()) + } + Err(e) => match e.error_type { + crate::AtomicErrorType::NotFoundError => {} + crate::AtomicErrorType::UnauthorizedError => {} + crate::AtomicErrorType::OtherError => { + return Err(format!( + "Error when getting resource in collection: {}", + e + ) + .into()) + } + }, + } + } else { + // If there is no need for nested resources, and no auth checks, we can skip the expensive part! + subjects.push(subject.into()) + } + } + count = i + 1; + } + } + + Ok(QueryResult { + count, + resources, + subjects, + }) +} + +#[tracing::instrument(skip(store))] +/// Adds a QueryFilter to the `watched_queries` +pub fn watch_collection(store: &Db, q_filter: &QueryFilter) -> AtomicResult<()> { + store + .watched_queries + .insert(bincode::serialize(q_filter)?, b"")?; + Ok(()) +} + +/// Initialize the index for Collections +// TODO: This is probably no the most reliable way of finding the collections to watch. +// I suppose we should add these dynamically when a Collection is being requested. +#[tracing::instrument(skip(store))] +pub fn create_watched_collections(store: &Db) -> AtomicResult<()> { + let collections_url = format!("{}/collections", store.server_url); + let collections_resource = store.get_resource_extended(&collections_url, false, None)?; + for member_subject in collections_resource + .get(crate::urls::COLLECTION_MEMBERS)? + .to_subjects(None)? + { + let collection = store.get_resource_extended(&member_subject, false, None)?; + let value = if let Ok(val) = collection.get(crate::urls::COLLECTION_VALUE) { + // TODO: check the datatype. Now we assume it's a string + Some(val.clone()) + } else { + None + }; + let property = if let Ok(val) = collection.get(crate::urls::COLLECTION_PROPERTY) { + Some(val.to_string()) + } else { + None + }; + let sort_by = if let Ok(val) = collection.get(crate::urls::COLLECTION_SORT_BY) { + Some(val.to_string()) + } else { + None + }; + let q_filter = QueryFilter { + property, + value, + sort_by, + }; + watch_collection(store, &q_filter)?; + } + Ok(()) +} + +/// Checks if the resource will match with a QueryFilter. +/// Does any value or property or sort value match? +/// Returns the matching property, if found. +/// E.g. if a Resource +fn check_resource_query_filter_property( + resource: &Resource, + q_filter: &QueryFilter, +) -> Option { + if let Some(property) = &q_filter.property { + if let Ok(matched_propval) = resource.get(property) { + if let Some(filter_val) = &q_filter.value { + if matched_propval.to_string() == filter_val.to_string() { + return Some(property.to_string()); + } + } else { + return Some(property.to_string()); + } + } + } else if let Some(filter_val) = &q_filter.value { + for (prop, val) in resource.get_propvals() { + if query_value_compare(val, filter_val) { + return Some(prop.to_string()); + } + } + return None; + } + None +} + +// This is probably the most complex function in the whole repo. +// If things go wrong when making changes, add a test and fix stuff in the logic below. +pub fn should_update(q_filter: &QueryFilter, index_atom: &IndexAtom, resource: &Resource) -> bool { + let resource_check = check_resource_query_filter_property(resource, q_filter); + let matching_prop = if let Some(p) = resource_check { + p + } else { + return false; + }; + + match (&q_filter.property, &q_filter.value, &q_filter.sort_by) { + // Whenever the atom matches with either the sorted or the filtered prop, we have to update + (Some(filterprop), Some(filter_val), Some(sortprop)) => { + if sortprop == &index_atom.property { + return true; + } + if filterprop == &index_atom.property && index_atom.value == filter_val.to_string() { + return true; + } + // If either one of these match + let relevant_prop = + filterprop == &index_atom.property || sortprop == &index_atom.property; + // And the value matches, we have to update + relevant_prop && filter_val.to_string() == index_atom.value + } + (Some(filter_prop), Some(_filter_val), None) => filter_prop == &index_atom.property, + (Some(filter_prop), None, Some(sort_by)) => { + filter_prop == &index_atom.property || sort_by == &index_atom.property + } + (Some(filter_prop), None, None) => filter_prop == &index_atom.property, + (None, Some(filter_val), None) => { + filter_val.to_string() == index_atom.value || matching_prop == index_atom.property + } + (None, Some(filter_val), Some(sort_by)) => { + filter_val.to_string() == index_atom.value + || matching_prop == index_atom.property + || &matching_prop == sort_by + || &index_atom.property == sort_by + } + // We should not create indexes for Collections that iterate over _all_ resources. + _ => todo!(), + } +} + +/// This is called when an atom is added or deleted. +/// Check whether the Atom will be hit by a TPF query matching the [QueryFilter]. +/// Updates the index accordingly. +/// We need both the `index_atom` and the full `atom`. +#[tracing::instrument(skip_all)] +pub fn check_if_atom_matches_watched_query_filters( + store: &Db, + index_atom: &IndexAtom, + atom: &Atom, + delete: bool, + resource: &Resource, +) -> AtomicResult<()> { + for item in store.watched_queries.iter() { + if let Ok((k, _v)) = item { + let q_filter = bincode::deserialize::(&k) + .map_err(|e| format!("Could not deserialize QueryFilter: {}", e))?; + let should_update = should_update(&q_filter, index_atom, resource); + + if should_update { + update_indexed_member(store, &q_filter, atom, delete)?; + } + } else { + return Err(format!("Can't deserialize collection index: {:?}", item).into()); + } + } + Ok(()) +} + +/// Adds or removes a single item (IndexAtom) to the index_members cache. +#[tracing::instrument(skip(store))] +pub fn update_indexed_member( + store: &Db, + collection: &QueryFilter, + atom: &Atom, + delete: bool, +) -> AtomicResult<()> { + let key = create_query_index_key( + collection, + // Maybe here we should serialize the value a bit different - as a sortable string, where Arrays are sorted by their length. + Some(&atom.value), + Some(&atom.subject), + )?; + if delete { + store.members_index.remove(key)?; + } else { + store.members_index.insert(key, b"")?; + } + Ok(()) +} + +/// We can only store one bytearray as a key in Sled. +/// We separate the various items in it using this bit that's illegal in UTF-8. +const SEPARATION_BIT: u8 = 0xff; + +/// Maximum string length for values in the members_index. Should be long enough to contain pretty long URLs, but not very long documents. +pub const MAX_LEN: usize = 120; + +/// Creates a key for a collection + value combination. +/// These are designed to be lexicographically sortable. +#[tracing::instrument()] +pub fn create_query_index_key( + query_filter: &QueryFilter, + value: Option<&Value>, + subject: Option<&str>, +) -> AtomicResult> { + let mut q_filter_bytes: Vec = bincode::serialize(query_filter)?; + q_filter_bytes.push(SEPARATION_BIT); + + let mut value_bytes: Vec = if let Some(val) = value { + let val_string = val.to_sortable_string(); + let shorter = if val_string.len() > MAX_LEN { + &val_string[0..MAX_LEN] + } else { + &val_string + }; + let lowercase = shorter.to_lowercase(); + lowercase.as_bytes().to_vec() + } else { + vec![0] + }; + value_bytes.push(SEPARATION_BIT); + + let subject_bytes = if let Some(sub) = subject { + sub.as_bytes().to_vec() + } else { + vec![0] + }; + + let bytesvec: Vec = [q_filter_bytes, value_bytes, subject_bytes].concat(); + Ok(bytesvec) +} + +/// Creates a key for a collection + value combination. +/// These are designed to be lexicographically sortable. +#[tracing::instrument()] +pub fn parse_collection_members_key(bytes: &[u8]) -> AtomicResult<(QueryFilter, &str, &str)> { + let mut iter = bytes.split(|b| b == &SEPARATION_BIT); + let q_filter_bytes = iter.next().ok_or("No q_filter_bytes")?; + let value_bytes = iter.next().ok_or("No value_bytes")?; + let subject_bytes = iter.next().ok_or("No value_bytes")?; + + let q_filter: QueryFilter = bincode::deserialize(q_filter_bytes)?; + let value = if !value_bytes.is_empty() { + std::str::from_utf8(value_bytes) + .map_err(|e| format!("Can't parse value in members_key: {}", e))? + } else { + return Err("Can't parse value in members_key".into()); + }; + let subject = if !subject_bytes.is_empty() { + std::str::from_utf8(subject_bytes) + .map_err(|e| format!("Can't parse subject in members_key: {}", e))? + } else { + return Err("Can't parse subject in members_key".into()); + }; + Ok((q_filter, value, subject)) +} + +/// Converts one Value to a bunch of indexable items. +/// Returns None for unsupported types. +pub fn value_to_reference_index_string(value: &Value) -> Option> { + let vals = match value { + // This results in wrong indexing, as some subjects will be numbers. + Value::ResourceArray(_v) => value.to_subjects(None).unwrap_or_else(|_| vec![]), + Value::AtomicUrl(v) => vec![v.into()], + // We don't index nested resources for now + Value::NestedResource(_r) => return None, + // This might result in unnecassarily long strings, sometimes. We may want to shorten them later. + val => vec![val.to_string()], + }; + Some(vals) +} + +/// Converts one Atom to a series of stringified values that can be indexed. +#[tracing::instrument(skip(atom))] +pub fn atom_to_indexable_atoms(atom: &Atom) -> AtomicResult> { + let index_atoms = match value_to_reference_index_string(&atom.value) { + Some(v) => v, + None => return Ok(vec![]), + }; + let index_atoms = index_atoms + .into_iter() + .map(|v| IndexAtom { + value: v, + subject: atom.subject.clone(), + property: atom.property.clone(), + }) + .collect(); + Ok(index_atoms) +} + +#[cfg(test)] +pub mod test { + use crate::{db::test::init_db, urls}; + + use super::*; + + #[test] + fn create_and_parse_key() { + round_trip_same(Value::String("\n".into())); + round_trip_same(Value::String("short".into())); + round_trip_same(Value::Float(1.142)); + round_trip_same(Value::Float(-1.142)); + round_trip( + &Value::String("UPPERCASE".into()), + &Value::String("uppercase".into()), + ); + round_trip(&Value::String("29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB29NA(E*Tn3028nt87n_#T&*NF_AE*&#N@_T*&!#B_&*TN&*AEBT&*#B&TB@#!#@BB".into()), &Value::String("29na(e*tn3028nt87n_#t&*nf_ae*&#n@_t*&!#b_&*tn&*aebt&*#b&tb@#!#@bb29na(e*tn3028nt87n_#t&*nf_ae*&#n@_t*&!#b_&*tn&*aebt&*#b".into())); + + fn round_trip_same(val: Value) { + round_trip(&val, &val) + } + + fn round_trip(val: &Value, val_check: &Value) { + let collection = QueryFilter { + property: Some("http://example.org/prop".to_string()), + value: Some(Value::AtomicUrl("http://example.org/value".to_string())), + sort_by: None, + }; + let subject = "https://example.com/subject"; + let key = create_query_index_key(&collection, Some(val), Some(subject)).unwrap(); + let (col, val_out, sub_out) = parse_collection_members_key(&key).unwrap(); + assert_eq!(col.property, collection.property); + assert_eq!(val_check.to_string(), val_out); + assert_eq!(sub_out, subject); + } + } + + #[test] + fn lexicographic_partial() { + let q = QueryFilter { + property: Some("http://example.org/prop".to_string()), + value: Some(Value::AtomicUrl("http://example.org/value".to_string())), + sort_by: None, + }; + + let start_none = create_query_index_key(&q, None, None).unwrap(); + let num_1 = create_query_index_key(&q, Some(&Value::Float(1.0)), None).unwrap(); + let num_2 = create_query_index_key(&q, Some(&Value::Float(2.0)), None).unwrap(); + // let num_10 = create_query_index_key(&q, Some(&Value::Float(10.0)), None).unwrap(); + let num_1000 = create_query_index_key(&q, Some(&Value::Float(1000.0)), None).unwrap(); + let start_str = create_query_index_key(&q, Some(&Value::String("1".into())), None).unwrap(); + let a_downcase = + create_query_index_key(&q, Some(&Value::String("a".into())), None).unwrap(); + let b_upcase = create_query_index_key(&q, Some(&Value::String("B".into())), None).unwrap(); + let mid3 = + create_query_index_key(&q, Some(&Value::String("hi there".into())), None).unwrap(); + let end = create_query_index_key(&q, Some(&Value::String(END_CHAR.into())), None).unwrap(); + + assert!(start_none < num_1); + assert!(num_1 < num_2); + // TODO: Fix sorting numbers + // https://github.com/joepio/atomic-data-rust/issues/287 + // assert!(num_2 < num_10); + // assert!(num_10 < num_1000); + assert!(num_1000 < a_downcase); + assert!(a_downcase < b_upcase); + assert!(b_upcase < mid3); + assert!(mid3 < end); + + let mut sorted = vec![&end, &start_str, &a_downcase, &b_upcase, &start_none]; + sorted.sort(); + + let expected = vec![&start_none, &start_str, &a_downcase, &b_upcase, &end]; + + assert_eq!(sorted, expected); + } + + #[test] + fn should_update_or_not() { + let store = &init_db("should_update_or_not"); + + let prop = urls::IS_A.to_string(); + let class = urls::AGENT; + + let qf_prop_val = QueryFilter { + property: Some(prop.clone()), + value: Some(Value::AtomicUrl(class.to_string())), + sort_by: None, + }; + + let qf_prop = QueryFilter { + property: Some(prop.clone()), + value: None, + sort_by: None, + }; + + let qf_val = QueryFilter { + property: None, + value: Some(Value::AtomicUrl(class.to_string())), + sort_by: None, + }; + + let resource_correct_class = Resource::new_instance(class, store).unwrap(); + + let index_atom = IndexAtom { + subject: "https://example.com/someAgent".into(), + property: prop.clone(), + value: class.to_string(), + }; + + // We should be able to find the resource by propval, val, and / or prop. + assert!(should_update(&qf_val, &index_atom, &resource_correct_class)); + assert!(should_update( + &qf_prop_val, + &index_atom, + &resource_correct_class + )); + assert!(should_update( + &qf_prop, + &index_atom, + &resource_correct_class + )); + + // Test when a different value is passed + let resource_wrong_class = Resource::new_instance(urls::PARAGRAPH, store).unwrap(); + assert!(should_update(&qf_prop, &index_atom, &resource_wrong_class)); + assert!(!should_update(&qf_val, &index_atom, &resource_wrong_class)); + assert!(!should_update( + &qf_prop_val, + &index_atom, + &resource_wrong_class + )); + + let qf_prop_val_sort = QueryFilter { + property: Some(prop.clone()), + value: Some(Value::AtomicUrl(class.to_string())), + sort_by: Some(urls::DESCRIPTION.to_string()), + }; + let qf_prop_sort = QueryFilter { + property: Some(prop.clone()), + value: None, + sort_by: Some(urls::DESCRIPTION.to_string()), + }; + let qf_val_sort = QueryFilter { + property: Some(prop.clone()), + value: Some(Value::AtomicUrl(class.to_string())), + sort_by: Some(urls::DESCRIPTION.to_string()), + }; + + // We should update with a sort_by attribute + assert!(should_update( + &qf_prop_val_sort, + &index_atom, + &resource_correct_class + )); + assert!(should_update( + &qf_prop_sort, + &index_atom, + &resource_correct_class + )); + assert!(should_update( + &qf_val_sort, + &index_atom, + &resource_correct_class + )); + } +} diff --git a/lib/src/db/test.rs b/lib/src/db/test.rs new file mode 100644 index 000000000..4f74dc664 --- /dev/null +++ b/lib/src/db/test.rs @@ -0,0 +1,476 @@ +use crate::urls; + +use super::*; +use ntest::timeout; + +/// Creates new temporary database, populates it, removes previous one. +/// Can only be run one thread at a time, because it requires a lock on the DB file. +pub fn init_db(id: &str) -> Db { + let tmp_dir_path = format!(".temp/db/{}", id); + let _try_remove_existing = std::fs::remove_dir_all(&tmp_dir_path); + let store = Db::init( + std::path::Path::new(&tmp_dir_path), + "https://localhost".into(), + ) + .unwrap(); + let agent = store.create_agent(None).unwrap(); + store.set_default_agent(agent); + store.populate().unwrap(); + store +} + +/// Share the Db instance between tests. Otherwise, all tests try to init the same location on disk and throw errors. +/// Note that not all behavior can be properly tested with a shared database. +/// If you need a clean one, juts call init("someId"). +use lazy_static::lazy_static; // 1.4.0 +use std::sync::Mutex; +lazy_static! { + pub static ref DB: Mutex = Mutex::new(init_db("shared")); +} + +#[test] +#[timeout(30000)] +fn basic() { + let store = DB.lock().unwrap().clone(); + // We can create a new Resource, linked to the store. + // Note that since this store only exists in memory, it's data cannot be accessed from the internet. + // Let's make a new Property instance! + let mut new_resource = + crate::Resource::new_instance("https://atomicdata.dev/classes/Property", &store).unwrap(); + // And add a description for that Property + new_resource + .set_propval_shortname("description", "the age of a person", &store) + .unwrap(); + new_resource + .set_propval_shortname("shortname", "age", &store) + .unwrap(); + new_resource + .set_propval_shortname("datatype", crate::urls::INTEGER, &store) + .unwrap(); + // Changes are only applied to the store after saving them explicitly. + new_resource.save_locally(&store).unwrap(); + // The modified resource is saved to the store after this + + // A subject URL has been created automatically. + let subject = new_resource.get_subject(); + let fetched_new_resource = store.get_resource(subject).unwrap(); + let description_val = fetched_new_resource + .get_shortname("description", &store) + .unwrap() + .to_string(); + assert!(description_val == "the age of a person"); + + // Try removing something + store.get_resource(crate::urls::CLASS).unwrap(); + store.remove_resource(crate::urls::CLASS).unwrap(); + // Should throw an error, because can't remove non-existent resource + store.remove_resource(crate::urls::CLASS).unwrap_err(); + // Should throw an error, because resource is deleted + store.get_propvals(crate::urls::CLASS).unwrap_err(); + + assert!(store.all_resources(false).len() < store.all_resources(true).len()); +} + +#[test] +fn populate_collections() { + let store = DB.lock().unwrap().clone(); + let subjects: Vec = store + .all_resources(false) + .into_iter() + .map(|r| r.get_subject().into()) + .collect(); + println!("{:?}", subjects); + let collections_collection_url = format!("{}/collections", store.get_server_url()); + let collections_resource = store + .get_resource_extended(&collections_collection_url, false, None) + .unwrap(); + let member_count = collections_resource + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + assert!(member_count > 11); + let nested = collections_resource + .get(crate::urls::COLLECTION_INCLUDE_NESTED) + .unwrap() + .to_bool() + .unwrap(); + assert!(nested); +} + +#[test] +/// Check if the cache is working +fn add_atom_to_index() { + let store = DB.lock().unwrap().clone(); + let subject = urls::CLASS.into(); + let property: String = urls::PARENT.into(); + let value = Value::AtomicUrl(urls::AGENT.into()); + // This atom should normally not exist - Agent is not the parent of Class. + let atom = Atom::new(subject, property.clone(), value.clone()); + store + .add_atom_to_index(&atom, &Resource::new("ds".into())) + .unwrap(); + let found_no_external = store + .tpf(None, Some(&property), Some(&value), false) + .unwrap(); + // Don't find the atom if no_external is true. + assert_eq!( + found_no_external.len(), + 0, + "found items - should ignore external items" + ); + let found_external = store + .tpf(None, Some(&property), Some(&value), true) + .unwrap(); + // If we see the atom, it's in the index. + assert_eq!(found_external.len(), 1); +} + +#[test] +/// Check if a resource is properly removed from the DB after a delete command. +/// Also counts commits. +fn destroy_resource_and_check_collection_and_commits() { + let store = init_db("counter"); + let agents_url = format!("{}/agents", store.get_server_url()); + let agents_collection_1 = store + .get_resource_extended(&agents_url, false, None) + .unwrap(); + let agents_collection_count_1 = agents_collection_1 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + assert_eq!( + agents_collection_count_1, 1, + "The Agents collection is not one (we assume there is one agent already present from init)" + ); + + // We will count the commits, and check if they've incremented later on. + let commits_url = format!("{}/commits", store.get_server_url()); + let commits_collection_1 = store + .get_resource_extended(&commits_url, false, None) + .unwrap(); + let commits_collection_count_1 = commits_collection_1 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + println!("Commits collection count 1: {}", commits_collection_count_1); + + // Create a new agent, check if it is added to the new Agents collection as a Member. + let mut resource = crate::agents::Agent::new(None, &store) + .unwrap() + .to_resource(&store) + .unwrap(); + let _res = resource.save_locally(&store).unwrap(); + let agents_collection_2 = store + .get_resource_extended(&agents_url, false, None) + .unwrap(); + let agents_collection_count_2 = agents_collection_2 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + assert_eq!( + agents_collection_count_2, 2, + "The new Agent resource did not increase the collection member count from 1 to 2." + ); + + let commits_collection_2 = store + .get_resource_extended(&commits_url, false, None) + .unwrap(); + let commits_collection_count_2 = commits_collection_2 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + println!("Commits collection count 2: {}", commits_collection_count_2); + assert_eq!( + commits_collection_count_2, + commits_collection_count_1 + 1, + "The commits collection did not increase after saving the resource." + ); + + resource.destroy(&store).unwrap(); + let agents_collection_3 = store + .get_resource_extended(&agents_url, false, None) + .unwrap(); + let agents_collection_count_3 = agents_collection_3 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + assert_eq!( + agents_collection_count_3, 1, + "The collection count did not decrease after destroying the resource." + ); + + let commits_collection_3 = store + .get_resource_extended(&commits_url, false, None) + .unwrap(); + let commits_collection_count_3 = commits_collection_3 + .get(crate::urls::COLLECTION_MEMBER_COUNT) + .unwrap() + .to_int() + .unwrap(); + println!("Commits collection count 3: {}", commits_collection_count_3); + assert_eq!( + commits_collection_count_3, + commits_collection_count_2 + 1, + "The commits collection did not increase after destroying the resource." + ); +} + +#[test] +fn get_extended_resource_pagination() { + let store = DB.lock().unwrap().clone(); + let subject = format!("{}/commits?current_page=2", store.get_server_url()); + // Should throw, because page 2 is out of bounds for default page size + let _wrong_resource = store + .get_resource_extended(&subject, false, None) + .unwrap_err(); + // let subject = "https://atomicdata.dev/classes?current_page=2&page_size=1"; + let subject_with_page_size = format!("{}&page_size=1", subject); + let resource = store + .get_resource_extended(&subject_with_page_size, false, None) + .unwrap(); + let cur_page = resource + .get(urls::COLLECTION_CURRENT_PAGE) + .unwrap() + .to_int() + .unwrap(); + assert_eq!(cur_page, 2); + assert_eq!(resource.get_subject(), &subject_with_page_size); +} + +/// Generate a bunch of resources, query them. +/// Checks if cache is properly invalidated on modifying or deleting resources. +#[test] +fn queries() { + let store = &DB.lock().unwrap().clone(); + + let demo_val = Value::Slug("myval".to_string()); + let demo_reference = Value::AtomicUrl(urls::PARAGRAPH.into()); + + let count = 10; + let limit = 5; + assert!( + count > limit, + "following tests might not make sense if count is less than limit" + ); + + let sort_by = urls::DESCRIPTION; + + for _x in 0..count { + let mut demo_resource = Resource::new_generate_subject(store); + // We make one resource public + if _x == 1 { + demo_resource + .set_propval(urls::READ.into(), vec![urls::PUBLIC_AGENT].into(), store) + .unwrap(); + } + demo_resource + .set_propval(urls::DESTINATION.into(), demo_reference.clone(), store) + .unwrap(); + demo_resource + .set_propval(urls::SHORTNAME.into(), demo_val.clone(), store) + .unwrap(); + demo_resource + .set_propval( + sort_by.into(), + Value::Markdown(crate::utils::random_string()), + store, + ) + .unwrap(); + demo_resource.save(store).unwrap(); + } + + let mut q = Query { + property: Some(urls::DESTINATION.into()), + value: Some(demo_reference), + limit: Some(limit), + start_val: None, + end_val: None, + offset: 0, + sort_by: None, + sort_desc: false, + include_external: true, + include_nested: false, + for_agent: None, + }; + let res = store.query(&q).unwrap(); + assert_eq!( + res.count, count, + "number of references without property filter" + ); + assert_eq!(limit, res.subjects.len(), "limit"); + + q.property = None; + q.value = Some(demo_val); + let res = store.query(&q).unwrap(); + assert_eq!(res.count, count, "literal value"); + + q.offset = 9; + let res = store.query(&q).unwrap(); + assert_eq!(res.subjects.len(), count - q.offset, "offset"); + assert_eq!(res.resources.len(), 0, "no nested resources"); + + q.offset = 0; + q.include_nested = true; + let res = store.query(&q).unwrap(); + assert_eq!(res.resources.len(), limit, "nested resources"); + + q.sort_by = Some(sort_by.into()); + let mut res = store.query(&q).unwrap(); + let mut prev_resource = res.resources[0].clone(); + // For one resource, we will change the order by changing its value + let mut resource_changed_order_opt = None; + for (i, r) in res.resources.iter_mut().enumerate() { + let previous = prev_resource.get(sort_by).unwrap().to_string(); + let current = r.get(sort_by).unwrap().to_string(); + assert!( + previous <= current, + "should be ascending: {} - {}", + previous, + current + ); + // We change the order! + if i == 4 { + r.set_propval(sort_by.into(), Value::Markdown("!first".into()), store) + .unwrap(); + r.save(store).unwrap(); + resource_changed_order_opt = Some(r.clone()); + } + prev_resource = r.clone(); + } + + let mut resource_changed_order = resource_changed_order_opt.unwrap(); + + assert_eq!(res.count, count, "count changed after updating one value"); + + q.sort_by = Some(sort_by.into()); + let res = store.query(&q).unwrap(); + assert_eq!( + res.resources[0].get_subject(), + resource_changed_order.get_subject(), + "order did not change after updating resource" + ); + + resource_changed_order.destroy(store).unwrap(); + let res = store.query(&q).unwrap(); + assert!( + res.resources[0].get_subject() != resource_changed_order.get_subject(), + "deleted resoruce still in results" + ); + + q.sort_desc = true; + let res = store.query(&q).unwrap(); + let first = res.resources[0].get(sort_by).unwrap().to_string(); + let later = res.resources[limit - 1].get(sort_by).unwrap().to_string(); + assert!(first > later, "sort by desc"); + + q.for_agent = Some(urls::PUBLIC_AGENT.into()); + let res = store.query(&q).unwrap(); + assert_eq!(res.subjects.len(), 1, "authorized subjects"); + assert_eq!(res.resources.len(), 1, "authorized resources"); + // TODO: Ideally, the count is authorized too. But doing that could be hard. (or expensive) + // https://github.com/joepio/atomic-data-rust/issues/286 + // assert_eq!(res.count, 1, "authorized count"); +} + +#[test] +/// Changing these values actually correctly updates the index. +fn index_invalidate_cache() { + let store = &init_db("invalidate_cache"); + + // Make sure to use Properties that are not in the default store + + // Do strings work? + test_collection_update_value( + store, + urls::FILENAME, + Value::String("old_val".into()), + Value::String("1".into()), + ); + // Do booleans work? + test_collection_update_value( + store, + urls::IS_LOCKED, + Value::Boolean(true), + Value::Boolean(false), + ); + // Do ResourceArrays work? + test_collection_update_value( + store, + urls::ATTACHMENTS, + Value::ResourceArray(vec![ + "http://example.com/1".into(), + "http://example.com/2".into(), + "http://example.com/3".into(), + ]), + Value::ResourceArray(vec!["http://example.com/1".into()]), + ); +} + +/// Generates a bunch of resources, changes the value for one of them, checks if the order has changed correctly. +/// new_val should be lexicograhically _smaller_ than old_val. +fn test_collection_update_value(store: &Db, property_url: &str, old_val: Value, new_val: Value) { + println!("cache_invalidation test for {}", property_url); + let count = 10; + let limit = 5; + assert!( + count > limit, + "the following tests might not make sense if count is less than limit" + ); + + for _x in 0..count { + let mut demo_resource = Resource::new_generate_subject(store); + demo_resource + .set_propval(property_url.into(), old_val.clone(), store) + .unwrap(); + demo_resource.save(store).unwrap(); + } + + let q = Query { + property: Some(property_url.into()), + value: None, + limit: Some(limit), + start_val: None, + end_val: None, + offset: 0, + sort_by: Some(property_url.into()), + sort_desc: false, + include_external: true, + include_nested: true, + for_agent: None, + }; + let mut res = store.query(&q).unwrap(); + assert_eq!( + res.count, count, + "Not the right amount of members in this collection" + ); + + // For one resource, we will change the order by changing its value + let mut resource_changed_order_opt = None; + for (i, r) in res.resources.iter_mut().enumerate() { + // We change the order! + if i == 4 { + r.set_propval(property_url.into(), new_val.clone(), store) + .unwrap(); + r.save(store).unwrap(); + resource_changed_order_opt = Some(r.clone()); + } + } + + let resource_changed_order = + resource_changed_order_opt.expect("not enough resources in collection"); + + let res = store.query(&q).expect("No first result "); + assert_eq!(res.count, count, "count changed after updating one value"); + + assert_eq!( + res.subjects.first().unwrap(), + resource_changed_order.get_subject(), + "Updated resource is not the first Result of the new query" + ); +} diff --git a/lib/src/hierarchy.rs b/lib/src/hierarchy.rs index 5d29887ea..80c5d15e3 100644 --- a/lib/src/hierarchy.rs +++ b/lib/src/hierarchy.rs @@ -3,7 +3,7 @@ use core::fmt; -use crate::{errors::AtomicResult, urls, Resource, Storelike}; +use crate::{errors::AtomicResult, urls, Resource, Storelike, Value}; #[derive(Debug)] pub enum Right { @@ -26,7 +26,7 @@ pub fn add_children(store: &impl Storelike, resource: &mut Resource) -> AtomicRe let atoms = store.tpf( None, Some(urls::PARENT), - Some(resource.get_subject()), + Some(&Value::AtomicUrl(resource.get_subject().into())), false, )?; let mut children: Vec = Vec::new(); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 360b226de..8651d2ae4 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -49,7 +49,6 @@ pub mod commit; #[cfg(feature = "config")] pub mod config; pub mod datatype; -pub mod datetime_helpers; #[cfg(feature = "db")] pub mod db; #[cfg(feature = "db")] @@ -68,8 +67,8 @@ pub mod store; pub mod storelike; #[cfg(test)] mod test_utils; -mod url_helpers; pub mod urls; +pub mod utils; pub mod validate; pub mod values; diff --git a/lib/src/plugins/invite.rs b/lib/src/plugins/invite.rs index 968215067..3377a1489 100644 --- a/lib/src/plugins/invite.rs +++ b/lib/src/plugins/invite.rs @@ -1,6 +1,5 @@ use crate::{ - agents::Agent, errors::AtomicResult, url_helpers::check_valid_url, urls, Resource, Storelike, - Value, + agents::Agent, errors::AtomicResult, urls, utils::check_valid_url, Resource, Storelike, Value, }; /// If there is a valid Agent in the correct query param, and the invite is valid, update the rights and respond with a redirect to the target resource @@ -74,7 +73,7 @@ pub fn construct_invite_redirect( } if let Ok(expires) = invite_resource.get(urls::EXPIRES_AT) { - if expires.to_int()? > crate::datetime_helpers::now() { + if expires.to_int()? > crate::utils::now() { return Err("Invite is no longer valid".into()); } } diff --git a/lib/src/plugins/versioning.rs b/lib/src/plugins/versioning.rs index 0fa243a79..fe0c13bd9 100644 --- a/lib/src/plugins/versioning.rs +++ b/lib/src/plugins/versioning.rs @@ -1,6 +1,6 @@ use crate::{ collections::CollectionBuilder, endpoints::Endpoint, errors::AtomicResult, urls, AtomicError, - Commit, Resource, Storelike, + Commit, Resource, Storelike, Value, }; pub fn version_endpoint() -> Endpoint { @@ -89,7 +89,12 @@ fn handle_all_versions_request( /// Searches the local store for all commits with this subject, returns sorted from old to new. #[tracing::instrument(skip(store))] fn get_commits_for_resource(subject: &str, store: &impl Storelike) -> AtomicResult> { - let commit_atoms = store.tpf(None, Some(urls::SUBJECT), Some(subject), false)?; + let commit_atoms = store.tpf( + None, + Some(urls::SUBJECT), + Some(&Value::AtomicUrl(subject.into())), + false, + )?; let mut commit_resources = Vec::new(); for atom in commit_atoms { // TODO: This will fail if a resource simply uses the SUBJECT url without being a valid Commit. @@ -192,13 +197,13 @@ mod test { resource .set_propval_string(crate::urls::DESCRIPTION.into(), first_val, &store) .unwrap(); - let first_commit = resource.save_locally(&store).unwrap(); + let first_commit = resource.save_locally(&store).unwrap().commit_resource; let second_val = "Hello universe"; resource .set_propval_string(crate::urls::DESCRIPTION.into(), second_val, &store) .unwrap(); - let second_commit = resource.save_locally(&store).unwrap(); + let second_commit = resource.save_locally(&store).unwrap().commit_resource; let commits = get_commits_for_resource(subject, &store).unwrap(); assert_eq!(commits.len(), 2); diff --git a/lib/src/populate.rs b/lib/src/populate.rs index 956cf09ed..7e72a2fb1 100644 --- a/lib/src/populate.rs +++ b/lib/src/populate.rs @@ -129,20 +129,20 @@ pub fn populate_base_models(store: &impl Storelike) -> AtomicResult<()> { ]; for p in properties { - let mut resource = p.to_resource()?; + let mut resource = p.to_resource(); resource.set_propval_unsafe( urls::PARENT.into(), Value::AtomicUrl("https://atomicdata.dev/properties".into()), - )?; + ); store.add_resource_opts(&resource, false, false, true)?; } for c in classes { - let mut resource = c.to_resource()?; + let mut resource = c.to_resource(); resource.set_propval_unsafe( urls::PARENT.into(), Value::AtomicUrl("https://atomicdata.dev/classes".into()), - )?; + ); store.add_resource_opts(&resource, false, false, true)?; } @@ -203,7 +203,9 @@ pub fn populate_collections(store: &impl Storelike) -> AtomicResult<()> { let classes_atoms = store.tpf( None, Some("https://atomicdata.dev/properties/isA"), - Some("https://atomicdata.dev/classes/Class"), + Some(&Value::AtomicUrl( + "https://atomicdata.dev/classes/Class".into(), + )), true, )?; diff --git a/lib/src/property.rs b/lib/src/property.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/src/resources.rs b/lib/src/resources.rs index 89765b3d3..bf1f6a55e 100644 --- a/lib/src/resources.rs +++ b/lib/src/resources.rs @@ -1,5 +1,7 @@ //! A resource is a set of Atoms that share a URL +use crate::commit::{CommitOpts, CommitResponse}; +use crate::utils::random_string; use crate::values::Value; use crate::{commit::CommitBuilder, errors::AtomicResult}; use crate::{ @@ -155,23 +157,24 @@ impl Resource { } } + /// Create a new resource with a generated Subject + pub fn new_generate_subject(store: &impl Storelike) -> Resource { + let generated = format!("{}/{}", store.get_server_url(), random_string()); + + Resource::new(generated) + } + /// Create a new instance of some Class. /// The subject is generated, but can be changed. /// Does not save the resource to the store. pub fn new_instance(class_url: &str, store: &impl Storelike) -> AtomicResult { let propvals: PropVals = HashMap::new(); let class = store.get_class(class_url)?; - use rand::Rng; - let random_string: String = rand::thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .take(7) - .map(char::from) - .collect(); let subject = format!( "{}/{}/{}", store.get_server_url(), &class.shortname, - random_string + random_string() ); let mut resource = Resource { propvals, @@ -260,7 +263,14 @@ impl Resource { crate::client::post_commit(&commit, store)?; } // If that succeeds, save it locally; - let commit_response = commit.apply(store)?; + let opts = CommitOpts { + validate_schema: true, + validate_signature: false, + validate_timestamp: false, + validate_rights: false, + update_index: true, + }; + let commit_response = commit.apply_opts(store, &opts)?; // then, reset the internal CommitBuiler. self.reset_commit_builder(); Ok(commit_response) @@ -268,17 +278,23 @@ impl Resource { /// Saves the resource (with all the changes) to the store by creating a Commit. /// Uses default Agent to sign the Commit. - /// Returns the generated Commit. + /// Returns the generated Commit and the new Resource. /// Does not validate rights / hierarchy. /// Does not store these changes on the server of the Subject - the Commit will be lost, unless you handle it manually. - pub fn save_locally(&mut self, store: &impl Storelike) -> AtomicResult { + pub fn save_locally(&mut self, store: &impl Storelike) -> AtomicResult { let agent = store.get_default_agent()?; let commitbuilder = self.get_commit_builder().clone(); let commit = commitbuilder.sign(&agent, store)?; - commit.apply(store)?; + let opts = CommitOpts { + validate_schema: true, + validate_signature: false, + validate_timestamp: false, + validate_rights: false, + update_index: true, + }; + let res = commit.apply_opts(store, &opts)?; self.reset_commit_builder(); - let resource = commit.into_resource(store)?; - Ok(resource) + Ok(res) } /// Insert a Property/Value combination. @@ -299,7 +315,7 @@ impl Resource { ) })?; let val = Value::new(value, &fullprop.data_type)?; - self.set_propval_unsafe(property_url, val)?; + self.set_propval_unsafe(property_url, val); Ok(()) } @@ -324,7 +340,8 @@ impl Resource { } } if full_prop.data_type == value.datatype() { - self.set_propval_unsafe(property, value) + self.set_propval_unsafe(property, value); + Ok(()) } else { Err(format!("Datatype for subject '{}', property '{}', value '{}' did not match. Wanted '{}', got '{}'", self.get_subject(), @@ -340,10 +357,9 @@ impl Resource { /// Inserts a Property/Value combination. /// Overwrites existing. /// Adds it to the CommitBuilder. - pub fn set_propval_unsafe(&mut self, property: String, value: Value) -> AtomicResult<()> { + pub fn set_propval_unsafe(&mut self, property: String, value: Value) { self.propvals.insert(property.clone(), value.clone()); self.commit.set(property, value); - Ok(()) } /// Sets a property / value combination. @@ -357,7 +373,7 @@ impl Resource { ) -> AtomicResult<()> { let fullprop = self.resolve_shortname_to_property(property, store)?; let fullval = Value::new(value, &fullprop.data_type)?; - self.set_propval_unsafe(fullprop.subject, fullval)?; + self.set_propval_unsafe(fullprop.subject, fullval); Ok(()) } @@ -540,7 +556,18 @@ mod test { .clone() .sign(&agent, &store) .unwrap(); - commit.apply(&store).unwrap(); + commit + .apply_opts( + &store, + &CommitOpts { + validate_schema: true, + validate_signature: true, + validate_timestamp: true, + validate_rights: false, + update_index: true, + }, + ) + .unwrap(); assert!( new_resource .get_shortname("shortname", &store) diff --git a/lib/src/schema.rs b/lib/src/schema.rs index caa461127..6ca28e546 100644 --- a/lib/src/schema.rs +++ b/lib/src/schema.rs @@ -51,30 +51,30 @@ impl Property { }) } - /// Convert to resource - pub fn to_resource(&self) -> AtomicResult { + /// Convert to resource. + pub fn to_resource(&self) -> Resource { let mut resource = Resource::new(self.subject.clone()); resource.set_propval_unsafe( urls::IS_A.into(), Value::ResourceArray(vec![urls::PROPERTY.into()]), - )?; - resource.set_propval_unsafe(urls::SHORTNAME.into(), Value::Slug(self.shortname.clone()))?; + ); + resource.set_propval_unsafe(urls::SHORTNAME.into(), Value::Slug(self.shortname.clone())); resource.set_propval_unsafe( urls::DESCRIPTION.into(), Value::String(self.description.clone()), - )?; + ); resource.set_propval_unsafe( urls::DATATYPE_PROP.into(), Value::AtomicUrl(self.data_type.to_string()), - )?; + ); if let Some(classtype) = &self.class_type { resource.set_propval_unsafe( urls::CLASSTYPE_PROP.into(), Value::AtomicUrl(classtype.clone()), - )?; + ); } - Ok(resource) + resource } } @@ -118,27 +118,26 @@ impl Class { } /// Converts Class to a Resource - pub fn to_resource(&self) -> AtomicResult { + pub fn to_resource(&self) -> Resource { let mut resource = Resource::new(self.subject.clone()); resource.set_propval_unsafe( urls::IS_A.into(), Value::ResourceArray(vec![urls::CLASS.into()]), - )?; - resource.set_propval_unsafe(urls::SHORTNAME.into(), Value::Slug(self.shortname.clone()))?; + ); + resource.set_propval_unsafe(urls::SHORTNAME.into(), Value::Slug(self.shortname.clone())); resource.set_propval_unsafe( urls::DESCRIPTION.into(), Value::String(self.description.clone()), - )?; + ); if !self.requires.is_empty() { - resource - .set_propval_unsafe(urls::REQUIRES.into(), Value::from(self.requires.clone()))?; + resource.set_propval_unsafe(urls::REQUIRES.into(), Value::from(self.requires.clone())); } if !self.requires.is_empty() { resource.set_propval_unsafe( urls::RECOMMENDS.into(), Value::from(self.recommends.clone()), - )?; + ); } - Ok(resource) + resource } } diff --git a/lib/src/store.rs b/lib/src/store.rs index 14c22b533..9e795e6e7 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -127,7 +127,7 @@ impl Storelike for Store { #[cfg(test)] mod test { use super::*; - use crate::urls; + use crate::{urls, Value}; fn init_store() -> Store { let store = Store::init().unwrap(); @@ -174,6 +174,8 @@ mod test { #[test] fn tpf() { let store = init_store(); + let val = &Value::Slug("class".into()); + let val_url = &Value::AtomicUrl(urls::CLASS.into()); // All atoms let atoms = store.tpf(None, None, None, true).unwrap(); assert!(atoms.len() > 10); @@ -181,20 +183,21 @@ mod test { let atoms = store.tpf(Some(urls::CLASS), None, None, true).unwrap(); assert_eq!(atoms.len(), 6); // Find by value - let atoms = store.tpf(None, None, Some("class"), true).unwrap(); + let atoms = store.tpf(None, None, Some(val), true).unwrap(); assert_eq!(atoms[0].subject, urls::CLASS); assert_eq!(atoms.len(), 1); // Find by property and value let atoms = store - .tpf(None, Some(urls::SHORTNAME), Some("class"), true) + .tpf(None, Some(urls::SHORTNAME), Some(val), true) .unwrap(); assert!(atoms[0].subject == urls::CLASS); - assert!(atoms.len() == 1); + assert_eq!(atoms.len(), 1); // Find item in array let atoms = store - .tpf(None, Some(urls::IS_A), Some(urls::CLASS), true) + .tpf(None, Some(urls::IS_A), Some(val_url), true) .unwrap(); - assert!(atoms.len() > 3); + println!("{:?}", atoms); + assert!(atoms.len() > 3, "Find item in array"); } #[test] diff --git a/lib/src/storelike.rs b/lib/src/storelike.rs index c3b5bc59e..2d8a31ca8 100644 --- a/lib/src/storelike.rs +++ b/lib/src/storelike.rs @@ -5,6 +5,7 @@ use crate::{ errors::AtomicError, hierarchy, schema::{Class, Property}, + values::query_value_compare, }; use crate::{errors::AtomicResult, parse::parse_json_ad_array}; use crate::{mapping::Mapping, values::Value, Atom, Resource}; @@ -33,7 +34,7 @@ pub trait Storelike: Sized { /// Adds an Atom to the PropSubjectMap. Overwrites if already present. /// The default implementation for this does not do anything, so overwrite it if your store needs indexing. - fn add_atom_to_index(&self, _atom: &Atom) -> AtomicResult<()> { + fn add_atom_to_index(&self, _atom: &Atom, _resource: &Resource) -> AtomicResult<()> { Ok(()) } @@ -67,7 +68,8 @@ pub trait Storelike: Sized { for r in self.all_resources(include_external) { let atoms = r.to_atoms()?; for atom in atoms { - self.add_atom_to_index(&atom)?; + self.add_atom_to_index(&atom, &r) + .map_err(|e| format!("Failed to add atom to index {}. {}", atom, e))?; } } Ok(()) @@ -218,7 +220,7 @@ pub trait Storelike: Sized { /// let atoms = store.tpf( /// None, /// Some("https://atomicdata.dev/properties/isA"), - /// Some("https://atomicdata.dev/classes/Class"), + /// Some(&atomic_lib::Value::AtomicUrl("https://atomicdata.dev/classes/Class".into())), /// true /// ).unwrap(); /// assert!(atoms.len() > 11) @@ -229,7 +231,7 @@ pub trait Storelike: Sized { &self, q_subject: Option<&str>, q_property: Option<&str>, - q_value: Option<&str>, + q_value: Option<&Value>, // Whether resources from outside the store should be searched through include_external: bool, ) -> AtomicResult> { @@ -253,27 +255,13 @@ pub trait Storelike: Sized { return Ok(vec); } - // If the value is a resourcearray, check if it is inside - let val_equals = |val: &str| { - let q = q_value.unwrap(); - val == q || { - if val.starts_with('[') { - match crate::parse::parse_json_array(val) { - Ok(vec) => return vec.contains(&q.into()), - Err(_) => return val == q, - } - } - false - } - }; - // Find atoms matching the TPF query in a single resource let mut find_in_resource = |resource: &Resource| { let subj = resource.get_subject(); for (prop, val) in resource.get_propvals().iter() { if hasprop && q_property.as_ref().unwrap() == prop { if hasval { - if val_equals(&val.to_string()) { + if query_value_compare(val, q_value.unwrap()) { vec.push(Atom::new(subj.into(), prop.into(), val.clone())) } break; @@ -281,7 +269,7 @@ pub trait Storelike: Sized { vec.push(Atom::new(subj.into(), prop.into(), val.clone())) } break; - } else if hasval && !hasprop && val_equals(&val.to_string()) { + } else if hasval && !hasprop && query_value_compare(val, q_value.unwrap()) { vec.push(Atom::new(subj.into(), prop.into(), val.clone())) } } @@ -407,8 +395,67 @@ pub trait Storelike: Sized { crate::populate::populate_default_store(self) } + /// Search the Store, returns the matching subjects. + /// The second returned vector should be filled if query.include_resources is true. + /// Tries `query_cache`, which you should implement yourself. + fn query(&self, q: &Query) -> AtomicResult { + let atoms = self.tpf( + None, + q.property.as_deref(), + q.value.as_ref(), + q.include_external, + )?; + + // Remove duplicate subjects + let mut subjects_deduplicated: Vec = atoms + .iter() + .map(|atom| atom.subject.clone()) + .collect::>() + .into_iter() + .collect(); + + // Sort by subject, better than no sorting + subjects_deduplicated.sort(); + + // WARNING: Entering expensive loop! + // This is needed for sorting, authorization and including nested resources. + // It could be skipped if there is no authorization and sorting requirement. + let mut resources = Vec::new(); + for subject in subjects_deduplicated.iter() { + // These nested resources are not fully calculated - they will be presented as -is + match self.get_resource_extended(subject, true, q.for_agent.as_deref()) { + Ok(resource) => { + resources.push(resource); + } + Err(e) => match e.error_type { + crate::AtomicErrorType::NotFoundError => {} + crate::AtomicErrorType::UnauthorizedError => {} + crate::AtomicErrorType::OtherError => { + return Err( + format!("Error when getting resource in collection: {}", e).into() + ) + } + }, + } + } + + if let Some(sort) = &q.sort_by { + resources = crate::collections::sort_resources(resources, sort, q.sort_desc); + } + let mut subjects = Vec::new(); + for r in resources.iter() { + subjects.push(r.get_subject().clone()) + } + + Ok(QueryResult { + count: atoms.len(), + subjects, + resources, + }) + } + /// Removes an Atom from the PropSubjectMap. - fn remove_atom_from_index(&self, _atom: &Atom) -> AtomicResult<()> { + fn remove_atom_from_index(&self, _atom: &Atom, _resource: &Resource) -> AtomicResult<()> { Ok(()) } @@ -420,3 +467,37 @@ pub trait Storelike: Sized { crate::validate::validate_store(self, false) } } + +/// Use this to construct a list of Resources +#[derive(Debug)] +pub struct Query { + /// Filter by Property + pub property: Option, + /// Filter by Value + pub value: Option, + /// Maximum of items to return + pub limit: Option, + /// Value at which to begin lexicographically sorting things. + pub start_val: Option, + /// Value at which to stop lexicographically sorting things. + pub end_val: Option, + /// How many items to skip from the first one + pub offset: usize, + /// The Property URL that is used to sort the results + pub sort_by: Option, + /// Sort descending instead of ascending. + pub sort_desc: bool, + /// Whether to include non-server resources + pub include_external: bool, + /// Whether to include full Resources in the result, if not, will add empty vector here. + pub include_nested: bool, + /// For which Agent the query is executed. Pass `None`if you want to skip permission checks. + pub for_agent: Option, +} + +pub struct QueryResult { + pub subjects: Vec, + pub resources: Vec, + /// The amount of hits that were found, including the ones that were out of bounds or not authorized. + pub count: usize, +} diff --git a/lib/src/urls.rs b/lib/src/urls.rs index cd59c1c18..91a683bb2 100644 --- a/lib/src/urls.rs +++ b/lib/src/urls.rs @@ -90,6 +90,9 @@ pub const MIMETYPE: &str = "https://atomicdata.dev/properties/mimetype"; pub const INTERNAL_ID: &str = "https://atomicdata.dev/properties/internalId"; pub const DOWNLOAD_URL: &str = "https://atomicdata.dev/properties/downloadURL"; pub const ATTACHMENTS: &str = "https://atomicdata.dev/properties/attachments"; +// ... for Documents and Elements +pub const PARAGRAPH: &str = "https://atomicdata.dev/classes/elements/Paragraph"; + // Datatypes pub const STRING: &str = "https://atomicdata.dev/datatypes/string"; pub const MARKDOWN: &str = "https://atomicdata.dev/datatypes/markdown"; diff --git a/lib/src/url_helpers.rs b/lib/src/utils.rs similarity index 51% rename from lib/src/url_helpers.rs rename to lib/src/utils.rs index 7a0690310..ded86dc8b 100644 --- a/lib/src/url_helpers.rs +++ b/lib/src/utils.rs @@ -3,6 +3,7 @@ use crate::errors::AtomicResult; use url::Url; +/// Removes the path and query from a String, returns the base server URL pub fn server_url(url: &str) -> AtomicResult { let mut parsed: Url = Url::parse(url)?; @@ -25,3 +26,22 @@ pub fn check_valid_url(url: &str) -> AtomicResult<()> { } Ok(()) } + +/// Returns the current timestamp in milliseconds since UNIX epoch +pub fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("You're a time traveler") + .as_millis() as i64 +} + +/// Generates a relatively short random string +pub fn random_string() -> String { + use rand::Rng; + let random_string: String = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(7) + .map(char::from) + .collect(); + random_string.to_lowercase() +} diff --git a/lib/src/values.rs b/lib/src/values.rs index e5899fa0e..8e2138bf5 100644 --- a/lib/src/values.rs +++ b/lib/src/values.rs @@ -2,7 +2,7 @@ use crate::{ datatype::match_datatype, datatype::DataType, errors::AtomicResult, resources::PropVals, - url_helpers::check_valid_url, Resource, + utils::check_valid_url, Resource, }; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -214,6 +214,26 @@ impl Value { } Err(format!("Value {} is not a Nested Resource", self).into()) } + + /// Returns a Lexicographically sortable string representation of the value + pub fn to_sortable_string(&self) -> String { + match self { + Value::ResourceArray(arr) => arr.len().to_string(), + other => other.to_string(), + } + } +} + +/// Check if the value `q_val` is present in `val` +pub fn query_value_compare(val: &Value, q_val: &Value) -> bool { + let query_value = q_val.to_string(); + match val { + Value::ResourceArray(_vec) => { + let subs = val.to_subjects(None).unwrap_or_default(); + subs.iter().any(|v| v == &query_value) + } + other => other.to_string() == query_value, + } } impl From for Value { diff --git a/server/Cargo.toml b/server/Cargo.toml index e4a4f8344..8421521a3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" license = "MIT" name = "atomic-server" repository = "https://github.com/joepio/atomic-data-rust" -version = "0.30.4" +version = "0.31.0" [[bin]] name = "atomic-server" path = "src/bin.rs" @@ -49,7 +49,7 @@ version = "4.0.0-beta.18" [dependencies.atomic_lib] features = ["config", "db", "rdf"] path = "../lib" -version = "0.30.4" +version = "0.31.0" [dependencies.clap] features = ["derive", "env", "cargo"] diff --git a/server/src/commit_monitor.rs b/server/src/commit_monitor.rs index c45743ec4..f6ab64457 100644 --- a/server/src/commit_monitor.rs +++ b/server/src/commit_monitor.rs @@ -43,27 +43,37 @@ impl Handler for CommitMonitor { )] fn handle(&mut self, msg: Subscribe, _ctx: &mut Context) { // check if the agent has the rights to subscribe to this resource - if let Ok(resource) = self.store.get_resource(&msg.subject) { - match atomic_lib::hierarchy::check_read(&self.store, &resource, &msg.agent) { - Ok(_explanation) => { - let mut set = if let Some(set) = self.subscriptions.get(&msg.subject) { - set.clone() - } else { - HashSet::new() - }; - set.insert(msg.addr); - tracing::debug!("handle subscribe {} ", msg.subject); - self.subscriptions.insert(msg.subject.clone(), set); - } - Err(unauthorized_err) => { - tracing::debug!( - "Not allowed {} to subscribe to {}: {}", - &msg.agent, - &msg.subject, - unauthorized_err - ); + match self.store.get_resource(&msg.subject) { + Ok(resource) => { + match atomic_lib::hierarchy::check_read(&self.store, &resource, &msg.agent) { + Ok(_explanation) => { + let mut set = if let Some(set) = self.subscriptions.get(&msg.subject) { + set.clone() + } else { + HashSet::new() + }; + set.insert(msg.addr); + tracing::debug!("handle subscribe {} ", msg.subject); + self.subscriptions.insert(msg.subject.clone(), set); + } + Err(unauthorized_err) => { + tracing::debug!( + "Not allowed {} to subscribe to {}: {}", + &msg.agent, + &msg.subject, + unauthorized_err + ); + } } } + Err(e) => { + tracing::debug!( + "Ubsubscribe failed for {} by {}: {}", + &msg.subject, + msg.agent, + e + ); + } } } } diff --git a/server/src/handlers/commit.rs b/server/src/handlers/commit.rs index 7c6e5c2d9..0ec610a9e 100644 --- a/server/src/handlers/commit.rs +++ b/server/src/handlers/commit.rs @@ -1,6 +1,6 @@ use crate::{appstate::AppState, errors::AtomicServerResult}; use actix_web::{web, HttpResponse}; -use atomic_lib::{parse::parse_json_ad_commit_resource, Commit, Storelike}; +use atomic_lib::{commit::CommitOpts, parse::parse_json_ad_commit_resource, Commit, Storelike}; use std::sync::Mutex; /// Send and process a Commit. @@ -25,7 +25,14 @@ pub async fn post_commit( return Err("Subject of commit should be sent to other domain - this store can not own this resource.".into()); } // We don't update the index, because that's a job for the CommitMonitor. That means it can be done async in a different thread, making this commit response way faster. - let commit_response = incoming_commit.apply_opts(store, true, true, true, true, false)?; + let opts = CommitOpts { + validate_schema: true, + validate_signature: true, + validate_timestamp: true, + validate_rights: true, + update_index: false, + }; + let commit_response = incoming_commit.apply_opts(store, &opts)?; let message = commit_response.commit_resource.to_json_ad()?; diff --git a/server/src/handlers/tpf.rs b/server/src/handlers/tpf.rs index 710806170..970066ba9 100644 --- a/server/src/handlers/tpf.rs +++ b/server/src/handlers/tpf.rs @@ -1,7 +1,7 @@ use crate::{appstate::AppState, content_types::get_accept}; use crate::{content_types::ContentType, errors::AtomicServerResult, helpers::empty_to_nothing}; use actix_web::{web, HttpResponse}; -use atomic_lib::Storelike; +use atomic_lib::{Storelike, Value}; use serde::Deserialize; use std::collections::HashSet; use std::sync::Mutex; @@ -32,11 +32,11 @@ pub async fn tpf( let content_type = get_accept(req.headers()); let subject = empty_to_nothing(query.subject.clone()); let property = empty_to_nothing(query.property.clone()); - let value = empty_to_nothing(query.value.clone()); + let value = query.value.clone().map(Value::String); let atoms = store.tpf( subject.as_deref(), property.as_deref(), - value.as_deref(), + value.as_ref(), true, )?; tracing::info!("TPF query: {:?}", query); diff --git a/server/src/handlers/upload.rs b/server/src/handlers/upload.rs index 42cfa1c6f..1dcb2ddb3 100644 --- a/server/src/handlers/upload.rs +++ b/server/src/handlers/upload.rs @@ -4,8 +4,8 @@ use actix_multipart::Multipart; use actix_web::{web, HttpResponse}; use async_std::prelude::*; use atomic_lib::{ - commit::CommitResponse, datetime_helpers::now, hierarchy::check_write, urls, AtomicError, - Resource, Storelike, Value, + commit::CommitResponse, hierarchy::check_write, urls, utils::now, AtomicError, Resource, + Storelike, Value, }; use futures::{StreamExt, TryStreamExt}; use serde::Deserialize; diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 4dc0c0552..b3df0001a 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -520,7 +520,7 @@ dependencies = [ [[package]] name = "atomic-server" -version = "0.30.3" +version = "0.31.0" dependencies = [ "actix", "actix-cors", @@ -557,7 +557,7 @@ dependencies = [ [[package]] name = "atomic-server-tauri" -version = "0.30.3" +version = "0.31.0" dependencies = [ "actix-rt", "atomic-server", @@ -575,7 +575,7 @@ checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" [[package]] name = "atomic_lib" -version = "0.30.3" +version = "0.31.0" dependencies = [ "base64", "bincode", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 40ed05576..79139fe3d 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" license = "MIT" name = "atomic-server-tauri" repository = "https://github.com/joepio/atomic-data-rust" -version = "0.30.4" +version = "0.31.0" [build-dependencies] [build-dependencies.tauri-build] @@ -20,7 +20,7 @@ serde_json = "1.0" # We don't need HTTPS for desktop usage default-features = false path = "../server" -version = "0.30.4" +version = "0.31.0" [dependencies.serde] features = ["derive"]