Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dash-spv-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dash-spv = { path = "../dash-spv" }
dashcore = { path = "../dash" }
dash-network = { path = "../dash-network", features = ["ffi"] }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
hex = "0.4"
tracing = "0.1"
# Use key-wallet-ffi for all wallet-related FFI types
Expand Down
46 changes: 17 additions & 29 deletions dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::mem::forget;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

/// FFI wrapper around `DashSpvClient`.
type InnerClient = DashSpvClient<
Expand All @@ -26,7 +25,6 @@ pub struct FFIDashSpvClient {
pub(crate) inner: InnerClient,
pub(crate) runtime: Arc<Runtime>,
run_task: Mutex<Option<JoinHandle<()>>>,
shutdown_token: CancellationToken,
}

impl FFIDashSpvClient {
Expand Down Expand Up @@ -113,7 +111,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new(
inner: client,
runtime,
run_task: Mutex::new(None),
shutdown_token: CancellationToken::new(),
};
Box::into_raw(Box::new(ffi_client))
}
Expand All @@ -130,9 +127,10 @@ const RUN_TASK_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from
impl FFIDashSpvClient {
/// Wait for the run task to finish cooperatively, aborting only on timeout.
///
/// The caller must cancel `shutdown_token` before calling this so that
/// `DashSpvClient::run()` exits its loop and cleans up monitor tasks.
/// Only falls back to `abort()` if the task doesn't exit within the timeout.
/// `DashSpvClient::stop()` must have been called first (it flips the client's
/// internal running state, which makes `run()` exit its loop and clean up
/// monitor tasks). This only falls back to `abort()` if the task doesn't
/// exit within the timeout.
fn wait_for_run_task(&self) {
let task = self.run_task.lock().unwrap().take();
if let Some(mut task) = task {
Expand All @@ -155,18 +153,6 @@ impl FFIDashSpvClient {
}
}

fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> {
client.shutdown_token.cancel();

client.wait_for_run_task();

let result = client.runtime.block_on(async { client.inner.stop().await });

client.shutdown_token = CancellationToken::new();

result
}

/// Update the running client's configuration.
///
/// # Safety
Expand Down Expand Up @@ -203,8 +189,14 @@ pub unsafe extern "C" fn dash_spv_ffi_client_update_config(
pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) -> i32 {
null_check!(client);

let client = &mut (*client);
match stop_client_internal(client) {
let client = &(*client);

// `stop()` flips the client's internal running state, making `run()` break
// out of its loop. Wait for the spawned run task only after that.
let result = client.runtime.block_on(async { client.inner.stop().await });
client.wait_for_run_task();

match result {
Ok(()) => FFIErrorCode::Success as i32,
Err(e) => {
set_last_error(&e.to_string());
Expand All @@ -231,13 +223,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient)

tracing::info!("dash_spv_ffi_client_run: starting sync");

let shutdown_token = client.shutdown_token.clone();
let spv_client = client.inner.clone();

let task = client.runtime.spawn(async move {
tracing::debug!("Sync task: starting run");

if let Err(e) = spv_client.run(shutdown_token).await {
if let Err(e) = spv_client.run().await {
tracing::error!("Sync task: error: {}", e);
}

Expand Down Expand Up @@ -366,18 +357,15 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie
if !client.is_null() {
let client = Box::from_raw(client);

// Cancel shutdown token so run() exits its loop and cleans up
client.shutdown_token.cancel();

// Wait for the run task to finish (cooperative, with timeout fallback)
client.wait_for_run_task();

// Stop the SPV client (run() calls stop() internally, but this
// handles the case where run() was never called or was aborted)
// handles the case where run() was never called or was aborted).
client.runtime.block_on(async {
let _ = client.inner.stop().await;
});

// Wait for the run task to finish (cooperative, with timeout fallback)
client.wait_for_run_task();

tracing::info!("FFI client destroyed and all tasks cleaned up");
}
}
Expand Down
6 changes: 5 additions & 1 deletion dash-spv-ffi/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ mod tests {
let (config, _temp_dir) = create_test_config();
let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default());

// Pass default (no-op) callbacks — start/stop may fail without network
// Pass default (no-op) callbacks. Start/stop may fail without network.
// Run twice on the same pointer: the client's internal watch must
// re-arm after stop() so a second run() works.
let _result = dash_spv_ffi_client_run(client);
let _result = dash_spv_ffi_client_stop(client);
let _result = dash_spv_ffi_client_run(client);
let _result = dash_spv_ffi_client_stop(client);

Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/filter_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use key_wallet_manager::WalletManager;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -42,9 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting synchronization with filter support...");
println!("Watching address: {:?}", watch_address);

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/simple_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::WalletManager;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -36,9 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Starting header synchronization...");

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/spv_with_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::WalletManager;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -40,9 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// - Reorgs via handle_reorg()
// - Compact filter checks via check_compact_filter()

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
10 changes: 6 additions & 4 deletions dash-spv/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use dashcore::sml::masternode_list_engine::MasternodeListEngine;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{watch, Mutex, RwLock};

use super::ClientConfig;
use crate::error::{Result, SpvError};
Expand Down Expand Up @@ -111,7 +111,9 @@ pub struct DashSpvClient<W: WalletInterface, N: NetworkManager, S: StorageManage
pub(super) wallet: Arc<RwLock<W>>,
pub(super) masternode_engine: Option<Arc<RwLock<MasternodeListEngine>>>,
pub(super) sync_coordinator: Arc<Mutex<PersistentSyncCoordinator<W>>>,
pub(super) running: Arc<RwLock<bool>>,
/// `true` while running, `false` once a stop is requested. Stored as a
/// `watch` so a stop is observed immediately rather than polled.
pub(super) running: Arc<watch::Sender<bool>>,
pub(super) event_handlers: Arc<Vec<Arc<dyn super::EventHandler>>>,
}

Expand Down Expand Up @@ -151,8 +153,8 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
// ============ State Queries ============

/// Check if the client is running.
pub async fn is_running(&self) -> bool {
*self.running.read().await
pub fn is_running(&self) -> bool {
*self.running.borrow()
}

/// Returns the current chain tip hash if available.
Expand Down
37 changes: 16 additions & 21 deletions dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dashcore::sml::masternode_list_engine::MasternodeListEngine;
use dashcore_hashes::Hash;
use key_wallet_manager::WalletInterface;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{watch, Mutex, RwLock};

impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W, N, S> {
/// Create a new SPV client with the given configuration, network, storage, and wallet.
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
wallet,
masternode_engine,
sync_coordinator: Arc::new(Mutex::new(sync_coordinator)),
running: Arc::new(RwLock::new(false)),
running: Arc::new(watch::Sender::new(false)),
event_handlers: Arc::new(event_handlers),
};

Expand All @@ -161,11 +161,8 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,

/// Start the SPV client: spawn sync tasks and connect to the network.
pub(super) async fn start(&self) -> Result<()> {
{
let running = self.running.read().await;
if *running {
return Err(SpvError::Config("Client already running".to_string()));
}
if self.is_running() {
return Err(SpvError::Config("Client already running".to_string()));
}

// Start all sync tasks before connecting to the network to make sure initial connection
Expand All @@ -180,25 +177,27 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
// Connect to network
self.network.lock().await.connect().await?;

// Only mark as running after all startup operations succeed
{
let mut running = self.running.write().await;
*running = true;
}
// Only mark as running after all startup operations succeed.
// `send_replace` always stores the value regardless of receiver count,
// so this is correct even when `run()` has not subscribed yet.
self.running.send_replace(true);

Ok(())
}

/// Stop the SPV client.
pub async fn stop(&self) -> Result<()> {
// Check if already stopped
{
let running = self.running.read().await;
if !*running {
return Ok(());
}
if !*self.running.borrow() {
return Ok(());
}

// Flip the running state before tearing anything down so a concurrent
// `run()` loop wakes immediately and breaks out before it can lock the
// sync coordinator again. This prevents a tick from racing against the
// shutdown below.
self.running.send_replace(false);

// Shut down sync coordinator: signals cancellation and waits for manager
// tasks to drain before we tear down the network and storage layers.
if let Err(e) = self.sync_coordinator.lock().await.shutdown().await {
Expand All @@ -215,10 +214,6 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
tracing::info!("Storage shutdown completed - all data persisted");
}

// Mark as stopped
let mut running = self.running.write().await;
*running = false;

Ok(())
}

Expand Down
8 changes: 0 additions & 8 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
//! - `queries.rs` - Peer, masternode, and balance queries
//! - `transactions.rs` - Transaction operations (e.g., broadcast)
//! - `sync_coordinator.rs` - Sync orchestration and network monitoring
//!
//! ## Lock Ordering
//!
//! When acquiring multiple locks, always use this order:
//! 1. running (`Arc<RwLock<bool>>`)
//! 2. storage (`Arc<Mutex<S>>`)
//!
//! Never acquire locks in reverse order or deadlock will occur!

pub mod config;
pub mod event_handler;
Expand Down
23 changes: 16 additions & 7 deletions dash-spv/src/client/sync_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
self.sync_coordinator.lock().await.progress().clone()
}

/// Start the client and run the sync loop until the token is cancelled.
/// Start the client and run the sync loop until `stop()` is called.
///
/// Subscribes to all event channels internally and dispatches events to the
/// event handler provided at construction. Calls `start()` internally, runs
/// continuous network monitoring, and calls `stop()` before returning.
pub async fn run(&self, token: CancellationToken) -> Result<()> {
pub async fn run(&self) -> Result<()> {
let handlers = self.event_handlers.clone();
let monitor_shutdown = CancellationToken::new();
let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::<String>(1);

// Subscribe before `start()` so a `stop()` that races startup is never
// missed: the receiver records the version at subscription time, so any
// later state change is observed even if it lands before the loop runs.
let mut stop_rx = self.running.subscribe();

// Subscribe and spawn monitors before startup so we don't miss early
// connection events.
let sync_event_rx = self.subscribe_sync_events().await;
Expand Down Expand Up @@ -98,25 +103,29 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
return Err(e);
}

// `start()` flipped the state to `true`. Consume that edge so `changed()`
// only fires on the subsequent transition to `false` (the stop request).
// If a `stop()` already raced in, this reads `false` and the loop's first
// guard breaks immediately.
stop_rx.borrow_and_update();

tracing::info!("Starting continuous network monitoring...");

// Run the sync loop
let mut sync_coordinator_tick_interval = tokio::time::interval(SYNC_COORDINATOR_TICK_MS);

let error: Option<SpvError> = loop {
let running = self.running.read().await;
if !*running {
if !self.is_running() {
tracing::info!("Stopping network monitoring");
break None;
}
drop(running);

let error: Option<SpvError> = tokio::select! {
_ = sync_coordinator_tick_interval.tick() => {
self.sync_coordinator.lock().await.tick().await.err().map(Into::into)
}
_ = token.cancelled() => {
tracing::debug!("DashSpvClient run loop cancelled");
_ = stop_rx.changed() => {
tracing::debug!("DashSpvClient run loop stop requested");
break None
}
Some(msg) = monitor_failure_rx.recv() => {
Expand Down
4 changes: 1 addition & 3 deletions dash-spv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
//! use key_wallet_manager::WalletManager;
//! use std::sync::Arc;
//! use tokio::sync::RwLock;
//! use tokio_util::sync::CancellationToken;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -41,9 +40,8 @@
//! wallet,
//! vec![Arc::new(())],
//! ).await?;
//! let shutdown_token = CancellationToken::new();
//!
//! client.run(shutdown_token).await?;
//! client.run().await?;
//!
//! Ok(())
//! }
Expand Down
Loading
Loading