# Workshop: From Simple to Agentic RAG

## Setup


🎉 Free Pinecone API key

Before you run this notebook, head over to this [link to create a free Pinecone Account](https://www.pinecone.io).

Alternatively if you have a Pinecone API key feel free to proceed.

Next head over to [Tavily](https://app.tavily.com/home) and sign up for a free account and get your API key.

We will use [`uv`](https://docs.astral.sh/uv/getting-started/installation/) to install the workshop package and its dependencies.


```bash
!git clone https://github.com/parambharar/workshops.git
!cd workshops/rag
!pip install uv
!uv pip install -e .
```

Create an `.env` file in the workshop/rag directory and add the API keys to it:

```.env
PINECONE_API_KEY="YOUR_COHERE_API_KEY"
TAVILY_API_KEY="YOUR_TAVILY_API_KEY"
```


In [1]:
%load_ext dotenv
%dotenv
%load_ext autoreload
%autoreload 2


In [2]:
import json
import wandb
import weave
from copy import deepcopy
import nest_asyncio
nest_asyncio.apply()

In [None]:
from utils import convert_contents_to_text, make_id, render_doc, printmd, chunk_simple, chunk_markdown, load_dataset, chunk_dataset
from retrieval_metrics import RetrievalScorer
from response_metrics import ResponseScorer
from retriever import TfidfSearchEngine, BM25SearchEngine, DenseSearchEngine, Retriever, RetrieverWithReranker, HybridRetrieverWithReranker, VectorStoreSearchEngine
from generation import SimpleResponseGenerator, QueryEnhancedResponseGenerator
from pipeline import SimpleRAGPipeline, QueryEnhancedRAGPipeline
from query_enhancer import QueryEnhancer
from agent import Agent

Below we are downloading the finance PDF files we will use as our data source.


In [None]:
weave_client = weave.init("rag-workshop")

In [None]:
from download_finance_docs import PDFProcessor
processor = PDFProcessor()
data = processor.load_pdf_documents()

In [None]:
docs_dir = "../data/finance_docs"

In [None]:
docs_dir = pathlib.Path(docs_dir)
docs_files = sorted(docs_dir.rglob("*.pdf"))

print(f"Number of files: {len(docs_files)}\n")
print("First 5 files:\n{files}".format(files="\n".join(map(str, docs_files[:5]))))

In [None]:
total_tokens = sum(map(lambda x: x["metadata"]["raw_tokens"], data))
print(f"Total Tokens in dataset: {total_tokens}")

In [None]:
# build weave dataset
raw_data = weave.Dataset(name="raw_data", rows=data)

# publish the dataset
weave.publish(raw_data)

## A simple RAG Pipeline

First, we will learn about building a simple RAG pipeline. We will mainly focus on how to preprocess and chunk the data followed by building a simple retrieval engine without using any fancy "Vector Index". The idea is to show the inner working of a retrieval pipeline and make you understand the workflow from a user query to a generated response using an LLM.

In [7]:
document_chunks = []
docs = raw_data.rows
for doc in docs:
    chunks = chunk_simple(doc["content"], chunk_size=500)
    doc_id = make_id(doc["content"])
    for chunk in chunks:
        doc_chunk = deepcopy(doc)
        doc_chunk["chunk"] = chunk
        doc_chunk["text"] = convert_contents_to_text(chunk)
        doc_chunk["chunk_id"] = make_id(chunk)
        doc_chunk["doc_id"] = doc_id
        document_chunks.append(doc_chunk)

In [8]:
tfidf_search_engine = await TfidfSearchEngine().fit(document_chunks)

class TFIDFRetriever(Retriever):
    pass

tfidf_retriever = TFIDFRetriever(search_engine=tfidf_search_engine)
# retrieved_docs = await tfidf_search_engine.search(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=5)
# for doc in retrieved_docs:
#     render_doc(doc)

In [9]:
# docs_data = [{"document": item["text"]} for item in retrieved_docs]
simple_response_generator = SimpleResponseGenerator()
# response = await simple_response_generator.invoke(
#     query="How has Apple's revenue from iPhone sales fluctuated across quarters?", documents=docs_data
# )
# printmd(response["choices"][0]["message"]["content"])

In [10]:
class TFIDFRAGPipeline(SimpleRAGPipeline):
    pass
tfidf_rag_pipeline = TFIDFRAGPipeline(
    retriever=tfidf_retriever,
    generator=simple_response_generator
)
# response = await tfidf_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)
# printmd(response["answer"])

In [11]:
## Evaluating the RAG Pipeline



In [12]:
### Evaluating the Retrieval

In [None]:
eval_dataset = weave.ref("weave:///a-sh0ts/rag-course-finance/object/eval_data:CoQDvdOENbZqkwg7IlhZm33drBCAf9OUNvf8ar6YHzM").get()

print("Number of evaluation samples: ", len(eval_dataset.rows))

In [15]:
retrieval_evaluation = weave.Evaluation(
    name="Retrieval_Evaluation",
    dataset=eval_dataset,
    scorers=[RetrievalScorer(name="retrieval_scorer", description="Retrieval metrics")],
    preprocess_model_input=lambda x: {"query": x["question"], "top_k": 5},
)

In [None]:
tfidf_retrieval_scores = await retrieval_evaluation.evaluate(
    model=tfidf_retriever,
    __weave={"display_name": "TFIDF Retrieval"}
)


In [17]:
### Evaluationg the Response Generation

In [18]:


response_evaluation = weave.Evaluation(
    name="Response_Evaluation",
    dataset=eval_dataset,
    scorers=[ResponseScorer(name="response_scorer", description="Response metrics")],
    preprocess_model_input=lambda x: {"query": x["question"]},
)


In [None]:
tfidf_response_scores = await response_evaluation.evaluate(
    model=tfidf_rag_pipeline,
    __weave={"display_name": "TFIDF RAG Pipeline"}
)

In [None]:
bm25_search_engine = await BM25SearchEngine().fit(document_chunks)

class BM25Retriever(Retriever):
    pass

bm25_retriever = BM25Retriever(search_engine=bm25_search_engine)

# retrieved_docs = await bm25_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=bm25_retriever,
    __weave={"display_name": "BM25 Retrieval"}
)

In [22]:

class BM25RAGPipeline(SimpleRAGPipeline):
    pass

bm25_rag_pipeline = BM25RAGPipeline(
    retriever=bm25_retriever,
    generator=simple_response_generator
)

# response = await bm25_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)

# printmd(response["answer"])

In [None]:
response_scores = await response_evaluation.evaluate(
    model=bm25_rag_pipeline,
    __weave={"display_name": "BM25 RAG Pipeline"}
)

In [24]:
dense_search_engine = DenseSearchEngine()
dense_search_engine = await dense_search_engine.fit(document_chunks)

class DenseRetriever(Retriever):
    pass

dense_retriever = DenseRetriever(search_engine=dense_search_engine)
# retrieved_docs = await dense_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=5)
# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=dense_retriever,
    __weave={"display_name": "Dense Retrieval"}
)


In [26]:
class DenseRAGPipeline(SimpleRAGPipeline):
    pass

dense_rag_pipeline = DenseRAGPipeline(
    retriever=dense_retriever,
    generator=simple_response_generator
)

# response = await dense_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)

# printmd(response["answer"])

In [None]:
response_scores = await response_evaluation.evaluate(
    model=dense_rag_pipeline,
    __weave={"display_name": "Dense RAG Pipeline"}
)

In [28]:

class DenseRerankedRetriever(RetrieverWithReranker):
    pass

dense_reranked_retriever = DenseRerankedRetriever(search_engine=dense_search_engine,)

# retrieved_docs = await dense_reranked_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=dense_reranked_retriever,
    __weave={"display_name": "Dense Reranked Retrieval"}
)

In [None]:
class DenseRerankedRAGPipeline(SimpleRAGPipeline):
    pass

dense_reranked_rag_pipeline = DenseRerankedRAGPipeline(
    retriever=dense_reranked_retriever,
    generator=simple_response_generator
)

# response = await dense_reranked_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)

# printmd(response["answer"])
response_scores = await response_evaluation.evaluate(
    model=dense_reranked_rag_pipeline,
    __weave={"display_name": "Dense Reranked RAG Pipeline"}
)


In [31]:
hybrid_retriever = HybridRetrieverWithReranker(
    sparse_search_engine=tfidf_search_engine,
    dense_search_engine=dense_search_engine
)

# retrieved_docs = await hybrid_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)

In [None]:
hybrid_retrieval_scores = await retrieval_evaluation.evaluate(
    model=hybrid_retriever,
    __weave={"display_name": "Hybrid Retrieval"}
)


In [None]:
class HybridRAGPipeline(SimpleRAGPipeline):
    pass

hybrid_rag_pipeline = HybridRAGPipeline(
    retriever=hybrid_retriever,
    generator=simple_response_generator
)

# response = await hybrid_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)

# printmd(response["answer"])
hybrid_response_scores = await response_evaluation.evaluate(
    model=hybrid_rag_pipeline,
    __weave={"display_name": "Hybrid RAG Pipeline"}
)


In [34]:
# Structured Chunking

In [None]:
chunked_docs = chunk_markdown(docs[0]["content"], chunk_size=500)
chunked_docs

In [36]:
document_chunks = []
for doc in docs:
    chunks = chunk_markdown(doc["content"], chunk_size=500)
    doc_id = make_id(doc["content"])
    for chunk in chunks:
        doc_chunk = deepcopy(doc)
        doc_chunk["chunk"] = chunk
        doc_chunk["text"] = convert_contents_to_text(chunk)
        doc_chunk["chunk_id"] = make_id(chunk)
        doc_chunk["doc_id"] = doc_id
        document_chunks.append(doc_chunk)

In [37]:
bm25_search_engine = await BM25SearchEngine().fit(document_chunks)
dense_search_engine = await DenseSearchEngine().fit(document_chunks)
hybrid_retriever = HybridRetrieverWithReranker(
    sparse_search_engine=bm25_search_engine,
    dense_search_engine=dense_search_engine
)

# retrieved_docs = await hybrid_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_evaluation = weave.Evaluation(
    name="Retrieval_Evaluation",
    dataset=eval_dataset,
    scorers=[RetrievalScorer(name="retrieval_scorer", description="Retrieval metrics")],
    preprocess_model_input=lambda x: {"query": x["question"], "top_k": 10, "top_n": 5},
)
hybrid_retrieval_scores = await retrieval_evaluation.evaluate(
    model=hybrid_retriever,
    __weave={"display_name": "Hybrid Retrieval With Structured Chunking"}
)

In [39]:

class HybridRAGPipeline(SimpleRAGPipeline):
    pass

hybrid_rag_pipeline = HybridRAGPipeline(
    retriever=hybrid_retriever,
    generator=simple_response_generator)

# response = await hybrid_rag_pipeline.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?",)

# printmd(response["answer"])

In [None]:
hybrid_response_scores = await response_evaluation.evaluate(
    model=hybrid_rag_pipeline,
    __weave={"display_name": "Hybrid RAG Pipeline With Structured Chunking"}
)
hybrid_response_scores


In [41]:
## More data, vector store

In [42]:
# full_dataset = load_dataset(docs_root)
# chunked_dataset = chunk_dataset(full_dataset, chunk_size=500)
# vectorstore_search_engine = VectorStoreSearchEngine()
# vectorstore_search_engine = await vectorstore_search_engine.fit(chunked_dataset)
vectorstore_search_engine = VectorStoreSearchEngine()
vectorstore_search_engine = await vectorstore_search_engine.load()

In [43]:
# results = await vectorstore_search_engine.search(
#     query="How has Apple's revenue from iPhone sales fluctuated across quarters?",
#     top_k=5,
#     filters="file_type in ('notebook', 'markdown')")
# for doc in results:
#     render_doc(doc)

In [44]:
class VectorStoreRetriever(RetrieverWithReranker):
    pass

vectorstore_retriever = VectorStoreRetriever(search_engine=vectorstore_search_engine)

# results = await vectorstore_retriever.invoke(query="How has Apple's revenue from iPhone sales fluctuated across quarters?", top_k=10, top_n=5, filters="file_type in ('notebook', 'markdown')")
# for doc in results:
#     render_doc(doc)

In [45]:
## Query Enhancement

In [None]:
query_enhancer = QueryEnhancer()
results = await query_enhancer.invoke("How has Apple's revenue from iPhone sales fluctuated across quarters?")
results

In [None]:
query_enhanced_rag_pipeline = QueryEnhancedRAGPipeline(
    query_enhancer=query_enhancer,
    retriever=vectorstore_retriever,
    response_generator=QueryEnhancedResponseGenerator()
)
response = await query_enhanced_rag_pipeline.invoke("How has Apple's revenue from iPhone sales fluctuated across quarters?")


In [None]:
query_enhanced_response_scores = await response_evaluation.evaluate(
    model=query_enhanced_rag_pipeline,
    __weave={"display_name": "Query Enhanced RAG Pipeline"}
)
query_enhanced_response_scores


In [None]:
## Agentic RAG

In [None]:
agent = Agent()

query = ""
answer = await agent.invoke(query)
printmd(answer["answer"])
