diff --git a/Cargo.lock b/Cargo.lock index d82160db21..59115a2de8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1060,6 +1060,7 @@ dependencies = [ "ethkey", "futures 0.1.29", "futures 0.3.15", + "group 0.8.0", "gstuff", "hex 0.4.2", "http 0.2.7", diff --git a/mm2src/coins/Cargo.toml b/mm2src/coins/Cargo.toml index 25553f7a78..1ab063bf1f 100644 --- a/mm2src/coins/Cargo.toml +++ b/mm2src/coins/Cargo.toml @@ -41,6 +41,7 @@ ethkey = { git = "https://github.com/artemii235/parity-ethereum.git" } futures01 = { version = "0.1", package = "futures" } # using select macro requires the crate to be named futures, compilation failed with futures03 name futures = { version = "0.3", package = "futures", features = ["compat", "async-await"] } +group = "0.8.0" gstuff = { version = "0.7", features = ["nightly"] } hex = "0.4.2" http = "0.2" diff --git a/mm2src/coins/utxo/rpc_clients.rs b/mm2src/coins/utxo/rpc_clients.rs index 41a09e6272..66a23f36a3 100644 --- a/mm2src/coins/utxo/rpc_clients.rs +++ b/mm2src/coins/utxo/rpc_clients.rs @@ -511,14 +511,14 @@ pub enum EstimateFeeMethod { SmartFee, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(untagged)] pub enum BlockNonce { String(String), U64(u64), } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct VerboseBlock { /// Block hash pub hash: H256Json, @@ -1124,7 +1124,7 @@ impl NativeClientImpl { /// https://developer.bitcoin.org/reference/rpc/getblockheader.html pub fn get_block_header_bytes(&self, block_hash: H256Json) -> RpcRes { - let verbose = 0; + let verbose = false; rpc_func!(self, "getblockheader", block_hash, verbose) } } diff --git a/mm2src/coins/z_coin.rs b/mm2src/coins/z_coin.rs index 87d2b1dc5f..853e179c25 100644 --- a/mm2src/coins/z_coin.rs +++ b/mm2src/coins/z_coin.rs @@ -31,13 +31,13 @@ use futures::compat::Future01CompatExt; use futures::lock::Mutex as AsyncMutex; use futures::{FutureExt, TryFutureExt}; use futures01::Future; -use http::Uri; use keys::hash::H256; use keys::{KeyPair, Message, Public}; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; use mm2_number::{BigDecimal, MmNumber}; #[cfg(test)] use mocktopus::macros::*; +use parking_lot::Mutex; use primitives::bytes::Bytes; use rpc::v1::types::{Bytes as BytesJson, Transaction as RpcTransaction, H256 as H256Json}; use script::{Builder as ScriptBuilder, Opcode, Script, TransactionInputSigner}; @@ -45,7 +45,6 @@ use serde_json::Value as Json; use serialization::CoinVariant; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use zcash_client_backend::data_api::WalletRead; use zcash_client_backend::encoding::{decode_payment_address, encode_extended_spending_key, encode_payment_address}; @@ -70,9 +69,10 @@ use z_htlc::{z_p2sh_spend, z_send_dex_fee, z_send_htlc}; mod z_rpc; pub use z_rpc::SyncStatus; -use z_rpc::{init_light_client, SaplingSyncConnector, SaplingSyncGuard, WalletDbShared}; +use z_rpc::{init_light_client, init_native_client, SaplingSyncConnector, SaplingSyncGuard, WalletDbShared}; mod z_coin_errors; +use crate::z_coin::z_rpc::{create_wallet_db, BlockDb}; pub use z_coin_errors::*; #[cfg(all(test, feature = "zhtlc-native-tests"))] @@ -766,29 +766,39 @@ impl<'a> UtxoCoinWithIguanaPrivKeyBuilder for ZCoinBuilder<'a> { ); let evk = ExtendedFullViewingKey::from(&self.z_spending_key); + let cache_db_path = self.db_dir_path.join(format!("{}_cache.db", self.ticker)); + let wallet_db_path = self.db_dir_path.join(format!("{}_wallet.db", self.ticker)); + let blocks_db = + async_blocking(|| BlockDb::for_path(cache_db_path).map_to_mm(ZcoinClientInitError::BlocksDbInitFailure)) + .await?; + let wallet_db = create_wallet_db( + wallet_db_path, + self.protocol_info.consensus_params.clone(), + self.protocol_info.check_point_block.clone(), + evk, + ) + .await?; + let wallet_db = Arc::new(Mutex::new(wallet_db)); let (sync_state_connector, light_wallet_db) = match &self.z_coin_params.mode { ZcoinRpcMode::Native => { - return MmError::err(ZCoinBuildError::NativeModeIsNotSupportedYet); + let native_client = self.native_client()?; + init_native_client( + native_client, + blocks_db, + wallet_db, + self.protocol_info.consensus_params.clone(), + ) + .await? }, ZcoinRpcMode::Light { light_wallet_d_servers, .. } => { - let cache_db_path = self.db_dir_path.join(format!("{}_light_cache.db", self.ticker)); - let wallet_db_path = self.db_dir_path.join(format!("{}_light_wallet.db", self.ticker)); // TODO multi lightwalletd servers support will be added on the next iteration - let uri = Uri::from_str( - light_wallet_d_servers - .first() - .or_mm_err(|| ZCoinBuildError::EmptyLightwalletdUris)?, - )?; - init_light_client( - uri, - cache_db_path, - wallet_db_path, + light_wallet_d_servers.clone(), + blocks_db, + wallet_db, self.protocol_info.consensus_params.clone(), - self.protocol_info.check_point_block, - evk, ) .await? }, diff --git a/mm2src/coins/z_coin/z_coin_errors.rs b/mm2src/coins/z_coin/z_coin_errors.rs index add11fc1ba..f05106cfe8 100644 --- a/mm2src/coins/z_coin/z_coin_errors.rs +++ b/mm2src/coins/z_coin/z_coin_errors.rs @@ -3,6 +3,7 @@ use crate::utxo::rpc_clients::UtxoRpcError; use crate::utxo::utxo_builder::UtxoCoinBuildError; use crate::WithdrawError; use crate::{NumConversError, PrivKeyNotAllowed}; +use common::jsonrpc_client::JsonRpcError; use db_common::sqlite::rusqlite::Error as SqliteError; use db_common::sqlite::rusqlite::Error as SqlError; use derive_more::Display; @@ -19,6 +20,9 @@ pub enum UpdateBlocksCacheErr { GrpcError(tonic::Status), BlocksDbError(SqliteError), ZcashSqliteError(ZcashClientError), + UtxoRpcError(UtxoRpcError), + InternalError(String), + JsonRpcError(JsonRpcError), } impl From for UpdateBlocksCacheErr { @@ -33,18 +37,32 @@ impl From for UpdateBlocksCacheErr { fn from(err: ZcashClientError) -> Self { UpdateBlocksCacheErr::ZcashSqliteError(err) } } +impl From for UpdateBlocksCacheErr { + fn from(err: UtxoRpcError) -> Self { UpdateBlocksCacheErr::UtxoRpcError(err) } +} + +impl From for UpdateBlocksCacheErr { + fn from(err: JsonRpcError) -> Self { UpdateBlocksCacheErr::JsonRpcError(err) } +} + #[derive(Debug, Display)] #[non_exhaustive] -pub enum ZcoinLightClientInitError { +pub enum ZcoinClientInitError { TlsConfigFailure(tonic::transport::Error), ConnectionFailure(tonic::transport::Error), BlocksDbInitFailure(SqliteError), WalletDbInitFailure(SqliteError), ZcashSqliteError(ZcashClientError), + EmptyLightwalletdUris, + InvalidUri(InvalidUri), } -impl From for ZcoinLightClientInitError { - fn from(err: ZcashClientError) -> Self { ZcoinLightClientInitError::ZcashSqliteError(err) } +impl From for ZcoinClientInitError { + fn from(err: ZcashClientError) -> Self { ZcoinClientInitError::ZcashSqliteError(err) } +} + +impl From for ZcoinClientInitError { + fn from(err: InvalidUri) -> Self { ZcoinClientInitError::InvalidUri(err) } } #[derive(Debug, Display)] @@ -182,10 +200,7 @@ pub enum ZCoinBuildError { path: String, }, Io(std::io::Error), - EmptyLightwalletdUris, - NativeModeIsNotSupportedYet, - InvalidLightwalletdUri(InvalidUri), - LightClientInitErr(ZcoinLightClientInitError), + RpcClientInitErr(ZcoinClientInitError), ZCashParamsNotFound, } @@ -205,12 +220,8 @@ impl From for ZCoinBuildError { fn from(err: std::io::Error) -> ZCoinBuildError { ZCoinBuildError::Io(err) } } -impl From for ZCoinBuildError { - fn from(err: InvalidUri) -> Self { ZCoinBuildError::InvalidLightwalletdUri(err) } -} - -impl From for ZCoinBuildError { - fn from(err: ZcoinLightClientInitError) -> Self { ZCoinBuildError::LightClientInitErr(err) } +impl From for ZCoinBuildError { + fn from(err: ZcoinClientInitError) -> Self { ZCoinBuildError::RpcClientInitErr(err) } } pub(super) enum SqlTxHistoryError { diff --git a/mm2src/coins/z_coin/z_rpc.rs b/mm2src/coins/z_coin/z_rpc.rs index b0495cf6e3..0c8023b728 100644 --- a/mm2src/coins/z_coin/z_rpc.rs +++ b/mm2src/coins/z_coin/z_rpc.rs @@ -1,20 +1,24 @@ use super::{z_coin_errors::*, CheckPointBlockInfo, ZcoinConsensusParams}; -use crate::utxo::rpc_clients::NO_TX_ERROR_CODE; +use crate::utxo::rpc_clients::{NativeClient, UtxoRpcClientOps, NO_TX_ERROR_CODE}; +use async_trait::async_trait; use common::executor::Timer; use common::log::{debug, error, info, LogOnError}; -use common::{async_blocking, spawn_abortable, AbortOnDropHandle}; +use common::{async_blocking, spawn_abortable, AbortOnDropHandle, Future01CompatExt}; use db_common::sqlite::rusqlite::{params, Connection, Error as SqliteError, NO_PARAMS}; use db_common::sqlite::{query_single_row, run_optimization_pragmas}; use futures::channel::mpsc::{channel, Receiver as AsyncReceiver, Sender as AsyncSender}; use futures::channel::oneshot::{channel as oneshot_channel, Sender as OneshotSender}; use futures::lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use futures::StreamExt; +use group::GroupEncoding; use http::Uri; use mm2_err_handle::prelude::*; use parking_lot::Mutex; use prost::Message; use protobuf::Message as ProtobufMessage; +use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::str::FromStr; use std::sync::Arc; use tokio::task::block_in_place; use tonic::transport::{Channel, ClientTlsConfig}; @@ -33,8 +37,12 @@ use zcash_primitives::zip32::ExtendedFullViewingKey; mod z_coin_grpc { tonic::include_proto!("cash.z.wallet.sdk.rpc"); } +use crate::ZTransaction; +use rpc::v1::types::H256 as H256Json; use z_coin_grpc::compact_tx_streamer_client::CompactTxStreamerClient; -use z_coin_grpc::{BlockId, BlockRange, ChainSpec, TxFilter}; +use z_coin_grpc::{BlockId, BlockRange, ChainSpec, CompactBlock as TonicCompactBlock, + CompactOutput as TonicCompactOutput, CompactSpend as TonicCompactSpend, CompactTx as TonicCompactTx, + TxFilter}; pub type WalletDbShared = Arc>>; @@ -43,6 +51,187 @@ struct CompactBlockRow { data: Vec, } +pub type OnCompactBlockFn<'a> = + dyn FnMut(TonicCompactBlock) -> Result<(), MmError> + Send + Sync + 'a; + +#[async_trait] +pub trait ZRpcOps { + async fn get_block_height(&mut self) -> Result>; + + async fn scan_blocks( + &mut self, + start_block: u64, + last_block: u64, + on_block: &mut OnCompactBlockFn, + ) -> Result<(), MmError>; + + async fn check_tx_existence(&mut self, tx_id: TxId) -> bool; +} + +#[async_trait] +impl ZRpcOps for CompactTxStreamerClient { + async fn get_block_height(&mut self) -> Result> { + let request = tonic::Request::new(ChainSpec {}); + let block = self.get_latest_block(request).await?; + let res = block.into_inner().height; + Ok(res) + } + + async fn scan_blocks( + &mut self, + start_block: u64, + last_block: u64, + on_block: &mut OnCompactBlockFn, + ) -> Result<(), MmError> { + let request = tonic::Request::new(BlockRange { + start: Some(BlockId { + height: start_block, + hash: Vec::new(), + }), + end: Some(BlockId { + height: last_block, + hash: Vec::new(), + }), + }); + let mut response = self.get_block_range(request).await?; + while let Some(block) = response.get_mut().message().await? { + debug!("Got block {:?}", block); + on_block(block)?; + } + Ok(()) + } + + async fn check_tx_existence(&mut self, tx_id: TxId) -> bool { + let mut attempts = 0; + loop { + let filter = TxFilter { + block: None, + index: 0, + hash: tx_id.0.into(), + }; + let request = tonic::Request::new(filter); + match self.get_transaction(request).await { + Ok(_) => break, + Err(e) => { + error!("Error on getting tx {}", tx_id); + if e.message().contains(NO_TX_ERROR_CODE) { + if attempts >= 3 { + return false; + } + attempts += 1; + } + Timer::sleep(30.).await; + }, + } + } + true + } +} + +#[async_trait] +impl ZRpcOps for NativeClient { + async fn get_block_height(&mut self) -> Result> { + Ok(self.get_block_count().compat().await?) + } + + async fn scan_blocks( + &mut self, + start_block: u64, + last_block: u64, + on_block: &mut OnCompactBlockFn, + ) -> Result<(), MmError> { + for height in start_block..=last_block { + let block = self.get_block_by_height(height).await?; + debug!("Got block {:?}", block); + let mut compact_txs = Vec::new(); + // By default, CompactBlocks only contain CompactTxs for transactions that contain Sapling spends or outputs. + // Create and push compact_tx during iteration. + for (tx_id, hash_tx) in block.tx.iter().enumerate() { + let tx_bytes = self.get_transaction_bytes(hash_tx).compat().await?; + let tx = ZTransaction::read(tx_bytes.as_slice()).unwrap(); + let mut spends = Vec::new(); + let mut outputs = Vec::new(); + if !tx.shielded_spends.is_empty() || !tx.shielded_outputs.is_empty() { + // Create and push spends with outs for compact_tx during iterations. + for spend in &tx.shielded_spends { + let compact_spend = TonicCompactSpend { + nf: spend.nullifier.to_vec(), + }; + spends.push(compact_spend); + } + for out in &tx.shielded_outputs { + let compact_out = TonicCompactOutput { + cmu: out.cmu.to_bytes().to_vec(), + epk: out.ephemeral_key.to_bytes().to_vec(), + // https://zips.z.cash/zip-0307#output-compression + // The first 52 bytes of the ciphertext contain the contents and opening of the note commitment, + // which is all of the data needed to spend the note and to verify that the note is spendable. + ciphertext: out.enc_ciphertext[0..52].to_vec(), + }; + outputs.push(compact_out); + } + // Shadowing mut variables as immutable. No longer need to update them. + drop_mutability!(spends); + drop_mutability!(outputs); + let mut hash_tx_vec = hash_tx.0.to_vec(); + hash_tx_vec.reverse(); + + let compact_tx = TonicCompactTx { + index: tx_id as u64, + hash: hash_tx_vec, + fee: 0, + spends, + outputs, + }; + compact_txs.push(compact_tx); + } + } + let mut hash = block.hash.0.to_vec(); + hash.reverse(); + // Set 0 in vector in the case of genesis block. + let mut prev_hash = block.previousblockhash.unwrap_or_default().0.to_vec(); + prev_hash.reverse(); + // Shadowing mut variables as immutable. + drop_mutability!(hash); + drop_mutability!(prev_hash); + drop_mutability!(compact_txs); + + let compact_block = TonicCompactBlock { + proto_version: 0, + height, + hash, + prev_hash, + time: block.time, + // (hash, prevHash, and time) OR (full header) + header: Vec::new(), + vtx: compact_txs, + }; + on_block(compact_block)?; + } + Ok(()) + } + + async fn check_tx_existence(&mut self, tx_id: TxId) -> bool { + let mut attempts = 0; + loop { + match self.get_raw_transaction_bytes(&H256Json::from(tx_id.0)).compat().await { + Ok(_) => break, + Err(e) => { + error!("Error on getting tx {}", tx_id); + if e.to_string().contains(NO_TX_ERROR_CODE) { + if attempts >= 3 { + return false; + } + attempts += 1; + } + Timer::sleep(30.).await; + }, + } + } + true + } +} + /// A wrapper for the SQLite connection to the block cache database. pub struct BlockDb(Connection); @@ -108,7 +297,7 @@ impl BlockDb { Ok(query_single_row( &self.0, "SELECT height FROM compactblocks ORDER BY height DESC LIMIT 1", - db_common::sqlite::rusqlite::NO_PARAMS, + NO_PARAMS, |row| row.get(0), )? .unwrap_or(0)) @@ -143,25 +332,18 @@ impl BlockSource for BlockDb { } } -pub(super) async fn init_light_client( - lightwalletd_url: Uri, - cache_db_path: PathBuf, +pub async fn create_wallet_db( wallet_db_path: PathBuf, consensus_params: ZcoinConsensusParams, check_point_block: Option, evk: ExtendedFullViewingKey, -) -> Result<(AsyncMutex, WalletDbShared), MmError> { - let blocks_db = - async_blocking(|| BlockDb::for_path(cache_db_path).map_to_mm(ZcoinLightClientInitError::BlocksDbInitFailure)) - .await?; - - let wallet_db = async_blocking({ - let consensus_params = consensus_params.clone(); - move || -> Result<_, MmError> { +) -> Result, MmError> { + async_blocking({ + move || -> Result, MmError> { let db = WalletDb::for_path(wallet_db_path, consensus_params) - .map_to_mm(ZcoinLightClientInitError::WalletDbInitFailure)?; - run_optimization_pragmas(db.sql_conn()).map_to_mm(ZcoinLightClientInitError::WalletDbInitFailure)?; - init_wallet_db(&db).map_to_mm(ZcoinLightClientInitError::WalletDbInitFailure)?; + .map_to_mm(ZcoinClientInitError::WalletDbInitFailure)?; + run_optimization_pragmas(db.sql_conn()).map_to_mm(ZcoinClientInitError::WalletDbInitFailure)?; + init_wallet_db(&db).map_to_mm(ZcoinClientInitError::WalletDbInitFailure)?; if db.get_extended_full_viewing_keys()?.is_empty() { init_accounts_table(&db, &[evk])?; if let Some(check_point) = check_point_block { @@ -177,31 +359,68 @@ pub(super) async fn init_light_client( Ok(db) } }) - .await?; + .await +} + +pub(super) async fn init_light_client( + lightwalletd_urls: Vec, + blocks_db: BlockDb, + wallet_db: WalletDbShared, + consensus_params: ZcoinConsensusParams, +) -> Result<(AsyncMutex, WalletDbShared), MmError> { + let (sync_status_notifier, sync_watcher) = channel(1); + let (on_tx_gen_notifier, on_tx_gen_watcher) = channel(1); + + let lightwalletd_url = Uri::from_str( + lightwalletd_urls + .first() + .or_mm_err(|| ZcoinClientInitError::EmptyLightwalletdUris)?, + )?; + + let sync_handle = SaplingSyncLoopHandle { + current_block: BlockHeight::from_u32(0), + blocks_db: Mutex::new(blocks_db), + wallet_db: wallet_db.clone(), + consensus_params, + sync_status_notifier, + on_tx_gen_watcher, + watch_for_tx: None, + }; let tonic_channel = Channel::builder(lightwalletd_url) .tls_config(ClientTlsConfig::new()) - .map_to_mm(ZcoinLightClientInitError::TlsConfigFailure)? + .map_to_mm(ZcoinClientInitError::TlsConfigFailure)? .connect() .await - .map_to_mm(ZcoinLightClientInitError::ConnectionFailure)?; - let grpc_client = CompactTxStreamerClient::new(tonic_channel); + .map_to_mm(ZcoinClientInitError::ConnectionFailure)?; + let rpc_copy = CompactTxStreamerClient::new(tonic_channel); + let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(rpc_copy))); + Ok(( + SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle), + wallet_db, + )) +} + +pub(super) async fn init_native_client( + native_client: NativeClient, + blocks_db: BlockDb, + wallet_db: WalletDbShared, + consensus_params: ZcoinConsensusParams, +) -> Result<(AsyncMutex, WalletDbShared), MmError> { let (sync_status_notifier, sync_watcher) = channel(1); let (on_tx_gen_notifier, on_tx_gen_watcher) = channel(1); - let wallet_db = Arc::new(Mutex::new(wallet_db)); let sync_handle = SaplingSyncLoopHandle { current_block: BlockHeight::from_u32(0), - grpc_client, - blocks_db, + blocks_db: Mutex::new(blocks_db), wallet_db: wallet_db.clone(), consensus_params, sync_status_notifier, on_tx_gen_watcher, watch_for_tx: None, }; - let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle)); + let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(native_client))); Ok(( SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle), @@ -218,14 +437,14 @@ fn is_tx_imported(conn: &Connection, tx_id: TxId) -> bool { } pub struct SaplingSyncRespawnGuard { - pub(super) sync_handle: Option, + pub(super) sync_handle: Option<(SaplingSyncLoopHandle, Box)>, pub(super) abort_handle: Arc>, } impl Drop for SaplingSyncRespawnGuard { fn drop(&mut self) { - if let Some(handle) = self.sync_handle.take() { - *self.abort_handle.lock() = spawn_abortable(light_wallet_db_sync_loop(handle)); + if let Some((handle, rpc)) = self.sync_handle.take() { + *self.abort_handle.lock() = spawn_abortable(light_wallet_db_sync_loop(handle, rpc)); } } } @@ -233,12 +452,14 @@ impl Drop for SaplingSyncRespawnGuard { impl SaplingSyncRespawnGuard { pub(super) fn watch_for_tx(&mut self, tx_id: TxId) { if let Some(ref mut handle) = self.sync_handle { - handle.watch_for_tx = Some(tx_id); + handle.0.watch_for_tx = Some(tx_id); } } #[inline] - pub(super) fn current_block(&self) -> BlockHeight { self.sync_handle.as_ref().expect("always Some").current_block } + pub(super) fn current_block(&self) -> BlockHeight { + self.sync_handle.as_ref().expect("always Some").0.current_block + } } pub enum SyncStatus { @@ -258,15 +479,14 @@ pub enum SyncStatus { pub struct SaplingSyncLoopHandle { current_block: BlockHeight, - grpc_client: CompactTxStreamerClient, - blocks_db: BlockDb, + blocks_db: Mutex, wallet_db: WalletDbShared, consensus_params: ZcoinConsensusParams, /// Notifies about sync status without stopping the loop, e.g. on coin activation sync_status_notifier: AsyncSender, /// If new tx is required to be generated, we stop the sync and respawn it after tx is sent /// This watcher waits for such notification - on_tx_gen_watcher: AsyncReceiver>, + on_tx_gen_watcher: AsyncReceiver)>>, watch_for_tx: Option, } @@ -303,12 +523,13 @@ impl SaplingSyncLoopHandle { .debug_log_with_msg("No one seems interested in SyncStatus"); } - async fn update_blocks_cache(&mut self) -> Result<(), MmError> { - let request = tonic::Request::new(ChainSpec {}); - let current_blockchain_block = self.grpc_client.get_latest_block(request).await?; - let current_block_in_db = block_in_place(|| self.blocks_db.get_latest_block())?; + async fn update_blocks_cache( + &mut self, + rpc: &mut (dyn ZRpcOps + Send), + ) -> Result<(), MmError> { + let current_block = rpc.get_block_height().await?; + let current_block_in_db = block_in_place(|| self.blocks_db.lock().get_latest_block())?; let extrema = block_in_place(|| self.wallet_db.lock().block_height_extrema())?; - let mut from_block = self .consensus_params .sapling_activation_height @@ -317,30 +538,18 @@ impl SaplingSyncLoopHandle { if let Some((_, max_in_wallet)) = extrema { from_block = from_block.max(max_in_wallet.into()); } - - let current_block: u64 = current_blockchain_block.into_inner().height; - if current_block >= from_block { - let request = tonic::Request::new(BlockRange { - start: Some(BlockId { - height: from_block, - hash: Vec::new(), - }), - end: Some(BlockId { - height: current_block, - hash: Vec::new(), - }), - }); - - let mut response = self.grpc_client.get_block_range(request).await?; - - while let Some(block) = response.get_mut().message().await? { - debug!("Got block {:?}", block); - block_in_place(|| self.blocks_db.insert_block(block.height as u32, block.encode_to_vec()))?; + rpc.scan_blocks(from_block, current_block, &mut |block: TonicCompactBlock| { + block_in_place(|| { + self.blocks_db + .lock() + .insert_block(block.height as u32, block.encode_to_vec()) + })?; self.notify_blocks_cache_status(block.height, current_block); - } + Ok(()) + }) + .await?; } - self.current_block = BlockHeight::from_u32(current_block as u32); Ok(()) } @@ -355,7 +564,7 @@ impl SaplingSyncLoopHandle { if let Err(e) = validate_chain( &self.consensus_params, - &self.blocks_db, + self.blocks_db.lock().deref(), wallet_ops.get_max_height_hash()?, ) { match e { @@ -366,13 +575,13 @@ impl SaplingSyncLoopHandle { BlockHeight::from_u32(0) }; wallet_ops.rewind_to_height(rewind_height)?; - self.blocks_db.rewind_to_height(rewind_height.into())?; + self.blocks_db.lock().rewind_to_height(rewind_height.into())?; }, e => return MmError::err(e), } } - let current_block = BlockHeight::from_u32(self.blocks_db.get_latest_block()?); + let current_block = BlockHeight::from_u32(self.blocks_db.lock().get_latest_block()?); loop { match wallet_ops.block_height_extrema()? { Some((_, max_in_wallet)) => { @@ -384,35 +593,20 @@ impl SaplingSyncLoopHandle { }, None => self.notify_building_wallet_db(0, current_block.into()), } - scan_cached_blocks(&self.consensus_params, &self.blocks_db, &mut wallet_ops, Some(1000))?; + scan_cached_blocks( + &self.consensus_params, + self.blocks_db.lock().deref(), + &mut wallet_ops, + Some(1000), + )?; } Ok(()) } - async fn check_watch_for_tx_existence(&mut self) { + async fn check_watch_for_tx_existence(&mut self, rpc: &mut (dyn ZRpcOps + Send)) { if let Some(tx_id) = self.watch_for_tx { - let mut attempts = 0; - loop { - let filter = TxFilter { - block: None, - index: 0, - hash: tx_id.0.into(), - }; - let request = tonic::Request::new(filter); - match self.grpc_client.get_transaction(request).await { - Ok(_) => break, - Err(e) => { - error!("Error on getting tx {}", tx_id); - if e.message().contains(NO_TX_ERROR_CODE) { - if attempts >= 3 { - self.watch_for_tx = None; - return; - } - attempts += 1; - } - Timer::sleep(30.).await; - }, - } + if !rpc.check_tx_existence(tx_id).await { + self.watch_for_tx = None; } } } @@ -439,10 +633,10 @@ impl SaplingSyncLoopHandle { /// 6. Once the transaction is generated and sent, `SaplingSyncRespawnGuard::watch_for_tx` is called to update `SaplingSyncLoopHandle` state. /// 7. Once the loop is respawned, it will check that broadcast tx is imported (or not available anymore) before stopping in favor of /// next wait_for_gen_tx_blockchain_sync call. -async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle) { +async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle, mut client: Box) { // this loop is spawned as standalone task so it's safe to use block_in_place here loop { - if let Err(e) = sync_handle.update_blocks_cache().await { + if let Err(e) = sync_handle.update_blocks_cache(client.as_mut()).await { error!("Error {} on blocks cache update", e); sync_handle.notify_on_error(e.to_string()); Timer::sleep(10.).await; @@ -458,7 +652,7 @@ async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle) { sync_handle.notify_sync_finished(); - sync_handle.check_watch_for_tx_existence().await; + sync_handle.check_watch_for_tx_existence(client.as_mut()).await; if let Some(tx_id) = sync_handle.watch_for_tx { if !block_in_place(|| is_tx_imported(sync_handle.wallet_db.lock().sql_conn(), tx_id)) { @@ -470,10 +664,11 @@ async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle) { } if let Ok(Some(sender)) = sync_handle.on_tx_gen_watcher.try_next() { - match sender.send(sync_handle) { + match sender.send((sync_handle, client)) { Ok(_) => break, - Err(handle_from_channel) => { + Err((handle_from_channel, rpc_from_channel)) => { sync_handle = handle_from_channel; + client = rpc_from_channel; }, } } @@ -483,7 +678,7 @@ async fn light_wallet_db_sync_loop(mut sync_handle: SaplingSyncLoopHandle) { } type SyncWatcher = AsyncReceiver; -type NewTxNotifier = AsyncSender>; +type NewTxNotifier = AsyncSender)>>; pub(super) struct SaplingSyncConnector { sync_watcher: SyncWatcher, @@ -519,8 +714,8 @@ impl SaplingSyncConnector { .map_to_mm(|_| BlockchainScanStopped {})?; receiver .await - .map(|handle| SaplingSyncRespawnGuard { - sync_handle: Some(handle), + .map(|(handle, rpc)| SaplingSyncRespawnGuard { + sync_handle: Some((handle, rpc)), abort_handle: self.abort_handle.clone(), }) .map_to_mm(|_| BlockchainScanStopped {})