In [None]:
import os
import requests

urls = [
    'https://huggingface.co/dspy/cache/resolve/main/ragqa_arena_tech_500.json',
    'https://huggingface.co/datasets/colbertv2/lotte_passages/resolve/main/technology/test_collection.jsonl',
    'https://huggingface.co/dspy/cache/resolve/main/index.pt'
]

for url in urls:
    filename = os.path.basename(url)
    remote_size = int(requests.head(url, allow_redirects=True).headers.get('Content-Length', 0))
    local_size = os.path.getsize(filename) if os.path.exists(filename) else 0

    if local_size != remote_size:
        print(f"Downloading '{filename}'...")
        with requests.get(url, stream=True) as r, open(filename, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192): f.write(chunk)

In [None]:
import ujson
import dspy
from dspy.evaluate import SemanticF1

lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)

with open('data/ragqa_arena_tech_500.json') as f:
    data = [dspy.Example(**d).with_inputs('question') for d in ujson.load(f)]
    trainset, valset, devset, testset = data[:50], data[50:150], data[150:300], data[300:500]

metric = SemanticF1()
evaluate = dspy.Evaluate(devset=devset, metric=metric, num_threads=24, display_progress=True, display_table=3)

## Setting up retriever

In [16]:
import torch
import functools
from litellm import embedding as Embed

In [17]:
with open("data/test_collection.jsonl") as f:
    corpus = [ujson.loads(line) for line in f]

In [18]:
index = torch.load('data/index.pt', weights_only=True)
max_characters = 4000 # >98th percentile of document lengths

In [19]:
@functools.lru_cache(maxsize=None)
def search(query, k=5):
    query_embedding = torch.tensor(Embed(input=query, model="text-embedding-3-small").data[0]['embedding'])
    topk_scores, topk_indices = torch.matmul(index, query_embedding).topk(k)
    topK = [dict(score=score.item(), **corpus[idx]) for idx, score in zip(topk_indices, topk_scores)]
    return [doc['text'][:max_characters] for doc in topK]

## Building RAG module

In [20]:
class RAG(dspy.Module):
    def __init__(self, num_docs=5):
        self.num_docs = num_docs
        self.respond = dspy.ChainOfThought('context, question -> response')

    def forward(self, question):
        context = search(question, k=self.num_docs)
        return self.respond(context=context, question=question)

In [None]:
rag = RAG()
rag(question="what are high memory and low memory on linux?")

In [None]:
dspy.inspect_history()

In [None]:
evaluate(RAG())

## Using a DSPy Optimizer to improve RAG prompt

In [None]:
tp = dspy.MIPROv2(metric=metric, auto="medium", num_threads=24)  # use fewer threads if your rate limit is small

optimized_rag = tp.compile(RAG(), trainset=trainset, valset=valset,
                           max_bootstrapped_demos=2, max_labeled_demos=2,
                           requires_permission_to_run=False)