# Pragmatic DSPy

This is the companion notebook to this [blog post](https://dass.ie/blog/pragmatic-dspy)

**First** set the env variables in local.env before executing the first cell below


In [None]:
import dotenv
dotenv.load_dotenv('local.env')
import dspy
import os
from langfuse import Langfuse
from langfuse.api.resources.commons.types import DatasetStatus
from bs4 import BeautifulSoup
import json
import glob
#DSPy is using litellm to call the LLM so its useful to tweak the litellm config
import litellm
from extractor import GroundTruthEvaluator
from extractor import Extractor
from extractor import Extractor_Signature
from extractor import fetch_html
import uuid



## 1. Configure

- DSPy will use OpenAI 4o-mini by default.
- Langfuse is used for tracing and logging.

In [None]:
litellm.success_callback = ["langfuse"]
litellm.failure_callback = ["langfuse"]
#langfuse client is used for storing and retrieving datasets
langfuse = Langfuse(secret_key=os.getenv('LANGFUSE_SECRET_KEY'),
    public_key=os.getenv('LANGFUSE_PUBLIC_KEY'),
    host=os.getenv('LANGFUSE_HOST'))

#4o-mini was quick to get up and running
lm = dspy.LM('openai/gpt-4o-mini', api_key=os.getenv('OPENAI_API_KEY'),temperature=0.0)
dspy.configure(lm=lm,suppress_debug_info=False)

## 2.Bootstrap and review dataset

Let's crawl some websites and see if we can get a sense of the quality of the data

**Note**: always comply with the T&Cs of the websites you're scraping

simple_extractor is a very simple DSPy Signature that expects html and url as input and returns some event info see extractor.py

lets start with a trivial synthetic example to see if it works

In [None]:
simple_extractor = dspy.Predict(Extractor_Signature)
prediction = simple_extractor(html="""
<html>
<body>
<h1>Event Title</h1>
<p>Event Description</p>
<p>Event Location</p>
<p>Start: 1pm 12th December</p>
<p>End: 2pm 12th December</p>
</body>
</html>
""",url="https://bobsevents.com/event1")
print(prediction)


It worked!

Now let's try something a little more realistic and pull an actual event page

In [None]:

url = "<event_page_url>" #replace with a real url
html = fetch_html(url)
prediction = simple_extractor(html=html, url=url)
print(prediction)


It feels a little magical that we only had to write a very simple Signature and we got a pretty good result

We can have a look at the raw LLM call if we want to see what was sent to the LLM

In [None]:
lm.history[-1]

So the extractor appears to be working, at least for two examples . Lets get some more data from a variety of sources so we can start to evaluate it


In [None]:
from crawl import crawl
#Optional crawls the websites(subpages of the url) and saves the html to a file in data folder
urls = ["<url>"]
for url in urls:
    crawl(url)

Next lets run the extractor over all the data from the last step

In [13]:
def run(data_dir: str, extractor: Extractor):
    run_id = str(uuid.uuid4())
    print(f"Running with run_id: {run_id}")
    json_files = glob.glob(os.path.join(data_dir, "*.json"))
    for json_file in json_files:
        with open(json_file, "r+") as f:
            if f.readable() and f.read().strip(): 
                f.seek(0)
                data = json.load(f)
                soup = BeautifulSoup(data["html"], 'html.parser')
                body = soup.find('body')
                for script in soup.find_all('script'):
                    #remove scripts as they can contain a lot of noise
                    script.decompose()
                try:
                    extractor.extract(html=body.decode_contents(), url=data["metadata"]["url"],run_id=run_id)
                except Exception as e:
                    print(f"Error extracting {json_file}: {e}")
            else:
                print(f"file not readable:{json_file}")
    return run_id

simple_extractor = Extractor()
run("data/synthetic",simple_extractor)
#you may want to run for other websites
#run("data/<amazingevents>",simple_extractor)
langfuse.flush()

Now we can easily do a quick check of the results in [langfuse](https://cloud.langfuse.com/) using the run_id tag to filter. 

We could do lm.history again to see the raw calls but I found once you are making a few LLM calls the langfuse interface is easier to use

Much like bootstrapping a dataset in order to fine tune a model in ([Lesson 1 - fastai](https://www.kaggle.com/code/jhoward/is-it-a-bird-creating-a-model-from-your-own-data)) we can now create a dataset to optimise the extractor

Let's bootstrap using the simple_extractor and evaluate from there. We'll store datasets in langfuse so we can easily load them later. Of course for the purposes of this tutorial we could just store the dataset in memory


In [31]:
def add_traces_to_dataset(run_id: str, dataset_name: str):
    langfuse.create_dataset(name=dataset_name,description="pragmatic dspy events")
    traces = langfuse.fetch_traces(tags=[run_id])
    for trace in traces.data:
        langfuse.create_dataset_item(dataset_name=dataset_name,input=json.loads(trace.input),expected_output=json.loads(trace.output),metadata={"trace_run_id":run_id},source_trace_id=trace.id)

#add whatever runs for the early steps we are happy with to the dataset
add_traces_to_dataset('<run_id>','events')
#add_traces_to_dataset('<run_id>','events')

### 2.1 Review the data
Now we can navigate to the dataset in [langfuse](https://cloud.langfuse.com/) and get a feel for the data. In my case, for the websites I crawled, the extractor performed well. The main issue upon manual inspection of the data is that the extractor often speculates on an end time when there is none specified. Another issue is that some of the pages are listing with multiple events.

Now let's create a metric to simulate the manual evaluation of the data we've just done. Open up extractor.py to see the signature for the Evaluator

It's another simple dspy.Predict that expects html, url, title, description, location, start_time and end_time as input, i.e. the inputs plus the outputs of the extractor, and returns a score and reasoning

So we're using an LLM to evaluate the extractor

In [74]:

ground_truth_evaluator = GroundTruthEvaluator()
def run_evaluation(name, model: str = "openai/gpt-4o-mini",dataset_name: str = "events", extractor: Extractor = Extractor(), evaluator: GroundTruthEvaluator = GroundTruthEvaluator(),set: str = None):
    dataset = langfuse.get_dataset(dataset_name)
    for item in dataset.items:
        print(item.id)
        if set is not None:
            if item.metadata["set"] != set:
                continue
        if item.status != DatasetStatus.ARCHIVED:
            with item.observe(run_name=name, run_metadata={"model": model}) as trace_id:
                try:
                    pred = extractor.extract(html=item.input["kwargs"]["html"], url=item.input["kwargs"]["url"],model=model)
                    eval = evaluator.evaluate(html=item.input["kwargs"]["html"], url=item.input["kwargs"]["url"], title=pred.title, description=pred.description, location=pred.location, start_time=pred.start_time, end_time=pred.end_time)
                    langfuse.score(name="GroundTruthEvaluator",value=eval.score,trace_id=trace_id,comment=eval.reasoning)
                    print(eval.score," ",eval.reasoning)
                except Exception as e:
                    print(f"Error evaluating {item.id}: {e}")


run_evaluation("baseline",dataset_name="events")
langfuse.flush()

One thing to notice is that html pages, even just the contents of body, can have very large token size so we'll need to reduce the size of the pages we use for training. DSPy add its own instructions to the LLM when optimising so we'll risk creating requests that are too large for the LLM to handle.

We can use another simple dspy.Predict, that expects html and url as input and returns a boolean indicating if the page is a singular event page and the relevant html as output

In [47]:

from extractor import singular_event_page_evaluator
ds = langfuse.get_dataset("events")
ds_reduced = langfuse.create_dataset(name="events_reduced",description="pragmatic dspy events reduced")
for item in ds.items:
    pred = singular_event_page_evaluator(html=item.input["kwargs"]["html"], url=item.input["kwargs"]["url"])
    if pred.is_singular:
        input = item.input
        input["kwargs"]["html"] = pred.relevant_html
        langfuse.create_dataset_item(dataset_name=ds_reduced.name,input=input,expected_output=item.expected_output,metadata=item.metadata)



Now we got a dataset with just in the inputs we expect, lets try to deal with the speculation of end time

Note: you're dataset may have different quirks but the general approach will be the same

Let's run a contrived example to show how the evaluator prefers a speculated end time

In [None]:
ground_truth_evaluator = GroundTruthEvaluator()
test_html = """
<div class="event-details">
    <h1>An Event</h1>
    <p class="description">Great event</p>
    <div class="location">Dublin</div>
    <div class="datetime">
        <time datetime="2024-12-12T13:00:00">1pm, 12th December 2024</time>
    </div>
</div>
"""
eval1 = ground_truth_evaluator.evaluate(html=test_html, url="https://bobsevents.com/event1", title="An Event", description="Great event", location="Dublin", start_time="2024-12-12T13:00:00", end_time=None)
eval2 = ground_truth_evaluator.evaluate(html=test_html, url="https://bobsevents.com/event1", title="An Event", description="Great event", location="Dublin", start_time="2024-12-12T13:00:00", end_time="2024-12-12T15:00:00")
print(eval1)
print(eval2)

Lets create an eval dataset and bias the evaluator to prefer leaving the end time as None if it is not specified in the html

I've reviewed the dataset so I can automate biasing the evaluator. Of course we could do this manually in langfuse by manually labelling the dataset with our expected output.

Its worth noting how lacking in rigour my metric is buts it's not hard imagine how we could add more rigorous metrics as we expand and refine the dataset.

In [56]:

ds = langfuse.get_dataset("events_reduced")
eval_dataset = langfuse.create_dataset(name="eval_dataset",description="pragmatic dspy events reduced optimised")
for item in ds.items:
    if item.status != DatasetStatus.ARCHIVED:
        input = {"html":item.input["kwargs"]["html"],"url":item.input["kwargs"]["url"],"title":item.expected_output['_store']['title'],"description":item.expected_output['_store']['description'],"location":item.expected_output['_store']['location'],"start_time":item.expected_output['_store']['start_time'],"end_time":item.expected_output['_store']['end_time']}
        pred = ground_truth_evaluator.evaluate(**input)
        score = pred.score
        if "the end time" in pred.reasoning or item.expected_output['_store']['end_time'] == "Not Specified" or item.expected_output['_store']['end_time'] == "N/A":
            expected_output = item.expected_output
            expected_output['_store']['end_time'] = None
            langfuse.create_dataset_item(dataset_name=ds.name,input=item.input,expected_output=expected_output,metadata=item.metadata,id=item.id)
            if score > 0.6:
                score = 1
            else:
                score = score + 0.4
        langfuse.create_dataset_item(dataset_name=eval_dataset.name,input=input,expected_output={"score":score,"reasoning":pred.reasoning})


Next we'll split the dataset into train and test


In [None]:

dataset = langfuse.get_dataset("eval_dataset")
evaluator_examples = []
for item in dataset.items:
    evaluator_examples.append(dspy.Example(html=item.input["html"], url=item.input["url"], title=item.input["title"], description=item.input["description"], location=item.input["location"], start_time=item.input["start_time"], end_time=item.input["end_time"],score=item.expected_output["score"],reasoning=item.expected_output["reasoning"]).with_inputs("html","url","title","description","location","start_time","end_time"))
evaluator_train, evaluator_test = evaluator_examples[:int(len(evaluator_examples)*0.8)], evaluator_examples[int(len(evaluator_examples)*0.8):]
len(evaluator_train), len(evaluator_test), len(evaluator_examples)

We need a metric to optimise the evaluator so lets write one that simply compares the example to the prediction

We'll use that metric to optimise an new version of the evaluator and save it into gteo.json

**Note**: Running compile will generate multiple LLM calls so DSPy will prompt you to confirm you're okay with the cost before running.

In [None]:
def same_score(example, prediction, trace=None):
    return 1- abs(example.score - prediction.score) 

from extractor import GroundTruthEvaluator_Signature
ground_truth_evaluator = dspy.Predict(GroundTruthEvaluator_Signature)
tp = dspy.MIPROv2(metric=same_score, auto="light", num_threads=6)
ground_truth_evaluator = tp.compile(ground_truth_evaluator, trainset=evaluator_train, max_bootstrapped_demos=2, max_labeled_demos=2)
ground_truth_evaluator.save("gteo.json")

Now running the contrived example again we can see the evaluator now prefers not to speculate an end time


In [None]:
ground_truth_evaluator = GroundTruthEvaluator(config_file="gteo.json")
eval1 = ground_truth_evaluator.evaluate(html=test_html, url="https://bobsevents.com/event1", title="An Event", description="Great event", location="Dublin", start_time="2024-12-12T13:00:00", end_time=None)
eval2 = ground_truth_evaluator.evaluate(html=test_html, url="https://bobsevents.com/event1", title="An Event", description="Great event", location="Dublin", start_time="2024-12-12T13:00:00", end_time="2024-12-12T15:00:00")
print(eval1)
print(eval2)

## Optimising

Okay to we've already optimised the evaluator but that wasn't our main goal. Lets return to optimising the extractor

First we need to split the dataset into train and test


In [None]:
import random
random.seed(42)
from langfuse.api.resources.commons.types import DatasetStatus
dataset = langfuse.get_dataset("events_reduced")
train = []
test = []
data_size = len(dataset.items)
for item in dataset.items:
    if item.status != DatasetStatus.ARCHIVED:
        metadata = item.metadata
        if metadata is None:
            metadata = {}
        if random.random() < 0.8:
            train.append(dspy.Example(html=item.input["kwargs"]["html"], url=item.input["kwargs"]["url"], title=item.expected_output["_store"]["title"], description=item.expected_output["_store"]["description"], location=item.expected_output["_store"]["location"], start_time=item.expected_output["_store"]["start_time"], end_time=item.expected_output["_store"]["end_time"]).with_inputs("html","url"))
            metadata["set"] = "train"
        else:
            test.append(dspy.Example(html=item.input["kwargs"]["html"], url=item.input["kwargs"]["url"], title=item.expected_output["_store"]["title"], description=item.expected_output["_store"]["description"], location=item.expected_output["_store"]["location"], start_time=item.expected_output["_store"]["start_time"], end_time=item.expected_output["_store"]["end_time"]).with_inputs("html","url"))
            metadata["set"] = "test"
        langfuse.create_dataset_item("events_reduced",item.input,item.expected_output,metadata,item.source_trace_id,item.source_observation_id,item.status,item.id)
len(train), len(test), len(dataset.items)


We'll use the evaluator as the metric so in this case we just ignore expected output from the example. It's just checking if the predicted information is grounded in the html


In [None]:
def ogt_evaluate(example, prediction, trace=None):
    ground_truth_evaluator = GroundTruthEvaluator(config_file="gteo.json")
    return ground_truth_evaluator.evaluate(html=example.html, url=example.url, title=prediction.title, description=prediction.description, location=prediction.location, start_time=prediction.start_time, end_time=prediction.end_time).score

simple_extractor = dspy.Predict(Extractor_Signature)
def run_optimisation():
    tp = dspy.MIPROv2(metric=ogt_evaluate, auto="medium", num_threads=3,verbose=True)
    return tp.compile(simple_extractor, trainset=train, max_bootstrapped_demos=2, max_labeled_demos=5, requires_permission_to_run=False)

optimised_extractor = run_optimisation()
optimised_extractor.save("optimised_extractor.json")


Next lets run the evaluation again with the optimised extractor

In [None]:
run_evaluation("baseline",dataset_name="events_reduced",extractor=Extractor(),evaluator=GroundTruthEvaluator(config_file="gteo.json"),set="test")

For my test set the optimised extractor performed a little better that the baseline.

We can also try optimising using a different LLM for the prompt and task models. Let's try Claude 3.5 sonnet, it will be slower but hopefully propose better instructions.

In [None]:
simple_extractor = dspy.Predict(Extractor_Signature)
def run_optimisation():
    prompt_lm = dspy.LM(model="bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",temperature=0.0,aws_region_name="eu-central-1",cooldown_time=30)
    tp = dspy.MIPROv2(metric=ogt_evaluate, auto="medium", num_threads=3,verbose=True, prompt_model=prompt_lm, task_model=prompt_lm,max_errors=20)
    return tp.compile(simple_extractor, trainset=train, max_bootstrapped_demos=2, max_labeled_demos=5, requires_permission_to_run=False)

optimised_extractor = run_optimisation()
optimised_extractor.save("optimised_extractor_claude.json")


In [None]:
run_evaluation("baseline_claude",dataset_name="events_reduced",extractor=Extractor(config_file="optimised_extractor_claude.json"),evaluator=GroundTruthEvaluator(config_file="gteo.json"),set="test")
langfuse.flush()


# Conclusion
Again it performs a little better than the baseline and the first optimised extractor.