In [None]:
# !pip install dspy-ai cloudpickle matplotlib

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

In [None]:
# Import relevant packages
import dspy
from dspy.signatures.signature import signature_to_template

# Setting up LLM models
PROJECT_ID = os.environ.get("PROJECT_ID")
MODEL_ID = "gemini-1.5-flash-001"

flash = dspy.GoogleVertexAI(
    model=MODEL_ID, 
    max_tokens=1000,
    temperature=0.05, 
    project=PROJECT_ID
)

# Note: different models can be set for prompt model and task model
# - prompt model : rm (retrieval model) 
# - task model : lm (language model)
dspy.settings.configure(lm=flash, rm=flash)

# Fixed themes used for signatures
themes = """
    Technical Learning Resources,
    Clarity of Requirements,
    Time Commitment / Workload,
    Showcase / Presentation Format,
    Communication and Information Sharing,
    Team Formation and Dynamics,
    Relevance of Training Content,
    Application to Business / Use Cases,
    Accessibility and Inclusion,
    Incentives and Recognition,
    Post-Hackathon Follow-up,
    Support from Leadership / Mentors,
"""

1. Define Task

In [8]:
# Setting up signature for DSPy
class GenerateTheme(dspy.Signature):
    """Classify user feedback from hackathon to one single theme"""
    comments = dspy.InputField(desc="user feedback from hackathon")
    themes = dspy.OutputField(desc=f"only pick one of the following choices: {themes}")

## COT
class COT(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate_answer = dspy.ChainOfThought(GenerateTheme)
    
    def forward(self, comments):
        answer = self.generate_answer(comments=comments)
        return dspy.Prediction(answer=answer.themes)
    
cot = COT()

In [None]:
# Test a question
pred = cot.forward(
    "The topic and content was very interesting and the opportunity to connect with technical and commercial mentors was great! In the future, it would be great to see either more structure to the hackathon, or more communication about the structure of the hackathon."
)
pred
# Check prompt history
# flash.inspect_history(n=1)

2) Import training and testing data

In [None]:
# Create training and testing dataset
import pandas as pd
from dspy.datasets import DataLoader

# Convert csv to dataframe
data = pd.read_csv("./data.csv")
dataset = data[["comments", "answer"]] # only want the relevant columns
dataset = dataset[
    dataset.comments.notna() & dataset.answer.notna()
]
# dataset = dataset.head(5) # limit dataset for testing

# Load data into the desired format for DSPy
dl = DataLoader()
dspy_dataset = dl.from_pandas(dataset, fields=("comments", "answer"), input_keys=['comments']) # IMPORTANT: input_keys is the 'input' for the model
splits = dl.train_test_split(dspy_dataset, train_size=0.8) # `dataset` is a List of dspy.Example
train_dataset = splits['train']
test_dataset = splits['test']

dataset.describe()

3. Evaluate baseline

In [None]:
# Evaluate the accuracy - calculates the HIT ratio (i.e. exact match of the predicted answer)
from dspy.evaluate.evaluate import Evaluate

# Set up the `evaluate_rag` function. We'll use this many times below.
evaluate_rag = Evaluate(devset=test_dataset, num_threads=1, display_progress=True, display_table=5)

# Evaluate the `compiled_rag` program with the `answer_exact_match` metric.
metric = dspy.evaluate.answer_exact_match
evaluation_score = evaluate_rag(cot, metric=metric)
print(f"Retrieval HIT Ratio for RAG: {evaluation_score}%")

4. Compiling with Optimiser MIPRO (automates prompts and examples)

In [None]:
from dspy.teleprompt import MIPROv2

# Constants for optimisers
num_threads = 2
kwargs = dict(num_threads=num_threads, display_progress=True, display_table=5)
metric = dspy.evaluate.answer_exact_match

optimiser = MIPROv2(
                task_model=flash,
                prompt_model=flash,
                metric=metric,
                log_dir="logs/"
            )

compiled_model = optimiser.compile(
                    student=cot, 
                    trainset=train_dataset, 
                    max_bootstrapped_demos=4, 
                    max_labeled_demos=4, 
                    eval_kwargs=kwargs,
                )

In [None]:
# Test the new compiled model
pred = compiled_model.forward(
    "The topic and content was very interesting and the opportunity to connect with technical and commercial mentors was great! In the future, it would be great to see either more structure to the hackathon, or more communication about the structure of the hackathon."
)
pred
flash.inspect_history(n=1)

5. Plot model logs

In [None]:
import matplotlib.pyplot as plt

def plot_model_logs(trial_logs):
    # Extracting trial numbers, scores, and pruning status
    trial_numbers = list(trial_logs.keys())
    scores = [trial_logs[trial]['score'] for trial in trial_numbers]
    pruning_status = [trial_logs[trial]['pruned'] for trial in trial_numbers]

    # Plot setup
    plt.figure(figsize=(5, 3))

    # Plotting each point
    for trial_number, score, pruned in zip(trial_numbers, scores, pruning_status):
        if pruned:
            plt.scatter(trial_number, score, color='grey', label='Pruned Batch' if 'Pruned Batch' not in plt.gca().get_legend_handles_labels()[1] else "")
        else:
            plt.scatter(trial_number, score, color='green', label='Successful Batch' if 'Successful Batch' not in plt.gca().get_legend_handles_labels()[1] else "")

    plt.xlabel('Batch Number')
    plt.ylabel('Score')
    plt.title('Batch Scores')
    plt.grid(True)
    plt.legend()
    plt.show()

plot_model_logs(compiled_model.trial_logs)

In [None]:
def print_model_logs(model, compiled_model):
    # Init constant
    best_score = 0

    def get_signature(predictor):
        if (hasattr(predictor, 'extended_signature')):
            return predictor.extended_signature
        elif (hasattr(predictor, 'signature')):
            return predictor.signature

    print(f"Baseline program | Score: {best_score}:")
    for i,predictor in enumerate(model.predictors()):
        print(f"Prompt {i+1} Instruction: {get_signature(predictor).instructions}")
    print()

    print("----------------")

    for trial_num in compiled_model.trial_logs:
        program_score = compiled_model.trial_logs[trial_num]["score"]
        program_pruned = compiled_model.trial_logs[trial_num]["pruned"]
        if program_score > best_score and not program_pruned and compiled_model.trial_logs[trial_num]["full_eval"]:
            best_score = program_score
            best_program_so_far = compiled_model.trial_logs[trial_num]["program"]
        if trial_num % 5 == 0:
            print(f"Best program after {trial_num} batches | Score: {best_score}:")
            for i,predictor in enumerate(best_program_so_far.predictors()):
                print(f"Prompt {i+1} Instruction: {get_signature(predictor).instructions}")
            print()

print_model_logs(cot, compiled_model)

6. Saving and loading compiled models

In [13]:
import cloudpickle as pickle

LOAD_PRECOMPILED_MODEL = True

# saving trial logs
def save_trial_logs(model, outputfile="trial_logs"):        
    logs = {
        index: {key: value for key, value in item.items() if key != "program"}
        for index, item in model.trial_logs.items()
    }
    
    with open(f"{outputfile}.pickle", "wb") as file:
        pickle.dump(logs, file)

# loading trial logs
def load_trial_logs(inputfile="trial_logs"):
    with open(f"{inputfile}.pickle", "rb") as file:
        logs = pickle.load(file)
    
    return logs

if LOAD_PRECOMPILED_MODEL:
    loaded_model = cot.deepcopy()
    loaded_model.load('compiled_model.dspy')
    trial_logs = load_trial_logs()
    loaded_model.trial_logs = trial_logs


7. Scoring with compiled models

In [37]:
data = pd.read_csv("./data.csv") # change this

# Scoring locally
data["prediction"] = None
for index, row in data.iterrows():
    if isinstance(row["comments"], str):        
        prediction = loaded_model.forward(row["comments"])
        data.at[index, "prediction"] = prediction.answer.strip("**")

# Saving new dataframe
data.to_csv('data.csv')