From f8a37ea4d14865f458f35866f3f86751159f59e2 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Fri, 27 Jun 2025 18:09:55 -0700 Subject: [PATCH 01/10] metadata stuffs --- Cargo.lock | 47 +++++++++++++++++++++++---- Cargo.toml | 1 + crates/stt/src/realtime/clova.rs | 1 + crates/stt/src/realtime/deepgram.rs | 5 ++- crates/stt/src/realtime/whisper.rs | 1 + crates/whisper-local/src/model.rs | 6 ++-- crates/whisper-local/src/stream.rs | 22 ++++++------- plugins/listener-interface/Cargo.toml | 2 +- plugins/listener-interface/src/lib.rs | 3 ++ plugins/listener/Cargo.toml | 4 ++- plugins/listener/src/fsm.rs | 2 ++ plugins/local-stt/src/server.rs | 5 ++- 12 files changed, 75 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8407bbdaf..bc2042261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1758,7 +1758,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -2699,7 +2699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -2708,7 +2708,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -7700,7 +7700,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a793df0d7afeac54f95b471d3af7f0d4fb975699f972341a4b76988d49cdf0c" dependencies = [ "cfg-if", - "windows-targets 0.53.0", + "windows-targets 0.48.5", ] [[package]] @@ -10387,7 +10387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.101", @@ -13810,6 +13810,7 @@ dependencies = [ "tracing", "url", "uuid", + "voice_activity_detector", "ws", ] @@ -15330,6 +15331,26 @@ dependencies = [ "url", ] +[[package]] +name = "typed-builder" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "typeid" version = "1.0.3" @@ -15783,6 +15804,20 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "voice_activity_detector" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a78926100321e74c3d74d389e875f26f395b71528f8ea8b7b505c7f31063dcf3" +dependencies = [ + "futures", + "ndarray", + "ort", + "pin-project", + "thiserror 2.0.12", + "typed-builder", +] + [[package]] name = "vsimd" version = "0.8.0" @@ -16307,7 +16342,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index aa7b851a1..5d7672caa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ hound = "3.5.1" realfft = "3.5.0" ringbuf = "0.4.8" rodio = { version = "0.20.1", features = ["symphonia"] } +voice_activity_detector = "0.2.0" kalosm-common = { git = "https://github.com/floneum/floneum", rev = "52967ae" } kalosm-llama = { git = "https://github.com/floneum/floneum", rev = "52967ae" } diff --git a/crates/stt/src/realtime/clova.rs b/crates/stt/src/realtime/clova.rs index 3e8966529..a138dc68b 100644 --- a/crates/stt/src/realtime/clova.rs +++ b/crates/stt/src/realtime/clova.rs @@ -36,6 +36,7 @@ impl RealtimeSpeechToText for hypr_clova::realtime::Client { end_ms: Some(r.transcription.end_timestamp * 1000), confidence: Some(r.transcription.confidence as f32), }], + ..Default::default() })), clova::StreamResponse::Config(_) => None, }, diff --git a/crates/stt/src/realtime/deepgram.rs b/crates/stt/src/realtime/deepgram.rs index a53e6f7ee..58f375c18 100644 --- a/crates/stt/src/realtime/deepgram.rs +++ b/crates/stt/src/realtime/deepgram.rs @@ -82,7 +82,10 @@ impl RealtimeSpeechToText for crate::deepgram::DeepgramClient { }) .collect(); - Some(Ok(ListenOutputChunk { words })) + Some(Ok(ListenOutputChunk { + words, + ..Default::default() + })) } } _ => None, diff --git a/crates/stt/src/realtime/whisper.rs b/crates/stt/src/realtime/whisper.rs index 81ab81e36..873cea917 100644 --- a/crates/stt/src/realtime/whisper.rs +++ b/crates/stt/src/realtime/whisper.rs @@ -30,6 +30,7 @@ impl RealtimeSpeechToText for WhisperClient { start_ms: None, confidence: None, }], + ..Default::default() }) }); diff --git a/crates/whisper-local/src/model.rs b/crates/whisper-local/src/model.rs index ad90908b5..fffe18270 100644 --- a/crates/whisper-local/src/model.rs +++ b/crates/whisper-local/src/model.rs @@ -239,7 +239,7 @@ pub struct Segment { pub start: f32, pub end: f32, pub confidence: f32, - pub metadata: Option, + pub meta: Option, } impl Segment { @@ -263,8 +263,8 @@ impl Segment { self.confidence } - pub fn metadata(&self) -> Option<&serde_json::Value> { - self.metadata.as_ref() + pub fn meta(&self) -> Option { + self.meta.clone() } pub fn trim(&mut self) { diff --git a/crates/whisper-local/src/stream.rs b/crates/whisper-local/src/stream.rs index 49655fbad..ad6b407a0 100644 --- a/crates/whisper-local/src/stream.rs +++ b/crates/whisper-local/src/stream.rs @@ -19,13 +19,13 @@ pub struct TranscriptionTask { pub trait AudioChunk: Send + 'static { fn samples(&self) -> &[f32]; - fn metadata(&self) -> Option<&serde_json::Value>; + fn meta(&self) -> Option; } #[derive(Default)] pub struct SimpleAudioChunk { pub samples: Vec, - pub metadata: Option, + pub meta: Option, } impl AudioChunk for SimpleAudioChunk { @@ -33,8 +33,8 @@ impl AudioChunk for SimpleAudioChunk { &self.samples } - fn metadata(&self) -> Option<&serde_json::Value> { - self.metadata.as_ref() + fn meta(&self) -> Option { + self.meta.clone() } } @@ -153,13 +153,14 @@ where match this.stream.poll_next_unpin(cx) { Poll::Ready(Some(chunk)) => { + let meta = chunk.meta(); let samples = chunk.samples(); - let metadata = chunk.metadata(); + match process_transcription( &mut this.whisper, samples, &mut this.current_segment_task, - metadata, + meta, ) { Poll::Ready(result) => return Poll::Ready(result), Poll::Pending => continue, @@ -176,7 +177,7 @@ fn process_transcription<'a>( whisper: &'a mut Whisper, samples: &'a [f32], current_segment_task: &'a mut Option + Send>>>, - metadata: Option<&serde_json::Value>, + meta: Option, ) -> Poll> { if !samples.is_empty() { match whisper.transcribe(samples) { @@ -187,11 +188,10 @@ fn process_transcription<'a>( Poll::Ready(None) } Ok(mut segments) => { - if let Some(meta) = metadata { - for segment in &mut segments { - segment.metadata = Some(meta.clone()); - } + for segment in &mut segments { + segment.meta = meta.clone(); } + *current_segment_task = Some(Box::pin(futures_util::stream::iter(segments))); Poll::Pending } diff --git a/plugins/listener-interface/Cargo.toml b/plugins/listener-interface/Cargo.toml index faf05b5f5..3cbbec4b1 100644 --- a/plugins/listener-interface/Cargo.toml +++ b/plugins/listener-interface/Cargo.toml @@ -13,4 +13,4 @@ serde_json = { workspace = true } chrono = { workspace = true, features = ["serde"] } codes-iso-639 = { workspace = true } schemars = { workspace = true } -specta = { workspace = true, features = ["derive"] } +specta = { workspace = true, features = ["derive", "serde_json"] } diff --git a/plugins/listener-interface/src/lib.rs b/plugins/listener-interface/src/lib.rs index 1ca5ede85..ce20e516a 100644 --- a/plugins/listener-interface/src/lib.rs +++ b/plugins/listener-interface/src/lib.rs @@ -16,6 +16,7 @@ macro_rules! common_derives { } common_derives! { + #[derive(Default)] pub struct Word { pub text: String, pub speaker: Option, @@ -36,7 +37,9 @@ common_derives! { } common_derives! { + #[derive(Default)] pub struct ListenOutputChunk { + pub meta: Option, pub words: Vec, } } diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index d56758c8a..02e529a33 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -53,8 +53,10 @@ futures-util = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing = { workspace = true } -flume = { workspace = true } hound = { workspace = true } +voice_activity_detector = { workspace = true } + +flume = { workspace = true } statig = { workspace = true, features = ["async"] } [target."cfg(target_os = \"macos\")".dependencies] diff --git a/plugins/listener/src/fsm.rs b/plugins/listener/src/fsm.rs index e0e85b0ed..7959cf511 100644 --- a/plugins/listener/src/fsm.rs +++ b/plugins/listener/src/fsm.rs @@ -354,6 +354,8 @@ impl Session { futures_util::pin_mut!(listen_stream); while let Some(result) = listen_stream.next().await { + let _meta = result.meta.clone(); + // We don't have to do this, and inefficient. But this is what works at the moment. { let updated_words = update_session(&app, &session.id, result.words) diff --git a/plugins/local-stt/src/server.rs b/plugins/local-stt/src/server.rs index 4e5312208..307e5a237 100644 --- a/plugins/local-stt/src/server.rs +++ b/plugins/local-stt/src/server.rs @@ -152,7 +152,7 @@ async fn websocket(socket: WebSocket, model: hypr_whisper_local::Whisper, guard: let chunked = hypr_whisper_local::AudioChunkStream(chunked.map(|chunk| { hypr_whisper_local::SimpleAudioChunk { samples: chunk.convert_samples().collect(), - metadata: None, + ..Default::default() } })); hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(chunked, model) @@ -166,6 +166,8 @@ async fn websocket(socket: WebSocket, model: hypr_whisper_local::Whisper, guard: } chunk_opt = stream.next() => { let Some(chunk) = chunk_opt else { break }; + + let meta = chunk.meta(); let text = chunk.text().to_string(); let start = chunk.start() as u64; let duration = chunk.duration() as u64; @@ -177,6 +179,7 @@ async fn websocket(socket: WebSocket, model: hypr_whisper_local::Whisper, guard: } let data = ListenOutputChunk { + meta, words: text .split_whitespace() .filter(|w| !w.is_empty()) From afc1f2cbd518d43f3422859e44cdd049cdaa5693 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Fri, 27 Jun 2025 21:45:22 -0700 Subject: [PATCH 02/10] single audio works --- Cargo.lock | 2 + apps/app/server/src/native/listen/realtime.rs | 3 + crates/audio-utils/src/lib.rs | 9 ++ crates/whisper-cloud/src/client.rs | 3 +- crates/ws-utils/Cargo.toml | 1 + crates/ws-utils/src/lib.rs | 23 +++-- crates/ws/src/client.rs | 5 +- plugins/listener-interface/Cargo.toml | 1 + plugins/listener-interface/src/lib.rs | 26 +++++ plugins/listener/src/client.rs | 97 +++++++++++++++++-- plugins/listener/src/fsm.rs | 2 +- plugins/local-stt/src/server.rs | 32 ++++-- 12 files changed, 180 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc2042261..b50b24239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7879,6 +7879,7 @@ dependencies = [ "serde_bytes", "serde_json", "specta", + "strum 0.26.3", ] [[package]] @@ -17119,6 +17120,7 @@ dependencies = [ name = "ws-utils" version = "0.1.0" dependencies = [ + "audio-utils", "axum 0.8.4", "futures-util", "kalosm-sound", diff --git a/apps/app/server/src/native/listen/realtime.rs b/apps/app/server/src/native/listen/realtime.rs index 9f32183ce..86eeed588 100644 --- a/apps/app/server/src/native/listen/realtime.rs +++ b/apps/app/server/src/native/listen/realtime.rs @@ -42,6 +42,9 @@ async fn websocket(socket: WebSocket, state: STTState, params: ListenParams) { let audio = Bytes::from(data); Ok::, axum::Error>(Some((audio, ws_receiver))) } + ListenInputChunk::DualAudio { .. } => { + todo!() + } ListenInputChunk::End => Ok::, axum::Error>(None), } } diff --git a/crates/audio-utils/src/lib.rs b/crates/audio-utils/src/lib.rs index 749c2d5cc..0d80964b2 100644 --- a/crates/audio-utils/src/lib.rs +++ b/crates/audio-utils/src/lib.rs @@ -42,3 +42,12 @@ pub fn f32_to_i16_samples(samples: &[f32]) -> Vec { }) .collect() } + +pub fn bytes_to_f32_samples(data: &[u8]) -> Vec { + data.chunks_exact(2) + .map(|chunk| { + let sample = i16::from_le_bytes([chunk[0], chunk[1]]); + sample as f32 / 32767.0 + }) + .collect() +} diff --git a/crates/whisper-cloud/src/client.rs b/crates/whisper-cloud/src/client.rs index 7e4105edb..42dc4d5e0 100644 --- a/crates/whisper-cloud/src/client.rs +++ b/crates/whisper-cloud/src/client.rs @@ -87,10 +87,11 @@ impl WhisperClient { } impl WebSocketIO for WhisperClient { + type Data = bytes::Bytes; type Input = bytes::Bytes; type Output = WhisperOutput; - fn to_input(data: bytes::Bytes) -> Self::Input { + fn to_input(data: Self::Data) -> Self::Input { data } diff --git a/crates/ws-utils/Cargo.toml b/crates/ws-utils/Cargo.toml index 06075075e..4338b2afd 100644 --- a/crates/ws-utils/Cargo.toml +++ b/crates/ws-utils/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +hypr-audio-utils = { workspace = true } hypr-listener-interface = { workspace = true } axum = { workspace = true, features = ["ws"] } diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index dcba297b3..a07bb7121 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -1,6 +1,7 @@ use axum::extract::ws::{Message, WebSocket}; - use futures_util::{stream::SplitStream, Stream, StreamExt}; + +use hypr_audio_utils::bytes_to_f32_samples; use hypr_listener_interface::ListenInputChunk; pub struct WebSocketAudioSource { @@ -33,17 +34,23 @@ impl kalosm_sound::AsyncSource for WebSocketAudioSource { if data.is_empty() { None } else { - let samples: Vec = data - .chunks_exact(2) - .map(|chunk| { - let sample = i16::from_le_bytes([chunk[0], chunk[1]]); - sample as f32 / 32767.0 - }) - .collect(); + let samples: Vec = bytes_to_f32_samples(&data); Some((samples, receiver)) } } + ListenInputChunk::DualAudio { mic, speaker } => { + let mic_samples: Vec = bytes_to_f32_samples(&mic); + let speaker_samples: Vec = bytes_to_f32_samples(&speaker); + + let mixed_samples: Vec = mic_samples + .into_iter() + .zip(speaker_samples.into_iter()) + .map(|(mic, speaker)| mic + speaker) + .collect(); + + Some((mixed_samples, receiver)) + } ListenInputChunk::End => None, } } diff --git a/crates/ws/src/client.rs b/crates/ws/src/client.rs index 1ca1c1f60..adffc85fa 100644 --- a/crates/ws/src/client.rs +++ b/crates/ws/src/client.rs @@ -7,10 +7,11 @@ use tokio_tungstenite::{connect_async, tungstenite::client::IntoClientRequest}; pub use tokio_tungstenite::tungstenite::{protocol::Message, ClientRequestBuilder}; pub trait WebSocketIO: Send + 'static { + type Data: Send; type Input: Send + Default; type Output: DeserializeOwned; - fn to_input(data: bytes::Bytes) -> Self::Input; + fn to_input(data: Self::Data) -> Self::Input; fn to_message(input: Self::Input) -> Message; fn from_message(msg: Message) -> Option; } @@ -26,7 +27,7 @@ impl WebSocketClient { pub async fn from_audio( &self, - mut audio_stream: impl Stream + Send + Unpin + 'static, + mut audio_stream: impl Stream + Send + Unpin + 'static, ) -> Result, crate::Error> { let ws_stream = (|| self.try_connect(self.request.clone())) .retry( diff --git a/plugins/listener-interface/Cargo.toml b/plugins/listener-interface/Cargo.toml index 3cbbec4b1..c4668a0e8 100644 --- a/plugins/listener-interface/Cargo.toml +++ b/plugins/listener-interface/Cargo.toml @@ -9,6 +9,7 @@ hypr-language = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_bytes = { workspace = true } serde_json = { workspace = true } +strum = { workspace = true, features = ["derive"] } chrono = { workspace = true, features = ["serde"] } codes-iso-639 = { workspace = true } diff --git a/plugins/listener-interface/src/lib.rs b/plugins/listener-interface/src/lib.rs index ce20e516a..1dbbbbff1 100644 --- a/plugins/listener-interface/src/lib.rs +++ b/plugins/listener-interface/src/lib.rs @@ -52,6 +52,13 @@ common_derives! { #[serde(serialize_with = "serde_bytes::serialize")] data: Vec, }, + #[serde(rename = "dual_audio")] + DualAudio { + #[serde(serialize_with = "serde_bytes::serialize")] + mic: Vec, + #[serde(serialize_with = "serde_bytes::serialize")] + speaker: Vec, + }, #[serde(rename = "end")] End, } @@ -63,9 +70,28 @@ impl Default for ListenInputChunk { } } +common_derives! { + #[derive(strum::AsRefStr)] + pub enum AudioMode { + #[serde(rename = "single")] + #[strum(serialize = "single")] + Single, + #[serde(rename = "dual")] + #[strum(serialize = "dual")] + Dual, + } +} + +impl Default for AudioMode { + fn default() -> Self { + AudioMode::Single + } +} + common_derives! { #[derive(Default)] pub struct ListenParams { + pub audio_mode: AudioMode, #[specta(type = String)] #[schemars(with = "String")] #[serde(serialize_with = "serialize_language", deserialize_with = "deserialize_language")] diff --git a/plugins/listener/src/client.rs b/plugins/listener/src/client.rs index 311fd39a2..b6c73d951 100644 --- a/plugins/listener/src/client.rs +++ b/plugins/listener/src/client.rs @@ -1,4 +1,4 @@ -use futures_util::Stream; +use futures_util::{Stream, StreamExt}; use hypr_audio::AsyncSource; use hypr_audio_utils::AudioFormatExt; @@ -29,18 +29,23 @@ impl ListenClientBuilder { self } - pub fn build(self) -> ListenClient { + pub fn build_single(self) -> ListenClient { let uri = { let mut url: url::Url = self.api_base.unwrap().parse().unwrap(); - let params = self.params.unwrap_or_default(); + let params = hypr_listener_interface::ListenParams { + audio_mode: hypr_listener_interface::AudioMode::Single, + ..self.params.unwrap_or_default() + }; + let language = params.language.code(); url.set_path("/api/desktop/listen/realtime"); url.query_pairs_mut() .append_pair("language", language) .append_pair("static_prompt", ¶ms.static_prompt) - .append_pair("dynamic_prompt", ¶ms.dynamic_prompt); + .append_pair("dynamic_prompt", ¶ms.dynamic_prompt) + .append_pair("audio_mode", params.audio_mode.as_ref()); let host = url.host_str().unwrap(); @@ -61,6 +66,44 @@ impl ListenClientBuilder { ListenClient { request } } + + pub fn build_dual_channel(self) -> ListenClientDual { + let uri = { + let mut url: url::Url = self.api_base.unwrap().parse().unwrap(); + + let params = hypr_listener_interface::ListenParams { + audio_mode: hypr_listener_interface::AudioMode::Single, + ..self.params.unwrap_or_default() + }; + + let language = params.language.code(); + + url.set_path("/api/desktop/listen/realtime"); + url.query_pairs_mut() + .append_pair("language", language) + .append_pair("static_prompt", ¶ms.static_prompt) + .append_pair("dynamic_prompt", ¶ms.dynamic_prompt) + .append_pair("audio_mode", params.audio_mode.as_ref()); + + let host = url.host_str().unwrap(); + + if host.contains("127.0.0.1") || host.contains("localhost") { + url.set_scheme("ws").unwrap(); + } else { + url.set_scheme("wss").unwrap(); + } + + url.to_string().parse().unwrap() + }; + + let request = match self.api_key { + Some(key) => ClientRequestBuilder::new(uri) + .with_header("Authorization", format!("Bearer {}", key)), + None => ClientRequestBuilder::new(uri), + }; + + ListenClientDual { request } + } } #[derive(Clone)] @@ -69,10 +112,11 @@ pub struct ListenClient { } impl WebSocketIO for ListenClient { + type Data = bytes::Bytes; type Input = ListenInputChunk; type Output = ListenOutputChunk; - fn to_input(data: bytes::Bytes) -> Self::Input { + fn to_input(data: Self::Data) -> Self::Input { ListenInputChunk::Audio { data: data.to_vec(), } @@ -90,6 +134,35 @@ impl WebSocketIO for ListenClient { } } +#[derive(Clone)] +pub struct ListenClientDual { + request: ClientRequestBuilder, +} + +impl WebSocketIO for ListenClientDual { + type Data = (bytes::Bytes, bytes::Bytes); + type Input = ListenInputChunk; + type Output = ListenOutputChunk; + + fn to_input(data: Self::Data) -> Self::Input { + ListenInputChunk::DualAudio { + mic: data.0.to_vec(), + speaker: data.1.to_vec(), + } + } + + fn to_message(input: Self::Input) -> Message { + Message::Text(serde_json::to_string(&input).unwrap().into()) + } + + fn from_message(msg: Message) -> Option { + match msg { + Message::Text(text) => serde_json::from_str::(&text).ok(), + _ => None, + } + } +} + impl ListenClient { pub fn builder() -> ListenClientBuilder { ListenClientBuilder::default() @@ -105,6 +178,18 @@ impl ListenClient { } } +impl ListenClientDual { + pub async fn from_audio( + &self, + mic_stream: impl Stream + Send + Unpin + 'static, + speaker_stream: impl Stream + Send + Unpin + 'static, + ) -> Result, hypr_ws::Error> { + let dual_stream = mic_stream.zip(speaker_stream); + let ws = WebSocketClient::new(self.request.clone()); + ws.from_audio::(dual_stream).await + } +} + #[cfg(test)] mod tests { use super::*; @@ -125,7 +210,7 @@ mod tests { language: hypr_language::ISO639::En.into(), ..Default::default() }) - .build(); + .build_single(); let stream = client.from_audio(audio).await.unwrap(); futures_util::pin_mut!(stream); diff --git a/plugins/listener/src/fsm.rs b/plugins/listener/src/fsm.rs index 7959cf511..9b55aaba7 100644 --- a/plugins/listener/src/fsm.rs +++ b/plugins/listener/src/fsm.rs @@ -460,7 +460,7 @@ async fn setup_listen_client( static_prompt, ..Default::default() }) - .build()) + .build_single()) } async fn update_session( diff --git a/plugins/local-stt/src/server.rs b/plugins/local-stt/src/server.rs index 307e5a237..e679a2174 100644 --- a/plugins/local-stt/src/server.rs +++ b/plugins/local-stt/src/server.rs @@ -20,7 +20,6 @@ use tower_http::cors::{self, CorsLayer}; use hypr_chunker::ChunkerExt; use hypr_listener_interface::{ListenOutputChunk, ListenParams, Word}; -use hypr_ws_utils::WebSocketAudioSource; use crate::manager::{ConnectionGuard, ConnectionManager}; @@ -138,14 +137,26 @@ async fn websocket_with_model( .dynamic_prompt(¶ms.dynamic_prompt) .build(); - websocket(socket, model, guard).await; + let (ws_sender, ws_receiver) = socket.split(); + + match params.audio_mode { + hypr_listener_interface::AudioMode::Single => { + websocket_single_channel(ws_sender, ws_receiver, model, guard).await; + } + hypr_listener_interface::AudioMode::Dual => { + websocket_dual_channel(ws_sender, ws_receiver, model, guard).await; + } + } } -#[tracing::instrument(skip_all)] -async fn websocket(socket: WebSocket, model: hypr_whisper_local::Whisper, guard: ConnectionGuard) { - let (mut ws_sender, ws_receiver) = socket.split(); +async fn websocket_single_channel( + mut ws_sender: futures_util::stream::SplitSink, + ws_receiver: futures_util::stream::SplitStream, + model: hypr_whisper_local::Whisper, + guard: ConnectionGuard, +) { let mut stream = { - let audio_source = WebSocketAudioSource::new(ws_receiver, 16 * 1000); + let audio_source = hypr_ws_utils::WebSocketAudioSource::new(ws_receiver, 16 * 1000); let chunked = audio_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); @@ -204,3 +215,12 @@ async fn websocket(socket: WebSocket, model: hypr_whisper_local::Whisper, guard: let _ = ws_sender.close().await; } + +async fn websocket_dual_channel( + mut ws_sender: futures_util::stream::SplitSink, + _ws_receiver: futures_util::stream::SplitStream, + _model: hypr_whisper_local::Whisper, + _guard: ConnectionGuard, +) { + let _ = ws_sender.close().await; +} From e9afcb413a6ad78ced7bfd018d7e302f66aef880 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Fri, 27 Jun 2025 23:11:08 -0700 Subject: [PATCH 03/10] rename --- plugins/listener/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/listener/src/client.rs b/plugins/listener/src/client.rs index b6c73d951..d90626677 100644 --- a/plugins/listener/src/client.rs +++ b/plugins/listener/src/client.rs @@ -67,7 +67,7 @@ impl ListenClientBuilder { ListenClient { request } } - pub fn build_dual_channel(self) -> ListenClientDual { + pub fn build_dual(self) -> ListenClientDual { let uri = { let mut url: url::Url = self.api_base.unwrap().parse().unwrap(); From 70a0e70bbddb5a2ce8885790be89420075f54cb8 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 10:05:22 -0700 Subject: [PATCH 04/10] various fixes and refactors --- crates/audio-utils/src/lib.rs | 10 +++-- crates/ws-utils/src/lib.rs | 11 +++-- plugins/listener/src/client.rs | 80 +++++++++++++--------------------- 3 files changed, 44 insertions(+), 57 deletions(-) diff --git a/crates/audio-utils/src/lib.rs b/crates/audio-utils/src/lib.rs index 0d80964b2..b104bfbff 100644 --- a/crates/audio-utils/src/lib.rs +++ b/crates/audio-utils/src/lib.rs @@ -2,6 +2,8 @@ use bytes::{BufMut, Bytes, BytesMut}; use futures_util::{Stream, StreamExt}; use kalosm_sound::AsyncSource; +const I16_SCALE: f32 = 32768.0; + impl AudioFormatExt for T {} pub trait AudioFormatExt: AsyncSource { @@ -18,7 +20,7 @@ pub trait AudioFormatExt: AsyncSource { let mut buf = BytesMut::with_capacity(n); for sample in chunk { - let scaled = (sample * 32768.0).clamp(-32768.0, 32768.0); + let scaled = (sample * I16_SCALE).clamp(-I16_SCALE, I16_SCALE); buf.put_i16_le(scaled as i16); } buf.freeze() @@ -29,7 +31,7 @@ pub trait AudioFormatExt: AsyncSource { pub fn i16_to_f32_samples(samples: &[i16]) -> Vec { samples .iter() - .map(|&sample| sample as f32 / 32768.0) + .map(|&sample| sample as f32 / I16_SCALE) .collect() } @@ -37,7 +39,7 @@ pub fn f32_to_i16_samples(samples: &[f32]) -> Vec { samples .iter() .map(|&sample| { - let scaled = (sample * 32768.0).clamp(-32768.0, 32768.0); + let scaled = (sample * I16_SCALE).clamp(-I16_SCALE, I16_SCALE); scaled as i16 }) .collect() @@ -47,7 +49,7 @@ pub fn bytes_to_f32_samples(data: &[u8]) -> Vec { data.chunks_exact(2) .map(|chunk| { let sample = i16::from_le_bytes([chunk[0], chunk[1]]); - sample as f32 / 32767.0 + sample as f32 / I16_SCALE }) .collect() } diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index a07bb7121..92601b4f5 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -43,10 +43,13 @@ impl kalosm_sound::AsyncSource for WebSocketAudioSource { let mic_samples: Vec = bytes_to_f32_samples(&mic); let speaker_samples: Vec = bytes_to_f32_samples(&speaker); - let mixed_samples: Vec = mic_samples - .into_iter() - .zip(speaker_samples.into_iter()) - .map(|(mic, speaker)| mic + speaker) + let max_len = mic_samples.len().max(speaker_samples.len()); + let mixed_samples: Vec = (0..max_len) + .map(|i| { + let mic = mic_samples.get(i).copied().unwrap_or(0.0); + let speaker = speaker_samples.get(i).copied().unwrap_or(0.0); + (mic + speaker) * 0.9 + }) .collect(); Some((mixed_samples, receiver)) diff --git a/plugins/listener/src/client.rs b/plugins/listener/src/client.rs index d90626677..a36934fa5 100644 --- a/plugins/listener/src/client.rs +++ b/plugins/listener/src/client.rs @@ -29,34 +29,39 @@ impl ListenClientBuilder { self } - pub fn build_single(self) -> ListenClient { - let uri = { - let mut url: url::Url = self.api_base.unwrap().parse().unwrap(); + fn build_uri(&self, audio_mode: hypr_listener_interface::AudioMode) -> String { + let mut url: url::Url = self.api_base.as_ref().unwrap().parse().unwrap(); - let params = hypr_listener_interface::ListenParams { - audio_mode: hypr_listener_interface::AudioMode::Single, - ..self.params.unwrap_or_default() - }; + let params = hypr_listener_interface::ListenParams { + audio_mode, + ..self.params.clone().unwrap_or_default() + }; - let language = params.language.code(); + let language = params.language.code(); - url.set_path("/api/desktop/listen/realtime"); - url.query_pairs_mut() - .append_pair("language", language) - .append_pair("static_prompt", ¶ms.static_prompt) - .append_pair("dynamic_prompt", ¶ms.dynamic_prompt) - .append_pair("audio_mode", params.audio_mode.as_ref()); + url.set_path("/api/desktop/listen/realtime"); + url.query_pairs_mut() + .append_pair("language", language) + .append_pair("static_prompt", ¶ms.static_prompt) + .append_pair("dynamic_prompt", ¶ms.dynamic_prompt) + .append_pair("audio_mode", params.audio_mode.as_ref()); - let host = url.host_str().unwrap(); + let host = url.host_str().unwrap(); - if host.contains("127.0.0.1") || host.contains("localhost") { - url.set_scheme("ws").unwrap(); - } else { - url.set_scheme("wss").unwrap(); - } + if host.contains("127.0.0.1") || host.contains("localhost") { + url.set_scheme("ws").unwrap(); + } else { + url.set_scheme("wss").unwrap(); + } - url.to_string().parse().unwrap() - }; + url.to_string() + } + + pub fn build_single(self) -> ListenClient { + let uri = self + .build_uri(hypr_listener_interface::AudioMode::Single) + .parse() + .unwrap(); let request = match self.api_key { Some(key) => ClientRequestBuilder::new(uri) @@ -68,33 +73,10 @@ impl ListenClientBuilder { } pub fn build_dual(self) -> ListenClientDual { - let uri = { - let mut url: url::Url = self.api_base.unwrap().parse().unwrap(); - - let params = hypr_listener_interface::ListenParams { - audio_mode: hypr_listener_interface::AudioMode::Single, - ..self.params.unwrap_or_default() - }; - - let language = params.language.code(); - - url.set_path("/api/desktop/listen/realtime"); - url.query_pairs_mut() - .append_pair("language", language) - .append_pair("static_prompt", ¶ms.static_prompt) - .append_pair("dynamic_prompt", ¶ms.dynamic_prompt) - .append_pair("audio_mode", params.audio_mode.as_ref()); - - let host = url.host_str().unwrap(); - - if host.contains("127.0.0.1") || host.contains("localhost") { - url.set_scheme("ws").unwrap(); - } else { - url.set_scheme("wss").unwrap(); - } - - url.to_string().parse().unwrap() - }; + let uri = self + .build_uri(hypr_listener_interface::AudioMode::Dual) + .parse() + .unwrap(); let request = match self.api_key { Some(key) => ClientRequestBuilder::new(uri) From e0e19cbcc13ac6398f552e009c14ab310633570b Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 10:26:45 -0700 Subject: [PATCH 05/10] deps --- Cargo.lock | 35 ----------------------------------- Cargo.toml | 1 - plugins/listener/Cargo.toml | 1 - 3 files changed, 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e82a0ae4..0b8473137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13992,7 +13992,6 @@ dependencies = [ "tracing", "url", "uuid", - "voice_activity_detector", "ws", ] @@ -15506,26 +15505,6 @@ dependencies = [ "url", ] -[[package]] -name = "typed-builder" -version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7" -dependencies = [ - "typed-builder-macro", -] - -[[package]] -name = "typed-builder-macro" -version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "typeid" version = "1.0.3" @@ -15995,20 +15974,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "voice_activity_detector" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a78926100321e74c3d74d389e875f26f395b71528f8ea8b7b505c7f31063dcf3" -dependencies = [ - "futures", - "ndarray", - "ort", - "pin-project", - "thiserror 2.0.12", - "typed-builder", -] - [[package]] name = "vsimd" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 78cc3c655..b29b24225 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -189,7 +189,6 @@ hound = "3.5.1" realfft = "3.5.0" ringbuf = "0.4.8" rodio = { version = "0.20.1", features = ["symphonia"] } -voice_activity_detector = "0.2.0" kalosm-common = { git = "https://github.com/floneum/floneum", rev = "52967ae" } kalosm-llama = { git = "https://github.com/floneum/floneum", rev = "52967ae" } diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index 222a22c47..732dc4af9 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -54,7 +54,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing = { workspace = true } hound = { workspace = true } -voice_activity_detector = { workspace = true } flume = { workspace = true } statig = { workspace = true, features = ["async"] } From 9ef1a0e3887099bd1dbe486443994f0b1659b25e Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 11:16:07 -0700 Subject: [PATCH 06/10] dual audio works --- crates/ws-utils/src/lib.rs | 2 +- plugins/listener/src/client.rs | 2 +- plugins/listener/src/fsm.rs | 73 ++++++++++++++++++++++++--------- plugins/local-stt/src/server.rs | 64 +++++++++++++++++++++++++++-- 4 files changed, 117 insertions(+), 24 deletions(-) diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index 92601b4f5..06cc1feaf 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -48,7 +48,7 @@ impl kalosm_sound::AsyncSource for WebSocketAudioSource { .map(|i| { let mic = mic_samples.get(i).copied().unwrap_or(0.0); let speaker = speaker_samples.get(i).copied().unwrap_or(0.0); - (mic + speaker) * 0.9 + (mic + speaker).clamp(-1.0, 1.0) }) .collect(); diff --git a/plugins/listener/src/client.rs b/plugins/listener/src/client.rs index 8bacd0b9c..7909bae45 100644 --- a/plugins/listener/src/client.rs +++ b/plugins/listener/src/client.rs @@ -161,7 +161,7 @@ impl ListenClient { } impl ListenClientDual { - pub async fn from_audio( + pub async fn from_realtime_audio( &self, mic_stream: impl Stream + Send + Unpin + 'static, speaker_stream: impl Stream + Send + Unpin + 'static, diff --git a/plugins/listener/src/fsm.rs b/plugins/listener/src/fsm.rs index 5b79511eb..c5c7f9e14 100644 --- a/plugins/listener/src/fsm.rs +++ b/plugins/listener/src/fsm.rs @@ -130,7 +130,9 @@ impl Session { (None, None) }; - let (process_tx, process_rx) = flume::bounded::>(chunk_buffer_size); + let (process_mic_tx, process_mic_rx) = flume::bounded::>(chunk_buffer_size); + let (process_speaker_tx, process_speaker_rx) = + flume::bounded::>(chunk_buffer_size); { let silence_stream_tx = hypr_audio::AudioOutput::silence(); @@ -194,6 +196,8 @@ impl Session { tasks.spawn({ let app = self.app.clone(); let save_mixed_tx = save_mixed_tx.clone(); + let process_mic_tx = process_mic_tx.clone(); + let process_speaker_tx = process_speaker_tx.clone(); async move { let mut aec = hypr_aec::AEC::new().unwrap(); @@ -225,13 +229,14 @@ impl Session { continue; } - let mixed: Vec = mic_chunk + let processed_mic = mic_chunk .iter() - .zip(speaker_chunk.iter()) - .map(|(mic, speaker)| { - (mic * POST_MIC_GAIN + speaker * POST_SPEAKER_GAIN).clamp(-1.0, 1.0) - }) - .collect(); + .map(|x| x * POST_MIC_GAIN) + .collect::>(); + let processed_speaker = speaker_chunk + .iter() + .map(|x| x * POST_SPEAKER_GAIN) + .collect::>(); let now = Instant::now(); if now.duration_since(last_broadcast) >= AUDIO_AMPLITUDE_THROTTLE { @@ -249,12 +254,30 @@ impl Session { let _ = tx.send_async(speaker_chunk.clone()).await; } - if process_tx.send_async(mixed.clone()).await.is_err() { - tracing::error!("process_tx_send_error"); + // Send separate streams to listen client + if process_mic_tx + .send_async(processed_mic.clone()) + .await + .is_err() + { + tracing::error!("process_mic_tx_send_error"); + return; + } + if process_speaker_tx + .send_async(processed_speaker.clone()) + .await + .is_err() + { + tracing::error!("process_speaker_tx_send_error"); return; } if record { + let mixed: Vec = processed_mic + .iter() + .zip(processed_speaker.iter()) + .map(|(mic, speaker)| (mic + speaker).clamp(-1.0, 1.0)) + .collect(); if save_mixed_tx.send_async(mixed).await.is_err() { tracing::error!("save_mixed_tx_send_error"); } @@ -336,15 +359,27 @@ impl Session { }); } - let audio_stream = hypr_audio::StreamSource::new( - process_rx - .into_stream() - .map(|chunk| futures_util::stream::iter(chunk)) - .flatten(), - SAMPLE_RATE, - ); + let mic_audio_stream = process_mic_rx.into_stream().map(|chunk| { + let bytes: Vec = chunk + .into_iter() + .map(|sample| (sample * i16::MAX as f32) as i16) + .flat_map(|sample| sample.to_le_bytes()) + .collect(); + bytes::Bytes::from(bytes) + }); + + let speaker_audio_stream = process_speaker_rx.into_stream().map(|chunk| { + let bytes: Vec = chunk + .into_iter() + .map(|sample| (sample * i16::MAX as f32) as i16) + .flat_map(|sample| sample.to_le_bytes()) + .collect(); + bytes::Bytes::from(bytes) + }); - let listen_stream = listen_client.from_realtime_audio(audio_stream).await?; + let listen_stream = listen_client + .from_realtime_audio(mic_audio_stream, speaker_audio_stream) + .await?; tasks.spawn({ let app = self.app.clone(); @@ -427,7 +462,7 @@ async fn setup_listen_client( app: &tauri::AppHandle, language: hypr_language::Language, _jargons: Vec, -) -> Result { +) -> Result { let api_base = { use tauri_plugin_connector::{Connection, ConnectorPluginExt}; let conn: Connection = app.get_stt_connection().await?.into(); @@ -460,7 +495,7 @@ async fn setup_listen_client( static_prompt, ..Default::default() }) - .build_single()) + .build_dual()) } async fn update_session( diff --git a/plugins/local-stt/src/server.rs b/plugins/local-stt/src/server.rs index 3b842d4ec..2d70028c0 100644 --- a/plugins/local-stt/src/server.rs +++ b/plugins/local-stt/src/server.rs @@ -212,9 +212,67 @@ async fn websocket_single_channel( async fn websocket_dual_channel( mut ws_sender: futures_util::stream::SplitSink, - _ws_receiver: futures_util::stream::SplitStream, - _model: hypr_whisper_local::Whisper, - _guard: ConnectionGuard, + ws_receiver: futures_util::stream::SplitStream, + model: hypr_whisper_local::Whisper, + guard: ConnectionGuard, ) { + let mut stream = { + let audio_source = hypr_ws_utils::WebSocketAudioSource::new(ws_receiver, 16 * 1000); + let chunked = + audio_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); + + let chunked = hypr_whisper_local::AudioChunkStream(chunked.map(|chunk| { + hypr_whisper_local::SimpleAudioChunk { + samples: chunk.convert_samples().collect(), + ..Default::default() + } + })); + hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(chunked, model) + }; + + loop { + tokio::select! { + _ = guard.cancelled() => { + tracing::info!("websocket_cancelled_by_new_connection"); + break; + } + chunk_opt = stream.next() => { + let Some(chunk) = chunk_opt else { break }; + + let meta = chunk.meta(); + let text = chunk.text().to_string(); + let start = chunk.start() as u64; + let duration = chunk.duration() as u64; + let confidence = chunk.confidence(); + + if confidence < 0.2 { + tracing::warn!(confidence, "skipping_transcript: {}", text); + continue; + } + + let data = ListenOutputChunk { + meta, + words: text + .split_whitespace() + .filter(|w| !w.is_empty()) + .map(|w| Word { + text: w.trim().to_string(), + speaker: None, + start_ms: Some(start), + end_ms: Some(start + duration), + confidence: Some(confidence), + }) + .collect(), + }; + + let msg = Message::Text(serde_json::to_string(&data).unwrap().into()); + if let Err(e) = ws_sender.send(msg).await { + tracing::warn!("websocket_send_error: {}", e); + break; + } + } + } + } + let _ = ws_sender.close().await; } From 35215ad9f18f562f49844de00b60a3b2de93276e Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 11:48:25 -0700 Subject: [PATCH 07/10] did some refactor on fsm --- crates/audio-utils/src/lib.rs | 9 + plugins/listener/src/fsm.rs | 370 ++++++++++++++++++++-------------- 2 files changed, 225 insertions(+), 154 deletions(-) diff --git a/crates/audio-utils/src/lib.rs b/crates/audio-utils/src/lib.rs index 4f7febc51..0f02f21ff 100644 --- a/crates/audio-utils/src/lib.rs +++ b/crates/audio-utils/src/lib.rs @@ -48,6 +48,15 @@ pub fn f32_to_i16_samples(samples: &[f32]) -> Vec { .collect() } +pub fn f32_to_i16_bytes(chunk: Vec) -> bytes::Bytes { + let mut bytes = Vec::with_capacity(chunk.len() * 2); + for sample in chunk { + let i16_sample = (sample * I16_SCALE) as i16; + bytes.extend_from_slice(&i16_sample.to_le_bytes()); + } + bytes::Bytes::from(bytes) +} + pub fn bytes_to_f32_samples(data: &[u8]) -> Vec { data.chunks_exact(2) .map(|chunk| { diff --git a/plugins/listener/src/fsm.rs b/plugins/listener/src/fsm.rs index c5c7f9e14..7c031a7d1 100644 --- a/plugins/listener/src/fsm.rs +++ b/plugins/listener/src/fsm.rs @@ -22,6 +22,150 @@ const WAV_SPEC: hound::WavSpec = hound::WavSpec { sample_format: hound::SampleFormat::Float, }; +struct AudioSaver; + +impl AudioSaver { + async fn save_to_wav( + rx: flume::Receiver>, + session_id: &str, + app_dir: &std::path::Path, + filename: &str, + append: bool, + ) -> Result<(), Box> { + let dir = app_dir.join(session_id); + std::fs::create_dir_all(&dir)?; + let path = dir.join(filename); + + let mut wav = if append && path.exists() { + hound::WavWriter::append(path)? + } else { + hound::WavWriter::create(path, WAV_SPEC)? + }; + + while let Ok(chunk) = rx.recv_async().await { + for sample in chunk { + wav.write_sample(sample)?; + } + } + + wav.finalize()?; + Ok(()) + } +} + +struct AudioChannels { + mic_tx: flume::Sender>, + mic_rx: flume::Receiver>, + speaker_tx: flume::Sender>, + speaker_rx: flume::Receiver>, + save_mixed_tx: flume::Sender>, + save_mixed_rx: flume::Receiver>, + save_mic_raw_tx: Option>>, + save_mic_raw_rx: Option>>, + save_speaker_raw_tx: Option>>, + save_speaker_raw_rx: Option>>, + process_mic_tx: flume::Sender>, + process_mic_rx: flume::Receiver>, + process_speaker_tx: flume::Sender>, + process_speaker_rx: flume::Receiver>, +} + +impl AudioChannels { + fn new() -> Self { + const CHUNK_BUFFER_SIZE: usize = 64; + + let (mic_tx, mic_rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + let (speaker_tx, speaker_rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + let (save_mixed_tx, save_mixed_rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + let (process_mic_tx, process_mic_rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + let (process_speaker_tx, process_speaker_rx) = + flume::bounded::>(CHUNK_BUFFER_SIZE); + + let (save_mic_raw_tx, save_mic_raw_rx) = if cfg!(debug_assertions) { + let (tx, rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + let (save_speaker_raw_tx, save_speaker_raw_rx) = if cfg!(debug_assertions) { + let (tx, rx) = flume::bounded::>(CHUNK_BUFFER_SIZE); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + Self { + mic_tx, + mic_rx, + speaker_tx, + speaker_rx, + save_mixed_tx, + save_mixed_rx, + save_mic_raw_tx, + save_mic_raw_rx, + save_speaker_raw_tx, + save_speaker_raw_rx, + process_mic_tx, + process_mic_rx, + process_speaker_tx, + process_speaker_rx, + } + } + + async fn process_mic_stream( + mut mic_stream: impl futures_util::Stream> + Unpin, + mic_muted_rx: tokio::sync::watch::Receiver, + mic_tx: flume::Sender>, + ) { + let mut is_muted = *mic_muted_rx.borrow(); + let watch_rx = mic_muted_rx.clone(); + + while let Some(actual) = mic_stream.next().await { + if watch_rx.has_changed().unwrap_or(false) { + is_muted = *watch_rx.borrow(); + } + + let maybe_muted = if is_muted { + vec![0.0; actual.len()] + } else { + actual + }; + + if let Err(e) = mic_tx.send_async(maybe_muted).await { + tracing::error!("mic_tx_send_error: {:?}", e); + break; + } + } + } + + async fn process_speaker_stream( + mut speaker_stream: impl futures_util::Stream> + Unpin, + speaker_muted_rx: tokio::sync::watch::Receiver, + speaker_tx: flume::Sender>, + ) { + let mut is_muted = *speaker_muted_rx.borrow(); + let watch_rx = speaker_muted_rx.clone(); + + while let Some(actual) = speaker_stream.next().await { + if watch_rx.has_changed().unwrap_or(false) { + is_muted = *watch_rx.borrow(); + } + + let maybe_muted = if is_muted { + vec![0.0; actual.len()] + } else { + actual + }; + + if let Err(e) = speaker_tx.send_async(maybe_muted).await { + tracing::error!("speaker_tx_send_error: {:?}", e); + break; + } + } + } +} + pub struct Session { app: tauri::AppHandle, session_id: Option, @@ -99,7 +243,7 @@ impl Session { let mut input = hypr_audio::AudioInput::from_mic(); input.stream() }; - let mut mic_stream = mic_sample_stream + let mic_stream = mic_sample_stream .resample(SAMPLE_RATE) .chunks(hypr_aec::BLOCK_SIZE); @@ -107,32 +251,11 @@ impl Session { tokio::time::sleep(Duration::from_millis(200)).await; let speaker_sample_stream = hypr_audio::AudioInput::from_speaker(None).stream(); - let mut speaker_stream = speaker_sample_stream + let speaker_stream = speaker_sample_stream .resample(SAMPLE_RATE) .chunks(hypr_aec::BLOCK_SIZE); - let chunk_buffer_size: usize = 256; - - let (mic_tx, mic_rx) = flume::bounded::>(chunk_buffer_size); - let (speaker_tx, speaker_rx) = flume::bounded::>(chunk_buffer_size); - - let (save_mixed_tx, save_mixed_rx) = flume::bounded::>(chunk_buffer_size); - let (save_mic_raw_tx, save_mic_raw_rx) = if cfg!(debug_assertions) && record { - let (tx, rx) = flume::bounded::>(chunk_buffer_size); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - let (save_speaker_raw_tx, save_speaker_raw_rx) = if cfg!(debug_assertions) && record { - let (tx, rx) = flume::bounded::>(chunk_buffer_size); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - - let (process_mic_tx, process_mic_rx) = flume::bounded::>(chunk_buffer_size); - let (process_speaker_tx, process_speaker_rx) = - flume::bounded::>(chunk_buffer_size); + let channels = AudioChannels::new(); { let silence_stream_tx = hypr_audio::AudioOutput::silence(); @@ -141,63 +264,29 @@ impl Session { let mut tasks = JoinSet::new(); - tasks.spawn({ - let mic_muted_rx = mic_muted_rx_main.clone(); - async move { - let mut is_muted = *mic_muted_rx.borrow(); - let watch_rx = mic_muted_rx.clone(); - - while let Some(actual) = mic_stream.next().await { - if watch_rx.has_changed().unwrap_or(false) { - is_muted = *watch_rx.borrow(); - } - - let maybe_muted = if is_muted { - vec![0.0; actual.len()] - } else { - actual - }; - - if let Err(e) = mic_tx.send_async(maybe_muted).await { - tracing::error!("mic_tx_send_error: {:?}", e); - break; - } - } - } - }); - - tasks.spawn({ - let speaker_muted_rx = speaker_muted_rx_main.clone(); - async move { - let mut is_muted = *speaker_muted_rx.borrow(); - let watch_rx = speaker_muted_rx.clone(); - - while let Some(actual) = speaker_stream.next().await { - if watch_rx.has_changed().unwrap_or(false) { - is_muted = *watch_rx.borrow(); - } + tasks.spawn(AudioChannels::process_mic_stream( + mic_stream, + mic_muted_rx_main.clone(), + channels.mic_tx.clone(), + )); - let maybe_muted = if is_muted { - vec![0.0; actual.len()] - } else { - actual - }; - - if let Err(e) = speaker_tx.send_async(maybe_muted).await { - tracing::error!("speaker_tx_send_error: {:?}", e); - break; - } - } - } - }); + tasks.spawn(AudioChannels::process_speaker_stream( + speaker_stream, + speaker_muted_rx_main.clone(), + channels.speaker_tx.clone(), + )); let app_dir = self.app.path().app_data_dir().unwrap(); tasks.spawn({ let app = self.app.clone(); - let save_mixed_tx = save_mixed_tx.clone(); - let process_mic_tx = process_mic_tx.clone(); - let process_speaker_tx = process_speaker_tx.clone(); + let mic_rx = channels.mic_rx.clone(); + let speaker_rx = channels.speaker_rx.clone(); + let save_mixed_tx = channels.save_mixed_tx.clone(); + let save_mic_raw_tx = channels.save_mic_raw_tx.clone(); + let save_speaker_raw_tx = channels.save_speaker_raw_tx.clone(); + let process_mic_tx = channels.process_mic_tx.clone(); + let process_speaker_tx = channels.process_speaker_tx.clone(); async move { let mut aec = hypr_aec::AEC::new().unwrap(); @@ -229,14 +318,12 @@ impl Session { continue; } - let processed_mic = mic_chunk - .iter() - .map(|x| x * POST_MIC_GAIN) - .collect::>(); - let processed_speaker = speaker_chunk + let processed_mic: Vec = + mic_chunk.iter().map(|x| x * POST_MIC_GAIN).collect(); + let processed_speaker: Vec = speaker_chunk .iter() .map(|x| x * POST_SPEAKER_GAIN) - .collect::>(); + .collect(); let now = Instant::now(); if now.duration_since(last_broadcast) >= AUDIO_AMPLITUDE_THROTTLE { @@ -254,29 +341,22 @@ impl Session { let _ = tx.send_async(speaker_chunk.clone()).await; } - // Send separate streams to listen client - if process_mic_tx - .send_async(processed_mic.clone()) - .await - .is_err() - { + if let Err(_) = process_mic_tx.send_async(processed_mic).await { tracing::error!("process_mic_tx_send_error"); return; } - if process_speaker_tx - .send_async(processed_speaker.clone()) - .await - .is_err() - { + if let Err(_) = process_speaker_tx.send_async(processed_speaker).await { tracing::error!("process_speaker_tx_send_error"); return; } if record { - let mixed: Vec = processed_mic + let mixed: Vec = mic_chunk .iter() - .zip(processed_speaker.iter()) - .map(|(mic, speaker)| (mic + speaker).clamp(-1.0, 1.0)) + .zip(speaker_chunk.iter()) + .map(|(mic, speaker)| { + (mic * POST_MIC_GAIN + speaker * POST_SPEAKER_GAIN).clamp(-1.0, 1.0) + }) .collect(); if save_mixed_tx.send_async(mixed).await.is_err() { tracing::error!("save_mixed_tx_send_error"); @@ -290,92 +370,74 @@ impl Session { tasks.spawn({ let app_dir = app_dir.clone(); let session_id = session_id.clone(); + let save_mixed_rx = channels.save_mixed_rx.clone(); async move { - let dir = app_dir.join(&session_id); - std::fs::create_dir_all(&dir).unwrap(); - let path = dir.join("audio.wav"); - - let mut wav = if path.exists() { - hound::WavWriter::append(path).unwrap() - } else { - hound::WavWriter::create(path, WAV_SPEC).unwrap() - }; - - while let Ok(chunk) = save_mixed_rx.recv_async().await { - for sample in chunk { - wav.write_sample(sample).unwrap(); - } + if let Err(e) = AudioSaver::save_to_wav( + save_mixed_rx, + &session_id, + &app_dir, + "audio.wav", + true, + ) + .await + { + tracing::error!("failed_to_save_mixed_audio: {:?}", e); } - - wav.finalize().unwrap(); } }); } - if let Some(save_mic_raw_rx) = save_mic_raw_rx { + if let Some(save_mic_raw_rx) = channels.save_mic_raw_rx.clone() { tasks.spawn({ let session_id = session_id.clone(); let app_dir = app_dir.clone(); async move { - let dir = app_dir.join(&session_id); - std::fs::create_dir_all(&dir).unwrap(); - let path = dir.join("audio_mic.wav"); - - let mut wav = hound::WavWriter::create(path, WAV_SPEC).unwrap(); - - while let Ok(chunk) = save_mic_raw_rx.recv_async().await { - for sample in chunk { - wav.write_sample(sample).unwrap(); - } + if let Err(e) = AudioSaver::save_to_wav( + save_mic_raw_rx, + &session_id, + &app_dir, + "audio_mic.wav", + false, + ) + .await + { + tracing::error!("failed_to_save_raw_mic_audio: {:?}", e); } - - wav.finalize().unwrap(); } }); } - if let Some(save_speaker_raw_rx) = save_speaker_raw_rx { + if let Some(save_speaker_raw_rx) = channels.save_speaker_raw_rx.clone() { tasks.spawn({ let session_id = session_id.clone(); let app_dir = app_dir.clone(); async move { - let dir = app_dir.join(&session_id); - std::fs::create_dir_all(&dir).unwrap(); - let path = dir.join("audio_speaker.wav"); - - let mut wav = hound::WavWriter::create(path, WAV_SPEC).unwrap(); - - while let Ok(chunk) = save_speaker_raw_rx.recv_async().await { - for sample in chunk { - wav.write_sample(sample).unwrap(); - } + if let Err(e) = AudioSaver::save_to_wav( + save_speaker_raw_rx, + &session_id, + &app_dir, + "audio_speaker.wav", + false, + ) + .await + { + tracing::error!("failed_to_save_raw_speaker_audio: {:?}", e); } - - wav.finalize().unwrap(); } }); } - let mic_audio_stream = process_mic_rx.into_stream().map(|chunk| { - let bytes: Vec = chunk - .into_iter() - .map(|sample| (sample * i16::MAX as f32) as i16) - .flat_map(|sample| sample.to_le_bytes()) - .collect(); - bytes::Bytes::from(bytes) - }); - - let speaker_audio_stream = process_speaker_rx.into_stream().map(|chunk| { - let bytes: Vec = chunk - .into_iter() - .map(|sample| (sample * i16::MAX as f32) as i16) - .flat_map(|sample| sample.to_le_bytes()) - .collect(); - bytes::Bytes::from(bytes) - }); + let mic_audio_stream = channels + .process_mic_rx + .into_stream() + .map(hypr_audio_utils::f32_to_i16_bytes); + let speaker_audio_stream = channels + .process_speaker_rx + .into_stream() + .map(hypr_audio_utils::f32_to_i16_bytes); let listen_stream = listen_client .from_realtime_audio(mic_audio_stream, speaker_audio_stream) From 5dfe47236242e7eab47bff05cf3683b942ee786c Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 12:46:23 -0700 Subject: [PATCH 08/10] got it working for speaker only --- Cargo.lock | 1 + crates/ws-utils/Cargo.toml | 4 +- crates/ws-utils/src/lib.rs | 67 +++++++++++++++++++++++++++++++++ plugins/local-stt/src/server.rs | 35 +++++++++++------ 4 files changed, 94 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b8473137..cb9c2a40e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17291,6 +17291,7 @@ dependencies = [ "kalosm-sound", "listener-interface", "serde_json", + "tokio", ] [[package]] diff --git a/crates/ws-utils/Cargo.toml b/crates/ws-utils/Cargo.toml index 4338b2afd..e9d95d8da 100644 --- a/crates/ws-utils/Cargo.toml +++ b/crates/ws-utils/Cargo.toml @@ -8,6 +8,8 @@ hypr-audio-utils = { workspace = true } hypr-listener-interface = { workspace = true } axum = { workspace = true, features = ["ws"] } -futures-util = { workspace = true } kalosm-sound = { workspace = true, default-features = false } serde_json = { workspace = true } + +futures-util = { workspace = true } +tokio = { workspace = true } diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index 06cc1feaf..5c5e0615d 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -1,5 +1,6 @@ use axum::extract::ws::{Message, WebSocket}; use futures_util::{stream::SplitStream, Stream, StreamExt}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use hypr_audio_utils::bytes_to_f32_samples; use hypr_listener_interface::ListenInputChunk; @@ -69,3 +70,69 @@ impl kalosm_sound::AsyncSource for WebSocketAudioSource { self.sample_rate } } + +pub struct ChannelAudioSource { + receiver: Option>>, + sample_rate: u32, +} + +impl ChannelAudioSource { + fn new(receiver: UnboundedReceiver>, sample_rate: u32) -> Self { + Self { + receiver: Some(receiver), + sample_rate, + } + } +} + +impl kalosm_sound::AsyncSource for ChannelAudioSource { + fn as_stream(&mut self) -> impl Stream + '_ { + let receiver = self.receiver.as_mut().unwrap(); + futures_util::stream::unfold(receiver, |receiver| async move { + match receiver.recv().await { + Some(samples) => Some((samples, receiver)), + None => None, + } + }) + .flat_map(futures_util::stream::iter) + } + + fn sample_rate(&self) -> u32 { + self.sample_rate + } +} + +pub fn split_dual_audio_sources( + mut ws_receiver: SplitStream, + sample_rate: u32, +) -> (ChannelAudioSource, ChannelAudioSource) { + let (mic_tx, mic_rx) = unbounded_channel::>(); + let (speaker_tx, speaker_rx) = unbounded_channel::>(); + + tokio::spawn(async move { + while let Some(item) = ws_receiver.next().await { + match item { + Ok(Message::Text(data)) => match serde_json::from_str::(&data) { + Ok(ListenInputChunk::Audio { data }) => { + let samples = bytes_to_f32_samples(&data); + let _ = mic_tx.send(samples.clone()); + let _ = speaker_tx.send(samples); + } + Ok(ListenInputChunk::DualAudio { mic, speaker }) => { + let _ = mic_tx.send(bytes_to_f32_samples(&mic)); + let _ = speaker_tx.send(bytes_to_f32_samples(&speaker)); + } + Ok(ListenInputChunk::End) => break, + Err(_) => {} + }, + Ok(Message::Close(_)) | Err(_) => break, + _ => {} + } + } + }); + + ( + ChannelAudioSource::new(mic_rx, sample_rate), + ChannelAudioSource::new(speaker_rx, sample_rate), + ) +} diff --git a/plugins/local-stt/src/server.rs b/plugins/local-stt/src/server.rs index 2d70028c0..d21cb286d 100644 --- a/plugins/local-stt/src/server.rs +++ b/plugins/local-stt/src/server.rs @@ -216,19 +216,30 @@ async fn websocket_dual_channel( model: hypr_whisper_local::Whisper, guard: ConnectionGuard, ) { - let mut stream = { - let audio_source = hypr_ws_utils::WebSocketAudioSource::new(ws_receiver, 16 * 1000); - let chunked = - audio_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); + let (mic_source, speaker_source) = + hypr_ws_utils::split_dual_audio_sources(ws_receiver, 16 * 1000); + + let mic_chunked = + mic_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); + let speaker_chunked = + speaker_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); + + let _mic_chunked = hypr_whisper_local::AudioChunkStream(mic_chunked.map(|chunk| { + hypr_whisper_local::SimpleAudioChunk { + samples: chunk.convert_samples().collect(), + ..Default::default() + } + })); - let chunked = hypr_whisper_local::AudioChunkStream(chunked.map(|chunk| { - hypr_whisper_local::SimpleAudioChunk { - samples: chunk.convert_samples().collect(), - ..Default::default() - } - })); - hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(chunked, model) - }; + let speaker_chunked = hypr_whisper_local::AudioChunkStream(speaker_chunked.map(|chunk| { + hypr_whisper_local::SimpleAudioChunk { + samples: chunk.convert_samples().collect(), + ..Default::default() + } + })); + + let mut stream = + hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(speaker_chunked, model); loop { tokio::select! { From bea836b94db6ada18864f0b6d303a8e56a7d91a6 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 12:52:22 -0700 Subject: [PATCH 09/10] ws refactor --- crates/ws-utils/src/lib.rs | 119 ++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 56 deletions(-) diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index 5c5e0615d..d828883a8 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -5,6 +5,46 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use hypr_audio_utils::bytes_to_f32_samples; use hypr_listener_interface::ListenInputChunk; +enum AudioProcessResult { + Samples(Vec), + DualSamples { mic: Vec, speaker: Vec }, + Empty, + End, +} + +fn process_ws_message(message: Message) -> AudioProcessResult { + match message { + Message::Text(data) => match serde_json::from_str::(&data) { + Ok(ListenInputChunk::Audio { data }) => { + if data.is_empty() { + AudioProcessResult::Empty + } else { + AudioProcessResult::Samples(bytes_to_f32_samples(&data)) + } + } + Ok(ListenInputChunk::DualAudio { mic, speaker }) => AudioProcessResult::DualSamples { + mic: bytes_to_f32_samples(&mic), + speaker: bytes_to_f32_samples(&speaker), + }, + Ok(ListenInputChunk::End) => AudioProcessResult::End, + Err(_) => AudioProcessResult::Empty, + }, + Message::Close(_) => AudioProcessResult::End, + _ => AudioProcessResult::Empty, + } +} + +fn mix_audio_channels(mic: &[f32], speaker: &[f32]) -> Vec { + let max_len = mic.len().max(speaker.len()); + (0..max_len) + .map(|i| { + let mic_sample = mic.get(i).copied().unwrap_or(0.0); + let speaker_sample = speaker.get(i).copied().unwrap_or(0.0); + (mic_sample + speaker_sample).clamp(-1.0, 1.0) + }) + .collect() +} + pub struct WebSocketAudioSource { receiver: Option>, sample_rate: u32, @@ -24,43 +64,18 @@ impl kalosm_sound::AsyncSource for WebSocketAudioSource { let receiver = self.receiver.as_mut().unwrap(); futures_util::stream::unfold(receiver, |receiver| async move { - let item = receiver.next().await; - - match item { - Some(Ok(Message::Text(data))) => { - let input: ListenInputChunk = serde_json::from_str(&data).unwrap(); - - match input { - ListenInputChunk::Audio { data } => { - if data.is_empty() { - None - } else { - let samples: Vec = bytes_to_f32_samples(&data); - - Some((samples, receiver)) - } - } - ListenInputChunk::DualAudio { mic, speaker } => { - let mic_samples: Vec = bytes_to_f32_samples(&mic); - let speaker_samples: Vec = bytes_to_f32_samples(&speaker); - - let max_len = mic_samples.len().max(speaker_samples.len()); - let mixed_samples: Vec = (0..max_len) - .map(|i| { - let mic = mic_samples.get(i).copied().unwrap_or(0.0); - let speaker = speaker_samples.get(i).copied().unwrap_or(0.0); - (mic + speaker).clamp(-1.0, 1.0) - }) - .collect(); - - Some((mixed_samples, receiver)) - } - ListenInputChunk::End => None, + match receiver.next().await { + Some(Ok(message)) => match process_ws_message(message) { + AudioProcessResult::Samples(samples) => Some((samples, receiver)), + AudioProcessResult::DualSamples { mic, speaker } => { + let mixed = mix_audio_channels(&mic, &speaker); + Some((mixed, receiver)) } - } - Some(Ok(Message::Close(_))) => None, + AudioProcessResult::Empty => Some((Vec::new(), receiver)), + AudioProcessResult::End => None, + }, Some(Err(_)) => None, - _ => Some((Vec::new(), receiver)), + None => None, } }) .flat_map(futures_util::stream::iter) @@ -89,10 +104,7 @@ impl kalosm_sound::AsyncSource for ChannelAudioSource { fn as_stream(&mut self) -> impl Stream + '_ { let receiver = self.receiver.as_mut().unwrap(); futures_util::stream::unfold(receiver, |receiver| async move { - match receiver.recv().await { - Some(samples) => Some((samples, receiver)), - None => None, - } + receiver.recv().await.map(|samples| (samples, receiver)) }) .flat_map(futures_util::stream::iter) } @@ -110,23 +122,18 @@ pub fn split_dual_audio_sources( let (speaker_tx, speaker_rx) = unbounded_channel::>(); tokio::spawn(async move { - while let Some(item) = ws_receiver.next().await { - match item { - Ok(Message::Text(data)) => match serde_json::from_str::(&data) { - Ok(ListenInputChunk::Audio { data }) => { - let samples = bytes_to_f32_samples(&data); - let _ = mic_tx.send(samples.clone()); - let _ = speaker_tx.send(samples); - } - Ok(ListenInputChunk::DualAudio { mic, speaker }) => { - let _ = mic_tx.send(bytes_to_f32_samples(&mic)); - let _ = speaker_tx.send(bytes_to_f32_samples(&speaker)); - } - Ok(ListenInputChunk::End) => break, - Err(_) => {} - }, - Ok(Message::Close(_)) | Err(_) => break, - _ => {} + while let Some(Ok(message)) = ws_receiver.next().await { + match process_ws_message(message) { + AudioProcessResult::Samples(samples) => { + let _ = mic_tx.send(samples.clone()); + let _ = speaker_tx.send(samples); + } + AudioProcessResult::DualSamples { mic, speaker } => { + let _ = mic_tx.send(mic); + let _ = speaker_tx.send(speaker); + } + AudioProcessResult::End => break, + AudioProcessResult::Empty => continue, } } }); From 039d65fbe36d48c73ead8aa5b7350d1635416f53 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Mon, 7 Jul 2025 14:51:26 -0700 Subject: [PATCH 10/10] got speaker identity assignment --- plugins/local-stt/src/lib.rs | 2 +- plugins/local-stt/src/server.rs | 92 +++++++++++++-------------------- 2 files changed, 37 insertions(+), 57 deletions(-) diff --git a/plugins/local-stt/src/lib.rs b/plugins/local-stt/src/lib.rs index 0e52ba528..d74a50c93 100644 --- a/plugins/local-stt/src/lib.rs +++ b/plugins/local-stt/src/lib.rs @@ -137,7 +137,7 @@ mod test { language: hypr_language::ISO639::En.into(), ..Default::default() }) - .build(); + .build_single(); let audio_source = rodio::Decoder::new(std::io::BufReader::new( std::fs::File::open(hypr_data::english_1::AUDIO_PATH).unwrap(), diff --git a/plugins/local-stt/src/server.rs b/plugins/local-stt/src/server.rs index d21cb286d..caa191794 100644 --- a/plugins/local-stt/src/server.rs +++ b/plugins/local-stt/src/server.rs @@ -144,12 +144,12 @@ async fn websocket_with_model( } async fn websocket_single_channel( - mut ws_sender: futures_util::stream::SplitSink, + ws_sender: futures_util::stream::SplitSink, ws_receiver: futures_util::stream::SplitStream, model: hypr_whisper_local::Whisper, guard: ConnectionGuard, ) { - let mut stream = { + let stream = { let audio_source = hypr_ws_utils::WebSocketAudioSource::new(ws_receiver, 16 * 1000); let chunked = audio_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); @@ -157,61 +157,17 @@ async fn websocket_single_channel( let chunked = hypr_whisper_local::AudioChunkStream(chunked.map(|chunk| { hypr_whisper_local::SimpleAudioChunk { samples: chunk.convert_samples().collect(), - ..Default::default() + meta: Some(serde_json::json!({ "source": "mixed" })), } })); hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(chunked, model) }; - loop { - tokio::select! { - _ = guard.cancelled() => { - tracing::info!("websocket_cancelled_by_new_connection"); - break; - } - chunk_opt = stream.next() => { - let Some(chunk) = chunk_opt else { break }; - - let meta = chunk.meta(); - let text = chunk.text().to_string(); - let start = chunk.start() as u64; - let duration = chunk.duration() as u64; - let confidence = chunk.confidence(); - - if confidence < 0.2 { - tracing::warn!(confidence, "skipping_transcript: {}", text); - continue; - } - - let data = ListenOutputChunk { - meta, - words: text - .split_whitespace() - .filter(|w| !w.is_empty()) - .map(|w| Word { - text: w.trim().to_string(), - speaker: None, - start_ms: Some(start), - end_ms: Some(start + duration), - confidence: Some(confidence), - }) - .collect(), - }; - - let msg = Message::Text(serde_json::to_string(&data).unwrap().into()); - if let Err(e) = ws_sender.send(msg).await { - tracing::warn!("websocket_send_error: {}", e); - break; - } - } - } - } - - let _ = ws_sender.close().await; + process_transcription_stream(ws_sender, stream, guard).await; } async fn websocket_dual_channel( - mut ws_sender: futures_util::stream::SplitSink, + ws_sender: futures_util::stream::SplitSink, ws_receiver: futures_util::stream::SplitStream, model: hypr_whisper_local::Whisper, guard: ConnectionGuard, @@ -224,23 +180,36 @@ async fn websocket_dual_channel( let speaker_chunked = speaker_source.chunks(hypr_chunker::RMS::new(), std::time::Duration::from_secs(13)); - let _mic_chunked = hypr_whisper_local::AudioChunkStream(mic_chunked.map(|chunk| { + let mic_chunked = hypr_whisper_local::AudioChunkStream(mic_chunked.map(|chunk| { hypr_whisper_local::SimpleAudioChunk { samples: chunk.convert_samples().collect(), - ..Default::default() + meta: Some(serde_json::json!({ "source": "mic" })), } })); let speaker_chunked = hypr_whisper_local::AudioChunkStream(speaker_chunked.map(|chunk| { hypr_whisper_local::SimpleAudioChunk { samples: chunk.convert_samples().collect(), - ..Default::default() + meta: Some(serde_json::json!({ "source": "speaker" })), } })); - let mut stream = - hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(speaker_chunked, model); + let merged_stream = hypr_whisper_local::AudioChunkStream(futures_util::stream::select( + mic_chunked.0, + speaker_chunked.0, + )); + + let stream = + hypr_whisper_local::TranscribeMetadataAudioStreamExt::transcribe(merged_stream, model); + process_transcription_stream(ws_sender, stream, guard).await; +} + +async fn process_transcription_stream( + mut ws_sender: futures_util::stream::SplitSink, + mut stream: impl futures_util::Stream + Unpin, + guard: ConnectionGuard, +) { loop { tokio::select! { _ = guard.cancelled() => { @@ -261,14 +230,25 @@ async fn websocket_dual_channel( continue; } + let source = meta.and_then(|meta| + meta.get("source") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + ); + let speaker = match source { + Some(s) if s == "mic" => Some(hypr_listener_interface::SpeakerIdentity::Unassigned { index: 0 }), + Some(s) if s == "speaker" => Some(hypr_listener_interface::SpeakerIdentity::Unassigned { index: 1 }), + _ => None, + }; + let data = ListenOutputChunk { - meta, + meta: None, words: text .split_whitespace() .filter(|w| !w.is_empty()) .map(|w| Word { text: w.trim().to_string(), - speaker: None, + speaker: speaker.clone(), start_ms: Some(start), end_ms: Some(start + duration), confidence: Some(confidence),