-
Notifications
You must be signed in to change notification settings - Fork 34
Streaming callbacks #85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
use std::env; | ||
use std::time::Duration; | ||
|
||
use futures::stream::StreamExt; | ||
|
||
use deepgram::{ | ||
common::options::{Encoding, Endpointing, Language, Options}, | ||
Deepgram, DeepgramError, | ||
}; | ||
|
||
static PATH_TO_FILE: &str = "examples/audio/bueller.wav"; | ||
static AUDIO_CHUNK_SIZE: usize = 3174; | ||
static FRAME_DELAY: Duration = Duration::from_millis(16); | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), DeepgramError> { | ||
let deepgram_api_key = | ||
env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); | ||
|
||
let dg_client = Deepgram::new(&deepgram_api_key)?; | ||
|
||
let options = Options::builder() | ||
.smart_format(true) | ||
.language(Language::en_US) | ||
.build(); | ||
|
||
let callback_url = env::var("DEEPGRAM_CALLBACK_URL") | ||
.expect("DEEPGRAM_CALLBACK_URL environmental variable") | ||
.parse() | ||
.expect("DEEPGRAM_CALLBACK_URL not a valid URL"); | ||
|
||
let mut results = dg_client | ||
.transcription() | ||
.stream_request_with_options(options) | ||
.keep_alive() | ||
.encoding(Encoding::Linear16) | ||
.sample_rate(44100) | ||
.channels(2) | ||
.endpointing(Endpointing::CustomDurationMs(300)) | ||
.interim_results(true) | ||
.utterance_end_ms(1000) | ||
.vad_events(true) | ||
.no_delay(true) | ||
.callback(callback_url) | ||
bd-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.file(PATH_TO_FILE, AUDIO_CHUNK_SIZE, FRAME_DELAY) | ||
.await?; | ||
|
||
println!("Deepgram Request ID: {}", results.request_id()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See here - simple way to grab request ID from the |
||
while let Some(result) = results.next().await { | ||
println!("got: {:?}", result); | ||
} | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ use std::{ | |
time::Duration, | ||
}; | ||
|
||
use anyhow::anyhow; | ||
use bytes::Bytes; | ||
use futures::{ | ||
channel::mpsc::{self, Receiver, Sender}, | ||
|
@@ -36,6 +37,7 @@ use tungstenite::{ | |
protocol::frame::coding::{Data, OpCode}, | ||
}; | ||
use url::Url; | ||
use uuid::Uuid; | ||
|
||
use self::file_chunker::FileChunker; | ||
use crate::{ | ||
|
@@ -62,6 +64,7 @@ pub struct WebsocketBuilder<'a> { | |
vad_events: Option<bool>, | ||
stream_url: Url, | ||
keep_alive: Option<bool>, | ||
callback: Option<Url>, | ||
} | ||
|
||
impl Transcription<'_> { | ||
|
@@ -143,6 +146,7 @@ impl Transcription<'_> { | |
vad_events: None, | ||
stream_url: self.listen_stream_url(), | ||
keep_alive: None, | ||
callback: None, | ||
} | ||
} | ||
|
||
|
@@ -214,6 +218,7 @@ impl<'a> WebsocketBuilder<'a> { | |
no_delay, | ||
vad_events, | ||
stream_url, | ||
callback, | ||
} = self; | ||
|
||
let mut url = stream_url.clone(); | ||
|
@@ -257,6 +262,9 @@ impl<'a> WebsocketBuilder<'a> { | |
if let Some(vad_events) = vad_events { | ||
pairs.append_pair("vad_events", &vad_events.to_string()); | ||
} | ||
if let Some(callback) = callback { | ||
pairs.append_pair("callback", callback.as_ref()); | ||
} | ||
} | ||
|
||
Ok(url) | ||
|
@@ -315,6 +323,12 @@ impl<'a> WebsocketBuilder<'a> { | |
|
||
self | ||
} | ||
|
||
pub fn callback(mut self, callback: Url) -> Self { | ||
self.callback = Some(callback); | ||
|
||
self | ||
} | ||
} | ||
|
||
impl<'a> WebsocketBuilder<'a> { | ||
|
@@ -351,6 +365,7 @@ impl<'a> WebsocketBuilder<'a> { | |
|
||
let (tx, rx) = mpsc::channel(1); | ||
let mut is_done = false; | ||
let request_id = handle.request_id(); | ||
tokio::task::spawn(async move { | ||
let mut handle = handle; | ||
let mut tx = tx; | ||
|
@@ -421,7 +436,11 @@ impl<'a> WebsocketBuilder<'a> { | |
} | ||
} | ||
}); | ||
Ok(TranscriptionStream { rx, done: false }) | ||
Ok(TranscriptionStream { | ||
rx, | ||
done: false, | ||
request_id, | ||
}) | ||
} | ||
|
||
/// A low level interface to the Deepgram websocket transcription API. | ||
|
@@ -628,6 +647,7 @@ impl Deref for Audio { | |
pub struct WebsocketHandle { | ||
message_tx: Sender<WsMessage>, | ||
response_rx: Receiver<Result<StreamResponse>>, | ||
request_id: Uuid, | ||
} | ||
|
||
impl<'a> WebsocketHandle { | ||
|
@@ -652,7 +672,21 @@ impl<'a> WebsocketHandle { | |
builder.body(())? | ||
}; | ||
|
||
let (ws_stream, _) = tokio_tungstenite::connect_async(request).await?; | ||
let (ws_stream, upgrade_response) = tokio_tungstenite::connect_async(request).await?; | ||
|
||
let request_id = upgrade_response | ||
.headers() | ||
.get("dg-request-id") | ||
.ok_or(DeepgramError::UnexpectedServerResponse(anyhow!( | ||
"Websocket upgrade headers missing request ID" | ||
)))? | ||
.to_str() | ||
.ok() | ||
.and_then(|req_header_str| Uuid::parse_str(req_header_str).ok()) | ||
.ok_or(DeepgramError::UnexpectedServerResponse(anyhow!( | ||
"Received malformed request ID in websocket upgrade headers" | ||
)))?; | ||
|
||
let (message_tx, message_rx) = mpsc::channel(256); | ||
let (response_tx, response_rx) = mpsc::channel(256); | ||
|
||
|
@@ -670,6 +704,7 @@ impl<'a> WebsocketHandle { | |
Ok(WebsocketHandle { | ||
message_tx, | ||
response_rx, | ||
request_id, | ||
}) | ||
} | ||
|
||
|
@@ -725,6 +760,10 @@ impl<'a> WebsocketHandle { | |
// eprintln!("<handle> receiving response: {resp:?}"); | ||
resp | ||
} | ||
|
||
pub fn request_id(&self) -> Uuid { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs to this function, and please mention that the request ID is needed for support on issues that come up with a particular request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
self.request_id | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] | ||
|
@@ -741,6 +780,7 @@ pub struct TranscriptionStream { | |
#[pin] | ||
rx: Receiver<Result<StreamResponse>>, | ||
done: bool, | ||
request_id: Uuid, | ||
} | ||
|
||
impl Stream for TranscriptionStream { | ||
|
@@ -752,6 +792,16 @@ impl Stream for TranscriptionStream { | |
} | ||
} | ||
|
||
impl TranscriptionStream { | ||
/// Returns the Deepgram request ID for the speech-to-text live request. | ||
/// | ||
/// A request ID needs to be provided to Deepgram as part of any support | ||
/// or troubleshooting assistance related to a specific request. | ||
pub fn request_id(&self) -> Uuid { | ||
self.request_id | ||
} | ||
} | ||
|
||
mod file_chunker { | ||
use bytes::{Bytes, BytesMut}; | ||
use futures::Stream; | ||
|
Uh oh!
There was an error while loading. Please reload this page.