# Setup

Select the model to use as teacher and student during prompt optimization.

DSPy uses the litellm strings (eg. **ollama_chat/...**). 

For optimization, you would typically chose the stronger model as a teacher, for proposing instructions and generating bootstrapped samples. The student model is the model you intend to use during the task. These could also be the same model.

In [None]:
import os
from dotenv import load_dotenv
import dspy
import logging

logging.getLogger().setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("LiteLLM").setLevel(logging.WARNING)

# set your api key (if needed)
load_dotenv("../../.env")
APIKEY = os.getenv("APIKEY")

# set your model (litellm model strings)
model_id = "openrouter/deepseek/deepseek-chat"
lm = dspy.LM(model_id, api_key=APIKEY, cache=True)
dspy.configure(lm=lm)

# Signatures

Signatures are like DSPy's pydantic models. Describe the fields and docstrings as though they are prompts (they are).

They will likely reflect the data in your table schema, but also could additional intermediate data structures in multi-hop patterns.

### Initial prototype
```python
from typing import Literal, Optional


class NewsAppSignatureExample(dspy.Signature):
    text: str = dspy.InputField(desc="Text from an article for analysis")
    category: Literal["world", "entertainment", "science", "health", "business", "sports", "politics", "tech"] = dspy.OutputField(desc="Article content category")
    title: str = dspy.OutputField(desc="Article title, when available. Otherwise create one")
    tags: list[str] = dspy.OutputField(desc="Tags for search and classification")
    notable_people: Optional[list[str]] = dspy.OutputField(desc="Names of notable people in the article")
    notable_organizations: Optional[list[str]] = dspy.OutputField(desc="Names of notable organizations in the article")


# system prompt goes in the docstring
NewsAppSignatureExample.__doc__ = """
You are provided with the text of a news article. Help provide the requested information for catalogging.
"""
```

With some good examples in hand, I refined an expanded list with ChatGPT.

In [None]:
from news_app import NewsAppSignature

# Run the program

I like the natural code style of writing a DSPy signature. A pydantic model becomes the prompt.

`Literal` type + LLM = classifier (cool!)

We can already try it out, using the ChainOfThought predictor to run the program.

In [None]:
text = """
Business Briefing Dec. 2, 2015
Nokia shareholders overwhelmingly approved the acquisition of the ailing French telecom Alcatel-Lucent, removing one of the last hurdles to a 15.6 billion euro ($16.5 billion) deal that will make Nokia a market leader in networks.
In October, Nokia said it would pay 4 billion to shareholders as the company raised its outlook for the year.
Rajeev Suri, Nokias chief executive, said he was delighted by shareholders recognizing the long-term value creation opportunity of the deal, which is expected to close during the first quarter of 2016.
"""

In [None]:
catalog = dspy.ChainOfThought(NewsAppSignature)
catalog_item = catalog(article_text=text)
print(catalog_item)

# Generating training data

We'll rely on "best of n" scaling to help create synthetic data for our application. Then we'll manually review ~100 examples we created for training.


## A basic test time scaling

I'll generate some training data using a simplistic best-of-n style test time scaling. Aggregating all of the types is a bit more challenging, so I've done that in the `aggregate/` folder as a module that I can work on further.

Depending on where you are running your LLM calls, you might choose the serial or parallel methods below.

In [None]:
from dspy import Parallel, ChainOfThought
from typing import List, Literal
import tqdm

def generate_candidates_serial(text, n=8):
    """ Run in serial """
    return [catalog(article_text=text) for _ in range(n)]


def generate_candidates_parallel(text, n=8, num_threads=2):
    """ Run in parallel """
    parallel_executor = dspy.Parallel(num_threads=num_threads)
    exec_pairs = [(catalog, {'article_text': text}) for _ in range(n)]
    results = parallel_executor.forward(exec_pairs)
    
    return results

### Aggregation

We need to aggregate by each field to obtain consensus results. For lists, we fuzzy deduplicate and then set a threshold for N minimum occurrences for acceptence. We are targeting aggregation from 8 outputs.

I've modularized the code and imported it here, since it's a bit long and not especially interesting.

In [None]:
import sys
sys.path.append("..")

from aggregate.aggregate import LLMOutputAggregator

```python
from typing import List, Optional, Literal, Dict, Any, Union
from collections import Counter
import textdistance
import itertools
from pydantic import ValidationError
from typing import get_origin, get_args, Union

def is_optional_field(type_hint) -> bool:
    """
    Determines if a type hint is Optional, i.e., Union[X, None].
    """
    return get_origin(type_hint) is Union and type(None) in get_args(type_hint)

def aggregate_signatures(text, predictions: List[Any], threshold: int = 2, debug: bool = False) -> NewsAppSignature:
    """
    Aggregates multiple Prediction objects into a single NewsAppSignature.
    
    Args:
        predictions (List[Any]): A list of Prediction objects.
        threshold (int): Minimum number of occurrences for a cluster to be included.
    
    Returns:
        NewsAppSignature: The aggregated signature.
    
    Raises:
        ValueError: If required fields are missing or validation fails.
    """
    if not predictions:
        raise ValueError("No predictions to aggregate.")

    aggregated_fields: Dict[str, Any] = {}
    
    # Helper function for majority voting
    def majority_vote(values: List[Any]) -> Any:
        counter = Counter(values)
        if debug:
            print(counter)
        most_common, count = counter.most_common(1)[0]
        return most_common

    # Helper function for clustering similar strings with frequency threshold
    def cluster_strings_with_threshold(strings: List[str], threshold: int, similarity_threshold: float = 0.6) -> List[str]:
        """
        Clusters similar strings based on Jaccard similarity and filters clusters based on frequency threshold.
        
        Args:
            strings (List[str]): List of strings to cluster.
            threshold (int): Minimum number of occurrences for a cluster to be included.
            similarity_threshold (float): Jaccard similarity threshold for clustering.
        
        Returns:
            List[str]: List of representative strings from clusters that meet the threshold.
        """
        clusters = []
        for string in strings:
            added = False
            for cluster in clusters:
                # Compare with the first item in the cluster
                similarity = textdistance.jaccard.normalized_similarity(
                    set(string.lower().split()), set(cluster[0].lower().split())
                )
                if similarity >= similarity_threshold:
                    cluster.append(string)
                    added = True
                    break
            if not added:
                clusters.append([string])
        
        # Filter clusters based on threshold
        if debug:
            print(clusters)
        filtered_clusters = [cluster for cluster in clusters if len(cluster) >= threshold]
        
        # Return one representative from each filtered cluster
        return [cluster[0] for cluster in filtered_clusters]
    
    # Iterate over each field in the NewsAppSignature
    for field_name, field_type in NewsAppSignature.__annotations__.items():
        # Special handling for 'article_text' since it's turned off
        if field_name == "article_text":
            # Set 'article_text' to an empty string as per user's instruction
            aggregated_fields[field_name] = text
            continue

        # Collect all non-None values for the current field
        field_values = [getattr(pred, field_name, None) for pred in predictions]
        field_values = [val for val in field_values if val is not None]

        if not field_values:
            # Determine if the field is optional
            if is_optional_field(field_type):
                aggregated_fields[field_name] = None
            else:
                # For required fields with no values, raise an error
                raise ValueError(f"No values found for required field '{field_name}' during aggregation.")
            continue

        # Determine the field type for aggregation
        origin_type = get_origin(field_type)
        args_type = get_args(field_type)

        # Handle Literal types
        if origin_type is Literal:
            # Majority voting for Literal fields
            aggregated_fields[field_name] = majority_vote(field_values)
        elif isinstance(field_values[0], str):
            # Majority voting for single-string fields
            aggregated_fields[field_name] = majority_vote(field_values)
        elif isinstance(field_values[0], list):
            # Flatten all lists
            flattened = list(itertools.chain.from_iterable(field_values))
            # Cluster similar strings with frequency threshold
            clustered = cluster_strings_with_threshold(flattened, threshold=threshold)
            aggregated_fields[field_name] = clustered
        else:
            # Handle other types if necessary
            aggregated_fields[field_name] = majority_vote(field_values)

    # Instantiate the aggregated NewsAppSignature with all fields
    try:
        aggregated_signature = NewsAppSignature(**aggregated_fields)
    except ValidationError as ve:
        # Extract detailed validation errors
        raise ValueError(f"Error creating aggregated NewsAppSignature: {ve}")

    return aggregated_signature
```

```python
consensus = aggregate_signatures(text, results, debug=True, threshold=4)
consensus
```

## Process a bunch of data

We can load `ag_news` to create our synthetic training data, and process ~100 rows.

I'll save the save the results as I go. Quick and dirty, just restart if it fails.

In [None]:
from datasets import load_dataset

# Load a diverse news dataset (e.g., "ag_news")
dataset = load_dataset("valurank/News_Articles_Categorization", split="train")

### Utilities for tracking the dataset offset

In [None]:
import json
import hashlib
import os
import tqdm

# Define the number of articles and samples
num_articles = 100
samples_per_article = 8

# Define the output directory
output_dir = "training_data"

# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Define a file to keep track of progress (offset)
progress_file = os.path.join(output_dir, "progress.txt")

# Function to generate a non-cryptographic hash (e.g., MD5) of a JSON string
def generate_hash(json_str: str) -> str:
    return hashlib.md5(json_str.encode('utf-8')).hexdigest()

# Function to load the current offset
def load_offset() -> int:
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as f:
            try:
                offset = int(f.read().strip())
                return offset
            except ValueError:
                return 0
    return 0

# Function to save the current offset
def save_offset(offset: int):
    with open(progress_file, 'w') as f:
        f.write(str(offset))


### Best-Of-N Processing Loop

In [None]:
# Initialize the starting offset
start_offset = load_offset()

# Iterate over the specified number of articles starting from the offset
for i in tqdm.tqdm(range(start_offset, num_articles), desc="Processing Articles", total=num_articles - start_offset):
    try:
        article = dataset[i]
        text = article['Text']
        
        # Generate multiple predictions
        # candidates = generate_candidates_serial(text, n=samples_per_article)
        candidates = generate_candidates_parallel(text, n=samples_per_article)
        
        # Aggregate predictions to form consensus
        candidates_with_text = []
        for c in [c.toDict() for c in candidates]:
            c.update({"article_text": text})
            candidates_with_text.append(c)
        candidates_with_text
        consensus = LLMOutputAggregator.aggregate(
            NewsAppSignature, candidates_with_text, threshold=3
        )
        
        # Convert consensus to JSON string
        consensus_json = consensus.model_dump_json()
        
        # Generate filename using hash of JSON string
        filename_hash = generate_hash(consensus_json)
        filename = f"{filename_hash}.json"
        file_path = os.path.join(output_dir, filename)
        
        # Save the JSON string to the file
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(consensus_json)
        
        # Update the progress offset
        save_offset(i + 1)
        
    except Exception as e:
        print(f"Error processing article {i}: {e}")
        # Optionally, log the error to a file
        error_log = os.path.join(output_dir, "error_log.txt")
        with open(error_log, 'a') as f:
            f.write(f"Article {i}: {e}\n")
        # Continue with the next article
        continue


## Review data

(This is done in the review tool.)

## Load data

In [None]:
import glob
import json
from datetime import datetime

data = []
for file in glob.glob("training_data/accepted/*.json"):
    with open(file, "r") as fh:
        tmp = json.load(fh)

        # convert to date
        tmp["publication_date"] = datetime.strptime(tmp["publication_date"], "%Y-%m-%d").date() if tmp["publication_date"] else None

        # remove reasoning from example
        if "reasoning" in tmp:
            del tmp["reasoning"]

        e = dspy.Example(tmp).with_inputs("article_text")
        data.append(e)

In [None]:
data[0]

In [None]:
import sys
sys.path.append("../scorer")
from scorer import WordLlamaScorer
from dspy.evaluate import Evaluate
from dspy.teleprompt import MIPROv2


scorer = WordLlamaScorer.from_signature(NewsAppSignature, skip_fields=["article_text", "reasoning"])


teleprompter = MIPROv2(
    metric=scorer,
    auto="medium",
    teacher_settings=dict(lm=teacher_lm),
    num_threads=2
)

catalog = dspy.ChainOfThought(NewsAppSignature)
optimized_program = teleprompter.compile(
    student=catalog.deepcopy(),
    teacher=catalog.deepcopy(),
    trainset=data,
    max_bootstrapped_demos=2,
    max_labeled_demos=2,
    requires_permission_to_run=False,
)


In [None]:
for demo in optimized_program.demos:
    if 'publication_date' in demo and isinstance(demo['publication_date'], date):
        demo['publication_date'] = demo['publication_date'].isoformat()

# Save the state to a JSON file
optimized_program.save("miprov2_command_r7b.json", save_program=False)


In [None]:
optimized_program(article_text=text)


# Extra

In [None]:
from wordllama import WordLlama

wl = WordLlama.load()

In [None]:
import glob
import json

data = []
for file in glob.glob("training_data/accepted/*.json"):
    with open(file, "r") as fh:
        data.append(json.load(fh))

In [None]:
texts = [x["article_text"] for x in data]

embeds = wl.embed(texts)
sim_matrix = wl.vector_similarity(embeds, embeds)

In [None]:
import numpy as np

idx = np.concatenate([x[:, None] for x in np.where(sim_matrix > 0.9)], axis=1)
deduplicate = list(filter(lambda x: x[0] < x[1], idx)) # lower triangle


```python
import os
from pathlib import Path

def remove_older_file(file1_path: str, file2_path: str) -> str:
    """
    Compare two files and remove the older one.
    Returns the path of the removed file.
    """
    # Get modification timestamps for both files
    time1 = os.path.getmtime(file1_path)
    time2 = os.path.getmtime(file2_path)
    
    # Compare and remove older file
    if time1 < time2:
        os.remove(file1_path)
        return file1_path
    else:
        os.remove(file2_path)
        return file2_path


files = list(glob.glob("training_data/accepted/*.json"))
for pair in deduplicate:
    older_file = remove_older_file(files[pair[0]], files[pair[1]])
    print(f"Removed older file: {older_file}")
```