From ab7a960a8b23b82400d98b52f5a0127048ae37ae Mon Sep 17 00:00:00 2001 From: Tinco Andringa Date: Mon, 20 Oct 2025 01:14:52 +0200 Subject: [PATCH 1/5] Unify StreamError and OpenAIError (#413) * unify StreamError and OpenAIError * format * clippy --- async-openai/src/client.rs | 125 +++++++++++++-------- async-openai/src/error.rs | 28 ++++- async-openai/src/types/assistant_stream.rs | 4 +- 3 files changed, 106 insertions(+), 51 deletions(-) diff --git a/async-openai/src/client.rs b/async-openai/src/client.rs index fe2ed232..877d9c26 100644 --- a/async-openai/src/client.rs +++ b/async-openai/src/client.rs @@ -2,13 +2,13 @@ use std::pin::Pin; use bytes::Bytes; use futures::{stream::StreamExt, Stream}; -use reqwest::multipart::Form; -use reqwest_eventsource::{Event, EventSource, RequestBuilderExt}; +use reqwest::{multipart::Form, Response}; +use reqwest_eventsource::{Error as EventSourceError, Event, EventSource, RequestBuilderExt}; use serde::{de::DeserializeOwned, Serialize}; use crate::{ config::{Config, OpenAIConfig}, - error::{map_deserialization_error, ApiError, OpenAIError, WrappedError}, + error::{map_deserialization_error, ApiError, OpenAIError, StreamError, WrappedError}, file::Files, image::Images, moderation::Moderations, @@ -335,52 +335,34 @@ impl Client { .map_err(backoff::Error::Permanent)?; let status = response.status(); - let bytes = response - .bytes() - .await - .map_err(OpenAIError::Reqwest) - .map_err(backoff::Error::Permanent)?; - if status.is_server_error() { - // OpenAI does not guarantee server errors are returned as JSON so we cannot deserialize them. - let message: String = String::from_utf8_lossy(&bytes).into_owned(); - tracing::warn!("Server error: {status} - {message}"); - return Err(backoff::Error::Transient { - err: OpenAIError::ApiError(ApiError { - message, - r#type: None, - param: None, - code: None, - }), - retry_after: None, - }); - } - - // Deserialize response body from either error object or actual response object - if !status.is_success() { - let wrapped_error: WrappedError = serde_json::from_slice(bytes.as_ref()) - .map_err(|e| map_deserialization_error(e, bytes.as_ref())) - .map_err(backoff::Error::Permanent)?; - - if status.as_u16() == 429 - // API returns 429 also when: - // "You exceeded your current quota, please check your plan and billing details." - && wrapped_error.error.r#type != Some("insufficient_quota".to_string()) - { - // Rate limited retry... - tracing::warn!("Rate limited: {}", wrapped_error.error.message); - return Err(backoff::Error::Transient { - err: OpenAIError::ApiError(wrapped_error.error), - retry_after: None, - }); - } else { - return Err(backoff::Error::Permanent(OpenAIError::ApiError( - wrapped_error.error, - ))); + match read_response(response).await { + Ok(bytes) => Ok(bytes), + Err(e) => { + match e { + OpenAIError::ApiError(api_error) => { + if status.is_server_error() { + Err(backoff::Error::Transient { + err: OpenAIError::ApiError(api_error), + retry_after: None, + }) + } else if status.as_u16() == 429 + && api_error.r#type != Some("insufficient_quota".to_string()) + { + // Rate limited retry... + tracing::warn!("Rate limited: {}", api_error.message); + Err(backoff::Error::Transient { + err: OpenAIError::ApiError(api_error), + retry_after: None, + }) + } else { + Err(backoff::Error::Permanent(OpenAIError::ApiError(api_error))) + } + } + _ => Err(backoff::Error::Permanent(e)), + } } } - - Ok(bytes) }) .await } @@ -471,6 +453,53 @@ impl Client { } } +async fn read_response(response: Response) -> Result { + let status = response.status(); + let bytes = response.bytes().await.map_err(OpenAIError::Reqwest)?; + + if status.is_server_error() { + // OpenAI does not guarantee server errors are returned as JSON so we cannot deserialize them. + let message: String = String::from_utf8_lossy(&bytes).into_owned(); + tracing::warn!("Server error: {status} - {message}"); + return Err(OpenAIError::ApiError(ApiError { + message, + r#type: None, + param: None, + code: None, + })); + } + + // Deserialize response body from either error object or actual response object + if !status.is_success() { + let wrapped_error: WrappedError = serde_json::from_slice(bytes.as_ref()) + .map_err(|e| map_deserialization_error(e, bytes.as_ref()))?; + + return Err(OpenAIError::ApiError(wrapped_error.error)); + } + + Ok(bytes) +} + +async fn map_stream_error(value: EventSourceError) -> OpenAIError { + match value { + EventSourceError::Parser(e) => OpenAIError::StreamError(StreamError::Parser(e.to_string())), + EventSourceError::InvalidContentType(e, response) => { + OpenAIError::StreamError(StreamError::InvalidContentType(e, response)) + } + EventSourceError::InvalidLastEventId(e) => { + OpenAIError::StreamError(StreamError::InvalidLastEventId(e)) + } + EventSourceError::StreamEnded => OpenAIError::StreamError(StreamError::StreamEnded), + EventSourceError::Utf8(e) => OpenAIError::StreamError(StreamError::Utf8(e)), + EventSourceError::Transport(error) => OpenAIError::Reqwest(error), + EventSourceError::InvalidStatusCode(_status_code, response) => { + read_response(response).await.expect_err( + "Unreachable because read_response returns err when status_code is invalid", + ) + } + } +} + /// Request which responds with SSE. /// [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format) pub(crate) async fn stream( @@ -485,7 +514,7 @@ where while let Some(ev) = event_source.next().await { match ev { Err(e) => { - if let Err(_e) = tx.send(Err(OpenAIError::StreamError(e.to_string()))) { + if let Err(_e) = tx.send(Err(map_stream_error(e).await)) { // rx dropped break; } @@ -530,7 +559,7 @@ where while let Some(ev) = event_source.next().await { match ev { Err(e) => { - if let Err(_e) = tx.send(Err(OpenAIError::StreamError(e.to_string()))) { + if let Err(_e) = tx.send(Err(map_stream_error(e).await)) { // rx dropped break; } diff --git a/async-openai/src/error.rs b/async-openai/src/error.rs index a1139c9f..46b2fc1b 100644 --- a/async-openai/src/error.rs +++ b/async-openai/src/error.rs @@ -1,6 +1,10 @@ //! Errors originating from API calls, parsing responses, and reading-or-writing to the file system. +use std::string::FromUtf8Error; + +use reqwest::{header::HeaderValue, Response}; use serde::{Deserialize, Serialize}; + #[derive(Debug, thiserror::Error)] pub enum OpenAIError { /// Underlying error from reqwest library after an API call was made @@ -20,13 +24,35 @@ pub enum OpenAIError { FileReadError(String), /// Error on SSE streaming #[error("stream failed: {0}")] - StreamError(String), + StreamError(StreamError), /// Error from client side validation /// or when builder fails to build request before making API call #[error("invalid args: {0}")] InvalidArgument(String), } +#[derive(Debug, thiserror::Error)] +pub enum StreamError { + /// Source stream is not valid UTF8 + #[error(transparent)] + Utf8(FromUtf8Error), + /// Source stream is not a valid EventStream + #[error("Source stream is not a valid event stream: {0}")] + Parser(String), + /// The `Content-Type` returned by the server is invalid + #[error("Invalid content type for event stream: {0:?}")] + InvalidContentType(HeaderValue, Response), + /// The `Last-Event-ID` cannot be formed into a Header to be submitted to the server + #[error("Invalid `Last-Event-ID` for event stream: {0}")] + InvalidLastEventId(String), + /// The server sent an unrecognized event type + #[error("Unrecognized event type: {0}")] + UnrecognizedEventType(String), + /// The stream ended + #[error("Stream ended")] + StreamEnded, +} + /// OpenAI API returns error object on failure #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ApiError { diff --git a/async-openai/src/types/assistant_stream.rs b/async-openai/src/types/assistant_stream.rs index fca835cf..27ce6a13 100644 --- a/async-openai/src/types/assistant_stream.rs +++ b/async-openai/src/types/assistant_stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use futures::Stream; use serde::Deserialize; -use crate::error::{map_deserialization_error, ApiError, OpenAIError}; +use crate::error::{map_deserialization_error, ApiError, OpenAIError, StreamError}; use super::{ MessageDeltaObject, MessageObject, RunObject, RunStepDeltaObject, RunStepObject, ThreadObject, @@ -208,7 +208,7 @@ impl TryFrom for AssistantStreamEvent { "done" => Ok(AssistantStreamEvent::Done(value.data)), _ => Err(OpenAIError::StreamError( - "Unrecognized event: {value:?#}".into(), + StreamError::UnrecognizedEventType(value.event), )), } } From 506fb3f721614843ada366d6cea4772827f1938c Mon Sep 17 00:00:00 2001 From: Himanshu Neema Date: Sun, 19 Oct 2025 16:45:57 -0700 Subject: [PATCH 2/5] use underlying reqwest_eventsource::Error --- async-openai/src/client.rs | 19 +++++----------- async-openai/src/error.rs | 25 ++++++---------------- async-openai/src/types/assistant_stream.rs | 6 +++--- 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/async-openai/src/client.rs b/async-openai/src/client.rs index 877d9c26..42b7f662 100644 --- a/async-openai/src/client.rs +++ b/async-openai/src/client.rs @@ -482,21 +482,12 @@ async fn read_response(response: Response) -> Result { async fn map_stream_error(value: EventSourceError) -> OpenAIError { match value { - EventSourceError::Parser(e) => OpenAIError::StreamError(StreamError::Parser(e.to_string())), - EventSourceError::InvalidContentType(e, response) => { - OpenAIError::StreamError(StreamError::InvalidContentType(e, response)) - } - EventSourceError::InvalidLastEventId(e) => { - OpenAIError::StreamError(StreamError::InvalidLastEventId(e)) - } - EventSourceError::StreamEnded => OpenAIError::StreamError(StreamError::StreamEnded), - EventSourceError::Utf8(e) => OpenAIError::StreamError(StreamError::Utf8(e)), - EventSourceError::Transport(error) => OpenAIError::Reqwest(error), - EventSourceError::InvalidStatusCode(_status_code, response) => { - read_response(response).await.expect_err( - "Unreachable because read_response returns err when status_code is invalid", - ) + EventSourceError::InvalidStatusCode(status_code, response) => { + read_response(response).await.expect_err(&format!( + "Unreachable because read_response returns err when status_code {status_code} is invalid" + )) } + _ => OpenAIError::StreamError(StreamError::ReqwestEventSource(value.into())), } } diff --git a/async-openai/src/error.rs b/async-openai/src/error.rs index 46b2fc1b..52b1f133 100644 --- a/async-openai/src/error.rs +++ b/async-openai/src/error.rs @@ -4,7 +4,6 @@ use std::string::FromUtf8Error; use reqwest::{header::HeaderValue, Response}; use serde::{Deserialize, Serialize}; - #[derive(Debug, thiserror::Error)] pub enum OpenAIError { /// Underlying error from reqwest library after an API call was made @@ -33,24 +32,12 @@ pub enum OpenAIError { #[derive(Debug, thiserror::Error)] pub enum StreamError { - /// Source stream is not valid UTF8 - #[error(transparent)] - Utf8(FromUtf8Error), - /// Source stream is not a valid EventStream - #[error("Source stream is not a valid event stream: {0}")] - Parser(String), - /// The `Content-Type` returned by the server is invalid - #[error("Invalid content type for event stream: {0:?}")] - InvalidContentType(HeaderValue, Response), - /// The `Last-Event-ID` cannot be formed into a Header to be submitted to the server - #[error("Invalid `Last-Event-ID` for event stream: {0}")] - InvalidLastEventId(String), - /// The server sent an unrecognized event type - #[error("Unrecognized event type: {0}")] - UnrecognizedEventType(String), - /// The stream ended - #[error("Stream ended")] - StreamEnded, + /// Underlying error from reqwest_eventsource library when reading the stream + #[error("{0}")] + ReqwestEventSource(#[from] reqwest_eventsource::Error), + /// Error when a stream event does not match one of the expected values + #[error("Unrecognized event: {0:#?}")] + UnrecognizedEvent(eventsource_stream::Event), } /// OpenAI API returns error object on failure diff --git a/async-openai/src/types/assistant_stream.rs b/async-openai/src/types/assistant_stream.rs index 27ce6a13..f529c072 100644 --- a/async-openai/src/types/assistant_stream.rs +++ b/async-openai/src/types/assistant_stream.rs @@ -207,9 +207,9 @@ impl TryFrom for AssistantStreamEvent { .map(AssistantStreamEvent::ErrorEvent), "done" => Ok(AssistantStreamEvent::Done(value.data)), - _ => Err(OpenAIError::StreamError( - StreamError::UnrecognizedEventType(value.event), - )), + _ => Err(OpenAIError::StreamError(StreamError::UnrecognizedEvent( + value, + ))), } } } From 03d9022004f500fa523779f5409a964326e741bb Mon Sep 17 00:00:00 2001 From: Himanshu Neema Date: Sun, 19 Oct 2025 17:20:29 -0700 Subject: [PATCH 3/5] UnknownEvent --- async-openai/src/error.rs | 4 ++-- async-openai/src/types/assistant_stream.rs | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/async-openai/src/error.rs b/async-openai/src/error.rs index 52b1f133..fd092c79 100644 --- a/async-openai/src/error.rs +++ b/async-openai/src/error.rs @@ -36,8 +36,8 @@ pub enum StreamError { #[error("{0}")] ReqwestEventSource(#[from] reqwest_eventsource::Error), /// Error when a stream event does not match one of the expected values - #[error("Unrecognized event: {0:#?}")] - UnrecognizedEvent(eventsource_stream::Event), + #[error("Unknown event: {0:#?}")] + UnknownEvent(eventsource_stream::Event), } /// OpenAI API returns error object on failure diff --git a/async-openai/src/types/assistant_stream.rs b/async-openai/src/types/assistant_stream.rs index f529c072..add6a71b 100644 --- a/async-openai/src/types/assistant_stream.rs +++ b/async-openai/src/types/assistant_stream.rs @@ -207,9 +207,7 @@ impl TryFrom for AssistantStreamEvent { .map(AssistantStreamEvent::ErrorEvent), "done" => Ok(AssistantStreamEvent::Done(value.data)), - _ => Err(OpenAIError::StreamError(StreamError::UnrecognizedEvent( - value, - ))), + _ => Err(OpenAIError::StreamError(StreamError::UnknownEvent(value))), } } } From f21302df7bfebd438d1639edf93ac1dcae4449f0 Mon Sep 17 00:00:00 2001 From: Himanshu Neema Date: Sun, 19 Oct 2025 17:39:59 -0700 Subject: [PATCH 4/5] update exampels to test streaming errors --- examples/chat-stream/src/main.rs | 2 +- examples/completions-stream/src/main.rs | 2 +- examples/function-call-stream/src/main.rs | 4 ++-- examples/tool-call-stream/src/main.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/chat-stream/src/main.rs b/examples/chat-stream/src/main.rs index 2241e9ee..f3d22cde 100644 --- a/examples/chat-stream/src/main.rs +++ b/examples/chat-stream/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { }); } Err(err) => { - writeln!(lock, "error: {err}").unwrap(); + writeln!(lock, "error: {err:?}").unwrap(); } } stdout().flush()?; diff --git a/examples/completions-stream/src/main.rs b/examples/completions-stream/src/main.rs index 70b9f5bc..bd301938 100644 --- a/examples/completions-stream/src/main.rs +++ b/examples/completions-stream/src/main.rs @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box> { Ok(ccr) => ccr.choices.iter().for_each(|c| { print!("{}", c.text); }), - Err(e) => eprintln!("{}", e), + Err(e) => eprintln!("{e:?}"), } } diff --git a/examples/function-call-stream/src/main.rs b/examples/function-call-stream/src/main.rs index 02282971..ba70eb9e 100644 --- a/examples/function-call-stream/src/main.rs +++ b/examples/function-call-stream/src/main.rs @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box> { } } Err(err) => { - writeln!(lock, "error: {err}").unwrap(); + writeln!(lock, "error: {err:?}").unwrap(); } } stdout().flush()?; @@ -132,7 +132,7 @@ async fn call_fn( }); } Err(err) => { - writeln!(lock, "error: {err}").unwrap(); + writeln!(lock, "error: {err:?}").unwrap(); } } stdout().flush()?; diff --git a/examples/tool-call-stream/src/main.rs b/examples/tool-call-stream/src/main.rs index 230ee9a3..fd1ce77e 100644 --- a/examples/tool-call-stream/src/main.rs +++ b/examples/tool-call-stream/src/main.rs @@ -206,7 +206,7 @@ async fn main() -> Result<(), Box> { } Err(err) => { let mut lock = stdout().lock(); - writeln!(lock, "error: {err}").unwrap(); + writeln!(lock, "error: {err:?}").unwrap(); } } stdout() From 0e32f1f8d799eeeccfe82ed8a1641bb34c9fb4b2 Mon Sep 17 00:00:00 2001 From: Himanshu Neema Date: Sun, 19 Oct 2025 17:46:35 -0700 Subject: [PATCH 5/5] update responses-stream example --- examples/responses-stream/src/main.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/responses-stream/src/main.rs b/examples/responses-stream/src/main.rs index 5b565cd8..27e8b14e 100644 --- a/examples/responses-stream/src/main.rs +++ b/examples/responses-stream/src/main.rs @@ -36,13 +36,12 @@ async fn main() -> Result<(), Box> { | ResponseEvent::ResponseFailed(_) => { break; } - _ => { println!("{response_event:#?}"); } + _ => { + println!("{response_event:#?}"); + } }, Err(e) => { eprintln!("{e:#?}"); - // When a stream ends, it returns Err(OpenAIError::StreamError("Stream ended")) - // Without this, the stream will never end - break; } } }