Skip to content

Commit

Permalink
Merge pull request #6 from FindoraNetwork/feat-non-merkle-store
Browse files Browse the repository at this point in the history
initiated non-merkle sessioned store
  • Loading branch information
ws4charlie committed Sep 3, 2021
2 parents 73e2ad4 + f4ed18c commit d6317a2
Show file tree
Hide file tree
Showing 10 changed files with 1,702 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ edition = "2018"
[dependencies]
ruc = { git = "https://github.com/FindoraNetwork/RUC.git", branch = "master" }
merk = { git = "ssh://git@github.com/FindoraNetwork/merk", branch = "findora" }
rocksdb = "0.15.0"
parking_lot = "0.11.1"
serde = "1.0.124"
serde_json = "1.0.64"
rand = "0.8.3"
num_cpus = "1.13.0"
2 changes: 1 addition & 1 deletion src/db/merk_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl MerkleDB for FinDB {
}

/// Converts KVEntry to BatchEntry
fn to_batch<I: IntoIterator<Item = KVEntry>>(items: I) -> Vec<BatchEntry> {
pub fn to_batch<I: IntoIterator<Item = KVEntry>>(items: I) -> Vec<BatchEntry> {
let mut batch = Vec::new();
for (key, val) in items {
match val {
Expand Down
6 changes: 5 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
mod merk_db;
mod rocks_db;
mod temp_db;
mod temp_db_rocks;

pub use merk_db::{FinDB, IterOrder, KVBatch, KValue, MerkleDB, StoreKey};
pub use merk_db::{FinDB, IterOrder, KVBatch, KVEntry, KValue, MerkleDB, StoreKey};
pub use rocks_db::{IRocksDB, RocksDB};
pub use temp_db::TempFinDB;
pub use temp_db_rocks::TempRocksDB;
128 changes: 128 additions & 0 deletions src/db/rocks_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use merk::Op;
use ruc::*;
use std::path::{Path, PathBuf};

use crate::db::merk_db::{to_batch, DBIter};
use crate::db::{IterOrder, KVBatch};

const CF_STATE: &str = "state";

/// RocksDB KV store interface
pub trait IRocksDB {
/// Gets a value for the given key.
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;

/// Gets range iterator
fn iter(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DBIter;

/// Commits changes.
fn commit(&mut self, kvs: KVBatch) -> Result<()>;
}

/// Rocks db
pub struct RocksDB {
db: rocksdb::DB,
path: PathBuf,
}

impl RocksDB {
/// Opens a store with the specified file path. If no store exists at that
/// path, one will be created.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let db_opts = Self::default_db_opts();
Self::open_opt(path, db_opts)
}

/// Closes the store and deletes all data from disk.
pub fn destroy(self) -> Result<()> {
let opts = Self::default_db_opts();
let path = self.path.clone();
drop(self);
rocksdb::DB::destroy(&opts, path).c(d!())?;
Ok(())
}

/// Opens a store with the specified file path and the given options. If no
/// store exists at that path, one will be created.
fn open_opt<P>(path: P, db_opts: rocksdb::Options) -> Result<Self>
where
P: AsRef<Path>,
{
let mut path_buf = PathBuf::new();
path_buf.push(path);
let cfs = vec![rocksdb::ColumnFamilyDescriptor::new(
CF_STATE,
Self::default_db_opts(),
)];
let db = rocksdb::DB::open_cf_descriptors(&db_opts, &path_buf, cfs).c(d!())?;

Ok(Self { db, path: path_buf })
}

fn default_db_opts() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_allow_mmap_writes(true);
opts.set_allow_mmap_reads(true);
opts.create_missing_column_families(true);
opts.set_atomic_flush(true);
opts
}

fn iter_opt(
&self,
mode: rocksdb::IteratorMode,
readopts: rocksdb::ReadOptions,
) -> rocksdb::DBIterator {
let aux_cf = self.db.cf_handle(CF_STATE).unwrap();
self.db.iterator_cf_opt(aux_cf, readopts, mode)
}
}

impl IRocksDB for RocksDB {
/// Gets a value for the given key. If the key is not found, `None` is returned.
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
if let Some(cf) = self.db.cf_handle(CF_STATE) {
Ok(self.db.get_cf(cf, key).c(d!("get data failed"))?)
} else {
Ok(None)
}
}

/// Gets range iterator
fn iter(&self, lower: &[u8], upper: &[u8], order: IterOrder) -> DBIter {
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(lower.to_vec());
readopts.set_iterate_upper_bound(upper.to_vec());
match order {
IterOrder::Asc => self.iter_opt(rocksdb::IteratorMode::Start, readopts),
IterOrder::Desc => self.iter_opt(rocksdb::IteratorMode::End, readopts),
}
}

/// Commits changes.
fn commit(&mut self, kvs: KVBatch) -> Result<()> {
// update cf in batch
let batch_kvs = to_batch(kvs);
let state_cf = self.db.cf_handle(CF_STATE).unwrap();
let mut batch = rocksdb::WriteBatch::default();
for (key, value) in batch_kvs {
match value {
Op::Put(value) => batch.put_cf(state_cf, key, value),
Op::Delete => batch.delete_cf(state_cf, key),
};
}

// write to db
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(false);
self.db.write_opt(batch, &opts).c(d!())?;

// flush
self.db
.flush()
.map_err(|_| eg!("Failed to flush memtables"))?;

Ok(())
}
}
220 changes: 220 additions & 0 deletions src/db/temp_db_rocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use crate::db::{IRocksDB, KVBatch, RocksDB};
use ruc::*;
use std::env::temp_dir;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::time::SystemTime;

/// Wraps a RocksDB instance and deletes it from disk it once it goes out of scope.
pub struct TempRocksDB {
inner: Option<RocksDB>,
}

impl TempRocksDB {
pub fn open<P: AsRef<Path>>(path: P) -> Result<TempRocksDB> {
let inner = Some(RocksDB::open(path)?);
Ok(TempRocksDB { inner })
}

/// Opens a `TempRocksDB` at an autogenerated, temporary file path.
pub fn new() -> Result<TempRocksDB> {
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos();
let mut path = temp_dir();
path.push(format!("temp-rocksdb–{}", time));
TempRocksDB::open(path)
}

/// Closes db and deletes all data from disk.
fn destroy(&mut self) -> Result<()> {
self.inner.take().unwrap().destroy()
}
}

impl IRocksDB for TempRocksDB {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.deref().get(key)
}

fn iter(&self, lower: &[u8], upper: &[u8], order: super::IterOrder) -> super::merk_db::DBIter {
self.deref().iter(lower, upper, order)
}

fn commit(&mut self, kvs: KVBatch) -> Result<()> {
self.deref_mut().commit(kvs)
}
}

impl Deref for TempRocksDB {
type Target = RocksDB;
fn deref(&self) -> &RocksDB {
self.inner.as_ref().unwrap()
}
}

impl DerefMut for TempRocksDB {
fn deref_mut(&mut self) -> &mut RocksDB {
self.inner.as_mut().unwrap()
}
}

impl Drop for TempRocksDB {
fn drop(&mut self) {
self.destroy().expect("failed to delete db");
}
}

#[cfg(test)]
mod tests {
use super::TempRocksDB;
use crate::db::{IRocksDB, IterOrder};
use std::thread;

#[test]
fn db_put_n_get() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![
(b"k10".to_vec(), Some(b"v10".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
])
.unwrap();

// get and compare
assert_eq!(db.get(b"k10").unwrap().unwrap(), b"v10".to_vec());
assert_eq!(db.get(b"k20").unwrap().unwrap(), b"v20".to_vec());
}

#[test]
fn db_del_n_get() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![
(b"k10".to_vec(), Some(b"v10".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
])
.unwrap();

// del data at height 101
db.commit(vec![(b"k10".to_vec(), None), (b"k20".to_vec(), None)])
.unwrap();

// get and compare
assert_eq!(db.get(b"k10").unwrap(), None);
assert_eq!(db.get(b"k20").unwrap(), None);
}

#[test]
fn db_put_n_update() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![(b"k10".to_vec(), Some(b"v10".to_vec()))])
.unwrap();

// update data at height
db.commit(vec![
(b"k10".to_vec(), Some(b"v12".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
])
.unwrap();

// get and compare
assert_eq!(db.get(b"k10").unwrap(), Some(b"v12".to_vec()));
assert_eq!(db.get(b"k20").unwrap(), Some(b"v20".to_vec()));
}

#[test]
fn del_n_iter_range() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![
(b"k10".to_vec(), Some(b"v10".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
(b"k30".to_vec(), Some(b"v30".to_vec())),
(b"k40".to_vec(), Some(b"v40".to_vec())),
(b"k50".to_vec(), Some(b"v50".to_vec())),
])
.unwrap();

// del data at height 101
db.commit(vec![(b"k20".to_vec(), None), (b"k40".to_vec(), None)])
.unwrap();

// iterate data on range ["k10", "k50")
let iter = db.iter(b"k10", b"k50", IterOrder::Asc);
let expected = vec![
(b"k10".to_vec(), b"v10".to_vec()),
(b"k30".to_vec(), b"v30".to_vec()),
];
let actual = iter
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect::<Vec<_>>();
assert_eq!(expected, actual);
}

#[test]
fn iter_range_inc() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![
(b"k10".to_vec(), Some(b"v10".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
(b"k30".to_vec(), Some(b"v30".to_vec())),
(b"k40".to_vec(), Some(b"v40".to_vec())),
(b"k50".to_vec(), Some(b"v50".to_vec())),
])
.unwrap();

// iterate data on range ["k20", "k50")
let iter = db.iter(b"k20", b"k50", IterOrder::Asc);
let expected = vec![
(b"k20".to_vec(), b"v20".to_vec()),
(b"k30".to_vec(), b"v30".to_vec()),
(b"k40".to_vec(), b"v40".to_vec()),
];
let actual = iter
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect::<Vec<_>>();
assert_eq!(expected, actual);
}

#[test]
fn iter_range_desc() {
let path = thread::current().name().unwrap().to_owned();
let mut db = TempRocksDB::open(path).expect("failed to open db");

// commit data
db.commit(vec![
(b"k10".to_vec(), Some(b"v10".to_vec())),
(b"k20".to_vec(), Some(b"v20".to_vec())),
(b"k30".to_vec(), Some(b"v30".to_vec())),
(b"k40".to_vec(), Some(b"v40".to_vec())),
(b"k50".to_vec(), Some(b"v50".to_vec())),
])
.unwrap();

// iterate data on range ["k20", "k50")
let iter = db.iter(b"k20", b"k50", IterOrder::Desc);
let expected = vec![
(b"k40".to_vec(), b"v40".to_vec()),
(b"k30".to_vec(), b"v30".to_vec()),
(b"k20".to_vec(), b"v20".to_vec()),
];
let actual = iter
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect::<Vec<_>>();
assert_eq!(expected, actual);
}
}

0 comments on commit d6317a2

Please sign in to comment.