Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions dash-spv/src/client/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, oneshot, Mutex, RwLock};

use crate::error::{Result, SpvError};
use crate::storage::StorageManager;
use crate::types::{AddressBalance, SpvEvent, SpvStats, WatchItem};
use crate::types::{AddressBalance, SpvEvent, SpvStats};
use key_wallet_manager::wallet_interface::WalletInterface;

/// Task for the block processing worker.
Expand All @@ -32,7 +32,6 @@ pub struct BlockProcessor<W: WalletInterface, S: StorageManager> {
receiver: mpsc::UnboundedReceiver<BlockProcessingTask>,
wallet: Arc<RwLock<W>>,
storage: Arc<Mutex<S>>,
watch_items: Arc<RwLock<HashSet<WatchItem>>>,
stats: Arc<RwLock<SpvStats>>,
event_tx: mpsc::UnboundedSender<SpvEvent>,
processed_blocks: HashSet<dashcore::BlockHash>,
Expand All @@ -48,7 +47,6 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
receiver: mpsc::UnboundedReceiver<BlockProcessingTask>,
wallet: Arc<RwLock<W>>,
storage: Arc<Mutex<S>>,
watch_items: Arc<RwLock<HashSet<WatchItem>>>,
stats: Arc<RwLock<SpvStats>>,
event_tx: mpsc::UnboundedSender<SpvEvent>,
network: dashcore::Network,
Expand All @@ -57,7 +55,6 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
receiver,
wallet,
storage,
watch_items,
stats,
event_tx,
processed_blocks: HashSet::new(),
Expand Down Expand Up @@ -236,19 +233,13 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
}
drop(wallet); // Release lock

// Extract transactions that might affect watched items
let watch_items: Vec<_> = self.watch_items.read().await.iter().cloned().collect();
if !watch_items.is_empty() {
self.process_block_transactions(&block, &watch_items).await?;
} else {
// No watch items, but still emit BlockProcessed event
let _ = self.event_tx.send(SpvEvent::BlockProcessed {
height,
hash: block_hash.to_string(),
transactions_count: block.txdata.len(),
relevant_transactions: 0,
});
}
// Emit BlockProcessed event with actual relevant transaction count
let _ = self.event_tx.send(SpvEvent::BlockProcessed {
height,
hash: block_hash.to_string(),
transactions_count: block.txdata.len(),
relevant_transactions: txids.len(),
});

// Update chain state if needed
self.update_chain_state_with_block(&block).await?;
Expand All @@ -272,11 +263,11 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
Ok(())
}

/* TODO: Re-implement with wallet integration
/// Process transactions in a block to check for matches with watch items.
async fn process_block_transactions(
&mut self,
block: &dashcore::Block,
watch_items: &[WatchItem],
) -> Result<()> {
let block_hash = block.block_hash();
let mut relevant_transactions = 0;
Expand Down Expand Up @@ -357,7 +348,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
stats.blocks_with_relevant_transactions += 1;
}

tracing::info!("🚨 BLOCK MATCH DETECTED! Block {} at height {} contains {} transactions affecting watched addresses/scripts",
tracing::info!("🚨 BLOCK MATCH DETECTED! Block {} at height {} contains {} transactions affecting watched addresses/scripts",
block_hash, block_height, relevant_transactions);

// Report balance changes
Expand Down Expand Up @@ -441,7 +432,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
// Create and store UTXO if we have an address
if let Some(address) = matched_address {
let balance_impact = amount.to_sat() as i64;
tracing::info!("💰 TX {} output {}:{} to {:?} (value: {}) - Address {} balance impact: +{}",
tracing::info!("💰 TX {} output {}:{} to {:?} (value: {}) - Address {} balance impact: +{}",
txid, txid, vout, watch_item, amount, address, balance_impact);

// WalletInterface doesn't have add_utxo method - this will be handled by process_block
Expand All @@ -452,7 +443,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync
*balance_changes.entry(address.clone()).or_insert(0) += balance_impact;
*tx_balance_changes.entry(address.clone()).or_insert(0) += balance_impact;
} else {
tracing::info!("💰 TX {} output {}:{} to {:?} (value: {}) - No address to track balance",
tracing::info!("💰 TX {} output {}:{} to {:?} (value: {}) - No address to track balance",
txid, txid, vout, watch_item, amount);
}

Expand Down Expand Up @@ -580,6 +571,7 @@ impl<W: WalletInterface + Send + Sync + 'static, S: StorageManager + Send + Sync

Ok(())
}
*/

/// Get the balance for a specific address.
async fn get_address_balance(&self, _address: &dashcore::Address) -> Result<AddressBalance> {
Expand Down
18 changes: 1 addition & 17 deletions dash-spv/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use dashcore::{Address, Network, ScriptBuf};
// Serialization removed due to complex Address types

use crate::types::{ValidationMode, WatchItem};
use crate::types::ValidationMode;

/// Strategy for handling mempool (unconfirmed) transactions.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -53,9 +53,6 @@ pub struct ClientConfig {
/// Read timeout for TCP socket operations.
pub read_timeout: Duration,

/// Items to watch on the blockchain.
pub watch_items: Vec<WatchItem>,

/// Whether to enable filter syncing.
pub enable_filters: bool,

Expand Down Expand Up @@ -189,7 +186,6 @@ impl Default for ClientConfig {
message_timeout: Duration::from_secs(60),
sync_timeout: Duration::from_secs(300),
read_timeout: Duration::from_millis(100),
watch_items: vec![],
enable_filters: true,
enable_masternodes: true,
max_peers: 8,
Expand Down Expand Up @@ -279,18 +275,6 @@ impl ClientConfig {
self
}

/// Add a watch address.
pub fn watch_address(mut self, address: Address) -> Self {
self.watch_items.push(WatchItem::address(address));
self
}

/// Add a watch script.
pub fn watch_script(mut self, script: ScriptBuf) -> Self {
self.watch_items.push(WatchItem::Script(script));
self
}

/// Disable filters.
pub fn without_filters(mut self) -> Self {
self.enable_filters = false;
Expand Down
48 changes: 16 additions & 32 deletions dash-spv/src/client/filter_sync.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
//! Filter synchronization and management for the Dash SPV client.

use std::sync::Arc;
use tokio::sync::RwLock;

use crate::error::{Result, SpvError};
use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::sequential::SequentialSyncManager;
use crate::types::FilterMatch;
use crate::types::SpvStats;
use crate::types::{FilterMatch, WatchItem};
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Filter synchronization manager for coordinating filter downloads and checking.
pub struct FilterSyncCoordinator<'a, S: StorageManager, N: NetworkManager> {
sync_manager: &'a mut SequentialSyncManager<S, N>,
pub struct FilterSyncCoordinator<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> {
sync_manager: &'a mut SequentialSyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
watch_items: &'a Arc<RwLock<std::collections::HashSet<WatchItem>>>,
stats: &'a Arc<RwLock<SpvStats>>,
running: &'a Arc<RwLock<bool>>,
}

impl<'a, S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
FilterSyncCoordinator<'a, S, N>
impl<
'a,
S: StorageManager + Send + Sync + 'static,
N: NetworkManager + Send + Sync + 'static,
W: WalletInterface,
> FilterSyncCoordinator<'a, S, N, W>
{
/// Create a new filter sync coordinator.
pub fn new(
sync_manager: &'a mut SequentialSyncManager<S, N>,
sync_manager: &'a mut SequentialSyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
watch_items: &'a Arc<RwLock<std::collections::HashSet<WatchItem>>>,
stats: &'a Arc<RwLock<SpvStats>>,
running: &'a Arc<RwLock<bool>>,
) -> Self {
Self {
sync_manager,
storage,
network,
watch_items,
stats,
running,
}
Expand Down Expand Up @@ -68,20 +69,9 @@ impl<'a, S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + S
let tip_height =
self.storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0);

// Get current watch items to determine earliest height needed
let watch_items = self.get_watch_items().await;

if watch_items.is_empty() {
tracing::info!("No watch items configured, skipping filter sync");
return Ok(Vec::new());
}

// Find the earliest height among all watch items
let earliest_height = watch_items
.iter()
.filter_map(|item| item.earliest_height())
.min()
.unwrap_or(tip_height.saturating_sub(99)); // Default to last 100 blocks if no earliest_height set
// TODO: Get earliest height from wallet's birth height or earliest address usage
// For now, default to last 100 blocks
let earliest_height = tip_height.saturating_sub(99);

let num_blocks = num_blocks.unwrap_or(100);
let default_start = tip_height.saturating_sub(num_blocks - 1);
Expand Down Expand Up @@ -157,10 +147,4 @@ impl<'a, S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + S

Ok(())
}

/// Get all watch items.
async fn get_watch_items(&self) -> Vec<WatchItem> {
let watch_items = self.watch_items.read().await;
watch_items.iter().cloned().collect()
}
}
20 changes: 12 additions & 8 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! Network message handling for the Dash SPV client.

use std::sync::Arc;
use tokio::sync::RwLock;

use crate::client::ClientConfig;
use crate::error::{Result, SpvError};
use crate::mempool_filter::MempoolFilter;
use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::sequential::SequentialSyncManager;
use crate::types::{MempoolState, SpvEvent, SpvStats};
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Network message handler for processing incoming Dash protocol messages.
pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager> {
sync_manager: &'a mut SequentialSyncManager<S, N>,
pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> {
sync_manager: &'a mut SequentialSyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
config: &'a ClientConfig,
Expand All @@ -24,12 +24,16 @@ pub struct MessageHandler<'a, S: StorageManager, N: NetworkManager> {
event_tx: &'a tokio::sync::mpsc::UnboundedSender<SpvEvent>,
}

impl<'a, S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
MessageHandler<'a, S, N>
impl<
'a,
S: StorageManager + Send + Sync + 'static,
N: NetworkManager + Send + Sync + 'static,
W: WalletInterface,
> MessageHandler<'a, S, N, W>
{
/// Create a new message handler.
pub fn new(
sync_manager: &'a mut SequentialSyncManager<S, N>,
sync_manager: &'a mut SequentialSyncManager<S, N, W>,
storage: &'a mut S,
network: &'a mut N,
config: &'a ClientConfig,
Expand Down
Loading
Loading