# Ollama Model Experiments

This notebook runs Ollama models against custom and dataset examples:
- With and without RLM (Recursive Language Model)
- With different prefix prompts
- Generates parseable rule files

In [None]:
import sys

sys.path.insert(0, "..")

import json
import re
from pathlib import Path
from datetime import datetime
from string import Template
from dataclasses import dataclass, asdict

from self_distill.clients.ollama_client import OllamaClient
from self_distill.datasets import DATA, load_dataset

## 1. Rule Output Format

We define a JSON-based rule format that LLMs can generate and we can parse reliably.

In [None]:
@dataclass
class RuleExample:
    input: str
    output: str
    matches: bool


@dataclass
class Rule:
    rule_name: str
    description: str
    conditions: list[str]
    action: str
    examples: list[RuleExample]
    confidence: float = 0.0

    def to_dict(self) -> dict:
        return {
            "rule_name": self.rule_name,
            "description": self.description,
            "conditions": self.conditions,
            "action": self.action,
            "examples": [asdict(e) for e in self.examples],
            "confidence": self.confidence,
        }

    @classmethod
    def from_dict(cls, d: dict) -> "Rule":
        examples = [RuleExample(**e) for e in d.get("examples", [])]
        return cls(
            rule_name=d["rule_name"],
            description=d["description"],
            conditions=d.get("conditions", []),
            action=d["action"],
            examples=examples,
            confidence=d.get("confidence", 0.0),
        )

In [None]:
def parse_rule_from_response(response: str) -> Rule | None:
    """Parse a Rule from LLM response. Handles JSON in markdown code blocks."""
    # Try to extract JSON from code blocks first
    json_match = re.search(r"```(?:json)?\s*\n?({[^`]+})\s*\n?```", response, re.DOTALL)
    if json_match:
        json_str = json_match.group(1)
    else:
        # Try to find raw JSON object
        json_match = re.search(
            r"(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})", response, re.DOTALL
        )
        if json_match:
            json_str = json_match.group(1)
        else:
            return None

    try:
        data = json.loads(json_str)
        return Rule.from_dict(data)
    except (json.JSONDecodeError, KeyError, TypeError) as e:
        print(f"Parse error: {e}")
        return None


def save_rules(rules: list[Rule], filepath: str | Path):
    """Save rules to a JSON file."""
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)
    with open(filepath, "w") as f:
        json.dump([r.to_dict() for r in rules], f, indent=2)
    print(f"Saved {len(rules)} rules to {filepath}")


def load_rules(filepath: str | Path) -> list[Rule]:
    """Load rules from a JSON file."""
    with open(filepath) as f:
        data = json.load(f)
    return [Rule.from_dict(d) for d in data]

## 2. Prompt Templates

Different prompt strategies for rule generation. Uses `string.Template` with `$variable` syntax to avoid issues with JSON curly braces.

In [None]:
RULE_FORMAT_INSTRUCTION = """
Output your rule as a JSON object with this exact structure:
```json
{
  "rule_name": "descriptive_snake_case_name",
  "description": "What this rule detects/validates",
  "conditions": ["condition 1", "condition 2"],
  "action": "What to do when rule matches",
  "examples": [
    {"input": "example input", "output": "expected output", "matches": true}
  ],
  "confidence": 0.85
}
```
"""

PROMPTS = {
    "direct": {
        "system": "You are a rule extraction expert. Extract clear, testable rules from examples.",
        "user": Template(
            "Given these examples, create a rule that explains the pattern:\n\n$examples\n"
            + RULE_FORMAT_INSTRUCTION
        ),
    },
    "chain_of_thought": {
        "system": "You are a rule extraction expert. Think step by step to extract clear, testable rules.",
        "user": Template(
            """Given these examples, create a rule that explains the pattern.

Examples:
$examples

Think step by step:
1. What patterns do you notice in the positive examples?
2. What patterns do you notice in the negative examples?
3. What distinguishes positive from negative?
4. Formulate a precise rule.

"""
            + RULE_FORMAT_INSTRUCTION
        ),
    },
    "few_shot": {
        "system": """You are a rule extraction expert. Here's an example of good rule extraction:

Input examples:
- "The cat sat on the mat" -> valid
- "Cat the sat mat on the" -> invalid

Output:
```json
{
  "rule_name": "subject_verb_object_order",
  "description": "English sentences follow Subject-Verb-Object word order",
  "conditions": ["Subject appears before verb", "Verb appears before object"],
  "action": "Mark as grammatically valid if SVO order is maintained",
  "examples": [
    {"input": "The cat sat on the mat", "output": "valid", "matches": true},
    {"input": "Cat the sat mat on the", "output": "invalid", "matches": false}
  ],
  "confidence": 0.9
}
```
Now extract a rule from the user's examples.""",
        "user": Template(
            "Extract a rule from these examples:\n\n$examples\n"
            + RULE_FORMAT_INSTRUCTION
        ),
    },
    "rlm_decompose": {
        "system": """You are an RLM (Recursive Language Model) agent. You solve complex problems by:
1. Breaking them into sub-tasks
2. Solving each sub-task
3. Combining results

For rule extraction:
- Sub-task 1: Identify all positive examples and their common features
- Sub-task 2: Identify all negative examples and their common features  
- Sub-task 3: Find discriminating features (present in positive, absent in negative)
- Sub-task 4: Formulate rule from discriminating features
- Sub-task 5: Validate rule against all examples""",
        "user": Template(
            """Apply RLM decomposition to extract a rule from these examples:

$examples

Show your work for each sub-task, then provide the final rule.
"""
            + RULE_FORMAT_INSTRUCTION
        ),
    },
}

## 3. Model Runner

In [None]:
@dataclass
class ExperimentResult:
    model: str
    prompt_type: str
    examples_used: str
    raw_response: str
    parsed_rule: Rule | None
    usage: dict
    timestamp: str

    def to_dict(self) -> dict:
        return {
            "model": self.model,
            "prompt_type": self.prompt_type,
            "examples_used": self.examples_used,
            "raw_response": self.raw_response,
            "parsed_rule": self.parsed_rule.to_dict() if self.parsed_rule else None,
            "usage": self.usage,
            "timestamp": self.timestamp,
        }

In [None]:
def run_experiment(
    client: OllamaClient,
    examples: str,
    prompt_type: str = "direct",
    model: str | None = None,
) -> ExperimentResult:
    """Run a single rule extraction experiment."""
    prompt_template = PROMPTS[prompt_type]
    user_content = prompt_template["user"].substitute(examples=examples)

    messages = [
        {"role": "system", "content": prompt_template["system"]},
        {"role": "user", "content": user_content},
    ]

    response = client.completion(messages, model=model)
    usage = client.get_last_usage()
    parsed_rule = parse_rule_from_response(response)

    return ExperimentResult(
        model=model or client.model_name,
        prompt_type=prompt_type,
        examples_used=examples,
        raw_response=response,
        parsed_rule=parsed_rule,
        usage=usage,
        timestamp=datetime.now().isoformat(),
    )


def run_comparison(
    client: OllamaClient,
    examples: str,
    prompt_types: list[str] | None = None,
    models: list[str] | None = None,
) -> list[ExperimentResult]:
    """Run experiments across multiple prompt types and/or models."""
    prompt_types = prompt_types or list(PROMPTS.keys())
    models = models or [client.model_name]

    results = []
    for model in models:
        for prompt_type in prompt_types:
            print(f"Running: model={model}, prompt={prompt_type}")
            result = run_experiment(client, examples, prompt_type, model)
            results.append(result)
            print(f"  -> Rule parsed: {result.parsed_rule is not None}")

    return results

## 4. Setup Ollama Client

In [None]:
client = OllamaClient(model_name="llama3.2:3b")

print("Available models:")
for m in client.list_models():
    print(f"  - {m}")

## 5. Custom Examples

In [None]:
EMAIL_EXAMPLES = """
Valid:
- user@example.com
- john.doe@company.org
- test123@sub.domain.net

Invalid:
- userexample.com (missing @)
- user@.com (missing domain name)
- @example.com (missing local part)
- user@example (missing TLD)
"""

PII_EXAMPLES = """
Contains PII (should be flagged):
- "My SSN is 123-45-6789"
- "Call me at 555-123-4567"
- "My credit card is 4111-1111-1111-1111"

No PII (safe):
- "The meeting is at 3pm"
- "Order number: ABC123"
- "The price is $99.99"
"""

CODE_QUALITY_EXAMPLES = """
Good (follows conventions):
- "def calculate_total(items: list) -> float:"
- "class UserService:"
- "MAX_RETRY_COUNT = 3"

Bad (violates conventions):
- "def CalculateTotal(Items):"  # wrong naming convention
- "class user_service:"  # wrong class naming
- "maxRetryCount = 3"  # should be SCREAMING_SNAKE for constants
"""

In [None]:
# Run single experiment
result = run_experiment(client, EMAIL_EXAMPLES, prompt_type="direct")

print("=" * 60)
print("RAW RESPONSE:")
print("=" * 60)
print(result.raw_response)
print()
print("=" * 60)
print("PARSED RULE:")
print("=" * 60)
if result.parsed_rule:
    print(json.dumps(result.parsed_rule.to_dict(), indent=2))
else:
    print("Failed to parse rule")

In [None]:
# Compare prompt types on email examples
email_results = run_comparison(
    client, EMAIL_EXAMPLES, prompt_types=["direct", "chain_of_thought", "rlm_decompose"]
)

In [None]:
# Display comparison results
for r in email_results:
    print(f"\n{'=' * 60}")
    print(f"Prompt: {r.prompt_type}")
    print(f"Tokens: {r.usage}")
    if r.parsed_rule:
        print(f"Rule: {r.parsed_rule.rule_name}")
        print(f"Conditions: {r.parsed_rule.conditions}")
        print(f"Confidence: {r.parsed_rule.confidence}")
    else:
        print("FAILED TO PARSE")

## 6. Dataset Examples (CoLA)

In [None]:
cola_train = load_dataset(DATA.COLA, "train", include_rule_id=True)

print(f"Total examples: {len(cola_train)}")
print(f"Rule IDs: {cola_train.get_rule_ids()[:10]}...")

In [None]:
def format_cola_examples(df, n: int = 5) -> str:
    """Format CoLA DataFrame as examples string.

    Args:
        df: Polars DataFrame with 'text' and 'label' columns
        n: Number of examples per category
    """
    lines = []

    acceptable = df.filter(df["label"] == 1).head(n)
    unacceptable = df.filter(df["label"] == 0).head(n)

    lines.append("Grammatically acceptable:")
    for row in acceptable.iter_rows(named=True):
        lines.append(f'- "{row["text"]}"')

    lines.append("\nGrammatically unacceptable:")
    for row in unacceptable.iter_rows(named=True):
        lines.append(f'- "{row["text"]}"')

    return "\n".join(lines)

In [None]:
# Get examples for a specific rule
rule_ids = cola_train.get_rule_ids()
sample_rule_id = rule_ids[0] if rule_ids else None

if sample_rule_id:
    rule_examples_df = cola_train.filter_by_rule(sample_rule_id)
    print(f"Rule ID: {sample_rule_id}")
    print(f"Examples for this rule: {len(rule_examples_df)}")

    cola_example_str = format_cola_examples(rule_examples_df, n=3)
    print("\nFormatted examples:")
    print(cola_example_str)

In [None]:
# Run rule extraction on CoLA examples
if sample_rule_id:
    cola_result = run_experiment(
        client, cola_example_str, prompt_type="chain_of_thought"
    )

    print("Response:")
    print(cola_result.raw_response[:1000])
    print("\n...")

    if cola_result.parsed_rule:
        print("\nExtracted Rule:")
        print(json.dumps(cola_result.parsed_rule.to_dict(), indent=2))

## 7. Dataset Examples (GSM8K - Math)

In [None]:
gsm8k_train = load_dataset(DATA.GSM8K, "train")

print(f"Total examples: {len(gsm8k_train)}")
print("\nSample question:")
print(gsm8k_train[0].question)
print("\nAnswer:")
print(gsm8k_train[0].answer)

In [None]:
def format_gsm8k_for_pattern(items, n: int = 3) -> str:
    """Format GSM8K items to extract problem-solving patterns."""
    lines = ["Math word problems with solutions:\n"]

    for i, item in enumerate(items[:n], 1):
        lines.append(f"Problem {i}:")
        lines.append(item.question)
        lines.append(f"\nSolution {i}:")
        lines.append(item.answer)
        lines.append("\n" + "-" * 40 + "\n")

    return "\n".join(lines)


gsm8k_example_str = format_gsm8k_for_pattern(list(gsm8k_train)[:3])
print(gsm8k_example_str[:1500])

In [None]:
MATH_PATTERN_PROMPT = {
    "system": """You are a math education expert. Extract problem-solving strategies and patterns.
Focus on: problem types, key information extraction, solution strategies, common pitfalls.""",
    "user": Template(
        """Analyze these math problems and extract a reusable problem-solving rule:

$examples

"""
        + RULE_FORMAT_INSTRUCTION
    ),
}

messages = [
    {"role": "system", "content": MATH_PATTERN_PROMPT["system"]},
    {
        "role": "user",
        "content": MATH_PATTERN_PROMPT["user"].substitute(examples=gsm8k_example_str),
    },
]

math_response = client.completion(messages)
math_rule = parse_rule_from_response(math_response)

print("Math Pattern Rule:")
if math_rule:
    print(json.dumps(math_rule.to_dict(), indent=2))
else:
    print("Failed to parse. Raw response:")
    print(math_response[:1000])

## 8. Multi-Model Comparison

In [None]:
MODELS_TO_COMPARE = [
    "llama3.2:3b",
    # "qwen2.5-coder:32b",
    # "llama3.3:70b",
]

available = set(client.list_models())
models_to_test = [m for m in MODELS_TO_COMPARE if m in available]
print(f"Will test models: {models_to_test}")

In [None]:
if models_to_test:
    multi_model_results = run_comparison(
        client,
        PII_EXAMPLES,
        prompt_types=["direct", "rlm_decompose"],
        models=models_to_test,
    )

    print("\n" + "=" * 60)
    print("SUMMARY")
    print("=" * 60)
    for r in multi_model_results:
        status = "OK" if r.parsed_rule else "FAIL"
        print(f"{r.model:20} | {r.prompt_type:20} | {status}")

## 9. Save Results

In [None]:
OUTPUT_DIR = Path("rules")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

all_rules = []

if "email_results" in dir():
    for r in email_results:
        if r.parsed_rule:
            all_rules.append(r.parsed_rule)

if "cola_result" in dir() and cola_result.parsed_rule:
    all_rules.append(cola_result.parsed_rule)

if "math_rule" in dir() and math_rule:
    all_rules.append(math_rule)

print(f"Total rules extracted: {len(all_rules)}")

In [None]:
if all_rules:
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    save_rules(all_rules, OUTPUT_DIR / f"extracted_rules_{timestamp}.json")

In [None]:
def save_experiment_results(results: list[ExperimentResult], filepath: str | Path):
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)
    with open(filepath, "w") as f:
        json.dump([r.to_dict() for r in results], f, indent=2)
    print(f"Saved {len(results)} experiment results to {filepath}")


all_experiments = []
if "email_results" in dir():
    all_experiments.extend(email_results)
if "multi_model_results" in dir():
    all_experiments.extend(multi_model_results)

if all_experiments:
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    save_experiment_results(
        all_experiments, OUTPUT_DIR / f"experiments_{timestamp}.json"
    )

## 10. Usage Summary

In [None]:
print("Token Usage Summary:")
print(client.get_usage_summary())