# Monitoring your RAG app with Atla x Langfuse

This notebook demonstrates how to monitor a Retrieval-Augmented Generation (RAG) application using **Atla Selene for evaluation** and **Langfuse for observability**. If you'd like a visual walkthrough, check out our [demo video](https://www.youtube.com/watch?v=4TciraGerv8).

We use Alphabet's Q4 2024 earnings call transcript as an example document.

We build a Gradio application with a complete RAG pipeline that you can play around with. Traces will automatically be sent to Langfuse and scored by Selene. 

<br>

**Requirements:**

- An Atla account - you can sign up for free [here](https://www.atla-ai.com/sign-up)
- A Langfuse account - you can sign up for free [here](https://cloud.langfuse.com/auth/sign-up)
- An OpenAI API key - you can sign up for free [here](https://platform.openai.com/signup)

**Get started**

1. Follow the steps below in **Setup Atla on Langfuse**.
2. Set your Langfuse + OpenAI API keys.
3. Run the rest of the functions to load the RAG app.
4. Happy chatting! You'll get live quality assessments from Selene in your Langfuse traces, allowing you to accurately monitor your application's performance over time.


> Try prompting the chatbot adversarially to see if it hallucinates. Selene will detect poor responses and assign lower scores - helping you identify areas for improvement.



## Setup Atla on Langfuse

Navigate to your project on [cloud.langfuse.com](cloud.langfuse.com):

<br>

**Add your Atla API key to your Langfuse project:**

1. Head to **Settings** → **LLM Connections** and select **+** **Add new LLM API key.**
2. Set `atla` as the **Provider name** and select `atla` from the **LLM adapter** dropdown .
3. The API Base URL will automatically be filled in. Paste your Atla API key beginning with “pk-…” into the **API Key** field.
4. Leave **Enable default models** on.
5. Click **Save new LLM API key.**

![alt text](https://atla-ai.notion.site/image/attachment%3Aed7c7d92-b0bf-464f-9f43-83df6de65a5e%3Aimage.png?table=block&id=1bc309d1-7745-80fd-9f2d-cdd40f3005cf&spaceId=f08e6e70-73af-4363-9621-90e906b92ebc&width=2000&userId=&cache=v2)

**Add an LLM-as-a-Judge template**

1. Head to **Evaluation → LLM-as-a-Judge** in your sidebar and select **Templates.**
2. Click **+ New Template.**
3. Select `atla` as the **Model Provider** and select `atla-selene` as the **Model name.**
4. Let’s evaluate the retrieval component of our RAG app— select `Contextrelevance` from the default eval templates dropdown.
5. Adjust the prompts under **Reasoning** and **Score**— Selene will calibrate its score and feedback based on your specifications.
6. Click **Save.**

![alt text](https://atla-ai.notion.site/image/attachment%3A8cbdd3b7-4e9d-4548-981d-14c8f1fbe141%3Aimage.png?table=block&id=1bc309d1-7745-8031-aab8-d33bfac00586&spaceId=f08e6e70-73af-4363-9621-90e906b92ebc&width=2000&userId=&cache=v2)

**Add a new Evaluator configuration:**

1. Head to **Evaluation → LLM-as-a-Judge** in your sidebar and select **Evaluators**.
2. Click **+ New evaluator**.
3. Select the template `Contextrelevance` you just created.
4. Configure the **Variable mapping** for your evaluator to ensure the correct components of your traces are evaluated:
    - `{{query}}` can be mapped to **Trace → Input**
    - `{{context}}` can be mapped to **Span → Retrieval → Output**
5. Click **Save.**

![alt text](https://atla-ai.notion.site/image/attachment%3A9491785d-ae38-44ad-915e-7e1fc84792fc%3Aimage.png?table=block&id=1b9309d1-7745-80db-bd16-ca4ef455c723&spaceId=f08e6e70-73af-4363-9621-90e906b92ebc&width=2000&userId=&cache=v2)

> You can configure your evaluator such that it runs only when a target filter is passed, or you can set a sampling rate such that a % of traces are evaluated. For the purposes of this demonstration, we want to evaluate every trace so keep the sampling rate at 100%!



## Set Langfuse + OpenAI API keys

In [None]:
website_url = "https://abc.xyz/2024-q4-earnings-call/" # The website containing the document to be queried - we have set the Q4 24 earnings transcript for Alphabet

import os

os.environ["LANGFUSE_PUBLIC_KEY"] = "" # Your public LF key
os.environ["LANGFUSE_SECRET_KEY"] = "" # Your secret LF key
os.environ["OPENAI_API_KEY"] = "" # Your OpenAI key

## Setup RAG pipline

### Install packages for notebook

In [None]:
pip install -qU langchain-text-splitters langchain-community langgraph langchain-openai langfuse gradio selenium unstructured numpy==1.26.4

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m30.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m70.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m264.3/264.3 kB[0m [31m19.6 MB/s[0m eta 

### Initialize models

In [None]:
from langchain.chat_models import init_chat_model
from langchain_openai import OpenAIEmbeddings

llm = init_chat_model("gpt-4o-mini", model_provider="openai") # We choose our chat model
embeddings = OpenAIEmbeddings(model="text-embedding-3-large") # We choose our embeddings model

### Define functions

We set up a complete RAG pipeline with:

- Document loading
- Text chunking and metadata tagging
- Vector embedding and storage
- Query analysis and structuring
- Retrieval based on relevance
- Response generation based on context

The implementation uses Langfuse's observation decorators to track each step of the pipeline, capturing inputs, outputs, and metadata.

In [None]:
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # EU region
os.environ["LANGCHAIN_TRACING_V2"] = "false" # Prevents error when using OSS Langchain
os.environ["USER_AGENT"] = "myagent" # Prevents error when using OSS Langchain

from typing import Literal
from langfuse.decorators import observe, langfuse_context
from langfuse import Langfuse
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import Annotated, List, TypedDict
from langchain_community.document_loaders import SeleniumURLLoader

# Load and chunk contents of the website
loader = SeleniumURLLoader(urls=[website_url])
docs = loader.load()

# Chunking function for document
def manual_split(document, chunk_size=800, overlap=200):
    text = document.page_content
    splits = []

    # Simple character-based chunking
    for i in range(0, len(text), chunk_size - overlap):
        chunk = text[i:i + chunk_size]
        if chunk:  # Ensure we're not adding empty chunks
            doc = Document(page_content=chunk, metadata=document.metadata.copy())
            splits.append(doc)

    return splits

# Apply splitting
all_splits = []
for doc in docs:
    all_splits.extend(manual_split(doc))

# Update metadata
total_documents = len(all_splits)
third = total_documents // 3

for i, document in enumerate(all_splits):
    if i < third:
        document.metadata["section"] = "beginning"
    elif i < 2 * third:
        document.metadata["section"] = "middle"
    else:
        document.metadata["section"] = "end"


# Index chunks
vector_store = InMemoryVectorStore(embeddings)
_ = vector_store.add_documents(all_splits)


# Define schema for search
class Search(TypedDict):
    """Search query."""

    query: Annotated[str, ..., "Search query to run."]
    section: Annotated[
        Literal["beginning", "middle", "end"],
        ...,
        "Section to query.",
    ]

# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")


# Define state for application
class State(TypedDict):
    question: str
    query: Search
    context: List[Document]
    answer: str

# Structure user query better
@observe()
def analyze_query(state: State):
    with_structured = llm.with_structured_output(Search)
    query = with_structured.invoke(state["question"])
    return {"query": query}


# Retrieve relevant documents based on structured user query
@observe(as_type="retrieval")
def retrieve(state: State):
    query = state["query"]

    # Try without any specific filter first
    retrieved_docs = vector_store.similarity_search(
        query["query"],
        k=4  # Set explicit number of results
    )

    # This approach looks for partial matches in the section value
    if len(retrieved_docs) == 0 and query.get("section"):
        retrieved_docs = vector_store.similarity_search(
            query["query"],
            filter=lambda doc: query["section"] in str(doc.metadata.get("section", "")),
            k=4
        )

    docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)

    langfuse_context.update_current_observation(
        input={"query": query["query"], "section": query.get("section", "any")},
        output={
            "num_docs": len(retrieved_docs),
            "retrieved_content": docs_content  # Add full context
        }
    )
    return {"context": retrieved_docs}

# Generate response based on user query and context retrieved
@observe(as_type="generation")
def generate(state: State):

    docs_content = "\n\n".join(doc.page_content for doc in state["context"])

    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

# Create a wrapper function that will be the main trace sent to Langfuse
@observe()
def process_rag_pipeline(question: str, trace_id: str = None, session_id: str = None):
    """Main function that creates the top-level trace"""
    # Pass trace_id if you want to use a custom one
    kwargs = {"langfuse_observation_id": trace_id} if trace_id else {}

    # Initialize state
    state = {"question": question}

    # Update trace with metadata
    langfuse_context.update_current_trace(
        name="RAG Pipeline",
        user_id="demo_user", # Optional
        session_id=session_id, # Optional
        tags=["rag", "demo"]  # Optional
    )

    # Execute pipeline steps within the same trace context
    state.update(analyze_query(state, **kwargs))
    state.update(retrieve(state, **kwargs))
    state.update(generate(state, **kwargs))

    return state["answer"]

# Create a function that processes user input for app
def process_query(user_input):
    result = process_rag_pipeline(user_input, session_id=session_id)
    if isinstance(result, dict):
        return result["answer"]
    return str(result)

## Build RAG app

In [None]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="langsmith.client")
warnings.filterwarnings("ignore", category=UserWarning, message=".*type.*parameter.*")

import gradio as gr
import uuid

session_id = str(uuid.uuid4())

# Create a simple Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# RAG Chat Interface")

    with gr.Row():
        with gr.Column():
            chatbot = gr.Chatbot(height=400)
            msg = gr.Textbox(label="🧑 User input", placeholder="Ask a question about the document! You can try 'How were financial results?'")

    chat_history = []

    def respond(message, chat_history):
        bot_message = process_query(message)
        chat_history.append((message, bot_message))
        return "", chat_history

    msg.submit(respond, [msg, chatbot], [msg, chatbot])

demo.launch(debug=False, quiet=False)

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://af96cc7617c984f82f.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Click on the public URL above to view the app in full screen. <br>
<br>

## Monitor your performance with Selene

1. After using the chatbot, head to **Tracing** → **Traces** in your sidebar and select a trace with the name ‘RAG pipeline’.

2. Once there, you can analyze different components of your RAG pipeline:
    
    ![image.png](https://atla-ai.notion.site/image/attachment%3Addc6d1af-cbd9-4469-bc26-b598ba22ac22%3Aimage.png?table=block&id=1b9309d1-7745-803e-ba53-d7df90d8aa4d&spaceId=f08e6e70-73af-4363-9621-90e906b92ebc&width=2000&userId=&cache=v2)
    
3. From the highest-level Trace, click on Scores to view Selene's score and critique:
    
    ![image.png](https://atla-ai.notion.site/image/attachment%3Af8ec919b-68ba-4df9-bc43-279d1eef8191%3Aimage.png?table=block&id=1b9309d1-7745-809d-8105-df99108d734e&spaceId=f08e6e70-73af-4363-9621-90e906b92ebc&width=2000&userId=&cache=v2)
    
  - There might be a 10-second delay before the evaluation score appears, due to the Delay setting in the Evaluator.

> By regularly monitoring Selene's scores over time, you can detect model drift, privacy vulnerabilities, outdated vector databases, retrieval issues, and other potential problems!

#### **Next steps**

Now that you've set up one monitor, try setting up additional Evaluators to measure other metrics that matter to your specific use case!
