In [1]:
import dspy
import ujson

#lm = dspy.LM('openai/gpt-4o-mini')
lm = dspy.LM('ollama_chat/gemma3', api_base='http://localhost:11434', api_key='', cache=False, temperature=0)
dspy.configure(lm=lm)

In [2]:
with open("registration-v1.jsonl") as f:
    data = [ujson.loads(line) for line in f]

data[0]

{'question': 'What states require IAR CE?',
 'response': 'As of March 2025: Arkansas, California, Colorado, Florida, Hawaii, Kentucky, Maryland, Michigan, Mississippi, Nevada, North Dakota, Oklahoma, Oregon, South Carolina, Tennessee, Vermont, Washington D.C., Wisconsin',
 'gold_doc_ids': [2822, 2823]}

In [3]:
data = [dspy.Example(**d).with_inputs('question') for d in data]
# Let's pick an `example` here from the data.
example = data[2]
example

Example({'question': 'What will happen if I do not complete IAR CE?', 'response': 'CE is mandatory. Failure to complete your CE requirement will lead to your IAR registration becoming unrenewable.', 'gold_doc_ids': []}) (input_keys={'question'})

In [5]:
import random

random.Random(0).shuffle(data)
trainset, devset, testset = data[:8], data[8:10], data[10:1000]

len(trainset), len(devset), len(testset)

(8, 2, 1)

In [15]:
max_characters = 10  # for truncating >99th percentile of documents
topk_docs_to_retrieve = 5  # number of documents to retrieve per search query

with open("iar-v1.jsonl") as f:
    corpus = [ujson.loads(line)['text'] for line in f]
    print(f"Loaded {len(corpus)} documents. Will encode them below.")

corpus[0]


Loaded 2 documents. Will encode them below.


'IAR stands for Investment Advisor Registration'

In [None]:

#embedder = dspy.Embedder('openai/text-embedding-3-small', dimensions=512)
embedder = dspy.Embedder('ollama/nomic-embed-text', dimensions=512)
search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=topk_docs_to_retrieve)

UnsupportedParamsError: litellm.UnsupportedParamsError: Setting {'dimensions': 512} is not supported by ollama_chat. To drop it from the call, set `litellm.drop_params = True`.

In [11]:
class RAG(dspy.Module):
    def __init__(self, num_docs=5):
        self.num_docs = num_docs
        self.respond = dspy.ChainOfThought('context, question -> response')

    def forward(self, question):
        context = search(question, k=self.num_docs)   # not defined in this snippet, see link above
        return self.respond(context=context, question=question)


In [12]:

rag = RAG()
rag(question="what are high memory and low memory on linux?")


NameError: name 'search' is not defined

In [None]:



qa = dspy.Predict('question: str -> response: str')
response = qa(question="what are high memory and low memory on linux?")

print(response.response)

cot = dspy.ChainOfThought('question -> response')
cot(question="should curly braces appear on their own line?")








evaluate(RAG())

tp = dspy.MIPROv2(metric=dspy.SemanticF1(), auto="medium", num_threads=24)
optimized_rag = tp.compile(RAG(), trainset=trainset, max_bootstrapped_demos=2, max_labeled_demos=2, requires_permission_to_run=False)

pred = optimized_rag(question="cmd+tab does not work on hidden or minimized windows")
print(pred.response)

evaluate(optimized_rag)

optimized_rag.save("optimized_rag.json")

loaded_rag = RAG()
loaded_rag.load("optimized_rag.json")

loaded_rag(question="cmd+tab does not work on hidden or minimized windows")



def load_conll_dataset() -> dict:
    """
    Loads the CoNLL-2003 dataset into train, validation, and test splits.
    
    Returns:
        dict: Dataset splits with keys 'train', 'validation', and 'test'.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        # Use a temporary Hugging Face cache directory for compatibility with certain hosted notebook
        # environments that don't support the default Hugging Face cache directory
        os.environ["HF_DATASETS_CACHE"] = temp_dir
        return load_dataset("conll2003", trust_remote_code=True)

def extract_people_entities(data_row: Dict[str, Any]) -> List[str]:
    """
    Extracts entities referring to people from a row of the CoNLL-2003 dataset.
    
    Args:
        data_row (Dict[str, Any]): A row from the dataset containing tokens and NER tags.
    
    Returns:
        List[str]: List of tokens tagged as people.
    """
    return [
        token
        for token, ner_tag in zip(data_row["tokens"], data_row["ner_tags"])
        if ner_tag in (1, 2)  # CoNLL entity codes 1 and 2 refer to people
    ]

def prepare_dataset(data_split, start: int, end: int) -> List[dspy.Example]:
    """
    Prepares a sliced dataset split for use with DSPy.
    
    Args:
        data_split: The dataset split (e.g., train or test).
        start (int): Starting index of the slice.
        end (int): Ending index of the slice.
    
    Returns:
        List[dspy.Example]: List of DSPy Examples with tokens and expected labels.
    """
    return [
        dspy.Example(
            tokens=row["tokens"],
            expected_extracted_people=extract_people_entities(row)
        ).with_inputs("tokens")
        for row in data_split.select(range(start, end))
    ]

# Load the dataset
dataset = load_conll_dataset()

# Prepare the training and test sets
train_set = prepare_dataset(dataset["train"], 0, 50)
test_set = prepare_dataset(dataset["test"], 0, 200)

In [None]:
from typing import List

class PeopleExtraction(dspy.Signature):
    """
    Extract contiguous tokens referring to specific people, if any, from a list of string tokens.
    Output a list of tokens. In other words, do not combine multiple tokens into a single value.
    """
    tokens: list[str] = dspy.InputField(desc="tokenized text")
    extracted_people: list[str] = dspy.OutputField(desc="all tokens referring to specific people extracted from the tokenized text")

people_extractor = dspy.ChainOfThought(PeopleExtraction)

In [None]:
lm = dspy.LM('ollama_chat/mistral', api_base='http://localhost:11434', api_key='', cache=False, temperature=0)
dspy.settings.configure(lm=lm)

In [None]:
def extraction_correctness_metric(example: dspy.Example, prediction: dspy.Prediction, trace=None) -> bool:
    """
    Computes correctness of entity extraction predictions.
    
    Args:
        example (dspy.Example): The dataset example containing expected people entities.
        prediction (dspy.Prediction): The prediction from the DSPy people extraction program.
        trace: Optional trace object for debugging.
    
    Returns:
        bool: True if predictions match expectations, False otherwise.
    """
    return prediction.extracted_people == example.expected_extracted_people

evaluate_correctness = dspy.Evaluate(
    devset=test_set,
    metric=extraction_correctness_metric,
    num_threads=24,
    display_progress=True,
    display_table=True
)

In [None]:
evaluate_correctness(people_extractor, devset=test_set)

In [None]:
mipro_optimizer = dspy.MIPROv2(
    metric=extraction_correctness_metric,
    auto="medium",
)
optimized_people_extractor = mipro_optimizer.compile(
    people_extractor,
    trainset=train_set,
    max_bootstrapped_demos=4,
    requires_permission_to_run=False,
    minibatch=False
)

In [None]:
evaluate_correctness(optimized_people_extractor, devset=test_set)

In [None]:
dspy.inspect_history(n=1)

In [None]:
cost = sum([x['cost'] for x in lm.history if x['cost'] is not None])  # cost in USD, as calculated by LiteLLM for certain providers
cost

In [None]:
optimized_people_extractor.save("optimized_extractor_001.json")

loaded_people_extractor = dspy.ChainOfThought(PeopleExtraction)
loaded_people_extractor.load("optimized_extractor_001.json")

p1 = loaded_people_extractor(tokens=["Italy", "recalled", "Marcello", "Cuttitta"]).extracted_people
p2 = loaded_people_extractor(tokens=["Muralidhar", "Kamath", "went", "for", "a", "walk", "in", "Pittsburgh"]).extracted_people
print(p1, p2)