From d970e885d2551bae6a478c26e7f788ea25c092ae Mon Sep 17 00:00:00 2001 From: Zichao Zhang Date: Mon, 25 Oct 2021 10:07:51 +0800 Subject: [PATCH] Add backup module of graph store (#901) --- interactive_engine/executor/store/Cargo.toml | 8 +- .../executor/store/src/db/api/graph.rs | 49 +++++++++- .../executor/store/src/db/api/mod.rs | 1 + .../executor/store/src/db/graph/store.rs | 56 ++++++++++- .../store/src/db/graph/tests/backup.rs | 59 +++++++++++ .../executor/store/src/db/graph/tests/mod.rs | 1 + .../executor/store/src/db/storage/mod.rs | 13 ++- .../executor/store/src/db/storage/rocksdb.rs | 97 +++++++++++++++++-- 8 files changed, 272 insertions(+), 12 deletions(-) create mode 100644 interactive_engine/executor/store/src/db/graph/tests/backup.rs diff --git a/interactive_engine/executor/store/Cargo.toml b/interactive_engine/executor/store/Cargo.toml index da8516726bf9..f4d840fd0e7a 100644 --- a/interactive_engine/executor/store/Cargo.toml +++ b/interactive_engine/executor/store/Cargo.toml @@ -21,9 +21,15 @@ rust-ini = "0.13" libc = "0.2" log4rs = "0.8.0" grpcio = "=0.4.1" -rocksdb = "0.17.0" +#rocksdb = "0.17.0" maxgraph-common = { path = "../../rust-common" } +# For temporal use to support restoring from a specified backup. +# Change to a stable version of rust-rocksdb later. +[dependencies.rocksdb] +git = "https://github.com/GoldenLeaves/rust-rocksdb.git" +rev = "082dfcfbb4b51e61d1c01672951b84cb464601e2" + [build-dependencies] protoc-grpcio = "0.3.0" diff --git a/interactive_engine/executor/store/src/db/api/graph.rs b/interactive_engine/executor/store/src/db/api/graph.rs index 1b5f0bf7588a..5d97f9a08ed1 100644 --- a/interactive_engine/executor/store/src/db/api/graph.rs +++ b/interactive_engine/executor/store/src/db/api/graph.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::marker::PhantomData; use super::schema::*; -use super::{VertexId, SnapshotId, LabelId, GraphResult, EdgeId, +use super::{VertexId, SnapshotId, LabelId, BackupId, GraphResult, EdgeId, Vertex, Edge, EdgeKind, PropId, PropertyMap, Condition, PropertiesRef, ValueRef}; use crate::db::api::DataLoadTarget; @@ -163,6 +163,53 @@ pub trait GraphStorage { fn prepare_data_load(&self, si: SnapshotId, schema_version: i64, target: &DataLoadTarget, table_id: i64) -> GraphResult; fn commit_data_load(&self, si: SnapshotId, schema_version: i64, target: &DataLoadTarget, table_id: i64) -> GraphResult; + + /// Open a backup engine of graph storage that implements GraphBackup trait. + fn open_backup_engine(&self, backup_path: &str) -> GraphResult>; +} + +pub trait GraphBackup { + /// Create a new backup of graph store. This interface is thread safe. + /// + /// Returns the new created backup id if successful, `GraphError` otherwise. + fn create_new_backup(&mut self) -> GraphResult; + + /// Delete a backup of `backup_id`. This interface is thread safe. + /// + /// If `backup_id` is not available, something error when deleting or other errors, + /// `GraphError` will be returned. + fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()>; + + /// Purge old backups from all existed backups, keep latest `num_backups_to_keep` backups. + /// This interface is thread safe. + /// + /// If `num_backups_to_keep` is illegal, something error when purging or other errors, + /// `GraphError` will be returned. + fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()>; + + /// Restore the graph store from `backup_id` at `restore_path`. This interface is thread safe. + /// + /// If `restore_path` is not available,`backup_id` is not available, something error when + /// restoring or other errors, `GraphError` will be returned. + fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()>; + + /// Restore the graph store from the latest backup at `restore_path`. + /// This interface is thread safe. + /// + /// If `restore_path` is not available,no backup available, something error when restoring or + /// other errors, `GraphError` will be returned. + fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()>; + + /// Verify the backup of `backup_id`. This interface is thread safe. + /// + /// If `backup_id` is not available, backup files are broken, backup checksum mismatch or + /// other errors, `GraphError` will be returned. + fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()>; + + /// Get the current available backup id list. This interface is thread safe. + /// + /// Returns the available backup id vector(may be empty)。 + fn get_backup_list(&self) -> Vec; } pub trait VertexResultIter { diff --git a/interactive_engine/executor/store/src/db/api/mod.rs b/interactive_engine/executor/store/src/db/api/mod.rs index 8b73616a571a..aa4b4138f90f 100644 --- a/interactive_engine/executor/store/src/db/api/mod.rs +++ b/interactive_engine/executor/store/src/db/api/mod.rs @@ -23,6 +23,7 @@ pub type SnapshotId = i64; pub type VertexId = i64; pub type LabelId = i32; pub type PropId = i32; +pub type BackupId = i32; pub type GraphResult = Result; pub const MAX_SI: SnapshotId = SnapshotId::max_value() - 1; diff --git a/interactive_engine/executor/store/src/db/graph/store.rs b/interactive_engine/executor/store/src/db/graph/store.rs index 95694704ffdc..13f7a89770d5 100644 --- a/interactive_engine/executor/store/src/db/graph/store.rs +++ b/interactive_engine/executor/store/src/db/graph/store.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicIsize, Ordering}; use crate::db::api::*; -use crate::db::storage::ExternalStorage; -use crate::db::storage::rocksdb::RocksDB; +use crate::db::storage::{ExternalStorage, ExternalStorageBackup}; +use crate::db::storage::rocksdb::{RocksDB}; use crate::db::util::lock::GraphMutexLock; use super::vertex::*; use super::edge::*; @@ -28,6 +28,10 @@ pub struct GraphStore { lock: GraphMutexLock<()>, } +pub struct GraphBackupEngine { + engine: Box, +} + impl GraphStorage for GraphStore { type V = VertexImpl; type E = EdgeImpl; @@ -328,6 +332,44 @@ impl GraphStorage for GraphStore { } Ok(true) } + + fn open_backup_engine(&self, backup_path: &str) -> GraphResult> { + let engine = res_unwrap!(self.storage.open_backup_engine(backup_path), open_backup_engine, backup_path)?; + let ret = GraphBackupEngine { + engine + }; + Ok(Box::from(ret)) + } +} + +impl GraphBackup for GraphBackupEngine { + fn create_new_backup(&mut self) -> GraphResult { + self.engine.create_new_backup() + } + + fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()> { + self.engine.delete_backup(backup_id) + } + + fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()> { + self.engine.purge_old_backups(num_backups_to_keep) + } + + fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()> { + self.engine.restore_from_backup(restore_path, backup_id) + } + + fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()> { + self.engine.restore_from_latest_backup(restore_path) + } + + fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()> { + self.engine.verify_backup(backup_id) + } + + fn get_backup_list(&self) -> Vec { + self.engine.get_backup_list() + } } impl GraphStore { @@ -634,6 +676,16 @@ mod tests { do_test(path, |graph| tests::graph::test_si_guard(graph)); } + #[test] + fn test_backup_engine() { + let test_dir = "test_backup_engine"; + fs::rmr(&test_dir).unwrap(); + let store_path = format!("store_test/{}/store", test_dir); + let graph = create_empty_graph(&store_path); + tests::backup::test_backup_engine(graph, test_dir); + fs::rmr(&test_dir).unwrap(); + } + fn do_test(path: &str, func: F) { let path = format!("store_test/{}", path); fs::rmr(&path).unwrap(); diff --git a/interactive_engine/executor/store/src/db/graph/tests/backup.rs b/interactive_engine/executor/store/src/db/graph/tests/backup.rs new file mode 100644 index 000000000000..ba79f48b574b --- /dev/null +++ b/interactive_engine/executor/store/src/db/graph/tests/backup.rs @@ -0,0 +1,59 @@ +use crate::db::api::*; +use crate::db::graph::store::GraphStore; +use super::types; +use std::collections::HashMap; + +pub fn test_backup_engine(graph: G, test_dir: &str) { + let backup_path = format!("store_test/{}/backup", test_dir); + let mut backup_engine = graph.open_backup_engine(&backup_path).unwrap(); + + // insert data + let mut schema_version = 1; + graph.create_vertex_type(10, schema_version, 1, &types::create_test_type_def(1)).unwrap(); + let properties: HashMap = HashMap::new(); + graph.insert_overwrite_vertex(11, 1, 1, &properties).unwrap(); + graph.insert_overwrite_vertex(12, 2, 1, &properties).unwrap(); + + // create the first backup + let backup_1_id = backup_engine.create_new_backup().unwrap(); + // delete vertex id '2' and create the second backup + graph.delete_vertex(13, 2, 1).unwrap(); + let backup_2_id = backup_engine.create_new_backup().unwrap(); + assert!(backup_2_id > backup_1_id); + + // verify backups + let backup_list = backup_engine.get_backup_list(); + assert_eq!(backup_list.len(), 2); + backup_list.iter().for_each(|i| { + assert!(backup_engine.verify_backup(*i).is_ok()); + }); + + // restore the first backup + let restore_path_1 = format!("store_test/{}/restore_1", test_dir); + backup_engine.restore_from_backup(&restore_path_1, backup_1_id).unwrap(); + // test backup + let restore_store_1 = open_graph(&restore_path_1); + assert_eq!(restore_store_1.get_vertex(15, 1, Some(1)).unwrap().unwrap().get_id(), 1); + assert_eq!(restore_store_1.get_vertex(16, 2, Some(1)).unwrap().unwrap().get_id(), 2); + + // restore the latest(second) backup + let restore_path_2 = format!("store_test/{}/restore_2", test_dir); + backup_engine.restore_from_latest_backup(&restore_path_2).unwrap(); + // test backup + let restore_store_2 = open_graph(&restore_path_2); + assert_eq!(restore_store_2.get_vertex(17, 1, Some(1)).unwrap().unwrap().get_id(), 1); + assert!(restore_store_2.get_vertex(18, 2, Some(1)).unwrap().is_none()); + + // purge one old backup + backup_engine.purge_old_backups(1).unwrap(); + let backup_list = backup_engine.get_backup_list(); + assert_eq!(backup_list.len(), 1); + assert_eq!(backup_list.get(0).unwrap(), backup_2_id); +} + +fn open_graph(path: &str) -> GraphStore { + let mut builder = GraphConfigBuilder::new(); + builder.set_storage_engine("rocksdb"); + let config = builder.build(); + GraphStore::open(&config, path).unwrap() +} \ No newline at end of file diff --git a/interactive_engine/executor/store/src/db/graph/tests/mod.rs b/interactive_engine/executor/store/src/db/graph/tests/mod.rs index d7ba5275c6a1..98a4354170c1 100644 --- a/interactive_engine/executor/store/src/db/graph/tests/mod.rs +++ b/interactive_engine/executor/store/src/db/graph/tests/mod.rs @@ -4,4 +4,5 @@ pub mod data; pub mod vertex; pub mod edge; pub mod graph; +pub mod backup; diff --git a/interactive_engine/executor/store/src/db/storage/mod.rs b/interactive_engine/executor/store/src/db/storage/mod.rs index 03fae039f9cf..0eb421d33404 100644 --- a/interactive_engine/executor/store/src/db/storage/mod.rs +++ b/interactive_engine/executor/store/src/db/storage/mod.rs @@ -1,4 +1,4 @@ -use crate::db::api::GraphResult; +use crate::db::api::{GraphResult, BackupId}; pub mod rocksdb; use self::rocksdb::RocksDBIter; @@ -13,6 +13,17 @@ pub trait ExternalStorage { fn scan_range(&self, start: &[u8], end: &[u8]) -> GraphResult; fn delete_range(&self, start: &[u8], end: &[u8]) -> GraphResult<()>; fn load(&self, files: &[&str]) -> GraphResult<()>; + fn open_backup_engine(&self, backup_path: &str) -> GraphResult>; +} + +pub trait ExternalStorageBackup { + fn create_new_backup(&mut self) -> GraphResult; + fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()>; + fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()>; + fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()>; + fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()>; + fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()>; + fn get_backup_list(&self) -> Vec; } pub enum StorageRes { diff --git a/interactive_engine/executor/store/src/db/storage/rocksdb.rs b/interactive_engine/executor/store/src/db/storage/rocksdb.rs index 12c171fd0c9a..990b6afaa0ae 100644 --- a/interactive_engine/executor/store/src/db/storage/rocksdb.rs +++ b/interactive_engine/executor/store/src/db/storage/rocksdb.rs @@ -1,12 +1,18 @@ use ::rocksdb::{DB, Options, ReadOptions, DBRawIterator, IngestExternalFileOptions}; +use ::rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions}; use std::collections::HashMap; -use std::path::Path; +use std::sync::Arc; use crate::db::api::*; -use super::{StorageIter, StorageRes, ExternalStorage}; +use super::{StorageIter, StorageRes, ExternalStorage, ExternalStorageBackup}; pub struct RocksDB { - db: DB, + db: Arc, +} + +pub struct RocksDBBackupEngine { + db: Arc, + backup_engine: BackupEngine, } impl RocksDB { @@ -17,7 +23,7 @@ impl RocksDB { gen_graph_err!(GraphErrorCode::ExternalStorageError, msg, open, options, path) })?; let ret = RocksDB { - db, + db: Arc::new(db), }; Ok(ret) } @@ -88,12 +94,89 @@ impl ExternalStorage for RocksDB { fn load(&self, files: &[&str]) -> GraphResult<()> { let mut options = IngestExternalFileOptions::default(); options.set_move_files(true); - let paths : Vec<&Path> = files.to_vec().into_iter().map(|f| Path::new(f)).collect(); - self.db.ingest_external_file_opts(&options, paths).map_err(|e| { + self.db.ingest_external_file_opts(&options, files.to_vec()).map_err(|e| { let msg = format!("rocksdb.ingest_sst failed because {}", e.into_string()); gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) }) } + + fn open_backup_engine(&self, backup_path: &str) -> GraphResult> { + let backup_opts = BackupEngineOptions::default(); + let backup_engine = BackupEngine::open(&backup_opts, backup_path).map_err(|e| { + let msg = format!("open rocksdb backup engine at {} failed, because {}", backup_path.to_string(), e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + let ret = RocksDBBackupEngine { + db: self.db.clone(), + backup_engine, + }; + Ok(Box::from(ret)) + } +} + +impl ExternalStorageBackup for RocksDBBackupEngine { + /// Optimize this method after a new rust-rocksdb version. + fn create_new_backup(&mut self) -> GraphResult { + let before = self.get_backup_list(); + self.backup_engine.create_new_backup(&self.db).map_err(|e| { + let msg = format!("create new rocksdb backup failed, because {}", e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + let after = self.get_backup_list(); + if after.len() != before.len() + 1 { + let msg = "get new created rocksdb backup id failed".to_string(); + return Err(gen_graph_err!(GraphErrorCode::ExternalStorageError, msg)); + } + let new_backup_id = *after.iter().max().unwrap(); + Ok(new_backup_id) + } + + /// Do nothing now. + /// Implement this method after a new rust-rocksdb version. + #[allow(unused_variables)] + fn delete_backup(&mut self, backup_id: BackupId) -> GraphResult<()> { + Ok(()) + } + + fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> GraphResult<()> { + self.backup_engine.purge_old_backups(num_backups_to_keep).map_err(|e| { + let msg = format!("purge old rocksdb backups failed, because {}", e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + Ok(()) + } + + fn restore_from_backup(&mut self, restore_path: &str, backup_id: BackupId) -> GraphResult<()> { + let mut restore_option = RestoreOptions::default(); + restore_option.set_keep_log_files(false); + self.backup_engine.restore_from_backup(restore_path, restore_path, &restore_option, backup_id as u32).map_err(|e| { + let msg = format!("restore from rocksdb backup {} failed, because {}", backup_id, e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + Ok(()) + } + + fn restore_from_latest_backup(&mut self, restore_path: &str) -> GraphResult<()> { + let mut restore_option = RestoreOptions::default(); + restore_option.set_keep_log_files(false); + self.backup_engine.restore_from_latest_backup(restore_path, restore_path, &restore_option).map_err(|e| { + let msg = format!("restore from latest rocksdb backup failed, because {}", e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + Ok(()) + } + + fn verify_backup(&self, backup_id: BackupId) -> GraphResult<()> { + self.backup_engine.verify_backup(backup_id as u32).map_err(|e| { + let msg = format!("rocksdb backup {} verify failed, because {}", backup_id, e.into_string()); + gen_graph_err!(GraphErrorCode::ExternalStorageError, msg) + })?; + Ok(()) + } + + fn get_backup_list(&self) -> Vec { + self.backup_engine.get_backup_info().into_iter().map(|info| info.backup_id as BackupId).collect() + } } #[allow(unused_variables)] @@ -143,7 +226,7 @@ impl<'a> RocksDBIter<'a> { fn bytes_upper_bound(bytes: &[u8]) -> Option> { for i in (0..bytes.len()).rev() { - if bytes[i] != u8::max_value() { + if bytes[i] != u8::MAX { let mut ret = bytes.to_vec(); ret[i] += 1; for j in i+1..bytes.len() {