Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ logforth = { version = "0.29.1", features = [
"diagnostic-fastrace",
"append-fastrace",
] }
opentelemetry-semantic-conventions = "0.31.0"
opentelemetry-semantic-conventions = { version = "0.31.0", features = [
"semconv_experimental",
] }
metrics = "0.24.2"
metrics-exporter-otel = "0.3.0"
pin-project = "1.1.10"
Expand Down
5 changes: 3 additions & 2 deletions src/proxy/handlers/chat_completions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use axum::{
};
use fastrace::prelude::{Event as TraceEvent, *};
use log::error;
use opentelemetry_semantic_conventions::attribute::GEN_AI_RESPONSE_FINISH_REASONS;
use span_attributes::{
StreamOutputCollector, apply_span_properties, chunk_span_properties, request_span_properties,
response_span_properties, usage_span_properties,
Expand Down Expand Up @@ -72,7 +73,7 @@ pub async fn chat_completions(
let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?;
let provider_base_url = provider_instance.effective_base_url().ok();

let span = Span::enter_with_local_parent("aisix.llm.chat_completion");
let span = Span::enter_with_local_parent("aisix.llm.chat_completions");
apply_span_properties(
&span,
request_span_properties(
Expand Down Expand Up @@ -197,7 +198,7 @@ async fn handle_stream_request(
properties
.iter()
.filter(|(key, _)| {
key == "gen_ai.response.finish_reasons"
key == GEN_AI_RESPONSE_FINISH_REASONS
|| key == "llm.finish_reason"
|| key == "llm.token_count.completion_details.reasoning"
})
Expand Down
53 changes: 29 additions & 24 deletions src/proxy/handlers/chat_completions/span_attributes/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use opentelemetry_semantic_conventions::attribute::{
GEN_AI_OPERATION_NAME, GEN_AI_OUTPUT_TYPE, GEN_AI_REQUEST_CHOICE_COUNT,
GEN_AI_REQUEST_FREQUENCY_PENALTY, GEN_AI_REQUEST_MAX_TOKENS, GEN_AI_REQUEST_MODEL,
GEN_AI_REQUEST_PRESENCE_PENALTY, GEN_AI_REQUEST_SEED, GEN_AI_REQUEST_STOP_SEQUENCES,
GEN_AI_REQUEST_TEMPERATURE, GEN_AI_REQUEST_TOP_K, GEN_AI_REQUEST_TOP_P, GEN_AI_RESPONSE_ID,
GEN_AI_RESPONSE_MODEL, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, SERVER_ADDRESS,
SERVER_PORT, USER_ID,
};
use reqwest::Url;
use serde_json::{Map, Value};

Expand Down Expand Up @@ -35,7 +43,7 @@ pub(in crate::proxy::handlers::chat_completions) fn request_span_properties(
.map(message_view_from_chat_message)
.collect();
let mut properties = vec![
("gen_ai.operation.name".into(), "chat".into()),
(GEN_AI_OPERATION_NAME.into(), "chat".into()),
("openinference.span.kind".into(), "LLM".into()),
(
"gen_ai.provider.name".into(),
Expand All @@ -45,62 +53,59 @@ pub(in crate::proxy::handlers::chat_completions) fn request_span_properties(
"llm.system".into(),
provider_semantics.llm_system.to_string(),
),
("gen_ai.request.model".into(), request.model.clone()),
(GEN_AI_REQUEST_MODEL.into(), request.model.clone()),
];

if let Some(llm_provider) = provider_semantics.llm_provider {
properties.push(("llm.provider".into(), llm_provider.to_string()));
}

if let Some(choice_count) = request.n.filter(|count| *count != 1) {
properties.push((
"gen_ai.request.choice.count".into(),
choice_count.to_string(),
));
properties.push((GEN_AI_REQUEST_CHOICE_COUNT.into(), choice_count.to_string()));
}

if let Some(seed) = request.seed {
properties.push(("gen_ai.request.seed".into(), seed.to_string()));
properties.push((GEN_AI_REQUEST_SEED.into(), seed.to_string()));
}

if let Some(max_tokens) = request.max_completion_tokens.or(request.max_tokens) {
properties.push(("gen_ai.request.max_tokens".into(), max_tokens.to_string()));
properties.push((GEN_AI_REQUEST_MAX_TOKENS.into(), max_tokens.to_string()));
}

if let Some(value) = request.frequency_penalty {
properties.push(("gen_ai.request.frequency_penalty".into(), value.to_string()));
properties.push((GEN_AI_REQUEST_FREQUENCY_PENALTY.into(), value.to_string()));
}

if let Some(value) = request.presence_penalty {
properties.push(("gen_ai.request.presence_penalty".into(), value.to_string()));
properties.push((GEN_AI_REQUEST_PRESENCE_PENALTY.into(), value.to_string()));
}

if let Some(value) = request.temperature {
properties.push(("gen_ai.request.temperature".into(), value.to_string()));
properties.push((GEN_AI_REQUEST_TEMPERATURE.into(), value.to_string()));
}

if let Some(value) = request.top_p {
properties.push(("gen_ai.request.top_p".into(), value.to_string()));
properties.push((GEN_AI_REQUEST_TOP_P.into(), value.to_string()));
}

if let Some(value) = numeric_extra_to_string(request.extra.get("top_k")) {
properties.push(("gen_ai.request.top_k".into(), value));
properties.push((GEN_AI_REQUEST_TOP_K.into(), value));
}

if let Some(value) = stop_sequences_json(request.stop.as_ref()) {
properties.push(("gen_ai.request.stop_sequences".into(), value));
properties.push((GEN_AI_REQUEST_STOP_SEQUENCES.into(), value));
}

if let Some(value) = response_format_output_type(request.response_format.as_ref()) {
properties.push(("gen_ai.output.type".into(), value.to_string()));
properties.push((GEN_AI_OUTPUT_TYPE.into(), value.to_string()));
}

if let Some(value) = request_invocation_parameters(request) {
properties.push(("llm.invocation_parameters".into(), value));
}

if let Some(user_id) = request.user.as_ref().filter(|user_id| !user_id.is_empty()) {
properties.push(("user.id".into(), user_id.clone()));
properties.push((USER_ID.into(), user_id.clone()));
}

append_openinference_message_properties(&mut properties, "llm.input_messages", &input_messages);
Expand All @@ -119,10 +124,10 @@ pub(in crate::proxy::handlers::chat_completions) fn request_span_properties(

if let Some(base_url) = base_url {
if let Some(address) = base_url.host_str() {
properties.push(("server.address".into(), address.to_string()));
properties.push((SERVER_ADDRESS.into(), address.to_string()));
}
if let Some(port) = base_url.port_or_known_default() {
properties.push(("server.port".into(), port.to_string()));
properties.push((SERVER_PORT.into(), port.to_string()));
}
}

Expand All @@ -135,8 +140,8 @@ pub(in crate::proxy::handlers::chat_completions) fn response_span_properties(
) -> Vec<(String, String)> {
let output_messages = response_output_message_views(response);
let mut properties = vec![
("gen_ai.response.id".into(), response.id.clone()),
("gen_ai.response.model".into(), response.model.clone()),
(GEN_AI_RESPONSE_ID.into(), response.id.clone()),
(GEN_AI_RESPONSE_MODEL.into(), response.model.clone()),
("llm.model_name".into(), response.model.clone()),
];

Expand Down Expand Up @@ -164,11 +169,11 @@ pub(in crate::proxy::handlers::chat_completions) fn chunk_span_properties(
let mut properties = Vec::new();

if !chunk.id.is_empty() {
properties.push(("gen_ai.response.id".into(), chunk.id.clone()));
properties.push((GEN_AI_RESPONSE_ID.into(), chunk.id.clone()));
}

if !chunk.model.is_empty() {
properties.push(("gen_ai.response.model".into(), chunk.model.clone()));
properties.push((GEN_AI_RESPONSE_MODEL.into(), chunk.model.clone()));
properties.push(("llm.model_name".into(), chunk.model.clone()));
}

Expand Down Expand Up @@ -297,13 +302,13 @@ fn append_response_usage_properties(

if usage.input_tokens.is_none() {
let input_tokens = raw_usage.prompt_tokens.to_string();
properties.push(("gen_ai.usage.input_tokens".into(), input_tokens.clone()));
properties.push((GEN_AI_USAGE_INPUT_TOKENS.into(), input_tokens.clone()));
properties.push(("llm.token_count.prompt".into(), input_tokens));
}

if usage.output_tokens.is_none() {
let output_tokens = raw_usage.completion_tokens.to_string();
properties.push(("gen_ai.usage.output_tokens".into(), output_tokens.clone()));
properties.push((GEN_AI_USAGE_OUTPUT_TOKENS.into(), output_tokens.clone()));
properties.push(("llm.token_count.completion".into(), output_tokens));
}

Expand Down
33 changes: 29 additions & 4 deletions src/proxy/handlers/embeddings/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod span_attributes;
mod types;

use std::time::Duration;
Expand All @@ -7,7 +8,9 @@ use axum::{
extract::State,
response::{IntoResponse, Response},
};
use fastrace::prelude::*;
use log::error;
use span_attributes::{request_span_properties, response_span_properties};
pub use types::EmbeddingError;

use crate::{
Expand All @@ -23,8 +26,9 @@ use crate::{
AppState,
hooks::{self, RequestContext},
provider::create_provider_instance,
utils::trace::span_attributes::apply_span_properties,
},
utils::future::maybe_timeout,
utils::future::{WithSpan, maybe_timeout},
};

fn embedding_usage(response: &EmbeddingResponse) -> Usage {
Expand All @@ -38,7 +42,6 @@ fn embedding_usage(response: &EmbeddingResponse) -> Usage {
}
}

#[fastrace::trace]
pub async fn embeddings(
State(state): State<AppState>,
mut request_ctx: RequestContext,
Expand All @@ -61,14 +64,32 @@ pub async fn embeddings(
GatewayError::Internal(format!("provider {} not found", model.provider_id))
})?;
let provider_instance = create_provider_instance(gateway.as_ref(), &provider)?;
let provider_base_url = provider_instance.effective_base_url().ok();
let timeout = model.timeout.map(Duration::from_millis);

// Replace request model name with real model name
request_data.model = model.model.clone();

match maybe_timeout(timeout, gateway.embed(&request_data, &provider_instance)).await {
let span = Span::enter_with_local_parent("aisix.llm.embeddings");
apply_span_properties(
&span,
request_span_properties(
&request_data,
provider_instance.def.as_ref(),
provider_base_url.as_ref(),
),
);

let (response, span) = (WithSpan {
inner: maybe_timeout(timeout, gateway.embed(&request_data, &provider_instance)),
span: Some(span),
})
.await;

match response {
Ok(Ok(response)) => {
let usage = embedding_usage(&response);
span.add_properties(|| response_span_properties(&response, &usage));
let mut resp = Json(response).into_response();
if let Err(err) = hooks::rate_limit::post_check(&mut request_ctx, &usage).await {
error!("Rate limit post_check error: {}", err);
Expand All @@ -79,9 +100,13 @@ pub async fn embeddings(
Ok(resp)
}
Ok(Err(err)) => {
span.add_property(|| ("error.type", "gateway_error"));
error!("Error generating embeddings: {}", err);
Err(EmbeddingError::GatewayError(err))
}
Err(err) => Err(EmbeddingError::Timeout(err)),
Err(err) => {
span.add_property(|| ("error.type", "timeout"));
Err(EmbeddingError::Timeout(err))
}
}
}
Loading