# Tracing a RAG Pipeline

This notebook walks you through adding **full observability** to a Retrieval-Augmented Generation (RAG) pipeline—tracing every stage from document ingestion to answer generation, tracking costs, and monitoring performance.

**What You'll Learn:**
- Build a complete RAG chatbot that loads PDFs, chunks documents, generates embeddings, and retrieves relevant context
- Add auto-instrumentation to trace every stage—chunking, embedding, retrieval, and generation
- Monitor token usage, API costs, and latency at each step to identify bottlenecks
- Track usage per user and session to understand conversation flows and user behavior

**Prerequisites:**
- Python >=3.10, <3.14
- OpenAI API key
- Netra API key ([Steps mentioned here](https://docs.getnetra.ai/quick-start/Overview))

## Step 0: Install Packages

In [None]:
pip install netra-sdk openai chromadb pypdf reportlab

## Step 1: Set Environment Variables

In [None]:
import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API Key:")
os.environ["NETRA_API_KEY"] = getpass("Enter your Netra API Key:")
os.environ["NETRA_OTLP_ENDPOINT"] = getpass("Enter your Netra OTLP Endpoint:")

print("API keys configured!")



## Step 2: Initialize Netra for Observability

With auto-instrumentation, Netra automatically captures all OpenAI and ChromaDB operations—no decorators or manual spans required.

In [None]:
import os
from netra import Netra
from netra.instrumentation.instruments import InstrumentSet

Netra.init(
    app_name="pdf-qa-chatbot",
    headers=f"x-api-key={os.getenv('NETRA_API_KEY')}",
    environment="development",
    trace_content=True,
    instruments={
        InstrumentSet.OPENAI,
        InstrumentSet.CHROMA,
    }
)

print("Netra initialized with auto-instrumentation!")
print("All OpenAI and ChromaDB operations will be traced automatically.")

## Step 3: Import Libraries and Initialize Clients

In [None]:
import uuid
from typing import List, Dict, Optional
from pypdf import PdfReader
import chromadb
from openai import OpenAI

# Initialize clients
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
chroma_client = chromadb.Client()

print("Clients initialized!")

## Step 4: Define Helper Functions

These functions handle PDF loading, text chunking, and embedding generation. With auto-instrumentation, the `generate_embeddings()` call is automatically traced.

In [None]:
def load_pdf(file_path: str) -> str:
    """Extract text from a PDF file."""
    reader = PdfReader(file_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() + "\n"
    return text


def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap
    return chunks


def generate_embeddings(texts: List[str]) -> List[List[float]]:
    """Generate embeddings for a list of texts.

    This call is automatically traced by Netra, capturing:
    - Model used (text-embedding-3-small)
    - Token count
    - Latency
    """
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]


print("Helper functions defined!")

## Step 5: Define Ingestion, Retrieval, and Generation Functions

These core RAG functions are automatically traced—no manual instrumentation needed.

In [None]:
def ingest_pdf(file_path: str, collection_name: str = "pdf_qa") -> dict:
    """Ingest a PDF into the vector database.

    Auto-traced operations:
    - OpenAI embeddings.create() call
    - ChromaDB collection.add() call
    """
    # Load and chunk PDF
    pdf_text = load_pdf(file_path)
    chunks = chunk_text(pdf_text, chunk_size=1000, overlap=200)
    print(f"Loaded PDF: {len(pdf_text)} characters, {len(chunks)} chunks")

    # Generate embeddings (auto-traced)
    embeddings = generate_embeddings(chunks)
    print(f"Generated {len(embeddings)} embeddings")

    # Store in vector database (auto-traced)
    try:
        chroma_client.delete_collection(name=collection_name)
    except:
        pass

    collection = chroma_client.create_collection(name=collection_name)
    collection.add(
        documents=chunks,
        embeddings=embeddings,
        ids=[f"chunk_{i}" for i in range(len(chunks))]
    )
    print(f"Stored {len(chunks)} vectors in ChromaDB")

    return {
        "chunks_count": len(chunks),
        "collection": collection,
        "chunks": chunks
    }


def retrieve_chunks(query: str, collection, top_k: int = 3) -> List[dict]:
    """Retrieve relevant chunks for a query.

    Auto-traced operations:
    - OpenAI embeddings.create() for query
    - ChromaDB collection.query() call
    """
    # Generate query embedding (auto-traced)
    query_embedding = generate_embeddings([query])[0]

    # Perform vector search (auto-traced)
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["documents", "distances"]
    )

    # Process results
    retrieved = []
    for i, doc in enumerate(results["documents"][0]):
        similarity = 1 - results["distances"][0][i]
        retrieved.append({
            "content": doc,
            "similarity_score": similarity,
            "chunk_id": f"chunk_{i}"
        })

    return retrieved


def generate_answer(query: str, context_chunks: List[dict]) -> dict:
    """Generate an answer using the retrieved context.

    Auto-traced operations:
    - OpenAI chat.completions.create() with full token/cost tracking
    """
    context = "\n\n".join([chunk["content"] for chunk in context_chunks])

    # Generate answer (auto-traced with model, tokens, cost, latency)
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": """You are a helpful assistant that answers questions based on the provided context.
                Only use information from the context to answer. If the answer is not in the context, say so."""
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {query}"
            }
        ],
        temperature=0.1
    )

    answer = response.choices[0].message.content

    return {
        "answer": answer,
        "token_usage": {
            "prompt": response.usage.prompt_tokens,
            "completion": response.usage.completion_tokens,
            "total": response.usage.total_tokens
        }
    }


print("RAG functions defined!")

## Step 6: Create the PDFChatbot Class

This class combines all the functions into a complete chatbot with session and user tracking.

In [None]:
class PDFChatbot:
    """RAG Pipeline with Netra auto-instrumentation."""

    def __init__(self, pdf_path: str):
        self.pdf_path = pdf_path
        self.session_id = str(uuid.uuid4())
        self.collection = None
        self.chunks: List[str] = []
        self.conversation_history: List[Dict] = []

    def initialize(self):
        """Initialize the vector store with PDF content."""
        print(f"Initializing chatbot for: {self.pdf_path}")
        print(f"Session ID: {self.session_id}")

        result = ingest_pdf(self.pdf_path, f"pdf_{self.session_id[:8]}")
        self.collection = result["collection"]
        self.chunks = result["chunks"]

        print(f"\nInitialization complete! {result['chunks_count']} chunks indexed.")

    def chat(self, query: str, user_id: Optional[str] = None) -> dict:
        """Process a chat message and return the response.

        User and session context is attached to all auto-traced spans.
        """
        # Set session and user context for all traces
        Netra.set_session_id(self.session_id)
        if user_id:
            Netra.set_user_id(user_id)

        # Retrieve relevant chunks (auto-traced)
        retrieved_chunks = retrieve_chunks(query, self.collection)

        # Generate answer (auto-traced)
        result = generate_answer(query, retrieved_chunks)

        # Update conversation history
        self.conversation_history.append({"role": "user", "content": query})
        self.conversation_history.append({"role": "assistant", "content": result["answer"]})

        return {
            "query": query,
            "answer": result["answer"],
            "retrieved_chunks": retrieved_chunks,
            "token_usage": result["token_usage"],
            "session_id": self.session_id
        }


print("PDFChatbot class defined!")

## Step 7: Create a Sample PDF

Upload your own PDF or create a sample one for testing.

In [None]:
# Option 1: Upload your own PDF (uncomment to use)
# from google.colab import files
# uploaded = files.upload()
# pdf_path = list(uploaded.keys())[0]

# Option 2: Create a sample PDF for testing
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

def create_sample_pdf(filename: str = "sample_document.pdf"):
    """Create a sample PDF for testing."""
    c = canvas.Canvas(filename, pagesize=letter)
    width, height = letter

    # Page 1: Introduction
    c.setFont("Helvetica-Bold", 24)
    c.drawString(100, height - 100, "Introduction to Machine Learning")

    c.setFont("Helvetica", 12)
    text = """
    Machine learning is a subset of artificial intelligence that enables systems to learn
    and improve from experience without being explicitly programmed. The term was coined
    by Arthur Samuel in 1959 while at IBM.

    Machine learning algorithms build a mathematical model based on sample data, known as
    training data, in order to make predictions or decisions without being explicitly
    programmed to perform the task.

    The primary aim is to allow computers to learn automatically without human intervention
    or assistance and adjust actions accordingly.
    """

    y = height - 150
    for line in text.strip().split("\n"):
        c.drawString(100, y, line.strip())
        y -= 20

    # Page 2: Types of ML
    c.showPage()
    c.setFont("Helvetica-Bold", 18)
    c.drawString(100, height - 100, "Types of Machine Learning")

    c.setFont("Helvetica", 12)
    text = """
    There are three main types of machine learning:

    1. Supervised Learning: The algorithm learns from labeled training data and makes
       predictions based on that data. Examples include classification and regression.

    2. Unsupervised Learning: The algorithm learns patterns from unlabeled data without
       any guidance. Examples include clustering and dimensionality reduction.

    3. Reinforcement Learning: The algorithm learns through interaction with an environment,
       receiving rewards or penalties for actions. Used in robotics and game playing.

    Each type has its own applications and is suited for different kinds of problems.
    """

    y = height - 150
    for line in text.strip().split("\n"):
        c.drawString(100, y, line.strip())
        y -= 20

    # Page 3: Applications
    c.showPage()
    c.setFont("Helvetica-Bold", 18)
    c.drawString(100, height - 100, "Applications of Machine Learning")

    c.setFont("Helvetica", 12)
    text = """
    Machine learning has numerous real-world applications:

    - Image Recognition: Identifying objects, faces, and scenes in images
    - Natural Language Processing: Translation, sentiment analysis, chatbots
    - Recommendation Systems: Netflix, Amazon, Spotify recommendations
    - Autonomous Vehicles: Self-driving cars use ML for navigation
    - Healthcare: Disease diagnosis, drug discovery, personalized treatment
    - Financial Services: Fraud detection, algorithmic trading, credit scoring

    The field continues to grow rapidly with new applications emerging regularly.
    """

    y = height - 150
    for line in text.strip().split("\n"):
        c.drawString(100, y, line.strip())
        y -= 20

    c.save()
    print(f"Created sample PDF: {filename}")
    return filename

# Create the sample PDF
pdf_path = create_sample_pdf()

## Step 8: Initialize and Test the Chatbot

Now let's create and test our auto-traced PDF chatbot!

In [None]:
# Initialize the chatbot
chatbot = PDFChatbot(pdf_path)
chatbot.initialize()

In [None]:
# Test with a single question
response = chatbot.chat(
    "What are the key findings?",
    user_id="user-123"
)
print(f"Answer: {response['answer']}")
print(f"\nTokens used: {response['token_usage']['total']}")

In [None]:
# Test with multiple questions
questions = [
    "What is machine learning?",
    "What are the three types of machine learning?",
    "Who coined the term machine learning and when?",
    "What are some applications of machine learning?"
]

print("=" * 60)
print("Testing the RAG Pipeline")
print("=" * 60)

for i, question in enumerate(questions, 1):
    print(f"\n--- Question {i} ---")
    print(f"Q: {question}")

    response = chatbot.chat(question, user_id="test-user")

    print(f"\nA: {response['answer']}")
    print(f"\n[Tokens: {response['token_usage']['total']}]")
    print(f"[Retrieved {len(response['retrieved_chunks'])} chunks | "
          f"Max similarity: {max(c['similarity_score'] for c in response['retrieved_chunks']):.3f}]")

## Step 9 (Optional): Using Decorators for Custom Spans

Auto-instrumentation handles most cases, but you can add structure with decorators when needed.

In [None]:
from netra.decorators import workflow, task, span

@task(name="load-pdf")
def load_pdf_traced(file_path: str) -> str:
    """Extract text from a PDF file with tracing."""
    reader = PdfReader(file_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() + "\n"
    return text


@task(name="chunk-text")
def chunk_text_traced(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
    """Split text into overlapping chunks with tracing."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap
    return chunks


class PDFChatbotWithDecorators:
    """RAG Pipeline with decorator-based tracing."""

    def __init__(self, pdf_path: str):
        self.pdf_path = pdf_path
        self.session_id = str(uuid.uuid4())
        self.collection = None
        self.chunks: List[str] = []
        self.conversation_history: List[Dict] = []

    @task(name="document-ingestion")
    def initialize(self):
        """Initialize the vector store with PDF content."""
        pdf_text = load_pdf_traced(self.pdf_path)
        self.chunks = chunk_text_traced(pdf_text)
        embeddings = generate_embeddings(self.chunks)

        self.collection = chroma_client.create_collection(name=f"pdf_{self.session_id[:8]}")
        self.collection.add(
            documents=self.chunks,
            embeddings=embeddings,
            ids=[f"chunk_{i}" for i in range(len(self.chunks))]
        )
        print(f"Initialized with {len(self.chunks)} chunks")

    @workflow(name="pdf-qa-query")
    def chat(self, query: str, user_id: Optional[str] = None) -> Dict:
        """Process a chat message within a workflow span."""
        Netra.set_session_id(self.session_id)
        if user_id:
            Netra.set_user_id(user_id)

        retrieved = self._retrieve(query)
        result = generate_answer(query, retrieved)

        self.conversation_history.append({"role": "user", "content": query})
        self.conversation_history.append({"role": "assistant", "content": result["answer"]})

        return {"query": query, "answer": result["answer"], "retrieved_chunks": retrieved}

    @task(name="retrieval")
    def _retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """Retrieve relevant chunks within a task span."""
        return retrieve_chunks(query, self.collection, top_k)


print("Decorator-based chatbot defined!")
print("")
print("Decorator reference:")
print("  @workflow - Top-level pipeline or request handler")
print("  @task     - Discrete unit of work within a workflow")
print("  @span     - Fine-grained tracing for specific operations")

---

## What You'll See in the Dashboard

After running this notebook, check the Netra dashboard for:

- **OpenAI spans** showing model, tokens, cost, and full prompt/response
- **ChromaDB spans** showing query timing and results
- **User and session IDs** attached to all spans for filtering

## Documentation Links

- [Netra Documentation](https://docs.getnetra.ai)
- [Auto-Instrumentation Guide](https://docs.getnetra.ai/Observability/Traces/auto-instrumentation)
- [Decorators Guide](https://docs.getnetra.ai/Observability/Traces/decorators)
- [Traces API](https://docs.getnetra.ai/Observability/Traces)

## See Also

- [Evaluating RAG Quality](/Cookbooks/evaluation/evaluating-rag-quality) - Add quality metrics and test suites
- [Tracing LangChain Agents](/Cookbooks/observability/tracing-langchain-agents) - Apply similar tracing to LangChain
- [Tracing CrewAI Pipelines](/Cookbooks/observability/tracing-crewai-pipelines) - Trace multi-agent workflows