Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scanner): Add a new zebra-scanner binary #8608

Merged
merged 7 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6046,15 +6046,19 @@ dependencies = [
"insta",
"itertools 0.13.0",
"jubjub",
"lazy_static",
"proptest",
"proptest-derive",
"rand 0.8.5",
"sapling-crypto",
"semver 1.0.23",
"serde",
"serde_json",
"structopt",
"tokio",
"tower",
"tracing",
"tracing-subscriber",
"zcash_address",
"zcash_client_backend",
"zcash_keys",
Expand All @@ -6063,6 +6067,7 @@ dependencies = [
"zebra-chain",
"zebra-grpc",
"zebra-node-services",
"zebra-rpc",
"zebra-state",
"zebra-test",
]
Expand Down
14 changes: 14 additions & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ name = "scanner-grpc-server"
path = "src/bin/rpc_server.rs"
required-features = ["proptest-impl"]

[[bin]] # Bin to run the Scanner tool
name = "zebra-scanner"
path = "src/bin/scanner/main.rs"

[features]

# Production features that activate extra dependencies, or extra features in dependencies
Expand All @@ -39,6 +43,9 @@ proptest-impl = [
"zcash_note_encryption",
]

# Needed for the zebra-scanner binary.
shielded-scan = []

[dependencies]

color-eyre = "0.6.3"
Expand All @@ -61,6 +68,7 @@ zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["shielded-scan"] }
zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.5" }
zebra-rpc = { path = "../zebra-rpc", version = "1.0.0-beta.38" }

chrono = { version = "0.4.38", default-features = false, features = ["clock", "std", "serde"] }

Expand All @@ -77,6 +85,12 @@ zcash_note_encryption = { version = "0.4.0", optional = true }

zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38", optional = true }

# zebra-scanner binary dependencies
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
structopt = "0.3.26"
lazy_static = "1.4.0"
serde_json = "1.0.117"

[dev-dependencies]
insta = { version = "1.39.0", features = ["ron", "redactions"] }
tokio = { version = "1.37.0", features = ["test-util"] }
Expand Down
144 changes: 144 additions & 0 deletions zebra-scan/src/bin/scanner/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! The zebra-scanner binary.
//!
//! The zebra-scanner binary is a standalone binary that scans the Zcash blockchain for transactions using the given sapling keys.
use color_eyre::eyre::eyre;
use lazy_static::lazy_static;
use structopt::StructOpt;
use tracing::*;

use zebra_chain::{block::Height, parameters::Network};
use zebra_state::SaplingScanningKey;

use core::net::SocketAddr;
use std::path::PathBuf;

/// A structure with sapling key and birthday height.
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize)]
pub struct SaplingKey {
key: SaplingScanningKey,
#[serde(default = "min_height")]
birthday_height: Height,
}

fn min_height() -> Height {
Height(0)
}

impl std::str::FromStr for SaplingKey {
type Err = Box<dyn std::error::Error>;
fn from_str(value: &str) -> Result<Self, Self::Err> {
Ok(serde_json::from_str(value)?)
}
}

#[tokio::main]
/// Runs the zebra scanner binary with the given arguments.
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Display all logs from the zebra-scan crate.
tracing_subscriber::fmt::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();

// Parse command line arguments.
let args = Args::from_args();

let zebrad_cache_dir = args.zebrad_cache_dir;
let scanning_cache_dir = args.scanning_cache_dir;
let mut db_config = zebra_scan::Config::default().db_config;
db_config.cache_dir = scanning_cache_dir;
let network = args.network;
let sapling_keys_to_scan = args
.sapling_keys_to_scan
.into_iter()
.map(|key| (key.key, key.birthday_height.0))
.collect();
let listen_addr = args.listen_addr;

// Create a state config with arguments.
let state_config = zebra_state::Config {
cache_dir: zebrad_cache_dir,
..zebra_state::Config::default()
};

// Create a scanner config with arguments.
let scanner_config = zebra_scan::Config {
sapling_keys_to_scan,
listen_addr,
db_config,
};

// Get a read-only state and the database.
let (read_state, _latest_chain_tip, chain_tip_change, sync_task) =
zebra_rpc::sync::init_read_state_with_syncer(
state_config,
&network,
args.zebra_rpc_listen_addr,
)
.await?
.map_err(|err| eyre!(err))?;

// Spawn the scan task.
let scan_task_handle =
{ zebra_scan::spawn_init(scanner_config, network, read_state, chain_tip_change) };

// Pin the scan task handle.
tokio::pin!(scan_task_handle);
tokio::pin!(sync_task);

// Wait for task to finish
tokio::select! {
scan_result = &mut scan_task_handle => scan_result
.expect("unexpected panic in the scan task")
.map(|_| info!("scan task exited"))
.map_err(Into::into),
sync_result = &mut sync_task => {
sync_result.expect("unexpected panic in the scan task");
Ok(())
}
}
}

// Default values for the zebra-scanner arguments.
lazy_static! {
static ref DEFAULT_ZEBRAD_CACHE_DIR: String = zebra_state::Config::default()
.cache_dir
.to_str()
.expect("default cache dir is valid")
.to_string();
static ref DEFAULT_SCANNER_CACHE_DIR: String = zebra_scan::Config::default()
.db_config
.cache_dir
.to_str()
.expect("default cache dir is valid")
.to_string();
static ref DEFAULT_NETWORK: String = Network::default().to_string();
}

/// zebra-scanner arguments
#[derive(Clone, Debug, Eq, PartialEq, StructOpt)]
pub struct Args {
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
/// Path to zebrad state.
#[structopt(default_value = &DEFAULT_ZEBRAD_CACHE_DIR, long)]
pub zebrad_cache_dir: PathBuf,

/// Path to scanning state.
#[structopt(default_value = &DEFAULT_SCANNER_CACHE_DIR, long)]
pub scanning_cache_dir: PathBuf,

/// The Zcash network.
#[structopt(default_value = &DEFAULT_NETWORK, long)]
pub network: Network,

/// The sapling keys to scan for.
#[structopt(long)]
pub sapling_keys_to_scan: Vec<SaplingKey>,

/// The listen address of Zebra's RPC server used by the syncer to check for chain tip changes
/// and get blocks in Zebra's non-finalized state.
#[structopt(long)]
pub zebra_rpc_listen_addr: SocketAddr,

/// IP address and port for the gRPC server.
#[structopt(long)]
pub listen_addr: Option<SocketAddr>,
}
2 changes: 1 addition & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Config {
//
// TODO: Remove fields that are only used by the state, and create a common database config.
#[serde(flatten)]
db_config: DbConfig,
pub db_config: DbConfig,
}

impl Debug for Config {
Expand Down
23 changes: 10 additions & 13 deletions zebra-scan/src/service/scan_task/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tower::{Service, ServiceExt};

use tracing::Instrument;
use zcash_address::unified::{Encoding, Fvk, Ufvk};
Expand All @@ -38,7 +38,7 @@ use zebra_chain::{
transaction::Transaction,
};
use zebra_node_services::scan_service::response::ScanResult;
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};
use zebra_state::{ChainTipChange, ReadStateService, SaplingScannedResult, TransactionIndex};

use crate::{
service::{ScanTask, ScanTaskCommand},
Expand All @@ -51,11 +51,8 @@ mod scan_range;

pub use scan_range::ScanRangeTaskBuilder;

/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;
/// The read state type used by the scanner.
pub type State = ReadStateService;

/// Wait a few seconds at startup for some blocks to get verified.
///
Expand Down Expand Up @@ -262,13 +259,13 @@ pub async fn scan_height_and_store_results(
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Block(height.into()))
.call(zebra_state::ReadRequest::Block(height.into()))
.await
.map_err(|e| eyre!(e))?;

let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => return Ok(None),
zebra_state::ReadResponse::Block(Some(block)) => block,
zebra_state::ReadResponse::Block(None) => return Ok(None),
_ => unreachable!("unmatched response to a state::Block request"),
};

Expand Down Expand Up @@ -515,13 +512,13 @@ async fn tip_height(mut state: State) -> Result<Height, Report> {
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Tip)
.call(zebra_state::ReadRequest::Tip)
.await
.map_err(|e| eyre!(e))?;

match tip {
zebra_state::Response::Tip(Some((height, _hash))) => Ok(height),
zebra_state::Response::Tip(None) => Ok(Height(0)),
zebra_state::ReadResponse::Tip(Some((height, _hash))) => Ok(height),
zebra_state::ReadResponse::Tip(None) => Ok(Height(0)),
_ => unreachable!("unmatched response to a state::Tip request"),
}
}
Expand Down
8 changes: 4 additions & 4 deletions zebra-scan/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ pub async fn scan_service_registers_keys_correctly() -> Result<()> {

async fn scan_service_registers_keys_correctly_for(network: &Network) -> Result<()> {
// Mock the state.
let (state, _, _, chain_tip_change) = zebra_state::populated_state(vec![], network).await;
let (_, read_state, _, chain_tip_change) = zebra_state::populated_state(vec![], network).await;

// Instantiate the scan service.
let mut scan_service = ServiceBuilder::new()
.buffer(2)
.service(ScanService::new(&Config::ephemeral(), network, state, chain_tip_change).await);
let mut scan_service = ServiceBuilder::new().buffer(2).service(
ScanService::new(&Config::ephemeral(), network, read_state, chain_tip_change).await,
);

// Mock three Sapling keys.
let mocked_keys = mock_sapling_scanning_keys(3, network);
Expand Down
13 changes: 8 additions & 5 deletions zebrad/tests/common/shielded_scan/scan_task_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{fs, time::Duration};
use color_eyre::{eyre::eyre, Result};

use tokio::sync::mpsc::error::TryRecvError;
use tower::ServiceBuilder;
use zebra_chain::{
block::Height,
chain_tip::ChainTip,
Expand Down Expand Up @@ -82,9 +81,15 @@ pub(crate) async fn run() -> Result<()> {
let scan_db_path = zebrad_state_path.join(SCANNER_DATABASE_KIND);
fs::remove_dir_all(std::path::Path::new(&scan_db_path)).ok();

let (state_service, _read_state_service, latest_chain_tip, chain_tip_change) =
let (_state_service, _read_state_service, latest_chain_tip, chain_tip_change) =
start_state_service_with_cache_dir(&network, zebrad_state_path.clone()).await?;

let state_config = zebra_state::Config {
cache_dir: zebrad_state_path.clone(),
..zebra_state::Config::default()
};
let (read_state, _db, _) = zebra_state::init_read_only(state_config, &network);

let chain_tip_height = latest_chain_tip
.best_tip_height()
.ok_or_else(|| eyre!("State directory doesn't have a chain tip block"))?;
Expand All @@ -105,11 +110,9 @@ pub(crate) async fn run() -> Result<()> {

tracing::info!("opened state service with valid chain tip height, starting scan task",);

let state = ServiceBuilder::new().buffer(10).service(state_service);

// Create an ephemeral `Storage` instance
let storage = Storage::new(&scan_config, &network, false);
let mut scan_task = ScanTask::spawn(storage, state, chain_tip_change);
let mut scan_task = ScanTask::spawn(storage, read_state, chain_tip_change);

tracing::info!("started scan task, sending register/subscribe keys messages with zecpages key to start scanning for a new key",);

Expand Down
Loading