Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async-stream = "0.3.6"
futures-channel = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
ractor = "0.15"
reqwest = "0.12"
reqwest-streams = "0.10.0"
tokio = "1"
Expand All @@ -138,7 +139,7 @@ tokio-util = "0.7.15"

anyhow = "1"
approx = "0.5.1"
backon = "1.4.1"
backon = "1.5.2"
bytes = "1.9.0"
cached = "0.55.1"
clap = "4"
Expand Down
9 changes: 6 additions & 3 deletions crates/transcribe-whisper-local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use service::*;
// cargo test -p transcribe-whisper-local test_service -- --nocapture
mod tests {
use super::*;
use axum::{error_handling::HandleError, http::StatusCode};
use futures_util::StreamExt;
use hypr_audio_utils::AudioFormatExt;

Expand All @@ -18,7 +19,10 @@ mod tests {
.join("com.hyprnote.dev")
.join("stt/ggml-small-q8_0.bin");

let service = TranscribeService::builder().model_path(model_path).build();
let service = HandleError::new(
TranscribeService::builder().model_path(model_path).build(),
move |err: String| async move { (StatusCode::INTERNAL_SERVER_ERROR, err) },
);

let app = axum::Router::new().route_service("/v1/listen", service);

Expand All @@ -43,8 +47,7 @@ mod tests {
.to_i16_le_chunks(16000, 512);
let input = audio.map(|chunk| owhisper_interface::MixedMessage::Audio(chunk));

let stream = client.from_realtime_audio(input).await.unwrap();
futures_util::pin_mut!(stream);
let _ = client.from_realtime_audio(input).await.unwrap();

server_handle.abort();
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/transcribe-whisper-local/src/service/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
B: Send + 'static,
{
type Response = Response;
type Error = std::convert::Infallible;
type Error = String;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down
2 changes: 1 addition & 1 deletion plugins/listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ hound = { workspace = true }
vorbis_rs = { workspace = true }

futures-util = { workspace = true }
ractor = "0.15"
ractor = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
Expand Down
79 changes: 61 additions & 18 deletions plugins/listener/src/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::Bytes;
use futures_util::StreamExt;

use owhisper_interface::{ControlMessage, MixedMessage, Word2};
use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef};
use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, SupervisionEvent};
use tauri_specta::Event;

use crate::{manager::TranscriptManager, SessionEvent};
Expand All @@ -27,6 +27,7 @@ pub struct ListenerArgs {
pub struct ListenerState {
tx: tokio::sync::mpsc::Sender<MixedMessage<(Bytes, Bytes), ControlMessage>>,
rx_task: tokio::task::JoinHandle<()>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}

pub struct ListenerActor;
Expand All @@ -47,8 +48,31 @@ impl Actor for ListenerActor {
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (tx, rx_task) = spawn_rx_task(args, myself).await.unwrap();
Ok(ListenerState { tx, rx_task })
{
use tauri_plugin_local_stt::LocalSttPluginExt;
let _ = args.app.start_server(None).await;
}

let (tx, rx_task, shutdown_tx) = spawn_rx_task(args, myself).await.unwrap();
let state = ListenerState {
tx,
rx_task,
shutdown_tx: Some(shutdown_tx),
};

Ok(state)
}
Comment on lines +56 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t unwrap spawn_rx_task in pre_start.

.await.unwrap() will panic on setup failure. Propagate with ? so the actor fails to start cleanly.

-        let (tx, rx_task, shutdown_tx) = spawn_rx_task(args, myself).await.unwrap();
+        let (tx, rx_task, shutdown_tx) = spawn_rx_task(args, myself).await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (tx, rx_task, shutdown_tx) = spawn_rx_task(args, myself).await.unwrap();
let state = ListenerState {
tx,
rx_task,
shutdown_tx: Some(shutdown_tx),
};
Ok(state)
}
let (tx, rx_task, shutdown_tx) = spawn_rx_task(args, myself).await?;
let state = ListenerState {
tx,
rx_task,
shutdown_tx: Some(shutdown_tx),
};
Ok(state)
}
🤖 Prompt for AI Agents
In plugins/listener/src/actors/listener.rs around lines 56 to 64, the call to
spawn_rx_task uses `.await.unwrap()` which will panic on failure; replace the
unwrap with `?` to propagate the error, update the enclosing function's return
type to a Result (if not already) and adjust any error conversion (e.g., using
.map_err or Into) so the spawn_rx_task error can be returned to the actor
system; ensure shutdown_tx is wrapped the same way after using `?` so the
ListenerState construction remains unchanged.


async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(shutdown_tx) = state.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
state.rx_task.abort();
Ok(())
}

async fn handle(
Expand All @@ -65,12 +89,21 @@ impl Actor for ListenerActor {
Ok(())
}

async fn post_stop(
async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.rx_task.abort();
tracing::info!("supervisor_event: {:?}", message);

match message {
SupervisionEvent::ActorStarted(_) | SupervisionEvent::ProcessGroupChanged(_) => {}
SupervisionEvent::ActorTerminated(_, _, _) => {}
SupervisionEvent::ActorFailed(_cell, _) => {
myself.stop(None);
}
}
Ok(())
}
}
Expand All @@ -82,10 +115,12 @@ async fn spawn_rx_task(
(
tokio::sync::mpsc::Sender<MixedMessage<(Bytes, Bytes), ControlMessage>>,
tokio::task::JoinHandle<()>,
tokio::sync::oneshot::Sender<()>,
),
ActorProcessingErr,
> {
let (tx, rx) = tokio::sync::mpsc::channel::<MixedMessage<(Bytes, Bytes), ControlMessage>>(32);
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let app = args.app.clone();
let session_id = args.session_id.clone();
Expand All @@ -109,7 +144,7 @@ async fn spawn_rx_task(

let rx_task = tokio::spawn(async move {
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let (listen_stream, _handle) = match client.from_realtime_audio(outbound).await {
let (listen_stream, handle) = match client.from_realtime_audio(outbound).await {
Ok(res) => res,
Err(e) => {
tracing::error!("listen_ws_connect_failed: {:?}", e);
Expand All @@ -122,8 +157,14 @@ async fn spawn_rx_task(
let mut manager = TranscriptManager::with_unix_timestamp(session_start_ts_ms);

loop {
match tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()).await {
Ok(Some(response)) => {
tokio::select! {
_ = &mut shutdown_rx => {
handle.finalize_with_text(serde_json::json!({"type": "Finalize"}).to_string().into()).await;
break;
}
result = tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()) => {
match result {
Ok(Some(response)) => {
let diff = manager.append(response.clone());

let partial_words_by_channel: HashMap<usize, Vec<Word2>> = diff
Expand Down Expand Up @@ -179,21 +220,23 @@ async fn spawn_rx_task(
.emit(&app)
.unwrap();
}
Ok(None) => {
tracing::info!("listen_stream_ended");
break;
}
Err(_) => {
tracing::info!("listen_stream_timeout");
break;
Ok(None) => {
tracing::info!("listen_stream_ended");
break;
}
Err(_) => {
tracing::info!("listen_stream_timeout");
break;
}
}
}
}
}

myself.stop(None);
});

Ok((tx, rx_task))
Ok((tx, rx_task, shutdown_tx))
}

async fn update_session<R: tauri::Runtime>(
Expand Down
2 changes: 2 additions & 0 deletions plugins/local-stt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ axum = { workspace = true, features = ["ws"] }
axum-extra = { workspace = true, features = ["query"] }
tower-http = { workspace = true, features = ["cors", "trace"] }

backon = { workspace = true }
futures-util = { workspace = true }
ractor = { workspace = true }
reqwest = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros"] }
tokio-util = { workspace = true }
Expand Down
Loading
Loading