Skip to content

Commit

Permalink
Add backup module of graph store (#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldenleaves committed Oct 25, 2021
1 parent 8004e22 commit d970e88
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 12 deletions.
8 changes: 7 additions & 1 deletion interactive_engine/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ rust-ini = "0.13"
libc = "0.2"
log4rs = "0.8.0"
grpcio = "=0.4.1"
rocksdb = "0.17.0"
#rocksdb = "0.17.0"

maxgraph-common = { path = "../../rust-common" }

# For temporal use to support restoring from a specified backup.
# Change to a stable version of rust-rocksdb later.
[dependencies.rocksdb]
git = "https://github.com/GoldenLeaves/rust-rocksdb.git"
rev = "082dfcfbb4b51e61d1c01672951b84cb464601e2"

[build-dependencies]
protoc-grpcio = "0.3.0"
49 changes: 48 additions & 1 deletion interactive_engine/executor/store/src/db/api/graph.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::marker::PhantomData;
use super::schema::*;
use super::{VertexId, SnapshotId, LabelId, GraphResult, EdgeId,
use super::{VertexId, SnapshotId, LabelId, BackupId, GraphResult, EdgeId,
Vertex, Edge, EdgeKind, PropId, PropertyMap, Condition,
PropertiesRef, ValueRef};
use crate::db::api::DataLoadTarget;
Expand Down Expand Up @@ -163,6 +163,53 @@ pub trait GraphStorage {
fn prepare_data_load(&self, si: SnapshotId, schema_version: i64, target: &DataLoadTarget, table_id: i64) -> GraphResult<bool>;

fn commit_data_load(&self, si: SnapshotId, schema_version: i64, target: &DataLoadTarget, table_id: i64) -> GraphResult<bool>;

/// Open a backup engine of graph storage that implements GraphBackup trait.
fn open_backup_engine(&self, backup_path: &str) -> GraphResult<Box<dyn GraphBackup>>;
}

pub trait GraphBackup {
/// Create a new backup of graph store. This interface is thread safe.
///
/// Returns the new created backup id if successful, `GraphError` otherwise.
fn create_new_backup(&mut self) -> GraphResult<BackupId>;

/// Delete a backup of `backup_id`. This interface is thread safe.
///
/// If `backup_id` is not available, something error when deleting or other errors,
/// `GraphError` will be returned.
fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()>;

/// Purge old backups from all existed backups, keep latest `num_backups_to_keep` backups.
/// This interface is thread safe.
///
/// If `num_backups_to_keep` is illegal, something error when purging or other errors,
/// `GraphError` will be returned.
fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()>;

/// Restore the graph store from `backup_id` at `restore_path`. This interface is thread safe.
///
/// If `restore_path` is not available,`backup_id` is not available, something error when
/// restoring or other errors, `GraphError` will be returned.
fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()>;

/// Restore the graph store from the latest backup at `restore_path`.
/// This interface is thread safe.
///
/// If `restore_path` is not available,no backup available, something error when restoring or
/// other errors, `GraphError` will be returned.
fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()>;

/// Verify the backup of `backup_id`. This interface is thread safe.
///
/// If `backup_id` is not available, backup files are broken, backup checksum mismatch or
/// other errors, `GraphError` will be returned.
fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()>;

/// Get the current available backup id list. This interface is thread safe.
///
/// Returns the available backup id vector(may be empty)。
fn get_backup_list(&self) -> Vec<BackupId>;
}

pub trait VertexResultIter {
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/store/src/db/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub type SnapshotId = i64;
pub type VertexId = i64;
pub type LabelId = i32;
pub type PropId = i32;
pub type BackupId = i32;
pub type GraphResult<T> = Result<T, GraphError>;

pub const MAX_SI: SnapshotId = SnapshotId::max_value() - 1;
Expand Down
56 changes: 54 additions & 2 deletions interactive_engine/executor/store/src/db/graph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicIsize, Ordering};
use crate::db::api::*;
use crate::db::storage::ExternalStorage;
use crate::db::storage::rocksdb::RocksDB;
use crate::db::storage::{ExternalStorage, ExternalStorageBackup};
use crate::db::storage::rocksdb::{RocksDB};
use crate::db::util::lock::GraphMutexLock;
use super::vertex::*;
use super::edge::*;
Expand All @@ -28,6 +28,10 @@ pub struct GraphStore {
lock: GraphMutexLock<()>,
}

pub struct GraphBackupEngine {
engine: Box<dyn ExternalStorageBackup>,
}

impl GraphStorage for GraphStore {
type V = VertexImpl;
type E = EdgeImpl;
Expand Down Expand Up @@ -328,6 +332,44 @@ impl GraphStorage for GraphStore {
}
Ok(true)
}

fn open_backup_engine(&self, backup_path: &str) -> GraphResult<Box<dyn GraphBackup>> {
let engine = res_unwrap!(self.storage.open_backup_engine(backup_path), open_backup_engine, backup_path)?;
let ret = GraphBackupEngine {
engine
};
Ok(Box::from(ret))
}
}

impl GraphBackup for GraphBackupEngine {
fn create_new_backup(&mut self) -> GraphResult<BackupId> {
self.engine.create_new_backup()
}

fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()> {
self.engine.delete_backup(backup_id)
}

fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()> {
self.engine.purge_old_backups(num_backups_to_keep)
}

fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()> {
self.engine.restore_from_backup(restore_path, backup_id)
}

fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()> {
self.engine.restore_from_latest_backup(restore_path)
}

fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()> {
self.engine.verify_backup(backup_id)
}

fn get_backup_list(&self) -> Vec<BackupId> {
self.engine.get_backup_list()
}
}

impl GraphStore {
Expand Down Expand Up @@ -634,6 +676,16 @@ mod tests {
do_test(path, |graph| tests::graph::test_si_guard(graph));
}

#[test]
fn test_backup_engine() {
let test_dir = "test_backup_engine";
fs::rmr(&test_dir).unwrap();
let store_path = format!("store_test/{}/store", test_dir);
let graph = create_empty_graph(&store_path);
tests::backup::test_backup_engine(graph, test_dir);
fs::rmr(&test_dir).unwrap();
}

fn do_test<F: Fn(GraphStore)>(path: &str, func: F) {
let path = format!("store_test/{}", path);
fs::rmr(&path).unwrap();
Expand Down
59 changes: 59 additions & 0 deletions interactive_engine/executor/store/src/db/graph/tests/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::db::api::*;
use crate::db::graph::store::GraphStore;
use super::types;
use std::collections::HashMap;

pub fn test_backup_engine<G: GraphStorage>(graph: G, test_dir: &str) {
let backup_path = format!("store_test/{}/backup", test_dir);
let mut backup_engine = graph.open_backup_engine(&backup_path).unwrap();

// insert data
let mut schema_version = 1;
graph.create_vertex_type(10, schema_version, 1, &types::create_test_type_def(1)).unwrap();
let properties: HashMap<PropId, Value> = HashMap::new();
graph.insert_overwrite_vertex(11, 1, 1, &properties).unwrap();
graph.insert_overwrite_vertex(12, 2, 1, &properties).unwrap();

// create the first backup
let backup_1_id = backup_engine.create_new_backup().unwrap();
// delete vertex id '2' and create the second backup
graph.delete_vertex(13, 2, 1).unwrap();
let backup_2_id = backup_engine.create_new_backup().unwrap();
assert!(backup_2_id > backup_1_id);

// verify backups
let backup_list = backup_engine.get_backup_list();
assert_eq!(backup_list.len(), 2);
backup_list.iter().for_each(|i| {
assert!(backup_engine.verify_backup(*i).is_ok());
});

// restore the first backup
let restore_path_1 = format!("store_test/{}/restore_1", test_dir);
backup_engine.restore_from_backup(&restore_path_1, backup_1_id).unwrap();
// test backup
let restore_store_1 = open_graph(&restore_path_1);
assert_eq!(restore_store_1.get_vertex(15, 1, Some(1)).unwrap().unwrap().get_id(), 1);
assert_eq!(restore_store_1.get_vertex(16, 2, Some(1)).unwrap().unwrap().get_id(), 2);

// restore the latest(second) backup
let restore_path_2 = format!("store_test/{}/restore_2", test_dir);
backup_engine.restore_from_latest_backup(&restore_path_2).unwrap();
// test backup
let restore_store_2 = open_graph(&restore_path_2);
assert_eq!(restore_store_2.get_vertex(17, 1, Some(1)).unwrap().unwrap().get_id(), 1);
assert!(restore_store_2.get_vertex(18, 2, Some(1)).unwrap().is_none());

// purge one old backup
backup_engine.purge_old_backups(1).unwrap();
let backup_list = backup_engine.get_backup_list();
assert_eq!(backup_list.len(), 1);
assert_eq!(backup_list.get(0).unwrap(), backup_2_id);
}

fn open_graph(path: &str) -> GraphStore {
let mut builder = GraphConfigBuilder::new();
builder.set_storage_engine("rocksdb");
let config = builder.build();
GraphStore::open(&config, path).unwrap()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod data;
pub mod vertex;
pub mod edge;
pub mod graph;
pub mod backup;

13 changes: 12 additions & 1 deletion interactive_engine/executor/store/src/db/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db::api::GraphResult;
use crate::db::api::{GraphResult, BackupId};

pub mod rocksdb;
use self::rocksdb::RocksDBIter;
Expand All @@ -13,6 +13,17 @@ pub trait ExternalStorage {
fn scan_range(&self, start: &[u8], end: &[u8]) -> GraphResult<StorageIter>;
fn delete_range(&self, start: &[u8], end: &[u8]) -> GraphResult<()>;
fn load(&self, files: &[&str]) -> GraphResult<()>;
fn open_backup_engine(&self, backup_path: &str) -> GraphResult<Box<dyn ExternalStorageBackup>>;
}

pub trait ExternalStorageBackup {
fn create_new_backup(&mut self) -> GraphResult<BackupId>;
fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()>;
fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()>;
fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()>;
fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()>;
fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()>;
fn get_backup_list(&self) -> Vec<BackupId>;
}

pub enum StorageRes {
Expand Down
97 changes: 90 additions & 7 deletions interactive_engine/executor/store/src/db/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use ::rocksdb::{DB, Options, ReadOptions, DBRawIterator, IngestExternalFileOptions};
use ::rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

use crate::db::api::*;
use super::{StorageIter, StorageRes, ExternalStorage};
use super::{StorageIter, StorageRes, ExternalStorage, ExternalStorageBackup};

pub struct RocksDB {
db: DB,
db: Arc<DB>,
}

pub struct RocksDBBackupEngine {
db: Arc<DB>,
backup_engine: BackupEngine,
}

impl RocksDB {
Expand All @@ -17,7 +23,7 @@ impl RocksDB {
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg, open, options, path)
})?;
let ret = RocksDB {
db,
db: Arc::new(db),
};
Ok(ret)
}
Expand Down Expand Up @@ -88,12 +94,89 @@ impl ExternalStorage for RocksDB {
fn load(&self, files: &[&str]) -> GraphResult<()> {
let mut options = IngestExternalFileOptions::default();
options.set_move_files(true);
let paths : Vec<&Path> = files.to_vec().into_iter().map(|f| Path::new(f)).collect();
self.db.ingest_external_file_opts(&options, paths).map_err(|e| {
self.db.ingest_external_file_opts(&options, files.to_vec()).map_err(|e| {
let msg = format!("rocksdb.ingest_sst failed because {}", e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})
}

fn open_backup_engine(&self, backup_path: &str) -> GraphResult<Box<dyn ExternalStorageBackup>> {
let backup_opts = BackupEngineOptions::default();
let backup_engine = BackupEngine::open(&backup_opts, backup_path).map_err(|e| {
let msg = format!("open rocksdb backup engine at {} failed, because {}", backup_path.to_string(), e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
let ret = RocksDBBackupEngine {
db: self.db.clone(),
backup_engine,
};
Ok(Box::from(ret))
}
}

impl ExternalStorageBackup for RocksDBBackupEngine {
/// Optimize this method after a new rust-rocksdb version.
fn create_new_backup(&mut self) -> GraphResult<BackupId> {
let before = self.get_backup_list();
self.backup_engine.create_new_backup(&self.db).map_err(|e| {
let msg = format!("create new rocksdb backup failed, because {}", e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
let after = self.get_backup_list();
if after.len() != before.len() + 1 {
let msg = "get new created rocksdb backup id failed".to_string();
return Err(gen_graph_err!(GraphErrorCode::ExternalStorageError, msg));
}
let new_backup_id = *after.iter().max().unwrap();
Ok(new_backup_id)
}

/// Do nothing now.
/// Implement this method after a new rust-rocksdb version.
#[allow(unused_variables)]
fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()> {
Ok(())
}

fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()> {
self.backup_engine.purge_old_backups(num_backups_to_keep).map_err(|e| {
let msg = format!("purge old rocksdb backups failed, because {}", e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
Ok(())
}

fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()> {
let mut restore_option = RestoreOptions::default();
restore_option.set_keep_log_files(false);
self.backup_engine.restore_from_backup(restore_path, restore_path, &restore_option, backup_id as u32).map_err(|e| {
let msg = format!("restore from rocksdb backup {} failed, because {}", backup_id, e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
Ok(())
}

fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()> {
let mut restore_option = RestoreOptions::default();
restore_option.set_keep_log_files(false);
self.backup_engine.restore_from_latest_backup(restore_path, restore_path, &restore_option).map_err(|e| {
let msg = format!("restore from latest rocksdb backup failed, because {}", e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
Ok(())
}

fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()> {
self.backup_engine.verify_backup(backup_id as u32).map_err(|e| {
let msg = format!("rocksdb backup {} verify failed, because {}", backup_id, e.into_string());
gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)
})?;
Ok(())
}

fn get_backup_list(&self) -> Vec<BackupId> {
self.backup_engine.get_backup_info().into_iter().map(|info| info.backup_id as BackupId).collect()
}
}

#[allow(unused_variables)]
Expand Down Expand Up @@ -143,7 +226,7 @@ impl<'a> RocksDBIter<'a> {

fn bytes_upper_bound(bytes: &[u8]) -> Option<Vec<u8>> {
for i in (0..bytes.len()).rev() {
if bytes[i] != u8::max_value() {
if bytes[i] != u8::MAX {
let mut ret = bytes.to_vec();
ret[i] += 1;
for j in i+1..bytes.len() {
Expand Down

0 comments on commit d970e88

Please sign in to comment.