diff --git a/Cargo.lock b/Cargo.lock index e70ba138a..a70de31f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14754,6 +14754,7 @@ dependencies = [ "port-killer", "port_check", "ractor", + "ractor-supervisor", "reqwest 0.12.24", "rodio", "serde", diff --git a/plugins/local-stt/Cargo.toml b/plugins/local-stt/Cargo.toml index caac1efa4..2105fd416 100644 --- a/plugins/local-stt/Cargo.toml +++ b/plugins/local-stt/Cargo.toml @@ -70,11 +70,13 @@ tower-http = { workspace = true, features = ["cors", "trace"] } backon = { workspace = true } futures-util = { workspace = true } -ractor = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true, features = ["rt", "macros"] } tokio-util = { workspace = true } tracing = { workspace = true } +ractor = { workspace = true } +ractor-supervisor = { workspace = true } + port-killer = "0.1.0" port_check = "0.3.0" diff --git a/plugins/local-stt/src/error.rs b/plugins/local-stt/src/error.rs index c93c49ceb..067eca4fd 100644 --- a/plugins/local-stt/src/error.rs +++ b/plugins/local-stt/src/error.rs @@ -10,16 +10,14 @@ pub enum Error { HyprFileError(#[from] hypr_file::Error), #[error(transparent)] ShellError(#[from] tauri_plugin_shell::Error), - #[error(transparent)] - TauriError(#[from] tauri::Error), - #[error(transparent)] - IoError(#[from] std::io::Error), #[error("Model not downloaded")] ModelNotDownloaded, - #[error("Server already running")] - ServerAlreadyRunning, #[error("Server start failed {0}")] ServerStartFailed(String), + #[error("Server stop failed {0}")] + ServerStopFailed(String), + #[error("Supervisor not found")] + SupervisorNotFound, #[error("AM binary not found")] AmBinaryNotFound, #[error("AM API key not set")] diff --git a/plugins/local-stt/src/ext.rs b/plugins/local-stt/src/ext.rs index e9e762bb8..42b4d6706 100644 --- a/plugins/local-stt/src/ext.rs +++ b/plugins/local-stt/src/ext.rs @@ -1,7 +1,6 @@ -use std::{collections::HashMap, future::Future, path::PathBuf, time::Duration}; +use std::{collections::HashMap, future::Future, path::PathBuf, sync::Arc}; -use ractor::{call_t, registry, Actor, ActorRef}; -use tokio::time::sleep; +use ractor::{call_t, registry, ActorRef}; use tokio_util::sync::CancellationToken; use tauri::{ipc::Channel, Manager, Runtime}; @@ -12,13 +11,17 @@ use hypr_file::download_file_parallel_cancellable; use crate::{ model::SupportedSttModel, - server::{external, internal, ServerInfo, ServerStatus, ServerType}, + server::{external, internal, supervisor, ServerInfo, ServerStatus, ServerType}, }; pub trait LocalSttPluginExt { fn models_dir(&self) -> PathBuf; fn list_ggml_backends(&self) -> Vec; + fn get_supervisor( + &self, + ) -> impl Future>; + fn start_server( &self, model: SupportedSttModel, @@ -53,6 +56,15 @@ impl> LocalSttPluginExt for T { hypr_whisper_local::list_ggml_backends() } + async fn get_supervisor(&self) -> Result { + let state = self.state::(); + let guard = state.lock().await; + guard + .stt_supervisor + .clone() + .ok_or(crate::Error::SupervisorNotFound) + } + async fn is_model_downloaded(&self, model: &SupportedSttModel) -> Result { match model { SupportedSttModel::Am(model) => Ok(model.is_downloaded(self.models_dir())?), @@ -77,12 +89,12 @@ impl> LocalSttPluginExt for T { #[tracing::instrument(skip_all)] async fn start_server(&self, model: SupportedSttModel) -> Result { - let t = match &model { + let server_type = match &model { SupportedSttModel::Am(_) => ServerType::External, SupportedSttModel::Whisper(_) => ServerType::Internal, }; - let current_info = match t { + let current_info = match server_type { ServerType::Internal => internal_health().await, ServerType::External => external_health().await, }; @@ -99,164 +111,56 @@ impl> LocalSttPluginExt for T { } } - if matches!(t, ServerType::Internal) && !self.is_model_downloaded(&model).await? { + if matches!(server_type, ServerType::Internal) && !self.is_model_downloaded(&model).await? { return Err(crate::Error::ModelNotDownloaded); } - let am_key = if matches!(t, ServerType::External) { - let state = self.state::(); - let key = { - let guard = state.lock().await; - guard.am_api_key.clone() - }; - let key = key - .filter(|k| !k.is_empty()) - .ok_or(crate::Error::AmApiKeyNotSet)?; - Some(key) - } else { - None - }; - - let cache_dir = self.models_dir(); - let data_dir = self.app_handle().path().app_data_dir().unwrap().join("stt"); + let supervisor = self.get_supervisor().await?; - self.stop_server(None).await?; - // Need some delay - sleep(Duration::from_millis(300)).await; + supervisor::stop_all_stt_servers(&supervisor) + .await + .map_err(|e| crate::Error::ServerStopFailed(e.to_string()))?; - match t { + match server_type { ServerType::Internal => { + let cache_dir = self.models_dir(); let whisper_model = match model { SupportedSttModel::Whisper(m) => m, - _ => { - return Err(crate::Error::UnsupportedModelType); - } + _ => return Err(crate::Error::UnsupportedModelType), }; - let (_server, _) = Actor::spawn( - Some(internal::InternalSTTActor::name()), - internal::InternalSTTActor, - internal::InternalSTTArgs { - model_cache_dir: cache_dir, - model_type: whisper_model, - }, - ) - .await - .map_err(|e| crate::Error::ServerStartFailed(e.to_string()))?; - - internal_health() - .await - .and_then(|info| info.url) - .ok_or_else(|| crate::Error::ServerStartFailed("empty_health".to_string())) + start_internal_server(&supervisor, cache_dir, whisper_model).await } ServerType::External => { + let data_dir = self.app_handle().path().app_data_dir().unwrap().join("stt"); let am_model = match model { SupportedSttModel::Am(m) => m, - _ => { - return Err(crate::Error::UnsupportedModelType); - } - }; - - let am_key = match am_key { - Some(key) => key, - None => { - return Err(crate::Error::AmApiKeyNotSet); - } - }; - - let cmd: tauri_plugin_shell::process::Command = { - #[cfg(debug_assertions)] - { - let passthrough_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../../apps/desktop/src-tauri/resources/passthrough-aarch64-apple-darwin"); - let stt_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join( - "../../apps/desktop/src-tauri/resources/stt-aarch64-apple-darwin", - ); - - if !passthrough_path.exists() || !stt_path.exists() { - return Err(crate::Error::AmBinaryNotFound); - } - - self.shell() - .command(passthrough_path) - .current_dir(dirs::home_dir().unwrap()) - .arg(stt_path) - .args(["serve", "--any-token", "-v", "-d"]) - } - - #[cfg(not(debug_assertions))] - self.shell() - .sidecar("stt")? - .current_dir(dirs::home_dir().unwrap()) - .args(["serve", "--any-token"]) + _ => return Err(crate::Error::UnsupportedModelType), }; - let (_server, _) = Actor::spawn( - Some(external::ExternalSTTActor::name()), - external::ExternalSTTActor, - external::ExternalSTTArgs { - cmd, - api_key: am_key, - model: am_model, - models_dir: data_dir, - }, - ) - .await - .map_err(|e| crate::Error::ServerStartFailed(e.to_string()))?; - - external_health() - .await - .and_then(|info| info.url) - .ok_or_else(|| crate::Error::ServerStartFailed("empty_health".to_string())) + start_external_server(self, &supervisor, data_dir, am_model).await } } } #[tracing::instrument(skip_all)] async fn stop_server(&self, server_type: Option) -> Result { - let mut stopped = false; + let supervisor = self.get_supervisor().await?; + match server_type { - Some(ServerType::External) => { - if let Some(cell) = registry::where_is(external::ExternalSTTActor::name()) { - let actor: ActorRef = cell.into(); - if let Err(e) = actor.stop_and_wait(None, None).await { - tracing::error!("stop_server: {:?}", e); - } else { - stopped = true; - } - } - } - Some(ServerType::Internal) => { - if let Some(cell) = registry::where_is(internal::InternalSTTActor::name()) { - let actor: ActorRef = cell.into(); - if let Err(e) = actor.stop_and_wait(None, None).await { - tracing::error!("stop_server: {:?}", e); - } else { - stopped = true; - } - } + Some(t) => { + supervisor::stop_stt_server(&supervisor, t) + .await + .map_err(|e| crate::Error::ServerStopFailed(e.to_string()))?; + Ok(true) } None => { - if let Some(cell) = registry::where_is(external::ExternalSTTActor::name()) { - let actor: ActorRef = cell.into(); - if let Err(e) = actor.stop_and_wait(None, None).await { - tracing::error!("stop_server: {:?}", e); - } else { - stopped = true; - } - } - if let Some(cell) = registry::where_is(internal::InternalSTTActor::name()) { - let actor: ActorRef = cell.into(); - if let Err(e) = actor.stop_and_wait(None, None).await { - tracing::error!("stop_server: {:?}", e); - } else { - stopped = true; - } - } + supervisor::stop_all_stt_servers(&supervisor) + .await + .map_err(|e| crate::Error::ServerStopFailed(e.to_string()))?; + Ok(true) } } - - Ok(stopped) } #[tracing::instrument(skip_all)] @@ -410,6 +314,98 @@ impl> LocalSttPluginExt for T { } } +async fn start_internal_server( + supervisor: &supervisor::SupervisorRef, + cache_dir: PathBuf, + model: hypr_whisper_local_model::WhisperModel, +) -> Result { + supervisor::start_internal_stt( + supervisor, + internal::InternalSTTArgs { + model_cache_dir: cache_dir, + model_type: model, + }, + ) + .await + .map_err(|e| crate::Error::ServerStartFailed(e.to_string()))?; + + internal_health() + .await + .and_then(|info| info.url) + .ok_or_else(|| crate::Error::ServerStartFailed("empty_health".to_string())) +} + +async fn start_external_server>( + manager: &T, + supervisor: &supervisor::SupervisorRef, + data_dir: PathBuf, + model: hypr_am::AmModel, +) -> Result { + let am_key = { + let state = manager.state::(); + let key = { + let guard = state.lock().await; + guard.am_api_key.clone() + }; + + key.filter(|k| !k.is_empty()) + .ok_or(crate::Error::AmApiKeyNotSet)? + }; + + let port = port_check::free_local_port() + .ok_or_else(|| crate::Error::ServerStartFailed("failed_to_find_free_port".to_string()))?; + + let app_handle = manager.app_handle().clone(); + let cmd_builder = { + #[cfg(debug_assertions)] + { + let passthrough_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../../apps/desktop/src-tauri/resources/passthrough-aarch64-apple-darwin"); + let stt_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../../apps/desktop/src-tauri/resources/stt-aarch64-apple-darwin"); + + if !passthrough_path.exists() || !stt_path.exists() { + return Err(crate::Error::AmBinaryNotFound); + } + + let passthrough_path = Arc::new(passthrough_path); + let stt_path = Arc::new(stt_path); + external::CommandBuilder::new(move || { + app_handle + .shell() + .command(passthrough_path.as_ref()) + .current_dir(dirs::home_dir().unwrap()) + .arg(stt_path.as_ref()) + .args(["serve", "--any-token", "-v", "-d"]) + }) + } + + #[cfg(not(debug_assertions))] + { + external::CommandBuilder::new(move || { + app_handle + .shell() + .sidecar("stt") + .expect("failed to create sidecar command") + .current_dir(dirs::home_dir().unwrap()) + .args(["serve", "--any-token"]) + }) + } + }; + + supervisor::start_external_stt( + supervisor, + external::ExternalSTTArgs::new(cmd_builder, am_key, model, data_dir, port), + ) + .await + .map_err(|e| crate::Error::ServerStartFailed(e.to_string()))?; + + external_health() + .await + .and_then(|info| info.url) + .ok_or_else(|| crate::Error::ServerStartFailed("empty_health".to_string())) +} + async fn internal_health() -> Option { match registry::where_is(internal::InternalSTTActor::name()) { Some(cell) => { diff --git a/plugins/local-stt/src/lib.rs b/plugins/local-stt/src/lib.rs index d39087be7..5dfbb1f3e 100644 --- a/plugins/local-stt/src/lib.rs +++ b/plugins/local-stt/src/lib.rs @@ -1,3 +1,5 @@ +use ractor::ActorRef; +use ractor_supervisor::dynamic::DynamicSupervisorMsg; use std::collections::HashMap; use tauri::{Manager, Wry}; use tokio_util::sync::CancellationToken; @@ -17,10 +19,10 @@ pub use types::*; pub type SharedState = std::sync::Arc>; -#[derive(Default)] pub struct State { pub am_api_key: Option, pub download_task: HashMap, CancellationToken)>, + pub stt_supervisor: Option>, } const PLUGIN_NAME: &str = "local-stt"; @@ -64,10 +66,26 @@ pub fn init() -> tauri::plugin::TauriPlugin { } }; - app.manage(SharedState::new(tokio::sync::Mutex::new(State { + let state = std::sync::Arc::new(tokio::sync::Mutex::new(State { am_api_key: api_key, - ..Default::default() - }))); + download_task: HashMap::new(), + stt_supervisor: None, + })); + + app.manage(state.clone()); + + tauri::async_runtime::spawn(async move { + match server::supervisor::spawn_stt_supervisor().await { + Ok(supervisor) => { + let mut guard = state.lock().await; + guard.stt_supervisor = Some(supervisor); + tracing::info!("stt_supervisor_spawned"); + } + Err(e) => { + tracing::error!("failed_to_spawn_stt_supervisor: {:?}", e); + } + } + }); Ok(()) }) diff --git a/plugins/local-stt/src/server/external.rs b/plugins/local-stt/src/server/external.rs index 4f8d87ac3..70b2d1c97 100644 --- a/plugins/local-stt/src/server/external.rs +++ b/plugins/local-stt/src/server/external.rs @@ -1,10 +1,10 @@ -use std::path::PathBuf; +use std::{io, path::PathBuf, sync::Arc}; use tauri_plugin_shell::process::{Command, CommandChild}; -use super::{ServerInfo, ServerStatus}; use backon::{ConstantBuilder, Retryable}; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort}; +use super::{ServerInfo, ServerStatus}; use crate::SupportedSttModel; pub enum ExternalSTTMessage { @@ -12,11 +12,48 @@ pub enum ExternalSTTMessage { ProcessTerminated(String), } +#[derive(Clone)] +pub struct CommandBuilder { + factory: Arc Command + Send + Sync>, +} + +impl CommandBuilder { + pub fn new(factory: impl Fn() -> Command + Send + Sync + 'static) -> Self { + Self { + factory: Arc::new(factory), + } + } + + pub fn build(&self) -> Command { + (self.factory)() + } +} + +#[derive(Clone)] pub struct ExternalSTTArgs { - pub cmd: Command, + pub cmd_builder: CommandBuilder, pub api_key: String, pub model: hypr_am::AmModel, pub models_dir: PathBuf, + pub port: u16, +} + +impl ExternalSTTArgs { + pub fn new( + cmd_builder: CommandBuilder, + api_key: String, + model: hypr_am::AmModel, + models_dir: PathBuf, + port: u16, + ) -> Self { + Self { + cmd_builder, + api_key, + model, + models_dir, + port, + } + } } pub struct ExternalSTTState { @@ -37,6 +74,35 @@ impl ExternalSTTActor { } } +fn cleanup_state(state: &mut ExternalSTTState) { + let mut kill_failed = false; + + if let Some(process) = state.process_handle.take() { + if let Err(e) = process.kill() { + if let tauri_plugin_shell::Error::Io(io_err) = &e { + match io_err.kind() { + io::ErrorKind::InvalidInput | io::ErrorKind::NotFound => {} + _ => { + tracing::error!("failed_to_kill_process: {:?}", e); + kill_failed = true; + } + } + } else { + tracing::error!("failed_to_kill_process: {:?}", e); + kill_failed = true; + } + } + } + + if kill_failed { + hypr_host::kill_processes_by_matcher(hypr_host::ProcessMatcher::Sidecar); + } + + if let Some(task) = state.task_handle.take() { + task.abort(); + } +} + #[ractor::async_trait] impl Actor for ExternalSTTActor { type Msg = ExternalSTTMessage; @@ -48,8 +114,16 @@ impl Actor for ExternalSTTActor { myself: ActorRef, args: Self::Arguments, ) -> Result { - let port = port_check::free_local_port().unwrap(); - let (mut rx, child) = args.cmd.args(["--port", &port.to_string()]).spawn()?; + let ExternalSTTArgs { + cmd_builder, + api_key, + model, + models_dir, + port, + } = args; + + let cmd = cmd_builder.build(); + let (mut rx, child) = cmd.args(["--port", &port.to_string()]).spawn()?; let base_url = format!("http://localhost:{}/v1", port); let client = hypr_am::Client::new(&base_url); @@ -92,9 +166,9 @@ impl Actor for ExternalSTTActor { Ok(ExternalSTTState { base_url, - api_key: Some(args.api_key), - model: args.model, - models_dir: args.models_dir, + api_key: Some(api_key), + model, + models_dir, client, process_handle: Some(child), task_handle: Some(task_handle), @@ -139,31 +213,20 @@ impl Actor for ExternalSTTActor { _myself: ActorRef, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - if let Some(process) = state.process_handle.take() { - if let Err(e) = process.kill() { - tracing::error!("failed_to_kill_process: {:?}", e); - } - } - - if let Some(task) = state.task_handle.take() { - task.abort(); - } - - hypr_host::kill_processes_by_matcher(hypr_host::ProcessMatcher::Sidecar); - + cleanup_state(state); Ok(()) } async fn handle( &self, - myself: ActorRef, + _myself: ActorRef, message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { ExternalSTTMessage::ProcessTerminated(e) => { - myself.stop(Some(e)); - Ok(()) + cleanup_state(state); + Err(io::Error::new(io::ErrorKind::Other, e).into()) } ExternalSTTMessage::GetHealth(reply_port) => { let status = match state.client.status().await { diff --git a/plugins/local-stt/src/server/internal.rs b/plugins/local-stt/src/server/internal.rs index 90344ce7f..128d5904c 100644 --- a/plugins/local-stt/src/server/internal.rs +++ b/plugins/local-stt/src/server/internal.rs @@ -16,6 +16,7 @@ pub enum InternalSTTMessage { ServerError(String), } +#[derive(Clone)] pub struct InternalSTTArgs { pub model_type: WhisperModel, pub model_cache_dir: PathBuf, @@ -115,6 +116,7 @@ impl Actor for InternalSTTActor { state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { + InternalSTTMessage::ServerError(e) => Err(e.into()), InternalSTTMessage::GetHealth(reply_port) => { let info = ServerInfo { url: Some(state.base_url.clone()), @@ -128,7 +130,6 @@ impl Actor for InternalSTTActor { Ok(()) } - InternalSTTMessage::ServerError(e) => Err(e.into()), } } } diff --git a/plugins/local-stt/src/server/mod.rs b/plugins/local-stt/src/server/mod.rs index 0cf7de4e4..fe133ab3f 100644 --- a/plugins/local-stt/src/server/mod.rs +++ b/plugins/local-stt/src/server/mod.rs @@ -1,5 +1,6 @@ pub mod external; pub mod internal; +pub mod supervisor; #[derive( Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, specta::Type, diff --git a/plugins/local-stt/src/server/supervisor.rs b/plugins/local-stt/src/server/supervisor.rs new file mode 100644 index 000000000..cb2bec7f8 --- /dev/null +++ b/plugins/local-stt/src/server/supervisor.rs @@ -0,0 +1,132 @@ +use ractor::{concurrency::Duration, registry, ActorCell, ActorProcessingErr, ActorRef}; +use ractor_supervisor::{ + core::{ChildSpec, Restart, SpawnFn, SupervisorError}, + dynamic::{DynamicSupervisor, DynamicSupervisorMsg, DynamicSupervisorOptions}, +}; + +use super::{ + external::{ExternalSTTActor, ExternalSTTArgs}, + internal::{InternalSTTActor, InternalSTTArgs}, + ServerType, +}; + +pub type SupervisorRef = ActorRef; + +pub const INTERNAL_STT_ACTOR_NAME: &str = "internal_stt"; +pub const EXTERNAL_STT_ACTOR_NAME: &str = "external_stt"; +pub const SUPERVISOR_NAME: &str = "stt_supervisor"; + +pub async fn spawn_stt_supervisor() -> Result, ActorProcessingErr> { + let options = DynamicSupervisorOptions { + max_children: Some(1), + max_restarts: 5, + max_window: Duration::from_secs(5), + reset_after: None, + }; + + let (supervisor_ref, _handle) = + DynamicSupervisor::spawn(SUPERVISOR_NAME.to_string(), options).await?; + + Ok(supervisor_ref) +} + +pub async fn start_internal_stt( + supervisor: &ActorRef, + args: InternalSTTArgs, +) -> Result<(), ActorProcessingErr> { + let child_spec = create_internal_child_spec_with_args(args); + DynamicSupervisor::spawn_child(supervisor.clone(), child_spec).await +} + +pub async fn start_external_stt( + supervisor: &ActorRef, + args: ExternalSTTArgs, +) -> Result<(), ActorProcessingErr> { + let child_spec = create_external_child_spec_with_args(args); + DynamicSupervisor::spawn_child(supervisor.clone(), child_spec).await +} + +fn create_internal_child_spec_with_args(args: InternalSTTArgs) -> ChildSpec { + let spawn_fn = SpawnFn::new(move |supervisor: ActorCell, child_id: String| { + let args = args.clone(); + async move { + let (actor_ref, _handle) = + DynamicSupervisor::spawn_linked(child_id, InternalSTTActor, args, supervisor) + .await?; + Ok(actor_ref.get_cell()) + } + }); + + ChildSpec { + id: INTERNAL_STT_ACTOR_NAME.to_string(), + spawn_fn, + restart: Restart::Transient, + backoff_fn: None, + reset_after: None, + } +} + +fn create_external_child_spec_with_args(args: ExternalSTTArgs) -> ChildSpec { + let spawn_fn = SpawnFn::new(move |supervisor: ActorCell, child_id: String| { + let args = args.clone(); + async move { + let (actor_ref, _handle) = + DynamicSupervisor::spawn_linked(child_id, ExternalSTTActor, args, supervisor) + .await?; + Ok(actor_ref.get_cell()) + } + }); + + ChildSpec { + id: EXTERNAL_STT_ACTOR_NAME.to_string(), + spawn_fn, + restart: Restart::Transient, + backoff_fn: None, + reset_after: None, + } +} + +pub async fn stop_stt_server( + supervisor: &ActorRef, + server_type: ServerType, +) -> Result<(), ActorProcessingErr> { + let child_id = match server_type { + ServerType::Internal => INTERNAL_STT_ACTOR_NAME, + ServerType::External => EXTERNAL_STT_ACTOR_NAME, + }; + + let result = DynamicSupervisor::terminate_child(supervisor.clone(), child_id.to_string()).await; + + if let Err(e) = result { + if let Some(supervisor_error) = e.downcast_ref::() { + if matches!(supervisor_error, SupervisorError::ChildNotFound { .. }) { + return Ok(()); + } + } + return Err(e); + } + + match server_type { + ServerType::Internal => wait_for_actor_shutdown(InternalSTTActor::name()).await, + ServerType::External => wait_for_actor_shutdown(ExternalSTTActor::name()).await, + } + + Ok(()) +} + +pub async fn stop_all_stt_servers( + supervisor: &ActorRef, +) -> Result<(), ActorProcessingErr> { + let _ = stop_stt_server(supervisor, ServerType::Internal).await; + let _ = stop_stt_server(supervisor, ServerType::External).await; + Ok(()) +} + +async fn wait_for_actor_shutdown(actor_name: ractor::ActorName) { + for _ in 0..20 { + if registry::where_is(actor_name.clone()).is_none() { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +}