Skip to content

Commit

Permalink
Merge pull request #20 from kaikai1024/rewrite-restore
Browse files Browse the repository at this point in the history
Rewrite the restore and add unit test.
  • Loading branch information
kaikai1024 committed Aug 26, 2019
2 parents 7a6f070 + 22f5689 commit 87b515b
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ Cargo.lock
**/*.rs.bk

# dev test data
rocksdb
rocksdb_test
133 changes: 95 additions & 38 deletions src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use rocksdb::{
BlockBasedOptions, ColumnFamily, DBCompactionStyle, DBIterator, IteratorMode, Options,
ReadOptions, WriteBatch, WriteOptions, DB,
};
use std::fs::{remove_dir_all, rename};
use std::io::ErrorKind;
use std::fs::{metadata, remove_dir_all, rename};

// The backup db path.
const BACKUP_PATH: &str = "backup_old_db";

// For the future: Add more info about db.
#[derive(Debug)]
Expand Down Expand Up @@ -93,41 +95,37 @@ impl RocksDB {
}

/// Restore the database from a copy at given path.
pub fn restore(&mut self, new_db: &str) -> Result<()> {
pub fn restore(&mut self, new_db_path: &str) -> Result<()> {
// Close it first
// https://github.com/facebook/rocksdb/wiki/Basic-Operations#closing-a-database
self.close();

let backup_db = Path::new("backup_db");
// Backup if the backup_path does not exist.
let backup = !path_exists(&BACKUP_PATH);

let existed = match rename(&self.path, &backup_db) {
Ok(_) => true,
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
false
} else {
return Err(DatabaseError::Internal(e.to_string()));
}
}
};
// Backup the old db
if backup {
rename(&self.path, &BACKUP_PATH)?;
}

match rename(&new_db, &self.path) {
// Restore the new db.
match rename(&new_db_path, &self.path) {
Ok(_) => {
// Clean up the backup.
if existed {
remove_dir_all(&backup_db)?;
// Clean up the backup db.
if backup {
remove_dir_all(&BACKUP_PATH)?;
}
}
Err(e) => {
// Restore the backup.
if existed {
rename(&backup_db, &self.path)?;
// Restore the backup db.
if backup {
rename(&BACKUP_PATH, &self.path)?;
}
return Err(DatabaseError::Internal(e.to_string()));
}
}

// Reopen the database
// Reopen the database.
let new_db = Self::open(&self.path, &self.config).unwrap().db_info;
*Arc::get_mut(&mut self.db_info).unwrap() = Arc::try_unwrap(new_db).unwrap();
Ok(())
Expand Down Expand Up @@ -167,6 +165,13 @@ impl RocksDB {
}
}
}

#[cfg(test)]
fn clean_db(&self) {
if path_exists(&self.path) {
remove_dir_all(&self.path).unwrap();
}
}
}

impl Database for RocksDB {
Expand Down Expand Up @@ -315,60 +320,71 @@ impl Database for RocksDB {
}
}

// Get the column from the data category.
fn get_column(db: &DB, category: DataCategory) -> Result<ColumnFamily> {
db.cf_handle(map_columns(category))
.ok_or(DatabaseError::NotFound)
}

// Check the path exists.
fn path_exists(path: &str) -> bool {
metadata(Path::new(path)).is_ok()
}

#[cfg(test)]
mod tests {
use super::{Config, RocksDB};
use crate::database::{DataCategory, Database};
use crate::error::DatabaseError;
use crate::rocksdb::{path_exists, BACKUP_PATH};
use crate::test::{batch_op, insert_get_contains_remove};
use std::fs::{create_dir, remove_dir_all};

#[test]
fn test_insert_get_contains_remove_with_category() {
let cfg = Config::with_category_num(Some(1));
let db = RocksDB::open(
"rocksdb/test_get_insert_contains_remove_with_category",
"rocksdb_test/get_insert_contains_remove_with_category",
&cfg,
)
.unwrap();

insert_get_contains_remove(&db, Some(DataCategory::State));

db.clean_cf();
db.clean_db();
}

#[test]
fn test_insert_get_contains_remove() {
let db = RocksDB::open_default("rocksdb/test_get_insert_contains_remove").unwrap();
let db = RocksDB::open_default("rocksdb_test/get_insert_contains_remove").unwrap();

insert_get_contains_remove(&db, None);
db.clean_db();
}

#[test]
fn test_batch_op_with_category() {
let cfg = Config::with_category_num(Some(1));
let db = RocksDB::open("rocksdb/test_batch_op_with_category", &cfg).unwrap();
let db = RocksDB::open("rocksdb_test/batch_op_with_category", &cfg).unwrap();

batch_op(&db, Some(DataCategory::State));

db.clean_cf();
db.clean_db();
}

#[test]
fn test_batch_op() {
let db = RocksDB::open_default("rocksdb/test_batch_op").unwrap();
let db = RocksDB::open_default("rocksdb_test/batch_op").unwrap();

batch_op(&db, None);
db.clean_db();
}

#[test]
fn test_insert_batch_error_with_category() {
let cfg = Config::with_category_num(Some(1));
let db = RocksDB::open("rocksdb/test_insert_batch_error_with_category", &cfg).unwrap();
let db = RocksDB::open("rocksdb_test/insert_batch_error_with_category", &cfg).unwrap();

let data = b"test".to_vec();

Expand All @@ -378,24 +394,26 @@ mod tests {
}

db.clean_cf();
db.clean_db();
}

#[test]
fn test_insert_batch_error() {
let db = RocksDB::open_default("rocksdb/test_insert_batch_error").unwrap();
let db = RocksDB::open_default("rocksdb_test/insert_batch_error").unwrap();

let data = b"test".to_vec();

match db.insert_batch(None, vec![data], vec![]) {
Err(DatabaseError::InvalidData) => (), // pass
_ => panic!("should return error DatabaseError::InvalidData"),
}
db.clean_db();
}

#[test]
fn test_iterator_with_category() {
let cfg = Config::with_category_num(Some(1));
let db = RocksDB::open("rocksdb/test_iterator_with_category", &cfg).unwrap();
let db = RocksDB::open("rocksdb_test/iterator_with_category", &cfg).unwrap();

let data1 = b"test1".to_vec();
let data2 = b"test2".to_vec();
Expand All @@ -412,19 +430,20 @@ mod tests {
.into_iter()
.flat_map(|inner| inner)
.collect();
println!("contents: {:?}", contents);

assert_eq!(contents.len(), 2);
assert_eq!(&*contents[0].0, &*data1);
assert_eq!(&*contents[0].1, &*data1);
assert_eq!(&*contents[1].0, &*data2);
assert_eq!(&*contents[1].1, &*data2);

db.clean_cf();
db.clean_db();
}

#[test]
fn test_iterator() {
let db = RocksDB::open_default("rocksdb/test_iterator").unwrap();
let db = RocksDB::open_default("rocksdb_test/iterator").unwrap();

let data1 = b"test1".to_vec();
let data2 = b"test2".to_vec();
Expand All @@ -446,18 +465,19 @@ mod tests {
assert_eq!(&*contents[0].1, &*data1);
assert_eq!(&*contents[1].0, &*data2);
assert_eq!(&*contents[1].1, &*data2);
db.clean_db();
}

#[test]
fn test_close_with_category() {
let cfg = Config::with_category_num(Some(1));
let mut db = RocksDB::open("rocksdb/test_close_with_category", &cfg).unwrap();
let mut db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
let data = b"test".to_vec();
db.insert(Some(DataCategory::State), data.clone(), data.clone())
.unwrap();
assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
// Can not open it again
match RocksDB::open_default("rocksdb/test_close_with_category") {
match RocksDB::open_default("rocksdb_test/close_with_category") {
// "IO error: lock : rocksdb/test_close/LOCK: No locks available"
Err(DatabaseError::Internal(_)) => (), // pass
_ => panic!("should return error DatabaseError::Intrnal"),
Expand All @@ -468,18 +488,19 @@ mod tests {

// Can open it again and query
let cfg = Config::with_category_num(Some(1));
let db = RocksDB::open("rocksdb/test_close_with_category", &cfg).unwrap();
let db = RocksDB::open("rocksdb_test/close_with_category", &cfg).unwrap();
assert_eq!(db.contains(Some(DataCategory::State), &data), Ok(true));
db.clean_db();
}

#[test]
fn test_close() {
let mut db = RocksDB::open_default("rocksdb/test_close").unwrap();
let mut db = RocksDB::open_default("rocksdb_test/close").unwrap();
let data = b"test".to_vec();
db.insert(None, data.clone(), data.clone()).unwrap();
assert_eq!(db.contains(None, &data), Ok(true));
// Can not open it again
match RocksDB::open_default("rocksdb/test_close") {
match RocksDB::open_default("rocksdb_test/close") {
// "IO error: lock : rocksdb/test_close/LOCK: No locks available"
Err(DatabaseError::Internal(_)) => (), // pass
_ => panic!("should return error DatabaseError::Intrnal"),
Expand All @@ -489,8 +510,44 @@ mod tests {
assert_eq!(db.contains(None, &data), Ok(false));

// Can open it again and query
let db = RocksDB::open_default("rocksdb/test_close").unwrap();
let db = RocksDB::open_default("rocksdb_test/close").unwrap();
assert_eq!(db.contains(None, &data), Ok(true));
db.clean_db();
}

#[test]
fn test_restore() {
// No backup
if path_exists(BACKUP_PATH) {
remove_dir_all(BACKUP_PATH).unwrap();
}
let mut db = RocksDB::open_default("rocksdb_test/restore_backup").unwrap();
let new_path_with_backup = "rocksdb_test/restore_new_db_with_backup";
let new_db = RocksDB::open_default(new_path_with_backup).unwrap();
let data = b"test_no_backup".to_vec();
new_db.insert(None, data.clone(), data.clone()).unwrap();
assert_eq!(db.contains(None, &data), Ok(false));
assert_eq!(db.restore(new_path_with_backup), Ok(()));
assert_eq!(db.contains(None, &data), Ok(true));
// Clean the data
db.clean_db();
new_db.clean_db();

// Backup
let new_path_no_backup = "rocksdb_test/restore_new_db_no_backup";
let mut db = RocksDB::open_default("rocksdb_test/restore_no_backup").unwrap();
if !path_exists(BACKUP_PATH) {
create_dir(BACKUP_PATH).unwrap();
}

match db.restore(new_path_no_backup) {
//`Err(Internal("Directory not empty (os error 39)"))`
Err(DatabaseError::Internal(_)) => (), // pass
_ => panic!("should return error DatabaseError::Intrnal"),
}

// Clean the data
remove_dir_all(BACKUP_PATH).unwrap();
db.clean_db();
}
}

0 comments on commit 87b515b

Please sign in to comment.