diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index d5ccc3c6f7..b2152ba350 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -50,6 +50,7 @@ use cap_recording::{ use cap_rendering::{ProjectRecordingsMeta, RenderedFrame}; use clipboard_rs::common::RustImage; use clipboard_rs::{Clipboard, ClipboardContext}; +use cpal::StreamError; use editor_window::{EditorInstances, WindowEditorInstance}; use ffmpeg::ffi::AV_TIME_BASE; use general_settings::GeneralSettingsStore; @@ -109,6 +110,8 @@ pub struct App { recording_state: RecordingState, recording_logging_handle: LoggingHandle, mic_feed: ActorRef, + mic_meter_sender: flume::Sender, + selected_mic_label: Option, camera_feed: ActorRef, server_url: String, logs_dir: PathBuf, @@ -168,6 +171,32 @@ impl App { } } + async fn restart_mic_feed(&mut self) -> Result<(), String> { + info!("Restarting microphone feed after actor shutdown"); + + let (error_tx, error_rx) = flume::bounded(1); + let mic_feed = MicrophoneFeed::spawn(MicrophoneFeed::new(error_tx)); + + spawn_mic_error_logger(error_rx); + + mic_feed + .ask(microphone::AddSender(self.mic_meter_sender.clone())) + .await + .map_err(|e| e.to_string())?; + + if let Some(label) = self.selected_mic_label.clone() { + let ready = mic_feed + .ask(microphone::SetInput { label }) + .await + .map_err(|e| e.to_string())?; + ready.await.map_err(|e| e.to_string())?; + } + + self.mic_feed = mic_feed; + + Ok(()) + } + async fn add_recording_logging_handle(&mut self, path: &PathBuf) -> Result<(), String> { let logfile = std::fs::File::create(path).map_err(|e| format!("Failed to create logfile: {e}"))?; @@ -208,8 +237,9 @@ impl App { #[instrument(skip(state))] async fn set_mic_input(state: MutableState<'_, App>, label: Option) -> Result<(), String> { let mic_feed = state.read().await.mic_feed.clone(); + let desired_label = label.clone(); - match label { + match desired_label.as_ref() { None => { mic_feed .ask(microphone::RemoveInput) @@ -218,7 +248,9 @@ async fn set_mic_input(state: MutableState<'_, App>, label: Option) -> R } Some(label) => { mic_feed - .ask(feeds::microphone::SetInput { label }) + .ask(feeds::microphone::SetInput { + label: label.clone(), + }) .await .map_err(|e| e.to_string())? .await @@ -226,6 +258,11 @@ async fn set_mic_input(state: MutableState<'_, App>, label: Option) -> R } } + { + let mut app = state.write().await; + app.selected_mic_label = desired_label; + } + Ok(()) } @@ -271,6 +308,16 @@ async fn set_camera_input( Ok(()) } +fn spawn_mic_error_logger(error_rx: flume::Receiver) { + tokio::spawn(async move { + let Ok(err) = error_rx.recv_async().await else { + return; + }; + + error!("Mic feed actor error: {err}"); + }); +} + #[derive(specta::Type, Serialize, tauri_specta::Event, Clone)] pub struct RecordingOptionsChanged; @@ -1946,6 +1993,7 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { let (camera_tx, camera_ws_port, _shutdown) = camera_legacy::create_camera_preview_ws().await; let (mic_samples_tx, mic_samples_rx) = flume::bounded(8); + let mic_meter_sender = mic_samples_tx.clone(); let camera_feed = CameraFeed::spawn(CameraFeed::default()); let _ = camera_feed.ask(feeds::camera::AddSender(camera_tx)).await; @@ -1955,18 +2003,14 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { let mic_feed = MicrophoneFeed::spawn(MicrophoneFeed::new(error_tx)); - // TODO: make this part of a global actor one day - tokio::spawn(async move { - let Ok(err) = error_rx.recv_async().await else { - return; - }; - - error!("Mic feed actor error: {err}"); - }); + spawn_mic_error_logger(error_rx); - let _ = mic_feed + if let Err(err) = mic_feed .ask(feeds::microphone::AddSender(mic_samples_tx)) - .await; + .await + { + error!("Failed to attach audio meter sender: {err}"); + } mic_feed }; @@ -2136,6 +2180,8 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { recording_state: RecordingState::None, recording_logging_handle, mic_feed, + mic_meter_sender, + selected_mic_label: None, camera_feed, server_url, logs_dir: logs_dir.clone(), diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index c5e4316b5c..e999788c23 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use cap_fail::fail; use cap_project::CursorMoveEvent; use cap_project::cursor::SHORT_CURSOR_SHAPE_DEBOUNCE_MS; @@ -12,6 +13,7 @@ use cap_recording::{ RecordingMode, feeds::{camera, microphone}, instant_recording, + sources::MicrophoneSourceError, sources::{ screen_capture, screen_capture::{CaptureDisplay, CaptureWindow, ScreenCaptureTarget}, @@ -20,12 +22,14 @@ use cap_recording::{ }; use cap_rendering::ProjectRecordingsMeta; use cap_utils::{ensure_dir, spawn_actor}; -use futures::stream; +use futures::{FutureExt, stream}; use serde::{Deserialize, Serialize}; use specta::Type; use std::{ + any::Any, collections::{HashMap, VecDeque}, - path::PathBuf, + panic::AssertUnwindSafe, + path::{Path, PathBuf}, str::FromStr, sync::Arc, time::Duration, @@ -56,6 +60,7 @@ use crate::{ windows::{CapWindowId, ShowCapWindow}, }; +#[derive(Clone)] pub struct InProgressRecordingCommon { pub target_name: String, pub inputs: StartRecordingInputs, @@ -77,6 +82,23 @@ pub enum InProgressRecording { }, } +#[cfg(target_os = "macos")] +#[derive(Clone)] +struct SendableShareableContent(cidre::arc::R); + +#[cfg(target_os = "macos")] +impl SendableShareableContent { + fn retained(&self) -> cidre::arc::R { + self.0.clone() + } +} + +#[cfg(target_os = "macos")] +unsafe impl Send for SendableShareableContent {} + +#[cfg(target_os = "macos")] +unsafe impl Sync for SendableShareableContent {} + impl InProgressRecording { pub fn capture_target(&self) -> &ScreenCaptureTarget { match self { @@ -444,181 +466,197 @@ pub async fn start_recording( debug!("spawning start_recording actor"); - // done in spawn to catch panics just in case let app_handle = app.clone(); - let spawn_actor_res = async { - spawn_actor({ - let state_mtx = Arc::clone(&state_mtx); - let general_settings = general_settings.cloned(); - let recording_dir = recording_dir.clone(); - async move { - fail!("recording::spawn_actor"); - let mut state = state_mtx.write().await; - - use kameo::error::SendError; - let mic_feed = match state.mic_feed.ask(microphone::Lock).await { - Ok(lock) => Some(Arc::new(lock)), - Err(SendError::HandlerError(microphone::LockFeedError::NoInput)) => None, - Err(e) => return Err(e.to_string()), - }; + let actor_task = { + let state_mtx = Arc::clone(&state_mtx); + let general_settings = general_settings.cloned(); + let recording_dir = recording_dir.clone(); + let target_name = target_name.clone(); + let inputs = inputs.clone(); + async move { + fail!("recording::spawn_actor"); + let mut state = state_mtx.write().await; - let camera_feed = match state.camera_feed.ask(camera::Lock).await { - Ok(lock) => Some(Arc::new(lock)), - Err(SendError::HandlerError(camera::LockFeedError::NoInput)) => None, - Err(e) => return Err(e.to_string()), - }; + use kameo::error::SendError; - #[cfg(target_os = "macos")] - let shareable_content = crate::platform::get_shareable_content() - .await - .map_err(|e| format!("GetShareableContent: {e}"))? - .ok_or_else(|| "GetShareableContent/NotAvailable".to_string())?; + let camera_feed = match state.camera_feed.ask(camera::Lock).await { + Ok(lock) => Some(Arc::new(lock)), + Err(SendError::HandlerError(camera::LockFeedError::NoInput)) => None, + Err(e) => return Err(anyhow!(e.to_string())), + }; - let common = InProgressRecordingCommon { - target_name, - inputs: inputs.clone(), - recording_dir: recording_dir.clone(), - }; + #[cfg(target_os = "macos")] + let shareable_content = SendableShareableContent( + crate::platform::get_shareable_content() + .await + .map_err(|e| anyhow!(format!("GetShareableContent: {e}")))? + .ok_or_else(|| anyhow!("GetShareableContent/NotAvailable"))?, + ); - #[cfg(target_os = "macos")] - let excluded_windows = { - let window_exclusions = general_settings - .as_ref() - .map_or_else(general_settings::default_excluded_windows, |settings| { - settings.excluded_windows.clone() - }); + let common = InProgressRecordingCommon { + target_name, + inputs: inputs.clone(), + recording_dir: recording_dir.clone(), + }; - crate::window_exclusion::resolve_window_ids(&window_exclusions) - }; + #[cfg(target_os = "macos")] + let excluded_windows = { + let window_exclusions = general_settings + .as_ref() + .map_or_else(general_settings::default_excluded_windows, |settings| { + settings.excluded_windows.clone() + }); - let actor = match inputs.mode { - RecordingMode::Studio => { - let mut builder = studio_recording::Actor::builder( - recording_dir.clone(), - inputs.capture_target.clone(), - ) - .with_system_audio(inputs.capture_system_audio) - .with_custom_cursor( - general_settings - .map(|s| s.custom_cursor_capture) - .unwrap_or_default(), - ); - - #[cfg(target_os = "macos")] - { - builder = builder.with_excluded_windows(excluded_windows.clone()); - } + crate::window_exclusion::resolve_window_ids(&window_exclusions) + }; - if let Some(camera_feed) = camera_feed { - builder = builder.with_camera_feed(camera_feed); - } + let mut mic_restart_attempts = 0; - if let Some(mic_feed) = mic_feed { - builder = builder.with_mic_feed(mic_feed); - } + let done_fut = loop { + let mic_feed = match state.mic_feed.ask(microphone::Lock).await { + Ok(lock) => Some(Arc::new(lock)), + Err(SendError::HandlerError(microphone::LockFeedError::NoInput)) => None, + Err(e) => return Err(anyhow!(e.to_string())), + }; - let handle = builder - .build( - #[cfg(target_os = "macos")] - shareable_content, + let actor_result: Result = async { + match inputs.mode { + RecordingMode::Studio => { + let mut builder = studio_recording::Actor::builder( + recording_dir.clone(), + inputs.capture_target.clone(), ) - .await - .map_err(|e| { - error!("Failed to spawn studio recording actor: {e:#}"); - format!("{e:#}") - })?; + .with_system_audio(inputs.capture_system_audio) + .with_custom_cursor( + general_settings + .as_ref() + .map(|s| s.custom_cursor_capture) + .unwrap_or_default(), + ); - InProgressRecording::Studio { handle, common } - } - RecordingMode::Instant => { - let Some(video_upload_info) = video_upload_info.clone() else { - return Err("Video upload info not found".to_string()); - }; - - let mut builder = instant_recording::Actor::builder( - recording_dir.clone(), - inputs.capture_target.clone(), - ) - .with_system_audio(inputs.capture_system_audio) - .with_max_output_size( - general_settings - .as_ref() - .map(|settings| settings.instant_mode_max_resolution) - .unwrap_or_else(|| 1920), - ); - - #[cfg(target_os = "macos")] - { - builder = builder.with_excluded_windows(excluded_windows.clone()); - } + #[cfg(target_os = "macos")] + { + builder = builder.with_excluded_windows(excluded_windows.clone()); + } - if let Some(mic_feed) = mic_feed { - builder = builder.with_mic_feed(mic_feed); - } + if let Some(camera_feed) = camera_feed.clone() { + builder = builder.with_camera_feed(camera_feed); + } - let handle = builder - .build( - #[cfg(target_os = "macos")] - shareable_content, - ) - .await - .map_err(|e| { - error!("Failed to spawn instant recording actor: {e}"); - e.to_string() - })?; - - let progressive_upload = InstantMultipartUpload::spawn( - app_handle, - recording_dir.join("content/output.mp4"), - video_upload_info.clone(), - recording_dir.clone(), - Some(finish_upload_rx), - ); - - InProgressRecording::Instant { - handle, - progressive_upload, - video_upload_info, - common, - camera_feed, - } - } - }; + if let Some(mic_feed) = mic_feed.clone() { + builder = builder.with_mic_feed(mic_feed); + } - let done_fut = actor.done_fut(); + let handle = builder + .build( + #[cfg(target_os = "macos")] + shareable_content.retained(), + ) + .await + .map_err(|e| { + error!("Failed to spawn studio recording actor: {e:#}"); + e + })?; + + Ok(InProgressRecording::Studio { + handle, + common: common.clone(), + }) + } + RecordingMode::Instant => { + let Some(video_upload_info) = video_upload_info.clone() else { + return Err(anyhow!("Video upload info not found")); + }; + + let mut builder = instant_recording::Actor::builder( + recording_dir.clone(), + inputs.capture_target.clone(), + ) + .with_system_audio(inputs.capture_system_audio) + .with_max_output_size( + general_settings + .as_ref() + .map(|settings| settings.instant_mode_max_resolution) + .unwrap_or(1920), + ); - state.set_current_recording(actor); + #[cfg(target_os = "macos")] + { + builder = builder.with_excluded_windows(excluded_windows.clone()); + } - Ok::<_, String>(done_fut) - } - }) - .await - .map_err(|e| format!("Failed to spawn recording actor: {e}")) - } - .await; + if let Some(mic_feed) = mic_feed.clone() { + builder = builder.with_mic_feed(mic_feed); + } - let actor_done_fut = match spawn_actor_res { - Ok(Ok(rx)) => rx, - Ok(Err(err)) | Err(err) => { - let _ = RecordingEvent::Failed { error: err.clone() }.emit(&app); + let handle = builder + .build( + #[cfg(target_os = "macos")] + shareable_content.retained(), + ) + .await + .map_err(|e| { + error!("Failed to spawn instant recording actor: {e:#}"); + e + })?; + + let progressive_upload = InstantMultipartUpload::spawn( + app_handle.clone(), + recording_dir.join("content/output.mp4"), + video_upload_info.clone(), + recording_dir.clone(), + Some(finish_upload_rx.clone()), + ); - let mut dialog = MessageDialogBuilder::new( - app.dialog().clone(), - "An error occurred".to_string(), - err.clone(), - ) - .kind(tauri_plugin_dialog::MessageDialogKind::Error); + Ok(InProgressRecording::Instant { + handle, + progressive_upload, + video_upload_info, + common: common.clone(), + camera_feed: camera_feed.clone(), + }) + } + } + } + .await; - if let Some(window) = CapWindowId::RecordingControls.get(&app) { - dialog = dialog.parent(&window); - } + match actor_result { + Ok(actor) => { + let done_fut = actor.done_fut(); + state.set_current_recording(actor); + break done_fut; + } + Err(err) if mic_restart_attempts == 0 && mic_actor_not_running(&err) => { + mic_restart_attempts += 1; + state + .restart_mic_feed() + .await + .map_err(|restart_err| anyhow!(restart_err))?; + } + Err(err) => return Err(err), + } + }; - dialog.blocking_show(); + Ok::<_, anyhow::Error>(done_fut) + } + }; - let mut state = state_mtx.write().await; - let _ = handle_recording_end(app, Err(err.clone()), &mut state, recording_dir).await; + let actor_task_res = AssertUnwindSafe(actor_task).catch_unwind().await; - return Err(err); + let actor_done_fut = match actor_task_res { + Ok(Ok(rx)) => rx, + Ok(Err(err)) => { + let message = format!("{err:#}"); + handle_spawn_failure(&app, &state_mtx, recording_dir.as_path(), message.clone()) + .await?; + return Err(message); + } + Err(panic) => { + let panic_msg = panic_message(panic); + let message = format!("Failed to spawn recording actor: {panic_msg}"); + handle_spawn_failure(&app, &state_mtx, recording_dir.as_path(), message.clone()) + .await?; + return Err(message); } }; @@ -697,6 +735,62 @@ pub async fn resume_recording(state: MutableState<'_, App>) -> Result<(), String Ok(()) } +async fn handle_spawn_failure( + app: &AppHandle, + state_mtx: &MutableState<'_, App>, + recording_dir: &Path, + message: String, +) -> Result<(), String> { + let _ = RecordingEvent::Failed { + error: message.clone(), + } + .emit(app); + + let mut dialog = MessageDialogBuilder::new( + app.dialog().clone(), + "An error occurred".to_string(), + message.clone(), + ) + .kind(tauri_plugin_dialog::MessageDialogKind::Error); + + if let Some(window) = CapWindowId::RecordingControls.get(app) { + dialog = dialog.parent(&window); + } + + dialog.blocking_show(); + + let mut state = state_mtx.write().await; + let _ = handle_recording_end( + app.clone(), + Err(message), + &mut state, + recording_dir.to_path_buf(), + ) + .await; + + Ok(()) +} + +fn panic_message(panic: Box) -> String { + if let Some(msg) = panic.downcast_ref::<&str>() { + msg.to_string() + } else if let Some(msg) = panic.downcast_ref::() { + msg.clone() + } else { + "unknown panic".to_string() + } +} + +fn mic_actor_not_running(err: &anyhow::Error) -> bool { + err.chain().any(|cause| { + if let Some(source) = cause.downcast_ref::() { + matches!(source, MicrophoneSourceError::ActorNotRunning) + } else { + false + } + }) +} + #[tauri::command] #[specta::specta] #[instrument(skip(app, state))] diff --git a/crates/recording/src/sources/microphone.rs b/crates/recording/src/sources/microphone.rs index 3bcd84022c..63f6ff8c68 100644 --- a/crates/recording/src/sources/microphone.rs +++ b/crates/recording/src/sources/microphone.rs @@ -2,11 +2,12 @@ use crate::{ feeds::microphone::{self, MicrophoneFeedLock}, output_pipeline::{AudioFrame, AudioSource}, }; -use anyhow::anyhow; use cap_media_info::AudioInfo; use cpal::SampleFormat; use futures::{SinkExt, channel::mpsc}; +use kameo::error::SendError; use std::{borrow::Cow, sync::Arc}; +use thiserror::Error; const MICROPHONE_TARGET_CHANNELS: u16 = 1; @@ -15,6 +16,14 @@ pub struct Microphone { _lock: Arc, } +#[derive(Debug, Error)] +pub enum MicrophoneSourceError { + #[error("microphone actor not running")] + ActorNotRunning, + #[error("failed to add microphone sender: {0}")] + AddSenderFailed(SendError<()>), +} + impl AudioSource for Microphone { type Config = Arc; @@ -37,7 +46,10 @@ impl AudioSource for Microphone { feed_lock .ask(microphone::AddSender(tx)) .await - .map_err(|e| anyhow!("Failed to add camera sender: {e}"))?; + .map_err(|err| match err { + SendError::ActorNotRunning(_) => MicrophoneSourceError::ActorNotRunning, + other => MicrophoneSourceError::AddSenderFailed(other.map_msg(|_| ())), + })?; tokio::spawn(async move { while let Ok(frame) = rx.recv_async().await { diff --git a/crates/recording/src/sources/screen_capture/macos.rs b/crates/recording/src/sources/screen_capture/macos.rs index 2482d49abb..248585166d 100644 --- a/crates/recording/src/sources/screen_capture/macos.rs +++ b/crates/recording/src/sources/screen_capture/macos.rs @@ -17,12 +17,12 @@ use std::{ }, time::Duration, }; -use tokio::sync::broadcast; +use tokio::{select, sync::broadcast}; use tokio_util::{ future::FutureExt as _, sync::{CancellationToken, DropGuard}, }; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Debug)] pub struct CMSampleBufferCapture; @@ -269,7 +269,6 @@ pub struct CaptureError(pub arc::R); struct Capturer { started: Arc, capturer: Arc, - // error_rx: broadcast::Receiver>, } impl Clone for Capturer { @@ -283,32 +282,53 @@ impl Clone for Capturer { } impl Capturer { - fn new( - capturer: Arc, - // error_rx: broadcast::Receiver>, - ) -> Self { + fn new(capturer: Arc) -> Self { Self { started: Arc::new(AtomicBool::new(false)), capturer, - // error_rx, } } - async fn start(&mut self) -> anyhow::Result<()> { - if !self.started.fetch_xor(true, atomic::Ordering::Relaxed) { - self.capturer.start().await?; + async fn start(&self) -> anyhow::Result<()> { + if self + .started + .compare_exchange( + false, + true, + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + ) + .is_ok() + { + self.capturer + .start() + .await + .map_err(|err| anyhow!(format!("{err}")))?; } Ok(()) } - async fn stop(&mut self) -> anyhow::Result<()> { - if self.started.fetch_xor(true, atomic::Ordering::Relaxed) { + async fn stop(&self) -> anyhow::Result<()> { + if self + .started + .compare_exchange( + true, + false, + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + ) + .is_ok() + { self.capturer.stop().await.context("capturer_stop")?; } Ok(()) } + + fn mark_stopped(&self) { + self.started.store(false, atomic::Ordering::Relaxed); + } } pub struct VideoSourceConfig { @@ -332,29 +352,66 @@ impl output_pipeline::VideoSource for VideoSource { type Frame = VideoFrame; async fn setup( - mut config: Self::Config, + config: Self::Config, video_tx: mpsc::Sender, ctx: &mut SetupCtx, ) -> anyhow::Result where Self: Sized, { - ctx.tasks().spawn("screen-capture", async move { - if let Ok(err) = config.error_rx.recv().await { - return Err(anyhow!("{err}")); - } + let VideoSourceConfig { + inner, + capturer, + mut error_rx, + cancel_token, + drop_guard, + video_frame_counter, + } = config; + + let monitor_capturer = capturer.clone(); + let monitor_cancel = cancel_token.clone(); + ctx.tasks().spawn("screen-capture-monitor", async move { + loop { + select! { + _ = monitor_cancel.cancelled() => break Ok(()), + recv = error_rx.recv() => { + let err = match recv { + Ok(err) => err, + Err(broadcast::error::RecvError::Closed) => break Ok(()), + Err(broadcast::error::RecvError::Lagged(_)) => { + warn!("Screen capture error channel lagged; continuing"); + continue; + } + }; - Ok(()) + if is_system_stop_error(err.as_ref()) { + warn!("Screen capture stream stopped by the system; attempting restart"); + if monitor_cancel.is_cancelled() { + break Ok(()); + } + monitor_capturer.mark_stopped(); + if let Err(restart_err) = monitor_capturer.start().await { + return Err(anyhow!(format!( + "Failed to restart ScreenCaptureKit stream: {restart_err:#}" + ))); + } + continue; + } + + return Err(anyhow!(format!("{err}"))); + } + } + } }); - ChannelVideoSource::setup(config.inner, video_tx, ctx) + ChannelVideoSource::setup(inner, video_tx, ctx) .await .map(|source| Self { inner: source, - capturer: config.capturer, - cancel_token: config.cancel_token, - _drop_guard: config.drop_guard, - video_frame_counter: config.video_frame_counter, + capturer, + cancel_token, + _drop_guard: drop_guard, + video_frame_counter, }) } @@ -402,6 +459,16 @@ impl output_pipeline::VideoSource for VideoSource { } } +fn is_system_stop_error(err: &ns::Error) -> bool { + const SCK_ERROR_DOMAIN: &str = "com.apple.ScreenCaptureKit.error"; + + if err.domain().to_string() != SCK_ERROR_DOMAIN { + return false; + } + + err.localized_description().to_string() == "Stream was stopped by the system" +} + pub struct SystemAudioSourceConfig( ChannelAudioSourceConfig, Capturer, @@ -414,22 +481,36 @@ impl output_pipeline::AudioSource for SystemAudioSource { type Config = SystemAudioSourceConfig; fn setup( - mut config: Self::Config, + config: Self::Config, tx: mpsc::Sender, ctx: &mut SetupCtx, ) -> impl Future> + 'static where Self: Sized, { + let SystemAudioSourceConfig(channel_config, capturer, mut error_rx) = config; + ctx.tasks().spawn("system-audio", async move { - if let Ok(err) = config.2.recv().await { - return Err(anyhow!("{err}")); + loop { + match error_rx.recv().await { + Ok(err) => { + if is_system_stop_error(err.as_ref()) { + warn!("Screen capture audio stream stopped by the system; awaiting restart"); + continue; + } + + return Err(anyhow!("{err}")); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } } Ok(()) }); - ChannelAudioSource::setup(config.0, tx, ctx).map(|v| v.map(|source| Self(source, config.1))) + ChannelAudioSource::setup(channel_config, tx, ctx) + .map(|v| v.map(|source| Self(source, capturer))) } async fn start(&mut self) -> anyhow::Result<()> {