From 613debdbe0e092d8fd0e8380833cd24ec3c7a962 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Wed, 1 Sep 2021 14:24:28 -0400 Subject: [PATCH 01/13] Added test cases for creating auxiliary batch and for pruning values outside of the version window --- src/state/chain_state.rs | 223 +++++++++++++++++++++++++++++++++++---- src/state/mod.rs | 49 +++++++-- src/store/mod.rs | 50 +++++++-- 3 files changed, 286 insertions(+), 36 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 2859075..3ccc6e7 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -4,10 +4,13 @@ /// and RocksDB backend. /// use crate::db::{IterOrder, KVBatch, KValue, MerkleDB}; +use crate::store::Prefix; use merk::tree::{Tree, NULL_HASH}; use ruc::*; +use std::str; const HEIGHT_KEY: &[u8; 6] = b"Height"; +const SPLIT_BGN: &str = "_"; /// Concrete ChainState struct containing a reference to an instance of MerkleDB, a name and /// current tree height. @@ -16,6 +19,7 @@ where D: MerkleDB, { name: String, + ver_window: u64, db: D, } @@ -30,7 +34,7 @@ where /// MerkleDB trait is assigned. /// /// Returns the implicit struct - pub fn new(db: D, name: String) -> Self { + pub fn new(db: D, name: String, ver_window: u64) -> Self { let mut db_name = "chain-state"; if !name.is_empty() { db_name = name.as_str(); @@ -38,6 +42,7 @@ where ChainState { name: db_name.to_string(), + ver_window, db, } } @@ -126,6 +131,103 @@ where } } + /// Queries the Aux DB for existence of a key. + /// + /// Returns a bool wrapped in a result as the query involves DB access. + pub fn exists_aux(&self, key: &[u8]) -> Result { + match self.get_aux(key).c(d!())? { + Some(_) => Ok(true), + None => Ok(false), + } + } + + /// Deletes the auxiliary keys stored with a prefix < ( height - ver_window ), + /// if the ver_window == 0 then the function returns without deleting any keys. + /// + /// The main purpose is to save memory on the disk + fn prune_aux_batch(&self, height: u64, batch: &mut KVBatch) -> Result<()> { + if self.ver_window == 0 || height < self.ver_window + 1 { + return Ok(()); + } + //Build range keys for window limits + let window_start_height = (height - self.ver_window).to_string(); + let pruning_height = (height - self.ver_window - 1).to_string(); + + let new_window_limit = Prefix::new(window_start_height.as_bytes()); + let old_window_limit = Prefix::new(pruning_height.as_bytes()); + + //Range all auxiliary keys at pruning height + self.iterate_aux( + &old_window_limit.begin(), + &old_window_limit.end(), + IterOrder::Asc, + &mut |(k, v)| -> bool { + let key: Vec<_> = str::from_utf8(&k) + .unwrap_or("") + .split(SPLIT_BGN) + .collect(); + if key.len() < 2 { + return false; + } + println!("KEY: {:?}", key); + //If the key doesn't already exist in the window start height, need to add it + if !self + .exists_aux(new_window_limit.push(key[1].as_bytes()).as_ref()) + .unwrap_or(false) + { + // Add the key to new window limit height + batch.push(( + new_window_limit + .clone() + .push(key[1].as_ref()) + .as_ref() + .to_vec(), + Some(v), + )); + } + //Delete the key from the batch + batch.push(( + old_window_limit + .clone() + .push(key[1].as_ref()) + .as_ref() + .to_vec(), + None, + )); + false + }, + ); + Ok(()) + } + + /// Builds a new batch which is a copy of the original commit with the current height + /// prefixed to each key. + /// + /// This is to keep a versioned history of KV pairs. + fn build_aux_batch(&self, height: u64, batch: &mut KVBatch) -> Result { + let height_str = height.to_string(); + let prefix = Prefix::new(height_str.as_bytes()); + + // Copy keys from batch to aux batch while prefixing them with the current height + let mut aux_batch: KVBatch = batch + .iter() + .map(|(k, v)| { + ( + prefix.clone().push(k.clone().as_slice()).as_ref().to_vec(), + v.clone(), + ) + }) + .collect(); + + // Store the current height in auxiliary batch + aux_batch.push((HEIGHT_KEY.to_vec(), Some(height_str.as_bytes().to_vec()))); + + // Prune Aux data in the db + self.prune_aux_batch(height, &mut aux_batch)?; + + Ok(aux_batch) + } + /// Commits a key value batch to the MerkleDB. /// /// The current height is updated in the ChainState as well as in the auxiliary data of the DB. @@ -141,17 +243,11 @@ where height: u64, flush: bool, ) -> Result<(Vec, u64)> { - // Update height value in batch - let height_str = height.to_string(); - batch.sort(); + let aux = self.build_aux_batch(height, &mut batch).c(d!())?; + self.db.put_batch(batch).c(d!())?; - self.db - .commit( - vec![(HEIGHT_KEY.to_vec(), Some(height_str.as_bytes().to_vec()))], - flush, - ) - .c(d!())?; + self.db.commit(aux, flush).c(d!())?; Ok((self.root_hash(), height)) } @@ -190,9 +286,10 @@ where #[cfg(test)] mod tests { - use crate::db::{IterOrder, KValue, MerkleDB, TempFinDB}; + use crate::db::{IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; use crate::state::chain_state; use std::thread; + const VER_WINDOW: u64 = 100; #[test] fn test_new_chain_state() { @@ -201,7 +298,7 @@ mod tests { let fdb = TempFinDB::open(path).expect("failed to open db"); //Create new Chain State with new database - let _cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let _cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); } #[test] @@ -221,7 +318,7 @@ mod tests { .unwrap(); //Create new Chain State with new database - let cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); assert_eq!(cs.get(&b"k10".to_vec()).unwrap(), Some(b"v10".to_vec())); assert_eq!(cs.get(&b"k20".to_vec()).unwrap(), Some(b"v20".to_vec())); @@ -259,7 +356,7 @@ mod tests { .unwrap(); //Create new Chain State with new database - let cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); let mut func_iter = |entry: KValue| { println!("Key: {:?}, Value: {:?}", entry.0, entry.1); //Assert Keys are equal @@ -304,7 +401,7 @@ mod tests { fdb.commit(batch, false).unwrap(); //Create new Chain State with new database - let cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); let mut func_iter = |entry: KValue| { println!("Key: {:?}, Value: {:?}", entry.0, entry.1); //Assert Keys are equal @@ -340,7 +437,7 @@ mod tests { .unwrap(); //Create new Chain State with new database - let cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); assert_eq!(cs.exists(&b"k10".to_vec()).unwrap(), true); assert_eq!(cs.exists(&b"k20".to_vec()).unwrap(), true); @@ -366,7 +463,7 @@ mod tests { let batch_clone = batch.clone(); //Create new Chain State with new database - let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); // Commit batch to db, in production the flush would be true let result = cs.commit(batch, 55, false).unwrap(); @@ -405,7 +502,7 @@ mod tests { fdb.commit(vec![(b"height".to_vec(), Some(b"25".to_vec()))], false) .unwrap(); - let cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); let height_aux = cs.get_aux(&b"height".to_vec()).unwrap(); let height = cs.get(&b"height".to_vec()); @@ -428,7 +525,7 @@ mod tests { ]; //Create new Chain State with new database - let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); let (root_hash1, _) = cs.commit(batch, 32, false).unwrap(); let batch2 = vec![ @@ -456,7 +553,7 @@ mod tests { ]; //Create new Chain State with new database - let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string()); + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); assert_eq!(cs.height().unwrap(), 0u64); @@ -473,4 +570,90 @@ mod tests { let (_, _) = cs.commit(batch, 34, false).unwrap(); assert_eq!(cs.height().unwrap(), 34); } + + #[test] + fn test_build_aux_batch() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = TempFinDB::open(path).expect("failed to open db"); + let number_of_batches = 21; + let batch_size = 7; + //Create new Chain State with new database + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), 10); + + //Create Several batches (More than Window size) with different keys and values + for i in 1..number_of_batches { + let mut batch: KVBatch = KVBatch::new(); + for j in 0..batch_size { + let key = format!("key-{}", j); + let val = format!("val-{}", i); + batch.push((Vec::from(key), Some(Vec::from(val)))); + } + + //Commit the new batch + let _ = cs.commit(batch, i as u64, false); + + //After each commit verify the values by using get_aux + for k in 0..batch_size { + let key = format!("{}_key-{}", i, k); + let value = format!("val-{}", i); + assert_eq!( + cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), + value.as_bytes() + ) + } + } + } + + #[test] + fn test_prune_aux_batch() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = TempFinDB::open(path).expect("failed to open db"); + let number_of_batches = 21; + let batch_size = 7; + //Create new Chain State with new database + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), 10); + + //Create Several batches (More than Window size) with different keys and values + for i in 1..number_of_batches { + let mut batch: KVBatch = KVBatch::new(); + for j in 0..batch_size { + let key = format!("key-{}", j); + let val = format!("val-{}", i); + batch.push((Vec::from(key), Some(Vec::from(val)))); + } + + //Add a KV to the batch at a random height, 5 in this case + if i == 5 { + batch.push((b"random-key".to_vec(), Some(b"random-value".to_vec()))); + } + + //Commit the new batch + let _ = cs.commit(batch, i as u64, false); + + //After each commit verify the values by using get_aux + for k in 0..batch_size { + let key = format!("{}_key-{}", i, k); + let value = format!("val-{}", i); + assert_eq!( + cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), + value.as_bytes() + ) + } + } + + //Make sure random key is found within the current window. + //This will be current height - window size = 10 in this case. + assert_eq!( + cs.get_aux(b"10_random-key").unwrap(), + Some(b"random-value".to_vec()) + ); + + //Query aux values that are older than the window size to confirm batches were pruned + for i in 1..10 { + for k in 0..batch_size { + let key = format!("{}_key-{}", i, k); + assert_eq!(cs.get_aux(key.as_bytes()).unwrap(), None) + } + } + } } diff --git a/src/state/mod.rs b/src/state/mod.rs index 6a79ace..25a495e 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -171,13 +171,18 @@ mod tests { use super::*; use crate::db::{KValue, TempFinDB}; use std::thread; + const VER_WINDOW: u64 = 100; #[test] fn test_get() { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs.clone()); //Set some kv pairs @@ -211,7 +216,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -235,7 +244,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -259,7 +272,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -321,7 +338,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -341,7 +362,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -375,7 +400,11 @@ mod tests { //Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); //Set some kv pairs @@ -405,7 +434,11 @@ mod tests { fn test_iterate() { let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); let mut count = 0; diff --git a/src/store/mod.rs b/src/store/mod.rs index 84a1f75..fc54700 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -51,6 +51,8 @@ mod tests { use std::sync::Arc; use std::{thread, time}; + const VER_WINDOW: u64 = 100; + // a example store struct StakeStore<'a, D: MerkleDB> { pfx: Prefix, @@ -170,7 +172,11 @@ mod tests { // create store let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); let mut store = PrefixedStore::new("my_store", &mut state); let hash0 = store.state().root_hash(); @@ -207,7 +213,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut check = State::new(cs); let mut store = StakeStore::new("stake", &mut check); @@ -248,7 +258,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut check = State::new(cs); let mut store = StakeStore::new("stake", &mut check); @@ -278,7 +292,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut check = State::new(cs); let mut store = StakeStore::new("stake", &mut check); @@ -319,7 +337,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut check = State::new(cs); let mut store = StakeStore::new("stake", &mut check); @@ -383,7 +405,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut check = State::new(cs); let mut store = StakeStore::new("stake", &mut check); @@ -424,7 +450,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs.clone()); let mut store = StakeStore::new("stake", &mut state); @@ -586,7 +616,11 @@ mod tests { // create State let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "findora_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "findora_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs.clone()); let mut store = PrefixedStore::new("testStore", &mut state); From f5dbfc579c0091ae9ebdbdd2c7bdb4fa3d588562 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Fri, 3 Sep 2021 14:56:11 -0400 Subject: [PATCH 02/13] Added Feature to clean Aux DB in case the window size has decreased after a node restart. --- src/state/chain_state.rs | 129 ++++++++++++++++++++++++++++++++------- 1 file changed, 106 insertions(+), 23 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 3ccc6e7..6bf9e92 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -35,16 +35,19 @@ where /// /// Returns the implicit struct pub fn new(db: D, name: String, ver_window: u64) -> Self { - let mut db_name = "chain-state"; + let mut db_name = String::from("chain-state"); if !name.is_empty() { - db_name = name.as_str(); + db_name = name; } - ChainState { - name: db_name.to_string(), + let mut cs = ChainState { + name: db_name, ver_window, db, - } + }; + cs.clean_aux_db(); + + cs } /// Gets a value for the given key from the primary data section in RocksDB @@ -153,8 +156,8 @@ where let window_start_height = (height - self.ver_window).to_string(); let pruning_height = (height - self.ver_window - 1).to_string(); - let new_window_limit = Prefix::new(window_start_height.as_bytes()); - let old_window_limit = Prefix::new(pruning_height.as_bytes()); + let new_window_limit = Prefix::new("VER".as_bytes()).push(window_start_height.as_bytes()); + let old_window_limit = Prefix::new("VER".as_bytes()).push(pruning_height.as_bytes()); //Range all auxiliary keys at pruning height self.iterate_aux( @@ -162,24 +165,22 @@ where &old_window_limit.end(), IterOrder::Asc, &mut |(k, v)| -> bool { - let key: Vec<_> = str::from_utf8(&k) - .unwrap_or("") - .split(SPLIT_BGN) - .collect(); - if key.len() < 2 { + let key: Vec<_> = str::from_utf8(&k).unwrap_or("").split(SPLIT_BGN).collect(); + if key.len() < 3 { return false; } - println!("KEY: {:?}", key); + let raw_key = key[2..].join(SPLIT_BGN); + //If the key doesn't already exist in the window start height, need to add it if !self - .exists_aux(new_window_limit.push(key[1].as_bytes()).as_ref()) + .exists_aux(new_window_limit.push(raw_key.as_bytes()).as_ref()) .unwrap_or(false) { // Add the key to new window limit height batch.push(( new_window_limit .clone() - .push(key[1].as_ref()) + .push(raw_key.as_ref()) .as_ref() .to_vec(), Some(v), @@ -189,7 +190,7 @@ where batch.push(( old_window_limit .clone() - .push(key[1].as_ref()) + .push(raw_key.as_ref()) .as_ref() .to_vec(), None, @@ -206,7 +207,7 @@ where /// This is to keep a versioned history of KV pairs. fn build_aux_batch(&self, height: u64, batch: &mut KVBatch) -> Result { let height_str = height.to_string(); - let prefix = Prefix::new(height_str.as_bytes()); + let prefix = Prefix::new("VER".as_bytes()).push(height_str.as_bytes()); // Copy keys from batch to aux batch while prefixing them with the current height let mut aux_batch: KVBatch = batch @@ -282,11 +283,47 @@ where pub fn prune_tree() { unimplemented!() } + + /// When creating a new chain-state instance, any residual aux data outside the current window + /// needs to be cleared as to not waste memory or disrupt the versioning behaviour. + fn clean_aux_db(&mut self) { + //Get current height + let current_height = self.height().unwrap_or(0); + if current_height == 0 { + return; + } + if current_height < self.ver_window + 1 { + return; + } + + //Define upper and lower bounds for iteration + let upper = Prefix::new("VER".as_bytes()) + .push((current_height - self.ver_window).to_string().as_bytes()); + let lower = Prefix::new("VER".as_bytes()).push(1.to_string().as_bytes()); + + //Create an empty batch + let mut batch = KVBatch::new(); + + //Iterate aux data and delete keys within bounds + self.iterate_aux( + lower.as_ref(), + upper.as_ref(), + IterOrder::Desc, + &mut |(k, _v)| -> bool { + //Delete the key from aux db + batch.push((k, None)); + false + }, + ); + + //commit aux batch + let _ = self.db.commit(batch, true); + } } #[cfg(test)] mod tests { - use crate::db::{IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; + use crate::db::{FinDB, IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; use crate::state::chain_state; use std::thread; const VER_WINDOW: u64 = 100; @@ -594,7 +631,7 @@ mod tests { //After each commit verify the values by using get_aux for k in 0..batch_size { - let key = format!("{}_key-{}", i, k); + let key = format!("VER_{}_key-{}", i, k); let value = format!("val-{}", i); assert_eq!( cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), @@ -624,7 +661,7 @@ mod tests { //Add a KV to the batch at a random height, 5 in this case if i == 5 { - batch.push((b"random-key".to_vec(), Some(b"random-value".to_vec()))); + batch.push((b"random_key".to_vec(), Some(b"random-value".to_vec()))); } //Commit the new batch @@ -632,7 +669,7 @@ mod tests { //After each commit verify the values by using get_aux for k in 0..batch_size { - let key = format!("{}_key-{}", i, k); + let key = format!("VER_{}_key-{}", i, k); let value = format!("val-{}", i); assert_eq!( cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), @@ -644,16 +681,62 @@ mod tests { //Make sure random key is found within the current window. //This will be current height - window size = 10 in this case. assert_eq!( - cs.get_aux(b"10_random-key").unwrap(), + cs.get_aux(b"VER_10_random_key").unwrap(), Some(b"random-value".to_vec()) ); //Query aux values that are older than the window size to confirm batches were pruned for i in 1..10 { for k in 0..batch_size { - let key = format!("{}_key-{}", i, k); + let key = format!("VER_{}_key-{}", i, k); assert_eq!(cs.get_aux(key.as_bytes()).unwrap(), None) } } } + + #[test] + fn test_clean_aux_db() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = FinDB::open(path.clone()).expect("failed to open db"); + let number_of_batches = 21; + let batch_size = 7; + //Create new Chain State with new database + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), 10); + + //Create Several batches (More than Window size) with different keys and values + for i in 1..number_of_batches { + let mut batch: KVBatch = KVBatch::new(); + for j in 0..batch_size { + let key = format!("key-{}", j); + let val = format!("val-{}", i); + batch.push((Vec::from(key), Some(Vec::from(val)))); + } + + //Commit the new batch + let _ = cs.commit(batch, i as u64, false); + } + + //Open db with new chain-state - window half the size of the previous + //Simulate Node restart + std::mem::drop(cs); + let new_window_size = 5; + let fdb_new = TempFinDB::open(path.clone()).expect("failed to open db"); + let cs_new = chain_state::ChainState::new(fdb_new, "test_db".to_string(), new_window_size); + + //Confirm keys older than new window size have been deleted + for i in 1..(number_of_batches - new_window_size - 1) { + for k in 0..batch_size { + let key = format!("VER_{}_key-{}", i, k); + assert_eq!(cs_new.get_aux(key.as_bytes()).unwrap(), None) + } + } + + //Confirm keys within new window size still exist + for i in (number_of_batches - new_window_size)..number_of_batches { + for k in 0..batch_size { + let key = format!("VER_{}_key-{}", i, k); + assert!(cs_new.exists_aux(key.as_bytes()).unwrap()) + } + } + } } From 874d31baf5b6f3d696ed92de10e47aaacf91b31d Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Fri, 3 Sep 2021 15:12:33 -0400 Subject: [PATCH 03/13] Fixed merge errors --- src/state/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/state/mod.rs b/src/state/mod.rs index c781e44..780f0f1 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -280,7 +280,7 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); let mut state = State::new(cs); // Set maximum valid key and value @@ -304,7 +304,7 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); let mut state = State::new(cs); // Set maximum valid key and value @@ -327,7 +327,7 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); + let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); let mut state = State::new(cs); // Set a big value From 9d3a91228e028d63bf59c5d02ea72cd79e04bacc Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Tue, 7 Sep 2021 20:41:53 -0400 Subject: [PATCH 04/13] Fixed key design for version iteration. --- src/state/chain_state.rs | 88 +++++++++++++++++++++++++++------------- src/state/mod.rs | 18 ++++++-- 2 files changed, 74 insertions(+), 32 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 6bf9e92..8a7f0fd 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -11,6 +11,7 @@ use std::str; const HEIGHT_KEY: &[u8; 6] = b"Height"; const SPLIT_BGN: &str = "_"; +const TOMBSTONE: [u8; 1] = [206u8]; /// Concrete ChainState struct containing a reference to an instance of MerkleDB, a name and /// current tree height. @@ -153,8 +154,8 @@ where return Ok(()); } //Build range keys for window limits - let window_start_height = (height - self.ver_window).to_string(); - let pruning_height = (height - self.ver_window - 1).to_string(); + let window_start_height = Self::height_str(height - self.ver_window); + let pruning_height = Self::height_str(height - self.ver_window - 1); let new_window_limit = Prefix::new("VER".as_bytes()).push(window_start_height.as_bytes()); let old_window_limit = Prefix::new("VER".as_bytes()).push(pruning_height.as_bytes()); @@ -187,13 +188,14 @@ where )); } //Delete the key from the batch + //TODO: Need to set the value to designated tombstone instead of None batch.push(( old_window_limit .clone() .push(raw_key.as_ref()) .as_ref() .to_vec(), - None, + Some(TOMBSTONE.to_vec()), )); false }, @@ -206,22 +208,14 @@ where /// /// This is to keep a versioned history of KV pairs. fn build_aux_batch(&self, height: u64, batch: &mut KVBatch) -> Result { - let height_str = height.to_string(); - let prefix = Prefix::new("VER".as_bytes()).push(height_str.as_bytes()); - // Copy keys from batch to aux batch while prefixing them with the current height let mut aux_batch: KVBatch = batch .iter() - .map(|(k, v)| { - ( - prefix.clone().push(k.clone().as_slice()).as_ref().to_vec(), - v.clone(), - ) - }) + .map(|(k, v)| (Self::versioned_key(k, height), v.clone())) .collect(); // Store the current height in auxiliary batch - aux_batch.push((HEIGHT_KEY.to_vec(), Some(height_str.as_bytes().to_vec()))); + aux_batch.push((HEIGHT_KEY.to_vec(), Some(height.to_string().into_bytes()))); // Prune Aux data in the db self.prune_aux_batch(height, &mut aux_batch)?; @@ -274,6 +268,18 @@ where Ok(0u64) } + pub fn versioned_key(key: &[u8], height: u64) -> Vec { + Prefix::new("VER".as_bytes()) + .push(Self::height_str(height).as_bytes()) + .push(key) + .as_ref() + .to_vec() + } + + fn height_str(height: u64) -> String { + format!("{:020}", height) + } + /// Returns the Name of the ChainState pub fn name(&self) -> &str { self.name.as_str() @@ -284,6 +290,16 @@ where unimplemented!() } + pub fn build_state(&self, _height: u64) -> KVBatch { + //loop through height 1 to + //Iterate aux for prefixed height + //Use session cache data structure to apply each KV + //If a tombstone for a value is found, remove the key from the cache + //Convert cache to batch and return it + + KVBatch::new() + } + /// When creating a new chain-state instance, any residual aux data outside the current window /// needs to be cleared as to not waste memory or disrupt the versioning behaviour. fn clean_aux_db(&mut self) { @@ -296,19 +312,22 @@ where return; } + //Get batch for state at H = current_height - ver_window + //Commit this batch at base height H + //Define upper and lower bounds for iteration let upper = Prefix::new("VER".as_bytes()) - .push((current_height - self.ver_window).to_string().as_bytes()); - let lower = Prefix::new("VER".as_bytes()).push(1.to_string().as_bytes()); + .push(Self::height_str(current_height - self.ver_window).as_bytes()); + let lower = Prefix::new("VER".as_bytes()); //Create an empty batch let mut batch = KVBatch::new(); //Iterate aux data and delete keys within bounds self.iterate_aux( - lower.as_ref(), + lower.begin().as_ref(), upper.as_ref(), - IterOrder::Desc, + IterOrder::Asc, &mut |(k, _v)| -> bool { //Delete the key from aux db batch.push((k, None)); @@ -324,8 +343,10 @@ where #[cfg(test)] mod tests { use crate::db::{FinDB, IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; - use crate::state::chain_state; + use crate::state::chain_state::TOMBSTONE; + use crate::state::{chain_state, ChainState}; use std::thread; + const VER_WINDOW: u64 = 100; #[test] @@ -631,10 +652,11 @@ mod tests { //After each commit verify the values by using get_aux for k in 0..batch_size { - let key = format!("VER_{}_key-{}", i, k); + let key = + ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); let value = format!("val-{}", i); assert_eq!( - cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), + cs.get_aux(key.as_slice()).unwrap().unwrap(), value.as_bytes() ) } @@ -669,10 +691,11 @@ mod tests { //After each commit verify the values by using get_aux for k in 0..batch_size { - let key = format!("VER_{}_key-{}", i, k); + let key = + ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); let value = format!("val-{}", i); assert_eq!( - cs.get_aux(key.as_bytes()).unwrap().unwrap().as_slice(), + cs.get_aux(key.as_slice()).unwrap().unwrap().as_slice(), value.as_bytes() ) } @@ -681,15 +704,20 @@ mod tests { //Make sure random key is found within the current window. //This will be current height - window size = 10 in this case. assert_eq!( - cs.get_aux(b"VER_10_random_key").unwrap(), + cs.get_aux(ChainState::::versioned_key(b"random_key", 10).as_slice()) + .unwrap(), Some(b"random-value".to_vec()) ); //Query aux values that are older than the window size to confirm batches were pruned for i in 1..10 { for k in 0..batch_size { - let key = format!("VER_{}_key-{}", i, k); - assert_eq!(cs.get_aux(key.as_bytes()).unwrap(), None) + let key = + ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); + assert_eq!( + cs.get_aux(key.as_slice()).unwrap(), + Some(TOMBSTONE.to_vec()) + ) } } } @@ -726,16 +754,18 @@ mod tests { //Confirm keys older than new window size have been deleted for i in 1..(number_of_batches - new_window_size - 1) { for k in 0..batch_size { - let key = format!("VER_{}_key-{}", i, k); - assert_eq!(cs_new.get_aux(key.as_bytes()).unwrap(), None) + let key = + ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); + assert_eq!(cs_new.get_aux(key.as_slice()).unwrap(), None) } } //Confirm keys within new window size still exist for i in (number_of_batches - new_window_size)..number_of_batches { for k in 0..batch_size { - let key = format!("VER_{}_key-{}", i, k); - assert!(cs_new.exists_aux(key.as_bytes()).unwrap()) + let key = + ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); + assert!(cs_new.exists_aux(key.as_slice()).unwrap()) } } } diff --git a/src/state/mod.rs b/src/state/mod.rs index 780f0f1..4c5dfa8 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -280,7 +280,11 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); // Set maximum valid key and value @@ -304,7 +308,11 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); // Set maximum valid key and value @@ -327,7 +335,11 @@ mod tests { // Setup let path = thread::current().name().unwrap().to_owned(); let fdb = TempFinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); + let cs = Arc::new(RwLock::new(ChainState::new( + fdb, + "test_db".to_string(), + VER_WINDOW, + ))); let mut state = State::new(cs); // Set a big value From c98f60286468234d70afa466cd91f2177f782b70 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Wed, 8 Sep 2021 18:36:46 -0400 Subject: [PATCH 05/13] Added function to build state from height 1 Added some helper functions --- src/state/chain_state.rs | 124 +++++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 13 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 8a7f0fd..57fd7ab 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -4,6 +4,7 @@ /// and RocksDB backend. /// use crate::db::{IterOrder, KVBatch, KValue, MerkleDB}; +use crate::state::cache::KVMap; use crate::store::Prefix; use merk::tree::{Tree, NULL_HASH}; use ruc::*; @@ -166,12 +167,10 @@ where &old_window_limit.end(), IterOrder::Asc, &mut |(k, v)| -> bool { - let key: Vec<_> = str::from_utf8(&k).unwrap_or("").split(SPLIT_BGN).collect(); - if key.len() < 3 { + let raw_key = Self::get_raw_versioned_key(&k).unwrap_or_default(); + if raw_key.is_empty() { return false; } - let raw_key = key[2..].join(SPLIT_BGN); - //If the key doesn't already exist in the window start height, need to add it if !self .exists_aux(new_window_limit.push(raw_key.as_bytes()).as_ref()) @@ -188,7 +187,6 @@ where )); } //Delete the key from the batch - //TODO: Need to set the value to designated tombstone instead of None batch.push(( old_window_limit .clone() @@ -280,6 +278,17 @@ where format!("{:020}", height) } + fn get_raw_versioned_key(key: &[u8]) -> Result { + let key: Vec<_> = str::from_utf8(key) + .c(d!("key parse error"))? + .split(SPLIT_BGN) + .collect(); + if key.len() < 3 { + return Err(eg!("invalid key pattern")); + } + Ok(key[2..].join(SPLIT_BGN)) + } + /// Returns the Name of the ChainState pub fn name(&self) -> &str { self.name.as_str() @@ -290,14 +299,38 @@ where unimplemented!() } - pub fn build_state(&self, _height: u64) -> KVBatch { - //loop through height 1 to - //Iterate aux for prefixed height - //Use session cache data structure to apply each KV - //If a tombstone for a value is found, remove the key from the cache - //Convert cache to batch and return it + pub fn build_state(&self, height: u64) -> KVBatch { + //New map to store KV pairs + let mut map = KVMap::new(); + + let lower = Prefix::new("VER".as_bytes()); + let upper = Prefix::new("VER".as_bytes()).push(Self::height_str(height + 1).as_bytes()); - KVBatch::new() + self.iterate_aux( + lower.begin().as_ref(), + upper.as_ref(), + IterOrder::Asc, + &mut |(k, v)| -> bool { + let raw_key = Self::get_raw_versioned_key(&k).unwrap_or_default(); + if raw_key.is_empty() { + return false; + } + //If value was deleted in the version history, delete it in the map + if v.eq(TOMBSTONE.to_vec().as_slice()) { + map.remove(raw_key.as_bytes()); + } else { + //update map with current KV + map.insert(raw_key.as_bytes().to_vec(), Some(v)); + } + false + }, + ); + + let kvs: Vec<_> = map + .iter() + .map(|(k, v)| (Self::versioned_key(k, height), v.clone())) + .collect(); + kvs } /// When creating a new chain-state instance, any residual aux data outside the current window @@ -313,12 +346,17 @@ where } //Get batch for state at H = current_height - ver_window + let batch = self.build_state(current_height - self.ver_window); //Commit this batch at base height H + if self.db.commit(batch, true).is_err() { + println!("error building base chain state"); + return; + } //Define upper and lower bounds for iteration + let lower = Prefix::new("VER".as_bytes()); let upper = Prefix::new("VER".as_bytes()) .push(Self::height_str(current_height - self.ver_window).as_bytes()); - let lower = Prefix::new("VER".as_bytes()); //Create an empty batch let mut batch = KVBatch::new(); @@ -345,6 +383,7 @@ mod tests { use crate::db::{FinDB, IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; use crate::state::chain_state::TOMBSTONE; use crate::state::{chain_state, ChainState}; + use rand::Rng; use std::thread; const VER_WINDOW: u64 = 100; @@ -722,6 +761,65 @@ mod tests { } } + #[test] + fn test_build_state() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = TempFinDB::open(path).expect("failed to open db"); + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); + + let mut rng = rand::thread_rng(); + let mut keys = Vec::with_capacity(10); + for i in 0..10 { + keys.push(format!("key_{}", i)); + } + + //Apply several batches with select few keys at random + for h in 1..21 { + let mut batch = KVBatch::new(); + + //Build a random batch + for _ in 0..5 { + let rnd_key_idx = rng.gen_range(0..10); + let rnd_val = format!("val-{}", rng.gen_range(0..10)); + batch.push(( + keys[rnd_key_idx].clone().into_bytes(), + Some(rnd_val.into_bytes()), + )); + } + + let _ = cs.commit(batch, h, false); + } + + //Confirm the build_state function produces the same keys and values as the latest state. + let mut cs_batch = KVBatch::new(); + let bound = crate::store::Prefix::new("key".as_bytes()); + cs.iterate( + &bound.begin(), + &bound.end(), + IterOrder::Asc, + &mut |(k, v)| -> bool { + //Delete the key from aux db + cs_batch.push((k, Some(v))); + false + }, + ); + + let built_batch: Vec<_> = cs + .build_state(20) + .iter() + .map(|(k, v)| { + ( + ChainState::::get_raw_versioned_key(k) + .unwrap() + .into_bytes(), + v.clone(), + ) + }) + .collect(); + + assert!(cs_batch.eq(&built_batch)) + } + #[test] fn test_clean_aux_db() { let path = thread::current().name().unwrap().to_owned(); From a140646a77e435722f70a17b80653c3c76746d7d Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Thu, 9 Sep 2021 14:33:40 -0400 Subject: [PATCH 06/13] Added Get key by version interface Added unit test to confirm functionality --- src/state/chain_state.rs | 78 ++++++++++++++++++++++++++++++++++++---- src/state/mod.rs | 4 +++ 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 57fd7ab..c97c9d6 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -193,7 +193,7 @@ where .push(raw_key.as_ref()) .as_ref() .to_vec(), - Some(TOMBSTONE.to_vec()), + None, )); false }, @@ -209,7 +209,12 @@ where // Copy keys from batch to aux batch while prefixing them with the current height let mut aux_batch: KVBatch = batch .iter() - .map(|(k, v)| (Self::versioned_key(k, height), v.clone())) + .map(|(k, v)| { + ( + Self::versioned_key(k, height), + v.clone().map_or(Some(TOMBSTONE.to_vec()), Some), + ) + }) .collect(); // Store the current height in auxiliary batch @@ -333,6 +338,28 @@ where kvs } + pub fn get_ver(&self, key: &[u8], height: u64) -> Option> { + //Set bounds for iteration + let upper = Self::versioned_key(key, height + 1); + let lower = Self::versioned_key(key, 1); + + //Iterate in descending order from upper bound until a value is found + let mut result = None; + self.iterate_aux( + lower.as_ref(), + upper.as_ref(), + IterOrder::Desc, + &mut |(_k, v)| -> bool { + if v.eq(&TOMBSTONE) { + return true; + } + result = Some(v); + true + }, + ); + result + } + /// When creating a new chain-state instance, any residual aux data outside the current window /// needs to be cleared as to not waste memory or disrupt the versioning behaviour. fn clean_aux_db(&mut self) { @@ -381,7 +408,6 @@ where #[cfg(test)] mod tests { use crate::db::{FinDB, IterOrder, KVBatch, KValue, MerkleDB, TempFinDB}; - use crate::state::chain_state::TOMBSTONE; use crate::state::{chain_state, ChainState}; use rand::Rng; use std::thread; @@ -753,10 +779,7 @@ mod tests { for k in 0..batch_size { let key = ChainState::::versioned_key(format!("key-{}", k).as_bytes(), i); - assert_eq!( - cs.get_aux(key.as_slice()).unwrap(), - Some(TOMBSTONE.to_vec()) - ) + assert_eq!(cs.get_aux(key.as_slice()).unwrap(), None,) } } } @@ -867,4 +890,45 @@ mod tests { } } } + + #[test] + fn test_get_ver() { + let path = thread::current().name().unwrap().to_owned(); + let fdb = TempFinDB::open(path.clone()).expect("failed to open db"); + //Create new Chain State with new database + let mut cs = chain_state::ChainState::new(fdb, "test_db".to_string(), VER_WINDOW); + + //Commit a single key at different heights and values + for height in 1..21 { + let mut batch = KVBatch::new(); + if height == 3 { + batch.push((b"test_key".to_vec(), Some(b"test-val1".to_vec()))); + } + if height == 7 { + //Deleted key at height 7 + batch.push((b"test_key".to_vec(), None)); + } + if height == 15 { + batch.push((b"test_key".to_vec(), Some(b"test-val2".to_vec()))); + } + + let _ = cs.commit(batch, height, false); + } + + //Query the key at each version it was updated + assert_eq!(cs.get_ver(b"test_key", 3), Some(b"test-val1".to_vec())); + assert_eq!(cs.get_ver(b"test_key", 7), None); + assert_eq!(cs.get_ver(b"test_key", 15), Some(b"test-val2".to_vec())); + + //Query the key between update versions + assert_eq!(cs.get_ver(b"test_key", 5), Some(b"test-val1".to_vec())); + assert_eq!(cs.get_ver(b"test_key", 17), Some(b"test-val2".to_vec())); + assert_eq!(cs.get_ver(b"test_key", 10), None); + + //Query the key at a version it didn't exist + assert_eq!(cs.get_ver(b"test_key", 2), None); + + //Query the key after it's been deleted + assert_eq!(cs.get_ver(b"test_key", 8), None); + } } diff --git a/src/state/mod.rs b/src/state/mod.rs index 4c5dfa8..04a480e 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -68,6 +68,10 @@ where cs.get(key) } + pub fn get_ver(&self, key: &[u8], height: u64) -> Option> { + self.chain_state.read().get_ver(key, height) + } + /// Queries whether a key exists in the current state. /// /// First Checks the cache, returns true if found otherwise queries the chainState. From 842e843fe9578b60839ba0b0c30ef4afc1543035 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 11:24:59 -0400 Subject: [PATCH 07/13] Added some comments to describe new functions Refactored Build aux batch to ignore versioning if Ver_window == 0 --- src/state/chain_state.rs | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index c97c9d6..cc529d2 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -206,23 +206,26 @@ where /// /// This is to keep a versioned history of KV pairs. fn build_aux_batch(&self, height: u64, batch: &mut KVBatch) -> Result { - // Copy keys from batch to aux batch while prefixing them with the current height - let mut aux_batch: KVBatch = batch - .iter() - .map(|(k, v)| { - ( - Self::versioned_key(k, height), - v.clone().map_or(Some(TOMBSTONE.to_vec()), Some), - ) - }) - .collect(); + let mut aux_batch = KVBatch::new(); + if self.ver_window != 0 { + // Copy keys from batch to aux batch while prefixing them with the current height + aux_batch = batch + .iter() + .map(|(k, v)| { + ( + Self::versioned_key(k, height), + v.clone().map_or(Some(TOMBSTONE.to_vec()), Some), + ) + }) + .collect(); + + // Prune Aux data in the db + self.prune_aux_batch(height, &mut aux_batch)?; + } // Store the current height in auxiliary batch aux_batch.push((HEIGHT_KEY.to_vec(), Some(height.to_string().into_bytes()))); - // Prune Aux data in the db - self.prune_aux_batch(height, &mut aux_batch)?; - Ok(aux_batch) } @@ -271,6 +274,7 @@ where Ok(0u64) } + /// Build key Prefixed with Version height for Auxiliary data pub fn versioned_key(key: &[u8], height: u64) -> Vec { Prefix::new("VER".as_bytes()) .push(Self::height_str(height).as_bytes()) @@ -279,10 +283,12 @@ where .to_vec() } + /// Build a height string for versioning history fn height_str(height: u64) -> String { format!("{:020}", height) } + /// Deconstruct versioned key and return parsed raw key fn get_raw_versioned_key(key: &[u8]) -> Result { let key: Vec<_> = str::from_utf8(key) .c(d!("key parse error"))? @@ -304,6 +310,9 @@ where unimplemented!() } + /// Build the chain-state from height 1 to height H + /// + /// Returns a batch with KV pairs valid at height H pub fn build_state(&self, height: u64) -> KVBatch { //New map to store KV pairs let mut map = KVMap::new(); @@ -338,6 +347,10 @@ where kvs } + /// Get the value of a key at a given height + /// + /// Returns the value of the given key at a particular height + /// Returns None if the key was deleted or invalid at height H pub fn get_ver(&self, key: &[u8], height: u64) -> Option> { //Set bounds for iteration let upper = Self::versioned_key(key, height + 1); From 10164dacd44a2f36743d36ac83b3a57d2cf10d6a Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 12:29:03 -0400 Subject: [PATCH 08/13] Exposed get value by version to the store traits --- src/store/traits.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/store/traits.rs b/src/store/traits.rs index 586ab3b..562ef27 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -77,6 +77,11 @@ where self.state().get(key) } + /// get value by version. + fn get_v(&self, key: &[u8], height: u64) -> Option> { + self.state().get_ver(key, height) + } + /// iterate db only fn iter_db(&self, prefix: Prefix, asc: bool, func: &mut dyn FnMut(KValue) -> bool) -> bool { let mut iter_order = IterOrder::Desc; @@ -194,6 +199,11 @@ pub trait StatelessStore { state.get(key) } + /// get value by version. + fn get_v(state: &State, key: &[u8], height: u64) -> Option> { + state.get_ver(key, height) + } + /// iterate db only fn iter_db( state: &State, From 400ad08b04a6fa3b5c85fa9d80e002a5c5f4f949 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 13:31:00 -0400 Subject: [PATCH 09/13] Updated get value by version implementation --- src/state/chain_state.rs | 55 ++++++++++++++++++++++------------------ src/state/mod.rs | 2 +- src/store/traits.rs | 4 +-- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index cc529d2..26e333d 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -351,26 +351,19 @@ where /// /// Returns the value of the given key at a particular height /// Returns None if the key was deleted or invalid at height H - pub fn get_ver(&self, key: &[u8], height: u64) -> Option> { - //Set bounds for iteration - let upper = Self::versioned_key(key, height + 1); - let lower = Self::versioned_key(key, 1); - + pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { //Iterate in descending order from upper bound until a value is found let mut result = None; - self.iterate_aux( - lower.as_ref(), - upper.as_ref(), - IterOrder::Desc, - &mut |(_k, v)| -> bool { - if v.eq(&TOMBSTONE) { - return true; + for h in (1..height + 1).rev() { + let key = Self::versioned_key(key, h); + if let Some(val) = self.get_aux(&key).c(d!("error reading aux value"))? { + if val.eq(&TOMBSTONE) { + break; } - result = Some(v); - true - }, - ); - result + result = Some(val); + } + } + Ok(result) } /// When creating a new chain-state instance, any residual aux data outside the current window @@ -929,19 +922,31 @@ mod tests { } //Query the key at each version it was updated - assert_eq!(cs.get_ver(b"test_key", 3), Some(b"test-val1".to_vec())); - assert_eq!(cs.get_ver(b"test_key", 7), None); - assert_eq!(cs.get_ver(b"test_key", 15), Some(b"test-val2".to_vec())); + assert_eq!( + cs.get_ver(b"test_key", 3).unwrap(), + Some(b"test-val1".to_vec()) + ); + assert_eq!(cs.get_ver(b"test_key", 7).unwrap(), None); + assert_eq!( + cs.get_ver(b"test_key", 15).unwrap(), + Some(b"test-val2".to_vec()) + ); //Query the key between update versions - assert_eq!(cs.get_ver(b"test_key", 5), Some(b"test-val1".to_vec())); - assert_eq!(cs.get_ver(b"test_key", 17), Some(b"test-val2".to_vec())); - assert_eq!(cs.get_ver(b"test_key", 10), None); + assert_eq!( + cs.get_ver(b"test_key", 5).unwrap(), + Some(b"test-val1".to_vec()) + ); + assert_eq!( + cs.get_ver(b"test_key", 17).unwrap(), + Some(b"test-val2".to_vec()) + ); + assert_eq!(cs.get_ver(b"test_key", 10).unwrap(), None); //Query the key at a version it didn't exist - assert_eq!(cs.get_ver(b"test_key", 2), None); + assert_eq!(cs.get_ver(b"test_key", 2).unwrap(), None); //Query the key after it's been deleted - assert_eq!(cs.get_ver(b"test_key", 8), None); + assert_eq!(cs.get_ver(b"test_key", 8).unwrap(), None); } } diff --git a/src/state/mod.rs b/src/state/mod.rs index 04a480e..2cda7df 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -68,7 +68,7 @@ where cs.get(key) } - pub fn get_ver(&self, key: &[u8], height: u64) -> Option> { + pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { self.chain_state.read().get_ver(key, height) } diff --git a/src/store/traits.rs b/src/store/traits.rs index 562ef27..7f44272 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -78,7 +78,7 @@ where } /// get value by version. - fn get_v(&self, key: &[u8], height: u64) -> Option> { + fn get_v(&self, key: &[u8], height: u64) -> Result>> { self.state().get_ver(key, height) } @@ -200,7 +200,7 @@ pub trait StatelessStore { } /// get value by version. - fn get_v(state: &State, key: &[u8], height: u64) -> Option> { + fn get_v(state: &State, key: &[u8], height: u64) -> Result>> { state.get_ver(key, height) } From af15ab44a732734695bbf5ab4dead736a9962ebe Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 13:44:43 -0400 Subject: [PATCH 10/13] Updated Readme with versioning info. --- README.md | 92 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index d3316e7..355f945 100644 --- a/README.md +++ b/README.md @@ -9,50 +9,74 @@ The crate was designed to be the blockchain state database. It provides persiste - Key-value updates(in a block) applied to storage can be reverted if block failes to commit. - Prefix-based key-value iteration on commited state. - Prefix-based key-value iteration on latest state. +- Versioned key-value pairs available for queries. +- Entire chain-state available as a batch of key-value pairs at any height within version window. - Root hash calculation. **Example:** +```toml +[dependencies] +storage = { git = "ssh://git@github.com/FindoraNetwork/storage.git", branch = "master" } +parking_lot = "0.11.1" +``` ```rust extern crate storage; -use storage::state::ChainState; +use storage::state::{ChainState, State}; use storage::db::FinDB; use storage::store::PrefixedStore; use parking_lot::RwLock; use std::sync::Arc; +use std::thread; +use storage::store::traits::{Stated, Store}; + +// This window is used to determine how many versions of each KV pair are to be kept in the +// auxiliary db. +const VER_WINDOW:u64 = 100; + +fn main() { + println!("Testing Prefixed Store!"); + prefixed_store(); +} fn prefixed_store() { - // create store - let path = thread::current().name().unwrap().to_owned(); - let fdb = FinDB::open(path).expect("failed to open db"); - let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string()))); - let mut state = State::new(cs); - let mut store = PrefixedStore::new("my_store", &mut state); - let hash0 = store.state().root_hash(); - - // set kv pairs and commit - store.set(b"k10", b"v10".to_vec()); - store.set(b"k20", b"v20".to_vec()); - let (hash1, _height) = store.state_mut().commit(1).unwrap(); - - // verify - assert_eq!(store.get(b"k10").unwrap(), Some(b"v10".to_vec())); - assert_eq!(store.get(b"k20").unwrap(), Some(b"v20".to_vec())); - assert_ne!(hash0, hash1); - - // add, delete and update - store.set(b"k10", b"v15".to_vec()); - store.delete(b"k20").unwrap(); - store.set(b"k30", b"v30".to_vec()); - - // verify - assert_eq!(store.get(b"k10").unwrap(), Some(b"v15".to_vec())); - assert_eq!(store.get(b"k20").unwrap(), None); - assert_eq!(store.get(b"k30").unwrap(), Some(b"v30".to_vec())); - - // rollback and verify - store.state_mut().discard_session(); - assert_eq!(store.get(b"k10").unwrap(), Some(b"v10".to_vec())); - assert_eq!(store.get(b"k20").unwrap(), Some(b"v20".to_vec())); - assert_eq!(store.get(b"k30").unwrap(), None); + // create store + let path = thread::current().name().unwrap().to_owned(); + let fdb = FinDB::open(path).expect("failed to open db"); + let cs = Arc::new(RwLock::new(ChainState::new(fdb, "test_db".to_string(), VER_WINDOW))); + let mut state = State::new(cs); + let mut store = PrefixedStore::new("my_store", &mut state); + let hash0 = store.state().root_hash(); + + // set kv pairs and commit + store.set(b"k10", b"v10".to_vec()); + store.set(b"k20", b"v20".to_vec()); + let (hash1, _height) = store.state_mut().commit(1).unwrap(); + + // verify + assert_eq!(store.get(b"k10").unwrap(), Some(b"v10".to_vec())); + assert_eq!(store.get(b"k20").unwrap(), Some(b"v20".to_vec())); + assert_ne!(hash0, hash1); + + // add, delete and update + store.set(b"k10", b"v15".to_vec()); + store.delete(b"k20").unwrap(); + store.set(b"k30", b"v30".to_vec()); + + // verify + assert_eq!(store.get(b"k10").unwrap(), Some(b"v15".to_vec())); + assert_eq!(store.get(b"k20").unwrap(), None); + assert_eq!(store.get(b"k30").unwrap(), Some(b"v30".to_vec())); + + // rollback and verify + store.state_mut().discard_session(); + assert_eq!(store.get(b"k10").unwrap(), Some(b"v10".to_vec())); + assert_eq!(store.get(b"k20").unwrap(), Some(b"v20".to_vec())); + assert_eq!(store.get(b"k30").unwrap(), None); + + // get previous version of a key value pair + store.set(b"k10", b"v25".to_vec()); + let _ = store.state_mut().commit(2); + assert_eq!(store.get(b"k10").unwrap(), Some(b"v25".to_vec())); + assert_eq!(store.get_v(b"k10", 1).unwrap(), Some(b"v10".to_vec())); } ``` From bee1dde71f1c310d8edc294ec6d0dbee18e48a0f Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 13:57:19 -0400 Subject: [PATCH 11/13] fix spelling errors. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 355f945..2d9876b 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,8 @@ The crate was designed to be the blockchain state database. It provides persiste ### FEATURES: - Key-value updates(in a transaction) applied to storage can be reverted if transaction fails. -- Key-value updates(in a block) applied to storage can be reverted if block failes to commit. -- Prefix-based key-value iteration on commited state. +- Key-value updates(in a block) applied to storage can be reverted if block fails to commit. +- Prefix-based key-value iteration on committed state. - Prefix-based key-value iteration on latest state. - Versioned key-value pairs available for queries. - Entire chain-state available as a batch of key-value pairs at any height within version window. From 27ce6c03109fa06709c2471d2bc8acee5249ecd7 Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Mon, 13 Sep 2021 19:18:46 -0400 Subject: [PATCH 12/13] Set lower bound on get value by version. --- src/state/chain_state.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 26e333d..9c00cf9 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -352,9 +352,14 @@ where /// Returns the value of the given key at a particular height /// Returns None if the key was deleted or invalid at height H pub fn get_ver(&self, key: &[u8], height: u64) -> Result>> { + //Need to set lower bound as the height can get very large + let mut lower_bound = 1; + if height > self.ver_window { + lower_bound = height - self.ver_window; + } //Iterate in descending order from upper bound until a value is found let mut result = None; - for h in (1..height + 1).rev() { + for h in (lower_bound..height + 1).rev() { let key = Self::versioned_key(key, h); if let Some(val) = self.get_aux(&key).c(d!("error reading aux value"))? { if val.eq(&TOMBSTONE) { From 74662d5748f03c97941d398fa2051143b2161fce Mon Sep 17 00:00:00 2001 From: kevinssgh Date: Wed, 15 Sep 2021 15:00:06 -0400 Subject: [PATCH 13/13] Don't need to roll up keys from previous window if the value is a TOMBSTONE --- src/state/chain_state.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/state/chain_state.rs b/src/state/chain_state.rs index 9c00cf9..66552ee 100644 --- a/src/state/chain_state.rs +++ b/src/state/chain_state.rs @@ -172,9 +172,11 @@ where return false; } //If the key doesn't already exist in the window start height, need to add it + //If the value of this key is a TOMBSTONE then we don't need to add it if !self .exists_aux(new_window_limit.push(raw_key.as_bytes()).as_ref()) .unwrap_or(false) + && v.ne(&TOMBSTONE) { // Add the key to new window limit height batch.push((