In [None]:
import os
import re

import dspy
import pandas as pd
from dotenv import load_dotenv
from dspy.evaluate import Evaluate
from dspy.teleprompt import (
    BootstrapFewShot,
    BootstrapFewShotWithRandomSearch,
    KNNFewShot,
    MIPROv2,
)

from dataloader import build_eval_dataset, check_if_data_folder_exits

In [None]:
# load environment variables
load_dotenv()

# azure-openai-gpt-4o
AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT")
AZURE_OPENAI_VERSION = os.getenv("AZURE_OPENAI_VERSION")

# azure-openai-gpt-35-turbo
AZURE_OPENAI_KEY_35_TURBO = os.getenv("AZURE_OPENAI_KEY_35_TURBO")
AZURE_OPENAI_VERSION_35_TURBO = os.getenv("AZURE_OPENAI_VERSION_35_TURBO")
AZURE_OPENAI_DEPLOYMENT_35_TURBO = os.getenv("AZURE_OPENAI_DEPLOYMENT_35_TURBO")
AZURE_OPENAI_ENDPOINT_35_TURBO = os.getenv("AZURE_OPENAI_ENDPOINT_35_TURBO")

OLLAMA_URL = os.getenv("OLLAMA_URL")

DATA_FOLDER = "data/IR-Plag-Dataset"
check_if_data_folder_exits(DATA_FOLDER)

# Eval dataset for solution

In [None]:
eval_df = build_eval_dataset(DATA_FOLDER)
eval_df.sample(15)

# Dataset for training

In [None]:
df = pd.read_csv("data/train.tsv", sep="\t")
df = df.sample(frac=1, random_state=1337).reset_index(drop=True)

def create_example(row: pd.Series) -> dspy.Example:
    return dspy.Example(
        code_sample_1=row["sample_1"],
        code_sample_2=row["sample_2"],
        plagiarized="Yes" if row["plagiarized"] else "No",
        explanation=row["reason"],
    ).with_inputs("code_sample_1", "code_sample_2")


train_examples = []
for _, row in df.iterrows():
    example = create_example(row)
    train_examples.append(example)

# DSPy

our "8 steps of using DSPy" could be find in [README.md](README.md)

In [None]:
# llm setup
llm_name = "azure-gpt-4o"
# llm_name = "ollama-codellama:13b"
# llm_name = "azure-gpt-35-turbo"
program_save_path = "programs/{llm_name}_{optimizer}_{score}"


if llm_name == "azure-gpt-4o":
    lm = dspy.AzureOpenAI(
        api_base=AZURE_OPENAI_ENDPOINT,
        api_version=AZURE_OPENAI_VERSION,
        deployment_id=AZURE_OPENAI_DEPLOYMENT,
        api_key=AZURE_OPENAI_KEY,
    )
elif llm_name == "azure-gpt-35-turbo":
    lm = dspy.AzureOpenAI(
        api_base=AZURE_OPENAI_ENDPOINT_35_TURBO,
        api_version=AZURE_OPENAI_VERSION_35_TURBO,
        deployment_id=AZURE_OPENAI_DEPLOYMENT_35_TURBO,
        api_key=AZURE_OPENAI_KEY_35_TURBO,
    )
elif "ollama" in llm_name:
    model_name = "-".join(llm_name.split("-")[1:])
    lm = dspy.OllamaLocal(base_url=OLLAMA_URL, model=model_name)
else:
    raise ValueError(f"Unknown LLM name: {llm_name}")
dspy.settings.configure(lm=lm)

metadata = []

In [None]:
class Signature(dspy.Signature):
    """Detect if two code samples are plagiarized. In plagiarized field answer only : Yes if the code samples are plagiarized, No otherwise. In explenation field add the reason why the code samples are/ are not plagiarized."""

    code_sample_1 = dspy.InputField(desc="The first code sample to compare")
    code_sample_2 = dspy.InputField(desc="The second code sample to compare")
    explanation = dspy.OutputField(
        desc="Explanation or reason why the code samples are/ are not plagiarized"
    )
    plagiarized = dspy.OutputField(
        desc="Yes/No indicating if code samples are plagiarized"
    )


class CoT(dspy.Module):
    def __init__(self) -> None:
        super().__init__()
        self.prog = dspy.ChainOfThought(Signature)

    def forward(self, code_sample_1: str, code_sample_2: str) -> Signature:
        return self.prog(code_sample_1=code_sample_1, code_sample_2=code_sample_2)

In [None]:
def validate_answer(
    example: dspy.Example, pred: Signature, trace: object = None
) -> bool:
    try:
        if pred.plagiarized is None:
            return False  # or handle this case as appropriate for your use case

        pred_plag = pred.plagiarized.strip().lower().split("\n")[0]
        yes_no_pattern = r"\b(yes|no)\b"
        match = re.search(yes_no_pattern, pred_plag)
        extracted_answer = match.group(1) if match else pred.plagiarized.strip().lower()

        if example.plagiarized is None:
            return False  # or handle this case as appropriate for your use case

        score = (
            True if extracted_answer == example.plagiarized.strip().lower() else False
        )
    except Exception:
        score = False
    return score


evaluate = Evaluate(
    devset=train_examples,
    metric=validate_answer,
    num_threads=4,
    display_progress=True,
    display_table=0,
)

In [None]:
# zero-shot evaluation on train data
score = evaluate(CoT())

metadata.append({"optimizer": "zero-shot", "score": score})

In [None]:
def run_bootstrapfewshot(
    program: dspy.Module, llm_name: str, train_examples: list, metric: callable
) -> object:
    optimizer = "BootstrapFewShot"
    config = {"max_bootstrapped_demos": 8, "max_labeled_demos": 8}

    teleprompter = BootstrapFewShot(metric=metric, **config)
    optimized_cot = teleprompter.compile(program, trainset=train_examples)
    score = evaluate(optimized_cot)

    save_path = program_save_path.format(
        llm_name=llm_name, optimizer=optimizer, score=round(score, 2)
    )
    metadata = {"optimizer": optimizer, "score": score, "save_path": save_path}
    optimized_cot.save(save_path)
    return metadata


metadata.append(run_bootstrapfewshot(CoT(), llm_name, train_examples, validate_answer))

In [None]:
def run_bootstrapfewshotwithrandomsearch(
    program: dspy.Module, llm_name: str, train_examples: list, metric: callable
) -> object:
    optimizer = "BootstrapFewShotWithRandomSearch"
    config = {
        "max_bootstrapped_demos": 8,
        "max_labeled_demos": 8,
        "num_candidate_programs": 20,
        "num_threads": 4,
    }

    teleprompter = BootstrapFewShotWithRandomSearch(metric=metric, **config)
    optimized_cot = teleprompter.compile(program, trainset=train_examples)
    score = evaluate(optimized_cot)
    save_path = program_save_path.format(
        llm_name=llm_name, optimizer=optimizer, score=round(score, 2)
    )
    metadata = {"optimizer": optimizer, "score": score, "save_path": save_path}
    optimized_cot.save(save_path)
    return metadata


metadata.append(
    run_bootstrapfewshotwithrandomsearch(
        CoT(), llm_name, train_examples, validate_answer
    )
)

In [None]:
def run_knnfewshot(
    program: dspy.Module, llm_name: str, train_examples: list, metric: callable
) -> object:
    optimizer = "KNNFewShot"
    knn_teleprompter = KNNFewShot(7, train_examples)
    optimized_cot = knn_teleprompter.compile(CoT(), trainset=train_examples)
    score = evaluate(optimized_cot)
    save_path = program_save_path.format(
        llm_name=llm_name, optimizer=optimizer, score=round(score, 2)
    )
    metadata = {"optimizer": optimizer, "score": score, "save_path": save_path}
    optimized_cot.save(save_path)
    return metadata


metadata.append(run_knnfewshot(CoT(), llm_name, train_examples, validate_answer))

In [None]:
def run_miprov2(
    program: dspy.Module,
    llm_name: str,
    train_examples: list,
    metric: callable,
    prompt_model: object,
    task_model: object,
) -> object:
    optimizer = "MIPROv2"
    n = 20  # The number of instructions and fewshot examples that we will generate and optimize over
    batches = 40  # The number of optimization trials to be run (we will test out a new combination of instructions and fewshot examples in each trial)
    temperature = 0.5  # The temperature configured for generating new instructions
    eval_kwargs = {"num_threads": 4, "display_progress": True, "display_table": 0}
    teleprompter = MIPROv2(
        prompt_model=lm,
        task_model=lm,
        metric=validate_answer,
        num_candidates=n,
        init_temperature=temperature,
        verbose=True,
    )
    optimized_cot = teleprompter.compile(
        CoT(),
        trainset=train_examples,
        num_batches=batches,
        max_bootstrapped_demos=16,
        max_labeled_demos=16,
        requires_permission_to_run=False,
        eval_kwargs=eval_kwargs,
    )
    score = evaluate(optimized_cot)
    save_path = program_save_path.format(
        llm_name=llm_name, optimizer=optimizer, score=round(score, 2)
    )
    metadata = {"optimizer": optimizer, "score": score, "save_path": save_path}
    optimized_cot.save(save_path)
    return metadata


metadata.append(run_miprov2(CoT(), llm_name, train_examples, validate_answer, lm, lm))

In [None]:
df = pd.DataFrame(metadata)
df

In [None]:
df.to_csv(f"data/metadata/{llm_name}_metadata.csv", index=False)