Skip to content

Commit 4c6a07b

Browse files
committed
Add Langfuse observability to LLM execution methods
1 parent 1821b84 commit 4c6a07b

File tree

2 files changed

+131
-2
lines changed

2 files changed

+131
-2
lines changed

backend/app/core/langfuse/langfuse.py

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import uuid
22
import logging
3-
from typing import Any, Dict, Optional
3+
from typing import Any, Callable, Dict, Optional
4+
from functools import wraps
45

56
from asgi_correlation_id import correlation_id
67
from langfuse import Langfuse
78
from langfuse.client import StatefulGenerationClient, StatefulTraceClient
9+
from app.models.llm import CompletionConfig, QueryParams, LLMCallResponse
810

911
logger = logging.getLogger(__name__)
1012

@@ -107,3 +109,110 @@ def log_error(self, error_message: str, response_id: Optional[str] = None):
107109

108110
def flush(self):
109111
self.langfuse.flush()
112+
113+
114+
def observe_llm_execution(
115+
session_id: str | None = None,
116+
credentials: dict | None = None,
117+
):
118+
"""Decorator to add Langfuse observability to LLM provider execute methods.
119+
120+
Args:
121+
credentials: Langfuse credentials with public_key, secret_key, and host
122+
session_id: Session ID for grouping traces (conversation_id)
123+
124+
Usage:
125+
decorated_execute = observe_llm_execution(
126+
credentials=langfuse_creds,
127+
session_id=conversation_id
128+
)(provider_instance.execute)
129+
"""
130+
131+
def decorator(func: Callable) -> Callable:
132+
@wraps(func)
133+
def wrapper(completion_config: CompletionConfig, query: QueryParams, **kwargs):
134+
# Skip observability if no credentials provided
135+
if not credentials:
136+
logger.info("[Langfuse] No credentials - skipping observability")
137+
return func(completion_config, query, **kwargs)
138+
139+
try:
140+
langfuse = Langfuse(
141+
public_key=credentials.get("public_key"),
142+
secret_key=credentials.get("secret_key"),
143+
host=credentials.get("host"),
144+
)
145+
except Exception as e:
146+
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
147+
return func(completion_config, query, **kwargs)
148+
149+
trace_metadata = {
150+
"provider": completion_config.provider,
151+
}
152+
153+
if query.conversation and query.conversation.id:
154+
trace_metadata["conversation_id"] = query.conversation.id
155+
156+
trace = langfuse.trace(
157+
name="unified-llm-call",
158+
input=query.input,
159+
metadata=trace_metadata,
160+
tags=[completion_config.provider],
161+
)
162+
163+
generation = trace.generation(
164+
name=f"{completion_config.provider}-completion",
165+
input=query.input,
166+
model=completion_config.params.get("model"),
167+
)
168+
169+
try:
170+
# Execute the actual LLM call
171+
response: LLMCallResponse | None
172+
error: str | None
173+
response, error = func(completion_config, query, **kwargs)
174+
175+
if response:
176+
generation.end(
177+
output={
178+
"status": "success",
179+
"output": response.response.output.text,
180+
},
181+
usage_details={
182+
"input": response.usage.input_tokens,
183+
"output": response.usage.output_tokens,
184+
},
185+
model=response.response.model,
186+
)
187+
188+
trace.update(
189+
output={
190+
"status": "success",
191+
"output": response.response.output.text,
192+
},
193+
session_id=session_id or response.response.conversation_id,
194+
)
195+
else:
196+
error_msg = error or "Unknown error"
197+
generation.end(output={"error": error_msg})
198+
trace.update(
199+
output={"status": "failure", "error": error_msg},
200+
session_id=session_id,
201+
)
202+
203+
langfuse.flush()
204+
return response, error
205+
206+
except Exception as e:
207+
error_msg = str(e)
208+
generation.end(output={"error": error_msg})
209+
trace.update(
210+
output={"status": "failure", "error": error_msg},
211+
session_id=session_id,
212+
)
213+
langfuse.flush()
214+
raise
215+
216+
return wrapper
217+
218+
return decorator

backend/app/services/llm/jobs.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
from app.core.db import engine
99
from app.crud.config import ConfigVersionCrud
10+
from app.crud.credentials import get_provider_credential
1011
from app.crud.jobs import JobCrud
1112
from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest
1213
from app.models.llm.request import ConfigBlob, LLMCallConfig
1314
from app.utils import APIResponse, send_callback
1415
from app.celery.utils import start_high_priority_job
16+
from app.core.langfuse.langfuse import observe_llm_execution
1517
from app.services.llm.providers.registry import get_llm_provider
1618

1719

@@ -182,7 +184,25 @@ def execute_job(
182184
)
183185
return handle_job_error(job_id, request.callback_url, callback_response)
184186

185-
response, error = provider_instance.execute(
187+
langfuse_credentials = get_provider_credential(
188+
session=session,
189+
org_id=organization_id,
190+
project_id=project_id,
191+
provider="langfuse",
192+
)
193+
194+
# Extract conversation_id for langfuse session grouping
195+
conversation_id = None
196+
if request.query.conversation and request.query.conversation.id:
197+
conversation_id = request.query.conversation.id
198+
199+
# Apply Langfuse observability decorator to provider execute method
200+
decorated_execute = observe_llm_execution(
201+
credentials=langfuse_credentials,
202+
session_id=conversation_id,
203+
)(provider_instance.execute)
204+
205+
response, error = decorated_execute(
186206
completion_config=config_blob.completion,
187207
query=request.query,
188208
include_provider_raw_response=request.include_provider_raw_response,

0 commit comments

Comments
 (0)