From 6d2cdff3f9ef36d9850723643be72b40b13d4752 Mon Sep 17 00:00:00 2001 From: xdustinface Date: Sat, 16 May 2026 22:31:53 +1000 Subject: [PATCH] refactor(dash-spv): replace `run` token and `running` flag with `watch` `DashSpvClient::run()` took an external `CancellationToken` and the client separately tracked `running: Arc>`, two mechanisms for one concept. Shutdown also relied on the `running` flag being observed only on the next 100ms `sync_coordinator` tick. Both are now a single `Arc>`: `true` while running, `false` once a stop is requested. `run()` drops its parameter, subscribes before the internal `start()`, and breaks its loop immediately when the value flips, so `stop()` is an event rather than a poll. `start()`/`stop()` use `send_replace`, so they are correct with zero receivers (the normal state during `run -> stop -> run` and when `stop()` is called without `run()`). `is_running()` keeps its signature, reading the watch via `borrow()`. `stop()` now flips the state before tearing down the sync coordinator, so a concurrent `run()` loop wakes and exits before it can lock the coordinator again. This removes the window where a tick could race the shutdown. --- dash-spv-ffi/Cargo.toml | 1 - dash-spv-ffi/src/client.rs | 46 +++++++++-------------- dash-spv-ffi/tests/test_client.rs | 6 ++- dash-spv/examples/filter_sync.rs | 5 +-- dash-spv/examples/simple_sync.rs | 5 +-- dash-spv/examples/spv_with_wallet.rs | 5 +-- dash-spv/src/client/core.rs | 10 +++-- dash-spv/src/client/lifecycle.rs | 37 ++++++++---------- dash-spv/src/client/mod.rs | 8 ---- dash-spv/src/client/sync_coordinator.rs | 23 ++++++++---- dash-spv/src/lib.rs | 4 +- dash-spv/src/main.rs | 18 +++------ dash-spv/tests/dashd_masternode/setup.rs | 11 ++---- dash-spv/tests/dashd_sync/setup.rs | 17 +++------ dash-spv/tests/peer_test.rs | 19 +++------- dash-spv/tests/wallet_integration_test.rs | 46 +++++++++++++---------- 16 files changed, 111 insertions(+), 150 deletions(-) diff --git a/dash-spv-ffi/Cargo.toml b/dash-spv-ffi/Cargo.toml index 68254c60e..19ada2d0e 100644 --- a/dash-spv-ffi/Cargo.toml +++ b/dash-spv-ffi/Cargo.toml @@ -16,7 +16,6 @@ dash-spv = { path = "../dash-spv" } dashcore = { path = "../dash" } dash-network = { path = "../dash-network", features = ["ffi"] } tokio = { version = "1", features = ["full"] } -tokio-util = "0.7" hex = "0.4" tracing = "0.1" # Use key-wallet-ffi for all wallet-related FFI types diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index faf0235f4..6cb94c8e2 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -13,7 +13,6 @@ use std::mem::forget; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; /// FFI wrapper around `DashSpvClient`. type InnerClient = DashSpvClient< @@ -26,7 +25,6 @@ pub struct FFIDashSpvClient { pub(crate) inner: InnerClient, pub(crate) runtime: Arc, run_task: Mutex>>, - shutdown_token: CancellationToken, } impl FFIDashSpvClient { @@ -113,7 +111,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( inner: client, runtime, run_task: Mutex::new(None), - shutdown_token: CancellationToken::new(), }; Box::into_raw(Box::new(ffi_client)) } @@ -130,9 +127,10 @@ const RUN_TASK_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from impl FFIDashSpvClient { /// Wait for the run task to finish cooperatively, aborting only on timeout. /// - /// The caller must cancel `shutdown_token` before calling this so that - /// `DashSpvClient::run()` exits its loop and cleans up monitor tasks. - /// Only falls back to `abort()` if the task doesn't exit within the timeout. + /// `DashSpvClient::stop()` must have been called first (it flips the client's + /// internal running state, which makes `run()` exit its loop and clean up + /// monitor tasks). This only falls back to `abort()` if the task doesn't + /// exit within the timeout. fn wait_for_run_task(&self) { let task = self.run_task.lock().unwrap().take(); if let Some(mut task) = task { @@ -155,18 +153,6 @@ impl FFIDashSpvClient { } } -fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> { - client.shutdown_token.cancel(); - - client.wait_for_run_task(); - - let result = client.runtime.block_on(async { client.inner.stop().await }); - - client.shutdown_token = CancellationToken::new(); - - result -} - /// Update the running client's configuration. /// /// # Safety @@ -203,8 +189,14 @@ pub unsafe extern "C" fn dash_spv_ffi_client_update_config( pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) -> i32 { null_check!(client); - let client = &mut (*client); - match stop_client_internal(client) { + let client = &(*client); + + // `stop()` flips the client's internal running state, making `run()` break + // out of its loop. Wait for the spawned run task only after that. + let result = client.runtime.block_on(async { client.inner.stop().await }); + client.wait_for_run_task(); + + match result { Ok(()) => FFIErrorCode::Success as i32, Err(e) => { set_last_error(&e.to_string()); @@ -231,13 +223,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) tracing::info!("dash_spv_ffi_client_run: starting sync"); - let shutdown_token = client.shutdown_token.clone(); let spv_client = client.inner.clone(); let task = client.runtime.spawn(async move { tracing::debug!("Sync task: starting run"); - if let Err(e) = spv_client.run(shutdown_token).await { + if let Err(e) = spv_client.run().await { tracing::error!("Sync task: error: {}", e); } @@ -366,18 +357,15 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie if !client.is_null() { let client = Box::from_raw(client); - // Cancel shutdown token so run() exits its loop and cleans up - client.shutdown_token.cancel(); - - // Wait for the run task to finish (cooperative, with timeout fallback) - client.wait_for_run_task(); - // Stop the SPV client (run() calls stop() internally, but this - // handles the case where run() was never called or was aborted) + // handles the case where run() was never called or was aborted). client.runtime.block_on(async { let _ = client.inner.stop().await; }); + // Wait for the run task to finish (cooperative, with timeout fallback) + client.wait_for_run_task(); + tracing::info!("FFI client destroyed and all tasks cleaned up"); } } diff --git a/dash-spv-ffi/tests/test_client.rs b/dash-spv-ffi/tests/test_client.rs index 11397b1cb..b2eaeb85a 100644 --- a/dash-spv-ffi/tests/test_client.rs +++ b/dash-spv-ffi/tests/test_client.rs @@ -48,7 +48,11 @@ mod tests { let (config, _temp_dir) = create_test_config(); let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); - // Pass default (no-op) callbacks — start/stop may fail without network + // Pass default (no-op) callbacks. Start/stop may fail without network. + // Run twice on the same pointer: the client's internal watch must + // re-arm after stop() so a second run() works. + let _result = dash_spv_ffi_client_run(client); + let _result = dash_spv_ffi_client_stop(client); let _result = dash_spv_ffi_client_run(client); let _result = dash_spv_ffi_client_stop(client); diff --git a/dash-spv/examples/filter_sync.rs b/dash-spv/examples/filter_sync.rs index e5940036e..203896ce6 100644 --- a/dash-spv/examples/filter_sync.rs +++ b/dash-spv/examples/filter_sync.rs @@ -9,7 +9,6 @@ use key_wallet_manager::WalletManager; use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -42,9 +41,7 @@ async fn main() -> Result<(), Box> { println!("Starting synchronization with filter support..."); println!("Watching address: {:?}", watch_address); - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/simple_sync.rs b/dash-spv/examples/simple_sync.rs index 63679108d..0568768fe 100644 --- a/dash-spv/examples/simple_sync.rs +++ b/dash-spv/examples/simple_sync.rs @@ -8,7 +8,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -36,9 +35,7 @@ async fn main() -> Result<(), Box> { println!("Starting header synchronization..."); - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/spv_with_wallet.rs b/dash-spv/examples/spv_with_wallet.rs index 1609bd8d7..2d2c661d8 100644 --- a/dash-spv/examples/spv_with_wallet.rs +++ b/dash-spv/examples/spv_with_wallet.rs @@ -9,7 +9,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -40,9 +39,7 @@ async fn main() -> Result<(), Box> { // - Reorgs via handle_reorg() // - Compact filter checks via check_compact_filter() - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index ff346b8d0..09e8bb712 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -10,7 +10,7 @@ use dashcore::sml::masternode_list_engine::MasternodeListEngine; use std::sync::Arc; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{watch, Mutex, RwLock}; use super::ClientConfig; use crate::error::{Result, SpvError}; @@ -111,7 +111,9 @@ pub struct DashSpvClient>, pub(super) masternode_engine: Option>>, pub(super) sync_coordinator: Arc>>, - pub(super) running: Arc>, + /// `true` while running, `false` once a stop is requested. Stored as a + /// `watch` so a stop is observed immediately rather than polled. + pub(super) running: Arc>, pub(super) event_handlers: Arc>>, } @@ -151,8 +153,8 @@ impl DashSpvClient bool { - *self.running.read().await + pub fn is_running(&self) -> bool { + *self.running.borrow() } /// Returns the current chain tip hash if available. diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 1e4125dc6..07f899135 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -25,7 +25,7 @@ use dashcore::sml::masternode_list_engine::MasternodeListEngine; use dashcore_hashes::Hash; use key_wallet_manager::WalletInterface; use std::sync::Arc; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{watch, Mutex, RwLock}; impl DashSpvClient { /// Create a new SPV client with the given configuration, network, storage, and wallet. @@ -143,7 +143,7 @@ impl DashSpvClient DashSpvClient Result<()> { - { - let running = self.running.read().await; - if *running { - return Err(SpvError::Config("Client already running".to_string())); - } + if self.is_running() { + return Err(SpvError::Config("Client already running".to_string())); } // Start all sync tasks before connecting to the network to make sure initial connection @@ -180,11 +177,10 @@ impl DashSpvClient DashSpvClient Result<()> { // Check if already stopped - { - let running = self.running.read().await; - if !*running { - return Ok(()); - } + if !*self.running.borrow() { + return Ok(()); } + // Flip the running state before tearing anything down so a concurrent + // `run()` loop wakes immediately and breaks out before it can lock the + // sync coordinator again. This prevents a tick from racing against the + // shutdown below. + self.running.send_replace(false); + // Shut down sync coordinator: signals cancellation and waits for manager // tasks to drain before we tear down the network and storage layers. if let Err(e) = self.sync_coordinator.lock().await.shutdown().await { @@ -215,10 +214,6 @@ impl DashSpvClient>`) -//! 2. storage (`Arc>`) -//! -//! Never acquire locks in reverse order or deadlock will occur! pub mod config; pub mod event_handler; diff --git a/dash-spv/src/client/sync_coordinator.rs b/dash-spv/src/client/sync_coordinator.rs index f26056c23..e428bcb80 100644 --- a/dash-spv/src/client/sync_coordinator.rs +++ b/dash-spv/src/client/sync_coordinator.rs @@ -24,16 +24,21 @@ impl DashSpvClient Result<()> { + pub async fn run(&self) -> Result<()> { let handlers = self.event_handlers.clone(); let monitor_shutdown = CancellationToken::new(); let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::(1); + // Subscribe before `start()` so a `stop()` that races startup is never + // missed: the receiver records the version at subscription time, so any + // later state change is observed even if it lands before the loop runs. + let mut stop_rx = self.running.subscribe(); + // Subscribe and spawn monitors before startup so we don't miss early // connection events. let sync_event_rx = self.subscribe_sync_events().await; @@ -98,25 +103,29 @@ impl DashSpvClient = loop { - let running = self.running.read().await; - if !*running { + if !self.is_running() { tracing::info!("Stopping network monitoring"); break None; } - drop(running); let error: Option = tokio::select! { _ = sync_coordinator_tick_interval.tick() => { self.sync_coordinator.lock().await.tick().await.err().map(Into::into) } - _ = token.cancelled() => { - tracing::debug!("DashSpvClient run loop cancelled"); + _ = stop_rx.changed() => { + tracing::debug!("DashSpvClient run loop stop requested"); break None } Some(msg) = monitor_failure_rx.recv() => { diff --git a/dash-spv/src/lib.rs b/dash-spv/src/lib.rs index cabedf76a..3e26c0c97 100644 --- a/dash-spv/src/lib.rs +++ b/dash-spv/src/lib.rs @@ -20,7 +20,6 @@ //! use key_wallet_manager::WalletManager; //! use std::sync::Arc; //! use tokio::sync::RwLock; -//! use tokio_util::sync::CancellationToken; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -41,9 +40,8 @@ //! wallet, //! vec![Arc::new(())], //! ).await?; -//! let shutdown_token = CancellationToken::new(); //! -//! client.run(shutdown_token).await?; +//! client.run().await?; //! //! Ok(()) //! } diff --git a/dash-spv/src/main.rs b/dash-spv/src/main.rs index eb95bd580..677ff274f 100644 --- a/dash-spv/src/main.rs +++ b/dash-spv/src/main.rs @@ -8,7 +8,6 @@ use clap::{Parser, ValueEnum}; use dash_spv::{ClientConfig, DashSpvClient, LevelFilter, MempoolStrategy, Network}; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; -use tokio_util::sync::CancellationToken; /// Network selection for CLI #[derive(Clone, Copy, Debug, ValueEnum)] @@ -315,22 +314,17 @@ async fn run_client( } }; - let shutdown_token = CancellationToken::new(); - let ctrl_c_token = shutdown_token.clone(); + let stop_client = client.clone(); tokio::spawn(async move { - tokio::select! { - result = tokio::signal::ctrl_c() => { - result.ok(); - tracing::debug!("Shutdown signal received"); - } - _ = ctrl_c_token.cancelled() => { - tracing::debug!("Shutdown token cancelled"); + if tokio::signal::ctrl_c().await.is_ok() { + tracing::debug!("Shutdown signal received"); + if let Err(e) = stop_client.stop().await { + tracing::warn!("Error during ctrl-c stop: {}", e); } } - ctrl_c_token.cancel(); }); - client.run(shutdown_token).await?; + client.run().await?; Ok(()) } diff --git a/dash-spv/tests/dashd_masternode/setup.rs b/dash-spv/tests/dashd_masternode/setup.rs index 0ac2ca457..773dd5c07 100644 --- a/dash-spv/tests/dashd_masternode/setup.rs +++ b/dash-spv/tests/dashd_masternode/setup.rs @@ -23,7 +23,6 @@ use tempfile::TempDir; use tokio::sync::{broadcast, watch, RwLock}; use tokio::task::JoinHandle; use tokio::time; -use tokio_util::sync::CancellationToken; /// Timeout for masternode sync tests (masternode sync takes longer than wallet sync). pub(super) const SYNC_TIMEOUT: u64 = 60; @@ -38,14 +37,13 @@ pub(super) struct ClientHandle { pub(super) sync_event_receiver: broadcast::Receiver, pub(super) wallet_event_receiver: broadcast::Receiver, pub(super) _network_event_receiver: broadcast::Receiver, - pub(super) cancel_token: CancellationToken, pub(super) engine: Arc>, } impl ClientHandle { pub(super) async fn stop(&mut self) { - tracing::info!("Cancelling client run loop..."); - self.cancel_token.cancel(); + tracing::info!("Stopping client run loop..."); + self.client.stop().await.expect("client stop failed"); if let Some(handle) = self.run_handle.take() { handle.await.expect("Run task panicked").expect("Run task returned error"); } @@ -185,11 +183,9 @@ pub(super) async fn create_and_start_client( let engine = client.masternode_list_engine().expect("Engine should be initialized after creation"); - let cancel_token = CancellationToken::new(); - let run_token = cancel_token.clone(); let run_client = client.clone(); - let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + let run_handle = tokio::task::spawn(async move { run_client.run().await }); ClientHandle { client, @@ -198,7 +194,6 @@ pub(super) async fn create_and_start_client( sync_event_receiver, wallet_event_receiver, _network_event_receiver, - cancel_token, engine, } } diff --git a/dash-spv/tests/dashd_sync/setup.rs b/dash-spv/tests/dashd_sync/setup.rs index 16c229bb7..27c8b5415 100644 --- a/dash-spv/tests/dashd_sync/setup.rs +++ b/dash-spv/tests/dashd_sync/setup.rs @@ -24,7 +24,6 @@ use std::path::PathBuf; use std::sync::Arc; use tempfile::TempDir; use tokio::sync::{broadcast, watch, RwLock}; -use tokio_util::sync::CancellationToken; /// SPV-specific test context wrapping the shared dashd infrastructure. /// @@ -231,7 +230,7 @@ pub(super) type TestClient = DashSpvClient, PeerNetworkManager, DiskStorageManager>; /// A `ClientHandle` is a utility structure that manages the state and handles for a `TestClient` -/// required to interact with the synchronization process, various event channels, and cancellation capabilities. +/// required to interact with the synchronization process and event channels. pub(super) struct ClientHandle { /// The underlying SPV client instance. pub(super) client: TestClient, @@ -245,16 +244,13 @@ pub(super) struct ClientHandle { pub(super) network_event_receiver: broadcast::Receiver, /// A channel for receiving wallet events. pub(super) wallet_event_receiver: broadcast::Receiver, - /// A cancellation token for the client's run loop. - pub(super) cancel_token: CancellationToken, } impl ClientHandle { - /// Stops the execution of the client run loop by canceling its associated token and awaiting the - /// termination of the background task. + /// Stops the SPV client and awaits the termination of the background run task. pub(super) async fn stop(&mut self) { - tracing::info!("Cancelling client run loop..."); - self.cancel_token.cancel(); + tracing::info!("Stopping client run loop..."); + self.client.stop().await.expect("client stop failed"); if let Some(handle) = self.run_handle.take() { handle.await.expect("Run task panicked").expect("Run task returned error"); } @@ -304,11 +300,9 @@ pub(super) async fn create_and_start_client( let w = client.wallet().read().await; w.subscribe_events() }; - let cancel_token = CancellationToken::new(); - let run_token = cancel_token.clone(); let run_client = client.clone(); - let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + let run_handle = tokio::task::spawn(async move { run_client.run().await }); ClientHandle { client, @@ -317,7 +311,6 @@ pub(super) async fn create_and_start_client( sync_event_receiver, network_event_receiver, wallet_event_receiver, - cancel_token, } } diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index 47602c846..b62cf222e 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -6,7 +6,6 @@ use std::time::Duration; use tempfile::TempDir; use tokio::sync::RwLock; use tokio::time; -use tokio_util::sync::CancellationToken; use dash_spv::client::{ClientConfig, DashSpvClient}; use dash_spv::network::PeerNetworkManager; @@ -53,10 +52,8 @@ async fn test_peer_connection() { let client = DashSpvClient::new(config, network_manager, storage_manager, wallet, vec![]).await.unwrap(); - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); // Give it time to connect to peers time::sleep(Duration::from_secs(5)).await; @@ -65,7 +62,7 @@ async fn test_peer_connection() { let peer_count = client.peer_count().await; assert!(peer_count > 0, "Should have connected to at least one peer"); - cancel.cancel(); + client.stop().await.expect("Should stop"); let _ = handle.await; } @@ -92,17 +89,15 @@ async fn test_peer_persistence() { .await .unwrap(); - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); time::sleep(Duration::from_secs(5)).await; let peer_count = client.peer_count().await; assert!(peer_count > 0, "Should have connected to peers"); - cancel.cancel(); + client.stop().await.expect("Should stop"); let _ = handle.await; } @@ -122,11 +117,9 @@ async fn test_peer_persistence() { .unwrap(); // Should connect faster due to saved peers - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); let start = tokio::time::Instant::now(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); // Wait for connection but with shorter timeout time::sleep(Duration::from_secs(3)).await; @@ -137,7 +130,7 @@ async fn test_peer_persistence() { let elapsed = start.elapsed(); println!("Connected to {} peers in {:?} (using saved peers)", peer_count, elapsed); - cancel.cancel(); + client.stop().await.expect("Should stop"); let _ = handle.await; } } diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index 7ba508b7e..ef374ebfc 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; use dash_spv::network::PeerNetworkManager; use dash_spv::storage::DiskStorageManager; @@ -49,24 +48,33 @@ async fn test_spv_client_creation() { async fn test_spv_client_run_stop() { let client = create_test_client().await; - let token = CancellationToken::new(); - let cancel = token.clone(); - - let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); - - tokio::time::timeout(Duration::from_secs(5), async { - while !client.is_running().await { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .expect("client failed to start"); - - cancel.cancel(); - handle.await.unwrap().unwrap(); - - assert!(!client.is_running().await); + // Run twice on the same instance: the watch must re-arm after stop(), so a + // second run() succeeds. + for _ in 0..2 { + let run_client = client.clone(); + let handle = tokio::spawn(async move { run_client.run().await }); + + tokio::time::timeout(Duration::from_secs(5), async { + while !client.is_running() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("client failed to start"); + + client.stop().await.unwrap(); + + // stop() wakes the run loop immediately via the watch rather than only + // on the next 100ms coordinator tick, so the join completes far inside + // this bound. A hang (e.g. a missed wakeup) would trip the timeout. + tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("run task did not exit promptly after stop") + .unwrap() + .unwrap(); + + assert!(!client.is_running()); + } } #[tokio::test]