diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8226689de..827c80e27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,6 +97,7 @@ jobs: uses: dtolnay/rust-toolchain@stable with: targets: ${{ matrix.settings.target }} + components: clippy - name: Rust cache uses: swatinem/rust-cache@v2 diff --git a/crates/recording/examples/recording-cli.rs b/crates/recording/examples/recording-cli.rs index 5e35f42d5..54c6c0e36 100644 --- a/crates/recording/examples/recording-cli.rs +++ b/crates/recording/examples/recording-cli.rs @@ -66,7 +66,7 @@ pub async fn main() { id: Display::primary().id(), }, ) - // .with_system_audio(true) + .with_system_audio(true) // .with_camera_feed(std::sync::Arc::new( // camera_feed.ask(feeds::camera::Lock).await.unwrap(), // )) diff --git a/crates/recording/src/sources/screen_capture/windows.rs b/crates/recording/src/sources/screen_capture/windows.rs index d152635a7..819cec845 100644 --- a/crates/recording/src/sources/screen_capture/windows.rs +++ b/crates/recording/src/sources/screen_capture/windows.rs @@ -6,7 +6,9 @@ use ::windows::{ use cap_fail::fail_err; use cap_timestamp::{PerformanceCounterTimestamp, Timestamps}; use cpal::traits::{DeviceTrait, HostTrait}; +use futures::channel::oneshot; use kameo::prelude::*; +use scap_direct3d::StopCapturerError; use scap_ffmpeg::*; use std::{ collections::VecDeque, @@ -368,7 +370,7 @@ impl PipelineSourceTask for ScreenCaptureSource { #[derive(Actor)] struct ScreenCaptureActor { - capture_handle: Option, + stop_tx: Option>>>, error_tx: Sender<()>, d3d_device: ID3D11Device, } @@ -376,7 +378,7 @@ struct ScreenCaptureActor { impl ScreenCaptureActor { pub fn new(error_tx: Sender<()>, d3d_device: ID3D11Device) -> Self { Self { - capture_handle: None, + stop_tx: None, error_tx, d3d_device, } @@ -411,7 +413,7 @@ impl Message for ScreenCaptureActor { msg: StartCapturing, _: &mut Context, ) -> Self::Reply { - if self.capture_handle.is_some() { + if self.stop_tx.is_some() { return Err(StartCapturingError::AlreadyCapturing); } @@ -424,51 +426,101 @@ impl Message for ScreenCaptureActor { let error_tx = self.error_tx.clone(); - let mut capture_handle = scap_direct3d::Capturer::new( - msg.target, - msg.settings, - move |frame| { - let _ = msg.frame_handler.tell(NewFrame(frame)).try_send(); + let (ready_tx, ready_rx) = oneshot::channel(); - Ok(()) - }, - move || { - let _ = error_tx.send(()); + let (stop_tx, stop_rx) = + std::sync::mpsc::sync_channel::>>(1); - Ok(()) - }, - Some(self.d3d_device.clone()), - ) - .map_err(StartCapturingError::CreateCapturer)?; + let d3d_device = self.d3d_device.clone(); + std::thread::spawn(move || { + cap_mediafoundation_utils::thread_init(); + + let res = (|| { + let mut capture_handle = scap_direct3d::Capturer::new( + msg.target, + msg.settings, + move |frame| { + let _ = msg.frame_handler.tell(NewFrame(frame)).try_send(); + + Ok(()) + }, + move || { + let _ = error_tx.send(()); + + Ok(()) + }, + Some(d3d_device), + ) + .map_err(StartCapturingError::CreateCapturer)?; + + capture_handle + .start() + .map_err(StartCapturingError::StartCapturer)?; + + Ok::<_, StartCapturingError>(capture_handle) + })(); + + let mut capturer = match res { + Ok(capturer) => { + let _ = ready_tx.send(Ok(())); + capturer + } + Err(e) => { + let _ = ready_tx.send(Err(e)); + return; + } + }; - capture_handle - .start() - .map_err(StartCapturingError::StartCapturer)?; + let stop_channel = stop_rx.recv(); - info!("Capturer started"); + let res = capturer.stop(); - self.capture_handle = Some(capture_handle); + if let Ok(stop_channel) = stop_channel { + let _ = stop_channel.send(res); + } + }); + + match ready_rx.await { + Ok(res) => res?, + Err(_) => { + return Err(StartCapturingError::StartCapturer( + ::windows::core::Error::new( + ::windows::core::HRESULT(0x80004005u32 as i32), + "Capturer thread dropped ready channel", + ), + )); + } + } + + info!("Capturer started"); + self.stop_tx = Some(stop_tx); Ok(()) } } impl Message for ScreenCaptureActor { - type Reply = Result<(), StopCapturingError>; + type Reply = Result<(), String>; async fn handle( &mut self, _: StopCapturing, _: &mut Context, ) -> Self::Reply { - let Some(mut capturer) = self.capture_handle.take() else { - return Err(StopCapturingError::NotCapturing); + let Some(stop_tx) = self.stop_tx.take() else { + return Err("Not Capturing".to_string()); }; - if let Err(e) = capturer.stop() { + let (done_tx, done_rx) = oneshot::channel(); + if let Err(e) = stop_tx.send(done_tx) { error!("Silently failed to stop Windows capturer: {}", e); } + match done_rx.await { + Ok(res) => res.map_err(|e| e.to_string())?, + Err(_) => return Err("Capturer thread dropped stop channel".to_string()), + } + info!("stopped windows capturer"); Ok(())