Skip to content

Commit

Permalink
add recover rollup command (#594)
Browse files Browse the repository at this point in the history
* add recover rollup command

* combine ut to increase test stability
  • Loading branch information
jingchen2222 committed Jul 31, 2023
1 parent dddbf4c commit 518b627
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 57 deletions.
104 changes: 96 additions & 8 deletions src/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//

use crate::indexer_impl::IndexerNodeImpl;
use crate::recover::{Recover, RecoverConfig};
use crate::recover::{Recover, RecoverConfig, RecoverType};
use crate::rollup_executor::RollupExecutorConfig;
use crate::storage_node_light_impl::{StorageNodeV2Config, StorageNodeV2Impl};
use crate::system_impl::SystemImpl;
Expand All @@ -35,7 +35,7 @@ use db3_storage::db_store_v2::{DBStoreV2, DBStoreV2Config};
use db3_storage::doc_store::DocStoreConfig;
use db3_storage::key_store::KeyStore;
use db3_storage::key_store::KeyStoreConfig;
use db3_storage::mutation_store::MutationStoreConfig;
use db3_storage::mutation_store::{MutationStore, MutationStoreConfig};
use db3_storage::state_store::{StateStore, StateStoreConfig};
use db3_storage::system_store::{SystemRole, SystemStore, SystemStoreConfig};
use ethers::prelude::LocalWallet;
Expand Down Expand Up @@ -183,6 +183,35 @@ pub enum RecoverCommand {
verbose: bool,
},
// TODO: support recover rollup
#[clap(name = "rollup")]
Rollup {
/// The database path for mutation
#[clap(long, default_value = "./mutation_db")]
mutation_db_path: String,
/// The database path for state
#[clap(long, default_value = "./state_db")]
state_db_path: String,
/// The database path for doc db
#[clap(long, default_value = "./doc_db")]
doc_db_path: String,
#[clap(short, long, default_value = "./rollup_meta_db")]
meta_db_path: String,
#[clap(short, long, default_value = "./keys")]
key_root_path: String,
#[clap(short, long, default_value = "./recover_rollup_temp")]
recover_temp_path: String,
#[clap(
short,
long,
default_value = "0x0000000000000000000000000000000000000000"
)]
admin_addr: String,
/// this is just for upgrade the node
#[clap(long, default_value = "100000")]
doc_id_start: i64,
#[clap(short, long)]
verbose: bool,
},
}
impl DB3Command {
fn build_wallet(key_root_path: &str) -> std::result::Result<LocalWallet, DB3Error> {
Expand Down Expand Up @@ -382,6 +411,41 @@ impl DB3Command {
info!("exit standalone indexer")
}
DB3Command::Recover { cmd } => match cmd {
RecoverCommand::Rollup {
mutation_db_path,
state_db_path,
doc_db_path,
meta_db_path,
key_root_path,
recover_temp_path,
admin_addr,
doc_id_start,
verbose,
} => {
let log_level = if verbose {
LevelFilter::DEBUG
} else {
LevelFilter::INFO
};

tracing_subscriber::fmt().with_max_level(log_level).init();
info!("{ABOUT}");
let recover = Self::create_recover(
mutation_db_path,
meta_db_path,
state_db_path,
doc_db_path,
key_root_path,
recover_temp_path,
admin_addr,
doc_id_start,
RecoverType::Rollup,
)
.await;
info!("start recovering index node");
recover.recover_stat().unwrap();
recover.recover_from_ar().await.unwrap();
}
RecoverCommand::Index {
meta_db_path,
state_db_path,
Expand All @@ -401,14 +465,15 @@ impl DB3Command {
tracing_subscriber::fmt().with_max_level(log_level).init();
info!("{ABOUT}");
let recover = Self::create_recover(
"".to_string(),
meta_db_path,
state_db_path,
doc_db_path,
key_root_path,
recover_temp_path,
admin_addr,
doc_id_start,
SystemRole::DataIndexNode,
RecoverType::Index,
)
.await;
info!("start recovering index node");
Expand All @@ -418,14 +483,15 @@ impl DB3Command {
}
}
async fn create_recover(
mutation_db_path: String,
meta_db_path: String,
state_db_path: String,
doc_db_path: String,
key_root_path: String,
recover_temp_path: String,
_admin_addr: String,
doc_id_start: i64,
role: SystemRole,
recover_type: RecoverType,
) -> Recover {
let system_store_config = SystemStoreConfig {
key_root_path: key_root_path.to_string(),
Expand All @@ -445,6 +511,10 @@ impl DB3Command {
in_memory_db_handle_limit: 16,
};

let enable_doc_store = match recover_type {
RecoverType::Index => true,
RecoverType::Rollup => false,
};
let db_store_config = DBStoreV2Config {
db_path: meta_db_path.to_string(),
db_store_cf_name: "db_store_cf".to_string(),
Expand All @@ -454,20 +524,38 @@ impl DB3Command {
doc_owner_store_cf_name: "doc_owner_store_cf".to_string(),
db_owner_store_cf_name: "db_owner_cf".to_string(),
scan_max_limit: 1000,
enable_doc_store: true,
enable_doc_store,
doc_store_conf,
doc_start_id: doc_id_start,
};

let db_store = DBStoreV2::new(db_store_config.clone()).unwrap();

let storage = match recover_type {
RecoverType::Rollup => {
let mutation_store_config = MutationStoreConfig {
db_path: mutation_db_path.to_string(),
block_store_cf_name: "block_store_cf".to_string(),
tx_store_cf_name: "tx_store_cf".to_string(),
rollup_store_cf_name: "rollup_store_cf".to_string(),
gc_cf_name: "gc_store_cf".to_string(),
message_max_buffer: 4 * 1024,
scan_max_limit: 50,
block_state_cf_name: "block_state_cf".to_string(),
};
let store = MutationStore::new(mutation_store_config).unwrap();
Some(Arc::new(store))
}
RecoverType::Index => None,
};

std::fs::create_dir_all(recover_temp_path.as_str()).unwrap();
let recover_config = RecoverConfig {
key_root_path: key_root_path.to_string(),
temp_data_path: recover_temp_path.to_string(),
enable_mutation_recover: false,
role,
recover_type,
};
Recover::new(recover_config, db_store, system_store)
Recover::new(recover_config, db_store, system_store, storage)
.await
.unwrap()
}
Expand Down
9 changes: 4 additions & 5 deletions src/node/src/node_test_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#[cfg(test)]
pub mod tests {
use crate::recover::{Recover, RecoverConfig};
use crate::recover::{Recover, RecoverConfig, RecoverType};
use crate::rollup_executor::{RollupExecutor, RollupExecutorConfig};
use db3_crypto::db3_address::DB3Address;
use db3_error::Result;
Expand Down Expand Up @@ -106,16 +106,14 @@ pub mod tests {
let recover_index_config = RecoverConfig {
key_root_path: key_root_path.to_string(),
temp_data_path: format!("{real_path}/recover_index_temp_data"),
enable_mutation_recover: true,
role: SystemRole::DataIndexNode,
recover_type: RecoverType::Index,
};
if let Err(_e) = std::fs::create_dir_all(recover_index_config.temp_data_path.as_str()) {
}
let recover_rollup_config = RecoverConfig {
key_root_path: key_root_path.to_string(),
temp_data_path: format!("{real_path}/recover_rollup_temp_data"),
enable_mutation_recover: true,
role: SystemRole::DataRollupNode,
recover_type: RecoverType::Rollup,
};
if let Err(_e) = std::fs::create_dir_all(recover_rollup_config.temp_data_path.as_str())
{
Expand Down Expand Up @@ -193,6 +191,7 @@ pub mod tests {
recover_rollup_config,
db_store.clone(),
system_store.clone(),
None,
)
.await?;
Ok((rollup_executor, rollup_recover))
Expand Down
73 changes: 50 additions & 23 deletions src/node/src/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,38 @@

use crate::ar_toolbox::ArToolBox;
use crate::mutation_utils::MutationUtil;
use bytes::BytesMut;
use db3_crypto::id::TxId;
use db3_error::{DB3Error, Result};
use db3_proto::db3_mutation_v2_proto::MutationAction;
use db3_proto::db3_mutation_v2_proto::{MutationAction, MutationBody, MutationHeader};
use db3_storage::ar_fs::{ArFileSystem, ArFileSystemConfig};
use db3_storage::db_store_v2::DBStoreV2;
use db3_storage::meta_store_client::MetaStoreClient;
use db3_storage::mutation_store::MutationStore;
use db3_storage::system_store::{SystemRole, SystemStore};
use ethers::prelude::Signer;
use prost::Message;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, info};

#[derive(Clone)]
pub enum RecoverType {
Index,
Rollup,
}
#[derive(Clone)]
pub struct RecoverConfig {
pub key_root_path: String,
pub temp_data_path: String,
pub enable_mutation_recover: bool,
pub role: SystemRole,
pub recover_type: RecoverType,
}
pub struct Recover {
pub config: RecoverConfig,
pub ar_toolbox: Arc<ArToolBox>,
pub meta_store: Arc<MetaStoreClient>,
pub db_store: Arc<DBStoreV2>,
pub storage: Option<Arc<MutationStore>>,
network_id: Arc<AtomicU64>,
}

Expand All @@ -48,8 +57,13 @@ impl Recover {
config: RecoverConfig,
db_store: DBStoreV2,
system_store: Arc<SystemStore>,
storage: Option<Arc<MutationStore>>,
) -> Result<Self> {
let system_config = match system_store.get_config(&config.role) {
let role = match config.recover_type {
RecoverType::Index => SystemRole::DataIndexNode,
RecoverType::Rollup => SystemRole::DataRollupNode,
};
let system_config = match system_store.get_config(&role) {
Ok(Some(system_config)) => system_config,
Ok(None) => {
return Err(DB3Error::StoreEventError(
Expand Down Expand Up @@ -87,6 +101,7 @@ impl Recover {
ar_toolbox,
meta_store,
db_store: Arc::new(db_store),
storage,
network_id,
})
}
Expand All @@ -95,6 +110,14 @@ impl Recover {
Ok(())
}

pub fn recover_stat(&self) -> Result<()> {
self.db_store.recover_db_state()?;
if let Some(s) = &self.storage {
s.recover()?;
}
Ok(())
}

pub async fn recover_from_ar(&self) -> Result<()> {
info!("start recover from arweave");
let last_block = self.db_store.recover_block_state()?;
Expand Down Expand Up @@ -134,6 +157,13 @@ impl Recover {
Ok(from_block)
}

pub fn is_recover_rollup(&self) -> bool {
match self.config.recover_type {
RecoverType::Rollup => true,
_ => false,
}
}

/// recover from arweave tx
async fn recover_from_arweave_tx(&self, tx: &str, version: Option<String>) -> Result<()> {
debug!("recover_from_arweave_tx: {}, version {:?}", tx, version);
Expand All @@ -160,6 +190,22 @@ impl Recover {
order.clone(),
&doc_ids_map,
)?;

if self.is_recover_rollup() {
if let Some(s) = &self.storage {
s.update_mutation_stat(
&body.payload,
body.signature.as_str(),
doc_ids.as_str(),
&address,
nonce,
*block,
*order,
self.network_id.load(Ordering::Relaxed),
action,
)?;
}
}
}
}

Expand Down Expand Up @@ -206,25 +252,6 @@ mod tests {
use std::thread::sleep;
use tempdir::TempDir;

#[tokio::test]
async fn test_get_latest_arweave_tx() {
sleep(std::time::Duration::from_secs(1));
let tmp_dir_path = TempDir::new("test_get_latest_arweave_tx").expect("create temp dir");
match NodeTestBase::setup_for_smoke_test(&tmp_dir_path).await {
Ok((rollup_executor, recover)) => {
let result = rollup_executor.process().await;
assert_eq!(true, result.is_ok(), "{:?}", result);
let result = recover.get_latest_arweave_tx().await;
assert_eq!(true, result.is_ok(), "{:?}", result);
let tx = result.unwrap();
assert!(!tx.is_empty());
}
Err(e) => {
assert!(false, "{e}");
}
}
}

#[tokio::test]
async fn test_fetch_arware_tx_from_block() {
sleep(std::time::Duration::from_secs(3));
Expand Down

0 comments on commit 518b627

Please sign in to comment.