# Revisiting tracing with Langfuse

In [None]:
! pip install langsmith openai langfuse
! pip install -qU requests bs4 lxml chromadb langchain langchain-text-splitters langchain-openai
! pip install -qU duckduckgo-search langchain-community ddgs

In [None]:
import os

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGSMITH_API_KEY"] = ""
os.environ["LANGSMITH_PROJECT"] = ""
os.environ["OPENAI_API_KEY"] = ""

In [None]:
# kb_en_to_chroma.py  — minimal & direct
import os, re, time, requests
from urllib.parse import urljoin, urldefrag
from bs4 import BeautifulSoup

BASE = "https://www.kapitalbank.az"
START = f"{BASE}/en"
UA = {"User-Agent": "kb-minicrawl/0.2"}
TIMEOUT = 15
MAX_PAGES = 50

def clean_url(u):
    u = urldefrag(u)[0]
    if not u: return None
    if not u.startswith("http"): u = urljoin(BASE, u)
    if not u.startswith(START): return None
    if re.search(r"\.(pdf|jpe?g|png|gif|svg|mp4|zip|docx?|xlsx?)$", u, re.I): return None
    return u

def extract_text(html):
    s = BeautifulSoup(html, "lxml")
    for t in s(["script","style","noscript","svg","footer","nav","header"]): t.decompose()
    n = s.select_one("main") or s.select_one("article") or s.body or s
    return " ".join((n.get_text(" ", strip=True) if n else s.get_text(" ", strip=True)).split())

visited, queue, pages = set(), [START], []
while queue and len(visited) < MAX_PAGES:
    url = queue.pop(0)
    if url in visited: continue
    try:
        r = requests.get(url, headers=UA, timeout=TIMEOUT)
        if r.ok and "text/html" in r.headers.get("Content-Type",""):
            txt = extract_text(r.text)
            if len(txt) > 200:
                pages.append({"url": url, "text": txt})
            s = BeautifulSoup(r.text, "lxml")
            for a in s.find_all("a", href=True):
                u = clean_url(a["href"])
                if u and u not in visited:
                    queue.append(u)
        visited.add(url); time.sleep(0.15)
    except requests.RequestException:
        visited.add(url)

import json

# Save the crawled pages data to a file for later use
pages_outfile = "kapitalbank_pages.json"
with open(pages_outfile, "w", encoding="utf-8") as f:
    json.dump(pages, f, indent=2, ensure_ascii=False)
print(f"Saved {len(pages)} pages to {pages_outfile}")

# Load crawled pages from JSON file to make them available for Chroma processing
with open(pages_outfile, "r", encoding="utf-8") as f:
    pages = json.load(f)
print(f"Loaded {len(pages)} pages from {pages_outfile}")

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# ---- LangChain chunking ----
splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=120)
docs, metas = [], []
for p in pages:
    for chunk in splitter.split_text(p["text"]):
        docs.append(chunk)
        metas.append({"url": p["url"]})

# ---- OpenAI embeddings -> Chroma ----
persist_dir = "chroma_kapitalbank"
emb = OpenAIEmbeddings(model="text-embedding-3-small")  # cheap & solid
vs = Chroma.from_texts(
    texts=docs,
    embedding=emb,
    persist_directory=persist_dir,
    collection_name="kapitalbank_en",
    metadatas=metas,
)
vs.persist()
print(f"Indexed pages={len(pages)} chunks={len(docs)} into {persist_dir}/ (collection 'kapitalbank_en')")

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

persist_dir = "chroma_kapitalbank"
collection_name = "kapitalbank_en"
emb = OpenAIEmbeddings(model="text-embedding-3-small")

# Load the existing/persisted Chroma vector store
vs = Chroma(
    persist_directory=persist_dir,
    embedding_function=emb,
    collection_name=collection_name
)

In [None]:
from langfuse import get_client
from langfuse.langchain import CallbackHandler

# Initialize Langfuse client (prompt management)
langfuse = get_client()
# Initialize Langfuse CallbackHandler for Langchain (tracing)
langfuse_callback_handler = CallbackHandler()

# Create a text prompt
langfuse.create_prompt(
    name="general-agent-prompt",
    type="text",
    prompt="Answer the user's question using the tools you have. Always check internal knowledge first, then use the web.",
    labels=["production"],  # directly promote to production
    # config={
    #     "model": "gpt-4o",
    #     "temperature": 0.7,
    #     "supported_languages": ["en", "fr"],
    # },  # optionally, add configs (e.g. model parameters or model tools) or tags
)

## Tracing in Langfuse

In [None]:
# pip install -U "langchain>=0.3" "langgraph>=0.2" "langchain-openai>=1.0.0" "langfuse>=2.0.0"

from typing import Any, Dict
from langchain.chat_models import init_chat_model
from langchain_core.tools import create_retriever_tool, tool, render_text_description
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import create_agent

from langfuse import get_client
from langfuse.langchain import CallbackHandler
from langfuse import observe

# --- Langfuse: prompt mgmt + tracing ---
langfuse = get_client()
lf_cb = CallbackHandler()

# Get current production version (with a safe fallback for HA)
lf_prompt = langfuse.get_prompt(
    "general-agent-prompt",
    label="production",
    type="text",
    fallback="Answer the user's question. Prefer internal knowledge before the web.\n\nTools:\n{{tools}}"
)

# --- Tools (assume your vector store exists as `vs`) ---
retriever = vs.as_retriever(search_kwargs={"k": 3})
retrieve_tool = create_retriever_tool(
    retriever=retriever,
    name="retrieve",
    description="Search the internal vector store for passages relevant to the user's question."
)

_ddg = DuckDuckGoSearchRun()
@tool("duckduckgo_search")
def duckduckgo_search(query: str) -> str:
    """Search the web with DuckDuckGo and return a brief summary of top results."""
    return _ddg.run(query)

TOOLS = [retrieve_tool, duckduckgo_search]

# --- Model: respect prompt.config if set; otherwise default ---
cfg = lf_prompt.config or {}
model_id = cfg.get("model", "openai:gpt-4o-mini")
temperature = float(cfg.get("temperature", 0))
model = init_chat_model(model_id, temperature=temperature)

# --- Build system prompt from Langfuse prompt ---
tools_desc = render_text_description(TOOLS)
if "{{tools}}" in (lf_prompt.prompt or ""):
    system_prompt = lf_prompt.compile(tools=tools_desc)  # render {{tools}}
else:
    # If your prompt has no placeholder, just append a tool section.
    system_prompt = (lf_prompt.prompt or "") + f"\n\n# Tools\n{tools_desc}"

# --- Agent ---
@observe()  # traces the function; pair with the callback on invoke
def agentic_solution(inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
    q: str = inputs["question"]

    agent = create_agent(
        model=model,
        tools=TOOLS,
        system_prompt=system_prompt,  # accepts a plain string
    )

    res = agent.invoke(
        {"messages": [{"role": "user", "content": q}]},
        config={"callbacks": [lf_cb]},  # Langfuse tracing
    )

    # Robustly extract final text (dict w/ "messages" or AIMessage)
    msg = res["messages"][-1].content if isinstance(res, dict) and "messages" in res else getattr(res, "content", str(res))
    return {"answer": msg.strip()}


In [None]:
out = agentic_solution({"question": 'Where Kapitalbank is located?'})
print(out)

## Managing sessions of multiple step conversations

In [None]:
# --- Notebook chatbot: simple ask() API with Langfuse Session + tracing ---
# Paste this cell AFTER your base setup (where you already defined: langfuse, lf_cb, lf_prompt, model, TOOLS, system_prompt)

from uuid import uuid4
from typing import Dict, List, Optional
from langfuse import observe

# Optional: link generations to this exact Langfuse prompt version
try:
    LF_PROMPT_META = {
        "name": getattr(lf_prompt, "name", "general-agent-prompt"),
        "version": getattr(lf_prompt, "version", None),
        "label": "production",
        "id": getattr(lf_prompt, "id", None),
    }
except NameError:
    LF_PROMPT_META = None

# Session + cached agent
SESSION: Dict[str, Optional[str] | List[Dict[str, str]]] = {"id": None, "history": []}
_AGENT = None

def start_session(session_id: Optional[str] = None):
    """Start or reset a session (also clears chat history)."""
    SESSION["id"] = session_id or f"session-{uuid4()}"
    SESSION["history"] = []
    print(f"Session: {SESSION['id']}")

def _get_agent():
    """Build the agent once, bind Langfuse callback + (optional) prompt metadata."""
    global _AGENT
    if _AGENT is None:
        # ensure LLM/tool generations are traced
        m = model.with_config(callbacks=[lf_cb])
        a = create_agent(model=m, tools=TOOLS, system_prompt=system_prompt).with_config(callbacks=[lf_cb])
        # attach prompt metadata to MODEL path so Linked Generations show under the prompt
        if LF_PROMPT_META:
            # bind to both model and agent to be safe
            m = m.with_config(metadata={"langfuse_prompt": LF_PROMPT_META})
            a = a.with_config(metadata={"langfuse_prompt": LF_PROMPT_META})
        _AGENT = a
    return _AGENT

@observe(name="nb_turn", as_type="chain")   # was: as_type="trace"
def _turn(messages: List[Dict[str, str]], session_id: Optional[str]):
    """One traced turn; attaches to Langfuse Session while an active span exists."""
    if session_id:
        # attach all spans in this trace to the same Session
        try:
            langfuse.update_current_trace(session_id=session_id)
        except Exception:
            pass
    agent = _get_agent()
    return agent.invoke({"messages": messages}, config={"callbacks": [lf_cb]})

def ask(q: str) -> str:
    """Send a user message; returns assistant reply and updates history."""
    if not SESSION["id"]:
        start_session()

    msgs = SESSION["history"] + [{"role": "user", "content": q}]
    res = _turn(msgs, SESSION["id"])

    reply = (
        res["messages"][-1].content
        if isinstance(res, dict) and "messages" in res
        else getattr(res, "content", str(res))
    ).strip()

    SESSION["history"].extend(
        [{"role": "user", "content": q}, {"role": "assistant", "content": reply}]
    )
    return reply

def chat_history() -> List[Dict[str, str]]:
    """Return the current conversation history."""
    return SESSION["history"]

In [None]:
start_session()
ask("Where Kapitalbank is located?")
ask("What products does it offer?")
chat_history()
start_session()  # reset with a new session id

## Multiple users conversations

In [None]:
# --- Multi-user / multi-session chat runner for notebooks (no widgets) ---

from uuid import uuid4
from typing import List, Dict, Optional
from langfuse import observe

# (Optional) link generations to the exact managed prompt version
try:
    LF_PROMPT_META = {
        "name": getattr(lf_prompt, "name", "general-agent-prompt"),
        "version": getattr(lf_prompt, "version", None),
        "label": "production",
        "id": getattr(lf_prompt, "id", None),
    }
except NameError:
    LF_PROMPT_META = None

# Build once; bind Langfuse callback (so all LLM/tool calls are traced)
_model = model.with_config(callbacks=[lf_cb])
_agent = create_agent(model=_model, tools=TOOLS, system_prompt=system_prompt).with_config(callbacks=[lf_cb])
if LF_PROMPT_META:
    # help the Prompt → Linked Generations tab
    _agent = _agent.with_config(metadata={"langfuse_prompt": LF_PROMPT_META})

@observe(name="nb_turn", as_type="chain")
def _turn(messages: List[Dict[str, str]], session_id: str, user_id: str):
    # Attach this active span to Langfuse User + Session
    try:
        langfuse.update_current_trace(session_id=session_id, user_id=user_id, name="notebook-chat")
    except Exception:
        pass
    return _agent.invoke({"messages": messages}, config={"callbacks": [lf_cb]})

class ChatSession:
    """One user's conversation session (keeps its own id + history)."""
    def __init__(self, user_id: str, session_id: Optional[str] = None):
        self.user_id = user_id
        self.session_id = session_id or f"session-{uuid4()}"
        self.history: List[Dict[str, str]] = []

    def ask(self, q: str) -> str:
        msgs = self.history + [{"role": "user", "content": q}]
        res = _turn(msgs, session_id=self.session_id, user_id=self.user_id)
        reply = (
            res["messages"][-1].content
            if isinstance(res, dict) and "messages" in res
            else getattr(res, "content", str(res))
        ).strip()
        self.history.extend(
            [{"role": "user", "content": q}, {"role": "assistant", "content": reply}]
        )
        return reply

    def reset(self, new_session_id: Optional[str] = None):
        self.session_id = new_session_id or f"session-{uuid4()}"
        self.history.clear()

def simulate_users(user_sessions: Dict[str, List[List[str]]]) -> Dict[str, List[ChatSession]]:
    """
    user_sessions = {
        "user_a": [
            ["Q1", "Q2"],       # session 1 for user_a
            ["Q3"]              # session 2 for user_a
        ],
        "user_b": [
            ["Qx", "Qy", "Qz"]  # session 1 for user_b
        ]
    }
    """
    runs: Dict[str, List[ChatSession]] = {}
    for uid, sessions in user_sessions.items():
        runs[uid] = []
        for qs in sessions:
            chat = ChatSession(user_id=uid)
            print(f"\n=== User {uid} | {chat.session_id} ===")
            for q in qs:
                ans = chat.ask(q)
                print(f"Q: {q}\nA: {ans[:200]}{'...' if len(ans) > 200 else ''}\n")
            runs[uid].append(chat)
    return runs


In [None]:
scenarios = {
    "alex": [
        ["Where Kapitalbank is located?", "What products does it offer?"],
        ["Do they have corporate cards?"]
    ],
    "maria": [
        ["How to open an account online?", "What are the fees?"]
    ],
    "sam": [
        ["Latest news about Kapitalbank?"],
        ["Exchange rates today?", "ATM withdrawal limits?"]
    ],
}
runs = simulate_users(scenarios)

# Access histories if needed:
runs["alex"][0].history  # first session for alex
runs["sam"][1].history   # second session for sam