Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Naive DB + Rocksdb implemenation #125

Merged
merged 23 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"
network = { path = "network" }
ferret-libp2p = { path = "ferret-libp2p"}
utils = { path = "utils" }
db = { path = "db" }

libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "776d13ef046358964c7d64cda3295a3a3cb24743" }
tokio = "0.1.22"
Expand Down
8 changes: 8 additions & 0 deletions node/db/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "db"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
rocksdb = "0.13.0"
21 changes: 21 additions & 0 deletions node/db/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use rocksdb;
use std::fmt;

#[derive(Debug, PartialEq)]
pub struct Error {
msg: String,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Database error: {}", self.msg)
}
}

impl From<rocksdb::Error> for Error {
fn from(e: rocksdb::Error) -> Error {
Error {
msg: String::from(e),
}
}
}
33 changes: 33 additions & 0 deletions node/db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
pub mod errors;
pub mod rocks;

use errors::Error;

pub trait Write {
fn write<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>;
fn delete<K>(&self, key: K) -> Result<(), Error>
where
K: AsRef<[u8]>;
fn bulk_write<K, V>(&self, keys: &[K], values: &[V]) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>;
fn bulk_delete<K>(&self, keys: &[K]) -> Result<(), Error>
where
K: AsRef<[u8]>;
}

pub trait Read {
fn read<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>;
fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: AsRef<[u8]>;
fn bulk_read<K>(&self, keys: &[K]) -> Result<Vec<Option<Vec<u8>>>, Error>
where
K: AsRef<[u8]>;
}
90 changes: 90 additions & 0 deletions node/db/src/rocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use super::errors::Error;
use super::{Read, Write};
use rocksdb::{Options, WriteBatch, DB};
use std::path::Path;

#[derive(Debug)]
pub struct RocksDb {
db: DB,
}

impl RocksDb {
pub fn open(path: &Path) -> Result<Self, Error> {
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
let db = DB::open(&db_opts, path)?;
Ok(Self { db })
}
}

impl Write for RocksDb {
fn write<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
Ok(self.db.put(key, value)?)
}

fn delete<K>(&self, key: K) -> Result<(), Error>
where
K: AsRef<[u8]>,
{
Ok(self.db.delete(key)?)
}

fn bulk_write<K, V>(&self, keys: &[K], values: &[V]) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::default();
for (k, v) in keys.iter().zip(values.iter()) {
batch.put(k, v)?;
}
Ok(self.db.write(batch)?)
}

fn bulk_delete<K>(&self, keys: &[K]) -> Result<(), Error>
where
K: AsRef<[u8]>,
{
for k in keys.iter() {
self.db.delete(k)?;
}
Ok(())
}
}

impl Read for RocksDb {
fn read<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
{
self.db.get(key).map_err(Error::from)
}

fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: AsRef<[u8]>,
{
self.db
.get_pinned(key)
.map(|v| v.is_some())
.map_err(Error::from)
}

fn bulk_read<K>(&self, keys: &[K]) -> Result<Vec<Option<Vec<u8>>>, Error>
where
K: AsRef<[u8]>,
{
let mut v = Vec::with_capacity(keys.len());
for k in keys.iter() {
match self.db.get(k) {
Ok(val) => v.push(val),
Err(e) => return Err(Error::from(e)),
}
}
Ok(v)
}
}
41 changes: 41 additions & 0 deletions node/db/tests/db_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Taken from
// https://github.com/rust-rocksdb/rust-rocksdb/blob/master/tests/util/mod.rs
use rocksdb::{Options, DB};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

GregTheGreek marked this conversation as resolved.
Show resolved Hide resolved
/// Ensures that DB::Destroy is called for this database when DBPath is dropped.
pub struct DBPath {
pub path: PathBuf,
}

impl DBPath {
/// Suffixes the given `prefix` with a timestamp to ensure that subsequent test runs don't reuse
/// an old database in case of panics prior to Drop being called.
pub fn new(prefix: &str) -> DBPath {
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let path = format!(
"{}.{}.{}",
prefix,
current_time.as_secs(),
current_time.subsec_nanos()
);

DBPath {
path: PathBuf::from(path),
}
}
}

impl Drop for DBPath {
fn drop(&mut self) {
let opts = Options::default();
DB::destroy(&opts, &self.path).unwrap();
}
}

impl AsRef<Path> for DBPath {
fn as_ref(&self) -> &Path {
&self.path
}
}
108 changes: 108 additions & 0 deletions node/db/tests/rocks_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
mod db_utils;

use db::{rocks::RocksDb, Read, Write};
use db_utils::DBPath;

#[test]
fn start() {
let path = DBPath::new("start_rocks_test");
RocksDb::open(path.as_ref()).unwrap();
}

#[test]
fn write() {
let path = DBPath::new("write_rocks_test");
let key = [1];
let value = [1];

let db: RocksDb = RocksDb::open(path.as_ref()).unwrap();
db.write(key, value).unwrap();
}

#[test]
fn read() {
let path = DBPath::new("read_rocks_test");
let key = [0];
let value = [1];
let db = RocksDb::open(path.as_ref()).unwrap();
db.write(key.clone(), value.clone()).unwrap();
let res = db.read(key).unwrap().unwrap();
assert_eq!(value.to_vec(), res);
}

#[test]
fn exists() {
let path = DBPath::new("exists_rocks_test");
let key = [0];
let value = [1];
let db = RocksDb::open(path.as_ref()).unwrap();
db.write(key.clone(), value.clone()).unwrap();
let res = db.exists(key).unwrap();
assert_eq!(res, true);
}

#[test]
fn does_not_exist() {
let path = DBPath::new("does_not_exists_rocks_test");
let key = [0];
let db = RocksDb::open(path.as_ref()).unwrap();
let res = db.exists(key).unwrap();
assert_eq!(res, false);
}

#[test]
fn delete() {
let path = DBPath::new("delete_rocks_test");
let key = [0];
let value = [1];
let db = RocksDb::open(path.as_ref()).unwrap();
db.write(key.clone(), value.clone()).unwrap();
let res = db.exists(key.clone()).unwrap();
assert_eq!(res, true);
db.delete(key.clone()).unwrap();
let res = db.exists(key.clone()).unwrap();
assert_eq!(res, false);
}

#[test]
fn bulk_write() {
let path = DBPath::new("bulk_write_rocks_test");
let keys = [[0], [1], [2]];
let values = [[0], [1], [2]];
let db = RocksDb::open(path.as_ref()).unwrap();
db.bulk_write(&keys, &values).unwrap();
for k in keys.iter() {
let res = db.exists(k.clone()).unwrap();
assert_eq!(res, true);
}
}

#[test]
fn bulk_read() {
let path = DBPath::new("bulk_read_rocks_test");
let keys = [[0], [1], [2]];
let values = [[0], [1], [2]];
let db = RocksDb::open(path.as_ref()).unwrap();
db.bulk_write(&keys, &values).unwrap();
let results = db.bulk_read(&keys).unwrap();
for (result, value) in results.iter().zip(values.iter()) {
match result {
Some(v) => assert_eq!(v, value),
None => panic!("No values found!"),
}
}
}

#[test]
fn bulk_delete() {
let path = DBPath::new("bulk_delete_rocks_test");
let keys = [[0], [1], [2]];
let values = [[0], [1], [2]];
let db = RocksDb::open(path.as_ref()).unwrap();
db.bulk_write(&keys, &values).unwrap();
db.bulk_delete(&keys).unwrap();
for k in keys.iter() {
let res = db.exists(k.clone()).unwrap();
assert_eq!(res, false);
}
}