From 601a13d670453c588df2c4af4ed40b9cddc32a37 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 28 May 2026 20:14:26 -0700 Subject: [PATCH 1/2] Add Langfuse prompt backend (text prompts) LangfusePromptBackend fetches prompts from Langfuse's prompt registry through PromptManager, behind the [langfuse] extra. It takes a caller-supplied Langfuse client so it shares one connection with a LangfuseObserver on the same client. A text prompt maps to Prompt (template, version, label, sampling lifted from config) and sets observability_entities['langfuse_prompt'] so the Generation-to-Prompt link fires with no caller wiring. Chat prompts raise PromptNotFound: OA's render produces a single user message today, so multi-message support is deferred (tracked for a later release). Also document the LANGFUSE_* env vars next to the LLM_* ones in the examples config sections. --- docs/examples/index.md | 11 ++ examples/README.md | 11 ++ src/openarmature/prompts/backends/langfuse.py | 148 ++++++++++++++++ tests/unit/test_prompts_langfuse.py | 162 ++++++++++++++++++ 4 files changed, 332 insertions(+) create mode 100644 src/openarmature/prompts/backends/langfuse.py create mode 100644 tests/unit/test_prompts_langfuse.py diff --git a/docs/examples/index.md b/docs/examples/index.md index c961b73..51fe262 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -55,6 +55,17 @@ The OpenAI public-API defaults are: | `LLM_MODEL` | `gpt-4o-mini` | Any model the bound endpoint exposes. | | `LLM_API_KEY` | (none) | Required. Pass empty for local servers that don't authenticate. | +The Langfuse observer and the Langfuse prompt backend read the +standard Langfuse SDK variables when pointed at a live Langfuse +account; `Langfuse()` picks them up automatically, so no credentials +appear in the example code: + +| Env var | Notes | +| --------------------- | ---------------------------------------------------------------------------------------- | +| `LANGFUSE_PUBLIC_KEY` | From your Langfuse project settings. | +| `LANGFUSE_SECRET_KEY` | From your Langfuse project settings. | +| `LANGFUSE_BASE_URL` | Langfuse host (e.g. `https://cloud.langfuse.com`). The SDK also accepts `LANGFUSE_HOST`. | + For a local OpenAI-compatible server (vLLM, LM Studio, llama.cpp, etc.), point `LLM_BASE_URL` at the host root (e.g. `http://localhost:8000`) and set `LLM_API_KEY` to whatever value the diff --git a/examples/README.md b/examples/README.md index 78d33b9..5017f04 100644 --- a/examples/README.md +++ b/examples/README.md @@ -113,6 +113,17 @@ defaults shown: | `LLM_MODEL` | `gpt-4o-mini` | Any model the bound endpoint exposes. | | `LLM_API_KEY` | (none) | Required; pass empty for local servers that don't authenticate. | +The Langfuse observer and the Langfuse prompt backend read the standard +Langfuse SDK variables when pointed at a live Langfuse account; +`Langfuse()` reads them automatically, so no credentials appear in the +example code: + +| Env var | Notes | +| --- | --- | +| `LANGFUSE_PUBLIC_KEY` | From your Langfuse project settings. | +| `LANGFUSE_SECRET_KEY` | From your Langfuse project settings. | +| `LANGFUSE_BASE_URL` | Langfuse host (e.g. `https://cloud.langfuse.com`); the SDK also accepts `LANGFUSE_HOST`. | + ## Running ```bash diff --git a/src/openarmature/prompts/backends/langfuse.py b/src/openarmature/prompts/backends/langfuse.py new file mode 100644 index 0000000..fb8ead5 --- /dev/null +++ b/src/openarmature/prompts/backends/langfuse.py @@ -0,0 +1,148 @@ +"""Langfuse-backed PromptBackend (text prompts). + +Fetches prompts from Langfuse's prompt registry through OA's +``PromptManager``. Gated behind the ``[langfuse]`` extra; import this +module only when ``langfuse`` is installed (``backends/__init__`` does +not import it, so the base package stays langfuse-free). + +v1 supports Langfuse TEXT prompts. A Langfuse CHAT prompt raises +``PromptNotFound`` because OA's render produces a single user message +today; multi-message (chat) prompt support is tracked for a later +release. +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime +from typing import Any, Protocol + +from langfuse.api import NotFoundError, ServiceUnavailableError +from langfuse.model import ChatPromptClient, TextPromptClient + +from ..errors import PromptNotFound, PromptStoreUnavailable +from ..hashing import compute_template_hash +from ..prompt import Prompt, SamplingConfig + + +class LangfusePromptClient(Protocol): + """The minimal Langfuse prompt-fetch surface this backend needs. + + ``langfuse.Langfuse`` satisfies it structurally (its ``get_prompt`` + has additional optional parameters), so callers pass a real client; + tests can supply a lightweight fake. + """ + + def get_prompt(self, name: str, *, label: str = "production") -> TextPromptClient | ChatPromptClient: ... + + +# Langfuse prompt `config` keys that line up with SamplingConfig's +# declared fields. Only these are lifted into `Prompt.sampling`; the +# full config is preserved under `Prompt.metadata` so nothing is lost. +_SAMPLING_FIELDS = ( + "temperature", + "max_tokens", + "top_p", + "seed", + "frequency_penalty", + "presence_penalty", + "stop_sequences", +) + + +class LangfusePromptBackend: + """Reads prompts from Langfuse's prompt registry. + + Constructed with a caller-supplied ``langfuse.Langfuse`` client, so + it shares one client (one connection pool, one flush thread) with a + :class:`~openarmature.observability.langfuse.LangfuseObserver` built + on the same instance:: + + from langfuse import Langfuse + from openarmature.prompts import PromptManager + from openarmature.prompts.backends.langfuse import LangfusePromptBackend + + client = Langfuse(public_key="pk-lf-...", secret_key="sk-lf-...") + manager = PromptManager(LangfusePromptBackend(client)) + + ``fetch`` is reentrant and does not render; the manager renders. + The returned ``Prompt`` carries the raw Langfuse template (Langfuse + ``{{var}}`` placeholders are Jinja2-compatible, so OA's render + applies unchanged), plus the Langfuse SDK Prompt object under + ``observability_entities['langfuse_prompt']`` so the observability + Generation -> Prompt link fires automatically. + """ + + def __init__(self, client: LangfusePromptClient) -> None: + self._client = client + + async def fetch(self, name: str, label: str = "production") -> Prompt: + # The Langfuse SDK's get_prompt is synchronous (and does its own + # client-side caching); run it off the event loop. + result = await asyncio.to_thread(self._get_prompt, name, label) + + if isinstance(result, ChatPromptClient): + raise PromptNotFound( + f"prompt ({name!r}, {label!r}) is a Langfuse chat prompt; " + "the Langfuse backend supports text prompts only in this " + "release (multi-message prompt support is planned)", + name=name, + label=label, + backend="langfuse", + ) + + template = result.prompt + template_hash = compute_template_hash(template) + return Prompt( + name=name, + version=str(result.version), + label=label, + template=template, + template_hash=template_hash, + fetched_at=datetime.now(UTC), + sampling=_sampling_from_config(result.config), + observability_entities={"langfuse_prompt": result}, + metadata=_metadata_from(result), + ) + + def _get_prompt(self, name: str, label: str) -> TextPromptClient | ChatPromptClient: + try: + return self._client.get_prompt(name, label=label) + except NotFoundError as exc: + raise PromptNotFound( + f"prompt ({name!r}, {label!r}) not found in Langfuse", + name=name, + label=label, + backend="langfuse", + ) from exc + except ServiceUnavailableError as exc: + raise PromptStoreUnavailable( + f"Langfuse unavailable fetching ({name!r}, {label!r}): {exc}", + name=name, + label=label, + ) from exc + + +def _sampling_from_config(config: dict[str, Any] | None) -> SamplingConfig | None: + if not config: + return None + declared = {k: config[k] for k in _SAMPLING_FIELDS if k in config} + if not declared: + return None + return SamplingConfig(**declared) + + +def _metadata_from(result: TextPromptClient) -> dict[str, Any]: + # Preserve Langfuse-side attribution. `config` is kept whole here + # even though sampling fields are also lifted to `Prompt.sampling`, + # so non-sampling config keys aren't dropped. + meta: dict[str, Any] = { + "langfuse_version": result.version, + "langfuse_labels": result.labels, + "langfuse_tags": result.tags, + } + if result.config: + meta["langfuse_config"] = result.config + if result.commit_message is not None: + meta["langfuse_commit_message"] = result.commit_message + return meta diff --git a/tests/unit/test_prompts_langfuse.py b/tests/unit/test_prompts_langfuse.py new file mode 100644 index 0000000..c4f0df0 --- /dev/null +++ b/tests/unit/test_prompts_langfuse.py @@ -0,0 +1,162 @@ +"""Unit tests for the Langfuse-backed PromptBackend (text prompts).""" + +from __future__ import annotations + +from typing import Any, cast + +import pytest + +pytest.importorskip("langfuse") + +from langfuse.api import NotFoundError, ServiceUnavailableError # noqa: E402 +from langfuse.model import ( # noqa: E402 + ChatPromptClient, + Prompt_Chat, # pyright: ignore[reportPrivateImportUsage] + Prompt_Text, # pyright: ignore[reportPrivateImportUsage] + TextPromptClient, +) + +from openarmature.prompts import PromptManager # noqa: E402 +from openarmature.prompts.backends.langfuse import LangfusePromptBackend # noqa: E402 +from openarmature.prompts.errors import ( # noqa: E402 + PromptNotFound, + PromptStoreUnavailable, +) + +pytestmark = pytest.mark.asyncio + + +def _text_client( + *, + name: str = "greeting", + version: int = 3, + prompt: str = "Hello {{ user }}", + config: dict[str, Any] | None = None, + labels: list[str] | None = None, + tags: list[str] | None = None, +) -> TextPromptClient: + return TextPromptClient( + Prompt_Text( + type="text", + name=name, + version=version, + prompt=prompt, + config=config or {}, + labels=["production"] if labels is None else labels, + tags=tags or [], + ) + ) + + +def _chat_client(*, name: str = "chatty", version: int = 1) -> ChatPromptClient: + return ChatPromptClient( + Prompt_Chat( + type="chat", + name=name, + version=version, + prompt=cast(Any, [{"role": "system", "content": "hi {{ user }}"}]), + config={}, + labels=["production"], + tags=[], + ) + ) + + +class _FakeClient: + """Stands in for ``langfuse.Langfuse`` exposing only ``get_prompt``.""" + + def __init__(self, *, result: Any = None, exc: BaseException | None = None) -> None: + self._result = result + self._exc = exc + self.calls: list[tuple[str, str]] = [] + + def get_prompt(self, name: str, *, label: str = "production", **_: Any) -> Any: + self.calls.append((name, label)) + if self._exc is not None: + raise self._exc + return self._result + + +async def test_fetch_text_prompt_maps_to_prompt() -> None: + client = _text_client(prompt="Hello {{ user }}", version=7, tags=["greeting"]) + backend = LangfusePromptBackend(_FakeClient(result=client)) + + prompt = await backend.fetch("greeting", "production") + + assert prompt.name == "greeting" + assert prompt.version == "7" + assert prompt.label == "production" + assert prompt.template == "Hello {{ user }}" + assert prompt.template_hash.startswith("sha256:") + assert prompt.observability_entities is not None + assert prompt.observability_entities["langfuse_prompt"] is client + assert prompt.metadata is not None + assert prompt.metadata["langfuse_version"] == 7 + assert prompt.metadata["langfuse_tags"] == ["greeting"] + + +async def test_fetch_passes_label_through() -> None: + fake = _FakeClient(result=_text_client()) + backend = LangfusePromptBackend(fake) + + await backend.fetch("greeting", "staging") + + assert fake.calls == [("greeting", "staging")] + + +async def test_chat_prompt_raises_not_found() -> None: + backend = LangfusePromptBackend(_FakeClient(result=_chat_client())) + + with pytest.raises(PromptNotFound) as excinfo: + await backend.fetch("chatty", "production") + + assert excinfo.value.backend == "langfuse" + assert "chat prompt" in str(excinfo.value) + + +async def test_not_found_maps_to_prompt_not_found() -> None: + backend = LangfusePromptBackend(_FakeClient(exc=NotFoundError("nope"))) + + with pytest.raises(PromptNotFound): + await backend.fetch("missing", "production") + + +async def test_service_unavailable_maps_to_store_unavailable() -> None: + backend = LangfusePromptBackend(_FakeClient(exc=ServiceUnavailableError())) + + with pytest.raises(PromptStoreUnavailable): + await backend.fetch("greeting", "production") + + +async def test_sampling_extracted_from_config() -> None: + client = _text_client(config={"temperature": 0.0, "max_tokens": 256, "model": "gpt-4o"}) + backend = LangfusePromptBackend(_FakeClient(result=client)) + + prompt = await backend.fetch("greeting", "production") + + assert prompt.sampling is not None + assert prompt.sampling.temperature == 0.0 + assert prompt.sampling.max_tokens == 256 + # Non-sampling config keys are not lifted into sampling, but the + # full config is preserved under metadata. + assert prompt.metadata is not None + assert prompt.metadata["langfuse_config"]["model"] == "gpt-4o" + + +async def test_no_sampling_config_yields_none() -> None: + backend = LangfusePromptBackend(_FakeClient(result=_text_client(config={}))) + + prompt = await backend.fetch("greeting", "production") + + assert prompt.sampling is None + + +async def test_fetched_prompt_renders_through_manager() -> None: + backend = LangfusePromptBackend(_FakeClient(result=_text_client(prompt="Hi {{ user }}"))) + manager = PromptManager(backend) + + prompt = await manager.fetch("greeting", "production") + result = manager.render(prompt, {"user": "Alice"}) + + assert len(result.messages) == 1 + assert result.messages[0].content == "Hi Alice" From 53c2e8be8fa6adb71c4de3f183640a25bca63440 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Fri, 29 May 2026 11:22:03 -0700 Subject: [PATCH 2/2] Harden Langfuse prompt backend error mapping From PR #93 review: - Map httpx.TransportError (connect/read/timeout/network) to PromptStoreUnavailable in LangfusePromptBackend, alongside 503s, so PromptManager falls back on transport failures per the PromptBackend contract. Adds a transport-timeout unit test. - Align the opt-in Langfuse integration test's host resolution to the SDK's precedence (LANGFUSE_BASE_URL before LANGFUSE_HOST); it had framed LANGFUSE_HOST as canonical with the opposite precedence. --- src/openarmature/prompts/backends/langfuse.py | 8 +++++++- tests/unit/test_observability_langfuse_adapter.py | 12 ++++++------ tests/unit/test_prompts_langfuse.py | 11 +++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/openarmature/prompts/backends/langfuse.py b/src/openarmature/prompts/backends/langfuse.py index fb8ead5..5681d5f 100644 --- a/src/openarmature/prompts/backends/langfuse.py +++ b/src/openarmature/prompts/backends/langfuse.py @@ -17,6 +17,7 @@ from datetime import UTC, datetime from typing import Any, Protocol +import httpx from langfuse.api import NotFoundError, ServiceUnavailableError from langfuse.model import ChatPromptClient, TextPromptClient @@ -115,7 +116,12 @@ def _get_prompt(self, name: str, label: str) -> TextPromptClient | ChatPromptCli label=label, backend="langfuse", ) from exc - except ServiceUnavailableError as exc: + except (ServiceUnavailableError, httpx.TransportError) as exc: + # 503 plus transport-level failures (connect/read/timeout/ + # network): the SDK surfaces raw httpx errors when there's no + # HTTP response to map to a typed error. Per the PromptBackend + # contract these are unavailability, so the manager can fall + # back. 4xx auth and other errors still propagate. raise PromptStoreUnavailable( f"Langfuse unavailable fetching ({name!r}, {label!r}): {exc}", name=name, diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index bc38af9..eb6df8b 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -11,7 +11,7 @@ LANGFUSE_PUBLIC_KEY=pk-lf-... \\ LANGFUSE_SECRET_KEY=sk-lf-... \\ - LANGFUSE_HOST=https://cloud.langfuse.com \\ + LANGFUSE_BASE_URL=https://cloud.langfuse.com \\ uv run pytest tests/unit/test_observability_langfuse_adapter.py \\ -m integration -v @@ -160,11 +160,11 @@ async def test_adapter_against_real_langfuse_cloud() -> None: if not public_key or not secret_key: pytest.skip("LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY not set") - # LANGFUSE_HOST is the canonical name (matches the SDK's ``host=`` - # kwarg); LANGFUSE_BASE_URL is the common alias some downstream - # configs use. Accept either; LANGFUSE_HOST wins when both set. + # Mirror the SDK's precedence: Langfuse() reads LANGFUSE_BASE_URL + # first, then LANGFUSE_HOST. Resolve the same order here so this + # explicit host matches what a no-arg Langfuse() would pick up. host = ( - os.environ.get("LANGFUSE_HOST") or os.environ.get("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com" + os.environ.get("LANGFUSE_BASE_URL") or os.environ.get("LANGFUSE_HOST") or "https://cloud.langfuse.com" ) client = Langfuse( public_key=public_key, @@ -175,7 +175,7 @@ async def test_adapter_against_real_langfuse_cloud() -> None: # background export thread is just a logged warning and the test # passes while traces vanish. assert client.auth_check(), ( - "Langfuse auth_check failed — verify LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY / LANGFUSE_HOST" + "Langfuse auth_check failed — verify LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY / LANGFUSE_BASE_URL" ) observer = LangfuseObserver(client=LangfuseSDKAdapter(client)) diff --git a/tests/unit/test_prompts_langfuse.py b/tests/unit/test_prompts_langfuse.py index c4f0df0..c470a08 100644 --- a/tests/unit/test_prompts_langfuse.py +++ b/tests/unit/test_prompts_langfuse.py @@ -4,6 +4,7 @@ from typing import Any, cast +import httpx import pytest pytest.importorskip("langfuse") @@ -128,6 +129,16 @@ async def test_service_unavailable_maps_to_store_unavailable() -> None: await backend.fetch("greeting", "production") +async def test_transport_error_maps_to_store_unavailable() -> None: + # A connect/read/timeout/network failure surfaces as a raw httpx + # TransportError (no HTTP response to map to a typed SDK error); it + # must become PromptStoreUnavailable so PromptManager can fall back. + backend = LangfusePromptBackend(_FakeClient(exc=httpx.ConnectTimeout("timed out"))) + + with pytest.raises(PromptStoreUnavailable): + await backend.fetch("greeting", "production") + + async def test_sampling_extracted_from_config() -> None: client = _text_client(config={"temperature": 0.0, "max_tokens": 256, "model": "gpt-4o"}) backend = LangfusePromptBackend(_FakeClient(result=client))