# <a id='toc1_'></a>[Monitoring and Observability Platform](#toc0_)

This notebook showcases two distinct applications of Langfuse in conjunction with RAGAS to assess, monitor, and troubleshoot your RAG pipeline:
- Utilizing Langfuse tracing
- Leveraging Langfuse datasets

**Table of contents**<a id='toc0_'></a>    
    - [Setup](#toc1_1_1_)    
    - [1- Langfuse tracing](#toc1_1_2_)    
    - [2- Langfuse dataset](#toc1_1_3_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

### <a id='toc1_1_1_'></a>[Setup](#toc0_)

To begin, you need to configure your Langfuse project. Follow [these instructions](https://langfuse.com/docs/get-started) to get started.

> Next, update your `.env` file with the following credentials:\
> `LANGFUSE_PUBLIC_KEY`=pk-lf-...\
> `LANGFUSE_SECRET_KEY`=sk-lf-...\
> `LANGFUSE_HOST`=https://cloud.langfuse.com

You should now be able to access your Langfuse project and view the following user interface.

![Langfuse Dashboard](../data/6_docs/images/langfuse_dashboard.png)

In [None]:
import os
from pathlib import Path

from dotenv import load_dotenv

os.chdir(Path.cwd().joinpath(".."))
print(Path.cwd())
load_dotenv(override=True)

In [None]:
import ast
import uuid  # Ensure uuid is imported
from operator import itemgetter
from typing import Union

import pandas as pd
from datasets import Dataset
from dotenv import load_dotenv
from langchain.schema.runnable import Runnable, RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_chroma import Chroma
from langchain_core.callbacks import Callbacks
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langfuse import Langfuse
from langfuse.callback import CallbackHandler
from langfuse.decorators import langfuse_context, observe
from ragas import SingleTurnSample, evaluate
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.llms import LangchainLLMWrapper
from ragas.metrics import AspectCritic, FactualCorrectness, Faithfulness, LLMContextRecall
from ragas.metrics.base import Metric, MetricWithEmbeddings, MetricWithLLM
from ragas.run_config import RunConfig
from tqdm.auto import tqdm

from lib.models import embeddings, llm
from lib.prompts import SYSTEM_TEMPLATE

load_dotenv()

In [None]:
langfuse = Langfuse()
langfuse.auth_check()

We start by loading a simulated evaluation dataset for the continuation of the Hands-On

In [None]:
# Load the dataset from the .csv file
NB_EXAMPLES = 10
df_eval = pd.read_csv("data/6_docs/dataset_eval_20.csv", sep=";")[0:NB_EXAMPLES]
df_eval.head()

- **user_input** → The text or query provided by the user.  
- **reference** → The expected or “gold standard” response for the user input.  
- **response** → The output generated by a model or system for the user input.  
- **retrieved_contexts** → Any external context, documents, or information retrieved to help generate the response.

In [None]:
data_eval = Dataset.from_pandas(df_eval)
data_eval

### <a id='toc1_1_2_'></a>[1- Langfuse tracing](#toc0_)

We will work with those metrics:

- **[FactualCorrectness](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/factual_correctness/#factual-correctness)**   
  - **Definition:** Evaluates the factual correctness of a response by checking its claims against reference texts.  
  - **Required columns:** `response`, `reference`

- **[Faithfulness](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/faithfulness/)**   
  - **Definition:** Measures how factually consistent a response is with the retrieved context. Higher score = more consistent.  
  - **Required columns:** `response`, `retrieved_contexts`

- **[LLMContextRecall](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/context_recall/)**    
  - **Definition:** Estimates how much relevant information from the retrieved context is captured in the response.  
  - **Required columns:** `user_input`, `retrieved_contexts`, `reference`

- **[AspectCritic (harmfulness)](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/general_purpose/#aspect-critic)**   
  - **Definition:** Judges whether the submission is intended to harm, deceive, or exploit users.  
  - **Required columns:** `response`

In [None]:
METRICS = [
    FactualCorrectness(),
    Faithfulness(),
    LLMContextRecall(),
    AspectCritic(
        name="harmfulness",
        definition="Is the submission intended to harm, deceive, or exploit users?",
    ),
]


# util function to init Ragas Metrics. Provide LLM and Embeddings if needed
def init_ragas_metrics(
    metrics: list[Union[Metric]],
    llm: LangchainLLMWrapper,
    embeddings: LangchainEmbeddingsWrapper,
) -> None:
    for metric in metrics:
        if isinstance(metric, MetricWithLLM):
            metric.llm = llm
        if isinstance(metric, MetricWithEmbeddings):
            metric.embeddings = embeddings
        run_config = RunConfig()
        metric.init(run_config)


init_ragas_metrics(
    metrics=METRICS,
    llm=LangchainLLMWrapper(llm),
    embeddings=LangchainEmbeddingsWrapper(embeddings),
)

In [None]:
async def score_with_ragas(user_input: str, retrieved_contexts: list, response: str, true_answer: str) -> dict:
    scores = {}
    for m in METRICS:
        print(f"calculating {m.name}")
        row = {
            "user_input": user_input,
            "retrieved_contexts": retrieved_contexts,
            "response": response,
            "reference": true_answer,
        }
        row_data = SingleTurnSample(**row)
        scores[m.name] = await m.single_turn_ascore(row_data)
    return scores

In [None]:
# Initialize dictionaries to map questions to contexts and answers
q_to_c = {}
q_to_a = {}


@observe()
def retriever(user_input: str) -> list:
    """Retrieve contexts for a given question."""
    return q_to_c.get(user_input, [])


@observe()
def generator(user_input: str) -> str:
    """Generate an answer for a given question."""
    return q_to_a.get(user_input, "")


@observe()
async def evaluate_rag(row: dict) -> dict:
    """Process a row to retrieve contexts, generate an answer, and score the results."""
    user_input = row["user_input"]
    reference = row["reference"]

    # Update mappings for the current question
    q_to_c[user_input] = ast.literal_eval(row["retrieved_contexts"])
    q_to_a[user_input] = row["response"]

    # Retrieve contexts and generate an answer
    retrieved_contexts = retriever(user_input)
    response = generator(user_input)

    # Score the results
    scores = await score_with_ragas(user_input, retrieved_contexts, response, reference)
    for metric_name, score_value in scores.items():
        langfuse_context.score_current_trace(name=metric_name, value=score_value)

    langfuse_context.update_current_trace(name=user_input)

    return scores

In [None]:
for row in tqdm(data_eval):
    scores = await evaluate_rag(row)
    print("--------------------")

### <a id='toc1_1_3_'></a>[2- Langfuse dataset](#toc0_)

In [None]:
LF_DATASET_NAME = "dataset_eval"
LF_PROMPT_SYSTEM = "rag_prompt_system"

Load dataset to langfuse

In [None]:
langfuse.create_dataset(
    name=LF_DATASET_NAME,
    description="Dataset to evaluate the RAG pipeline",
)

for _, item in df_eval.iterrows():
    langfuse.create_dataset_item(
        dataset_name=LF_DATASET_NAME,
        input=item["user_input"],
        expected_output=item["reference"],
        metadata={"context_ground_truth": item["retrieved_contexts"]},
    )

Get langfuse dataset to evaluate

In [None]:
lf_dataset = langfuse.get_dataset(LF_DATASET_NAME)
len(lf_dataset.items)

Add the prompt to Lanfguse with [`create_prompt()`](https://langfuse.com/docs/prompts/get-started)

In [None]:
langfuse.create_prompt(
    name=LF_PROMPT_SYSTEM,
    type="text",
    prompt=SYSTEM_TEMPLATE,
    config={
        "temperature": 0.0,
        "supported_languages": ["en"],
    },  # optionally, add configs (e.g. model parameters or model tools)
    labels=["production", "latest"],  # "staging",
)

In [None]:
langfuse_text_prompt = langfuse.get_prompt(LF_PROMPT_SYSTEM, label="latest")
langchain_text_prompt = PromptTemplate.from_template(
    langfuse_text_prompt.get_langchain_prompt(),
)

We define a basic retriever using a Chroma database stored in memory, containing all the chunks from the imported dataset.

In [None]:
# we create a basic loc
docs = []

for idx, contexts in enumerate(df_eval["retrieved_contexts"]):
    contexts = ast.literal_eval(contexts)
    for i, context in enumerate(contexts):
        docs.append(Document(page_content=context))

print(f"Number of documents to add to the vector store: {len(docs)}")

vectorstore = Chroma.from_documents(
    documents=docs,
    collection_name=f"collection_{uuid.uuid4().hex}",  # Generate a random name for the collection. Avoids conflicts if we rerun the cell
    embedding=embeddings,
)

retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 2})

We define a basic RAG pipeline using a ChatOpenAI model and the retriever defined above.

In [None]:
def _format_docs(docs: list[Document]) -> str:
    return "\n\n".join(doc.page_content for doc in docs)


def get_detailed_rag_chain(prompt: ChatPromptTemplate) -> Runnable:
    return (
        RunnableParallel(
            {
                "raw_context": retriever,
                "question": RunnablePassthrough(),
            }
        )
        .assign(context=RunnableLambda(itemgetter("raw_context")) | _format_docs)
        .assign(answer=prompt | llm | StrOutputParser())
    )


rag_chain = get_detailed_rag_chain(langchain_text_prompt)

A Langchain chain can be integrated with the Langfuse callback to automatically record the trace to the Langfuse instance. Cf the example below

In [None]:
langfuse_handler = CallbackHandler()
question = "What do you know about Python?"
output = rag_chain.invoke(question, config={"callbacks": [langfuse_handler]})
print(output["answer"])

![Langfuse Single Trace](../data/6_docs/images/langfuse_single_trace.png)

In [None]:
def score_with_ragas(
    question: str,
    answer: str,
    contexts: list[str],
    ground_truth: str,
    callbacks: Callbacks = None,
) -> dict[str, float]:
    dataset = Dataset.from_dict(
        {
            "question": [question],
            "contexts": [contexts],
            "answer": [answer],
            "ground_truth": [ground_truth],
        }
    )

    result = evaluate(
        dataset,
        metrics=METRICS,
        llm=llm,
        embeddings=embeddings,
        callbacks=callbacks,
    )
    scores = result.scores[0]

    return scores

In [None]:
trace_name = "rag evaluation"
batch_size = 3

for item in tqdm(lf_dataset.items[:batch_size]):
    # get question
    question = item.input

    # start a new trace when you get a question
    trace = langfuse.trace(
        name="rag evaluation",
        input=question,
        tags=[question],
    )

    # link the trace with the dataset item
    item.link(trace, LF_DATASET_NAME)

    # get callback handler for Langchain tracing
    lf_handler = trace.get_langchain_handler()

    # use rag_chain to generate an answer
    output = rag_chain.invoke(question, config={"run_name": "RAG", "callbacks": [lf_handler]})

    # get documents chunks
    docs = output["raw_context"]
    context = [doc.page_content for doc in docs]

    # get answer
    answer = output["answer"]

    # log retrieved context in the metadata
    trace.update(output=answer, metadata={"retrieved_context": context})

    # compute scores for the question, context, answer tuple
    ragas_scores = score_with_ragas(
        question=question,
        answer=answer,
        contexts=context,
        ground_truth=item.expected_output,
        callbacks=[lf_handler],
    )

    # log scores in the Langfuse trace
    for k, v in ragas_scores.items():
        trace.score(
            name=k,
            value=v,
        )