In [1]:
# First step: install dependencies
from pathlib import Path

candidates = [
    Path("requirements.txt"),
    Path("article-scripts/arize/requirements.txt"),
]
requirements_file = next((p for p in candidates if p.exists()), None)
if requirements_file is None:
    raise FileNotFoundError("Could not find requirements.txt for this notebook")

print(f"Installing dependencies from {requirements_file.resolve()}")
%pip install -r {requirements_file}


Installing dependencies from /home/doran/jupyterlab/article-scripts/arize/requirements.txt
Collecting arize-phoenix (from -r requirements.txt (line 1))
  Downloading arize_phoenix-12.33.1-py3-none-any.whl.metadata (35 kB)
Collecting arize-phoenix-otel (from -r requirements.txt (line 2))
  Downloading arize_phoenix_otel-0.14.0-py3-none-any.whl.metadata (8.3 kB)
Collecting openinference-instrumentation-langchain (from -r requirements.txt (line 3))
  Downloading openinference_instrumentation_langchain-0.1.58-py3-none-any.whl.metadata (6.7 kB)
Collecting langchain (from -r requirements.txt (line 4))
  Downloading langchain-1.2.9-py3-none-any.whl.metadata (5.7 kB)
Collecting langchain-openai (from -r requirements.txt (line 5))
  Downloading langchain_openai-1.1.7-py3-none-any.whl.metadata (2.6 kB)
Collecting langchain-tavily (from -r requirements.txt (line 6))
  Downloading langchain_tavily-0.2.17-py3-none-any.whl.metadata (20 kB)
Collecting jupyter (from -r requirements.txt (line 9))
  Dow

In [7]:
import phoenix
print(phoenix.__version__)


12.33.1


# LangChain Agent Monitoring with Arize Phoenix



In [1]:
"""Monitor a local LangChain agent with Arize Phoenix and a vLLM backend.

This script is notebook-friendly and can also be run as a CLI tool.
"""

from __future__ import annotations

import argparse
import os
import sys
from typing import Any
from urllib.error import URLError
from urllib.parse import urlsplit, urlunsplit
from urllib.request import urlopen

try:
    from dotenv import load_dotenv
except ImportError:  # pragma: no cover - optional at runtime
    load_dotenv = None


def _load_env() -> None:
    if load_dotenv is not None:
        load_dotenv()


def _require_env(name: str) -> str:
    value = os.getenv(name)
    if not value:
        raise RuntimeError(f"Missing required environment variable: {name}")
    return value


def _validate_collector_endpoint(endpoint: str) -> None:
    if not endpoint.startswith(("http://", "https://")):
        return

    parts = urlsplit(endpoint)
    base_url = urlunsplit((parts.scheme, parts.netloc, "", "", ""))
    healthz_url = base_url.rstrip("/") + "/healthz"
    try:
        with urlopen(healthz_url, timeout=3) as response:
            if response.status >= 400:
                raise RuntimeError(f"Phoenix health check failed with HTTP {response.status}")
    except (URLError, TimeoutError, RuntimeError) as exc:
        raise RuntimeError(
            f"Phoenix collector is not reachable at {endpoint}. "
            "Set PHOENIX_COLLECTOR_ENDPOINT to your Phoenix URL (for this cluster: "
            "http://192.168.86.208:6006) or run a port-forward to localhost:6006."
        ) from exc


def _normalize_collector_endpoint(endpoint: str, protocol: str | None) -> str:
    if protocol != "http/protobuf":
        return endpoint
    if not endpoint.startswith(("http://", "https://")):
        return endpoint

    parts = urlsplit(endpoint)
    if parts.path not in ("", "/"):
        return endpoint

    return urlunsplit((parts.scheme, parts.netloc, "/v1/traces", parts.query, parts.fragment))


def setup_phoenix(project_name: str) -> None:
    """Enable Phoenix tracing for LangChain runs."""
    os.environ.setdefault("PHOENIX_HOST", os.getenv("PHOENIX_HOST", "localhost"))
    os.environ.setdefault("PHOENIX_PORT", os.getenv("PHOENIX_PORT", "6006"))

    endpoint = os.getenv("PHOENIX_COLLECTOR_ENDPOINT")
    if not endpoint:
        endpoint = f"http://{os.environ['PHOENIX_HOST']}:{os.environ['PHOENIX_PORT']}"
    protocol = os.getenv("PHOENIX_COLLECTOR_PROTOCOL")
    if not protocol and endpoint.startswith(("http://", "https://")):
        protocol = "http/protobuf"
    endpoint = _normalize_collector_endpoint(endpoint, protocol)
    _validate_collector_endpoint(endpoint)

    errors: list[str] = []

    # Modern path (Arize Phoenix + OpenInference packages).
    try:
        from phoenix.otel import register  # type: ignore
        from openinference.instrumentation.langchain import (
            LangChainInstrumentor,  # type: ignore
        )

        tracer_provider = register(
            project_name=project_name,
            endpoint=endpoint,
            protocol=protocol,
            batch=True,
            set_global_tracer_provider=False,
        )
        LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
        return
    except SyntaxError as exc:  # pragma: no cover - clearer wrong-package hint
        raise RuntimeError(
            "Detected a non-Arize `phoenix` package in this environment. "
            "Run: `pip uninstall -y phoenix && pip install -U arize-phoenix arize-phoenix-otel`"
        ) from exc
    except Exception as exc:  # pragma: no cover - runtime compatibility path
        errors.append(f"modern openinference path failed: {exc}")

    # Legacy path kept for older Phoenix installs.
    try:
        from phoenix.trace import LangChainInstrumentor  # type: ignore

        LangChainInstrumentor().instrument(project_name=project_name)
        return
    except SyntaxError as exc:  # pragma: no cover - clearer wrong-package hint
        raise RuntimeError(
            "Detected a non-Arize `phoenix` package in this environment. "
            "Run: `pip uninstall -y phoenix && pip install -U arize-phoenix arize-phoenix-otel`"
        ) from exc
    except Exception as exc:  # pragma: no cover - runtime compatibility path
        errors.append(f"legacy phoenix.trace path failed: {exc}")

    raise RuntimeError(
        "Unable to configure Phoenix instrumentation. "
        + " | ".join(errors)
        + " | verify packages: arize-phoenix, arize-phoenix-otel, "
          "openinference-instrumentation-langchain"
    )


def build_llm() -> Any:
    """Create a ChatOpenAI-compatible client against local vLLM."""
    model_name = _require_env("VLLM_MODEL_NAME")
    base_url = _require_env("VLLM_API_BASE")
    api_key = os.getenv("OPENAI_API_KEY", "not-needed")
    temperature = float(os.getenv("LLM_TEMPERATURE", "0"))

    # Preferred package (current LangChain split).
    try:
        from langchain_openai import ChatOpenAI  # type: ignore

        return ChatOpenAI(
            model=model_name,
            base_url=base_url,
            api_key=api_key,
            temperature=temperature,
        )
    except Exception:
        pass

    # Backward-compatible fallback.
    from langchain.chat_models import ChatOpenAI  # type: ignore

    return ChatOpenAI(
        model_name=model_name,
        openai_api_base=base_url,
        openai_api_key=api_key,
        temperature=temperature,
    )


def build_search_tool() -> Any:
    """Create Tavily tool used by the agent."""
    _require_env("TAVILY_API_KEY")

    max_results = int(os.getenv("TAVILY_MAX_RESULTS", "5"))
    topic = os.getenv("TAVILY_TOPIC", "general")

    try:
        from langchain_tavily import TavilySearch  # type: ignore

        return TavilySearch(max_results=max_results, topic=topic)
    except Exception:
        # Fallback if using langchain-community integration.
        from langchain_community.tools.tavily_search import TavilySearchResults  # type: ignore

        return TavilySearchResults(max_results=max_results)


def _extract_message_content(message: Any) -> str:
    """Normalize LangChain message-like outputs to plain text."""
    if isinstance(message, str):
        return message

    content = getattr(message, "content", None)
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts: list[str] = []
        for part in content:
            if isinstance(part, str):
                parts.append(part)
            elif isinstance(part, dict) and "text" in part:
                parts.append(str(part["text"]))
        if parts:
            return "\n".join(parts)

    if isinstance(message, dict):
        if "output" in message:
            return str(message["output"])
        if "content" in message:
            return str(message["content"])

    return str(message)


def run_agent(question: str) -> str:
    llm = build_llm()
    search_tool = build_search_tool()

    # LangChain >=1.0 API.
    try:
        from langchain.agents import create_agent

        agent = create_agent(
            model=llm,
            tools=[search_tool],
            system_prompt=(
                "You are a concise assistant. Use web search when needed and cite key facts."
            ),
            debug=True,
        )
        result = agent.invoke({"messages": [{"role": "user", "content": question}]})
        if isinstance(result, dict) and "messages" in result and result["messages"]:
            return _extract_message_content(result["messages"][-1])
        return _extract_message_content(result)
    except Exception:
        pass

    # LangChain <1.0 API fallback.
    try:
        from langchain.agents import AgentType, initialize_agent

        agent = initialize_agent(
            tools=[search_tool],
            llm=llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True,
        )

        if hasattr(agent, "invoke"):
            result = agent.invoke({"input": question})
            return _extract_message_content(result)
        if hasattr(agent, "run"):
            return _extract_message_content(agent.run(question))
    except Exception:
        pass

    raise RuntimeError("Unsupported LangChain agent interface")


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--project",
        default=os.getenv("PHOENIX_PROJECT_NAME", "local-llm-agent"),
        help="Phoenix project name",
    )
    parser.add_argument(
        "--question",
        default=os.getenv(
            "QUESTION",
            "Who is the current president of the United States and what is the latest headline about them?",
        ),
        help="Question to run through the agent",
    )
    return parser.parse_args()


def main() -> int:
    _load_env()
    args = parse_args()

    setup_phoenix(project_name=args.project)
    answer = run_agent(args.question)

    print("\n=== Agent Response ===")
    print(answer)
    print("\nOpen Phoenix UI and inspect project:", args.project)
    print(
        "Collector target:",
        os.getenv("PHOENIX_COLLECTOR_ENDPOINT", f"http://{os.getenv('PHOENIX_HOST', 'localhost')}:{os.getenv('PHOENIX_PORT', '6006')}"),
    )
    return 0


In [5]:
# Optional: pick an interesting question for this run
interesting_questions = [
    "Find the latest two major U.S. AI policy headlines and explain how they could affect open-source model deployment.",
    "Identify three recent LLM failure incidents in production, summarize each root cause, and propose concrete mitigations.",
    "Compare LangChain and LlamaIndex for a retrieval app: decision matrix, trade-offs, and recommendation for a 2-engineer team.",
    "Find a cybersecurity incident from the last 48 hours and explain potential risks to self-hosted vLLM infrastructure.",
    "Design a one-week experiment plan to detect prompt regressions using Phoenix traces, with hypotheses and success metrics.",
    "Find two conflicting reports about one current event, then separate verified facts from uncertainty.",
    "What are the top 5 mistakes teams make when instrumenting LLM apps, and how can Phoenix traces reveal each one?",
]

for i, prompt in enumerate(interesting_questions, start=1):
    print(f"{i}. {prompt}")

selected_index = 7  # Change this to 1..7
question = interesting_questions[selected_index - 1]
print(f"\nSelected question ({selected_index}): {question}")

1. Find the latest two major U.S. AI policy headlines and explain how they could affect open-source model deployment.
2. Identify three recent LLM failure incidents in production, summarize each root cause, and propose concrete mitigations.
3. Compare LangChain and LlamaIndex for a retrieval app: decision matrix, trade-offs, and recommendation for a 2-engineer team.
4. Find a cybersecurity incident from the last 48 hours and explain potential risks to self-hosted vLLM infrastructure.
5. Design a one-week experiment plan to detect prompt regressions using Phoenix traces, with hypotheses and success metrics.
6. Find two conflicting reports about one current event, then separate verified facts from uncertainty.
7. What are the top 5 mistakes teams make when instrumenting LLM apps, and how can Phoenix traces reveal each one?

Selected question (7): What are the top 5 mistakes teams make when instrumenting LLM apps, and how can Phoenix traces reveal each one?


In [6]:
# Notebook run cell
_load_env()

project_name = os.getenv("PHOENIX_PROJECT_NAME", "local-llm-agent")
if "question" not in globals():
    question = os.getenv(
        "QUESTION",
        "Who is the current president of the United States and what is the latest headline about them?",
    )

setup_phoenix(project_name=project_name)
answer = run_agent(question)

print("\n=== Agent Response ===")
print(answer)
print("\nOpen Phoenix UI and inspect project:", project_name)
print(
    "Collector target:",
    os.getenv(
        "PHOENIX_COLLECTOR_ENDPOINT",
        f"http://{os.getenv('PHOENIX_HOST', 'localhost')}:{os.getenv('PHOENIX_PORT', '6006')}",
    ),
)

Overriding of current TracerProvider is not allowed
Attempting to instrument while already instrumented


üî≠ OpenTelemetry Tracing Details üî≠
|  Phoenix Project: local-llm-agent
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: http://192.168.86.208:6006/v1/traces
|  Transport: HTTP + protobuf
|  Transport Headers: {}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.

[1m[values][0m {'messages': [HumanMessage(content='What are the top 5 mistakes teams make when instrumenting LLM apps, and how can Phoenix traces reveal each one?', additional_kwargs={}, response_metadata={}, id='063bd29e-22b9-483e-8378-16fb93e9f73f')]}
[1m[updates][0m {'model': {'messages': [AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 65, 'prompt_tokens': 1822, 'total_tokens': 1887, 'completion_tokens_details': None, 'prompt_tok