# Lab 5 â€” Evaluating LLMs with Ollama

In [None]:
import time
import json
import re
from pathlib import Path
from datetime import datetime, timezone

import pandas as pd
import numpy as np
from tqdm import trange, tqdm
from ollama import Client
import matplotlib.pyplot as plt
import seaborn as sns

from eval_utils import validate_models, should_skip_cot, preview_experiments

In [None]:
np.random.seed(42)

In [None]:
# Set up paths
ROOT = Path(".").resolve()
LAB_DIR = ROOT
OUTPUT_DIR = LAB_DIR / "outputs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
TASKS_DIR = LAB_DIR / 'tasks'
TASKS_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
# Create a client (defaults to http://localhost:11434)
client = Client()
models_resp = client.list()

print("Available models:")
for m in models_resp.models:
    print(" -", m.model)


In [None]:
# Load per-task JSON files from `lab5/tasks/` (no default task definitions in notebook)
task_files = sorted(TASKS_DIR.glob('*.json'))

# Load tasks
TASKS = []
DEV_EXAMPLES = {}
EVAL_EXAMPLES = {}

for f in task_files:
    d = json.loads(f.read_text(encoding='utf-8'))
    TASKS.append({'id': d['id'], 'name': d['name'], 'description': d.get('description',''), 'eval_criteria': d.get('eval_criteria','')})
    DEV_EXAMPLES[d['id']] = d.get('dev_examples', [])
    EVAL_EXAMPLES[d['id']] = d.get('eval_example', {})

print(f"Loaded {len(TASKS)} tasks.")

In [None]:
# Prompt builders
def zero_shot_prompt(task, instance=None):
    base = f"Task: {task['name']}\nDescription: {task.get('description','')}\nEvaluation criteria: {task.get('eval_criteria','')}\n"
    if instance:
        base += f"Input: {instance}\n"
    base += "\nPlease provide a concise, direct answer."
    return base


def few_shot_prompt(task, dev_examples, instance):
    prompt = f"Task: {task['name']}\nDescription: {task.get('description','')}\nEvaluation criteria: {task.get('eval_criteria','')}\n\nHere are a few examples:\n"
    for ex in dev_examples:
        prompt += f"Input: {ex['input']}\nOutput: {ex['output']}\n---\n"
    prompt += f"Now, Input: {instance}\nPlease write Output:" 
    return prompt


def cot_prompt(task, instance=None):
    prompt = zero_shot_prompt(task, instance)
    prompt += "\nLet's think step by step before answering."
    return prompt

In [None]:
def query_chat(model: str, prompt: str, temperature: float = 0.0, max_tokens: int = 1024, stream: bool = False):
    """Query a model using chat-style messages and return a dict with content and metadata."""
    messages = [{'role':'user', 'content': prompt}]
    start = time.time()
    try:
        resp = client.chat(model=model, messages=messages)
        elapsed = time.time() - start
        # resp is a ChatResponse; access message content
        content = resp['message']['content'] if isinstance(resp, dict) else getattr(resp, 'message', {}).get('content', '')
        return {
            'model': model,
            'prompt': prompt,
            'response': content,
            'elapsed': elapsed,
            'success': True,
            'raw': resp
        }
    except Exception as e:
        elapsed = time.time() - start
        return {
            'model': model,
            'prompt': prompt,
            'response': '',
            'elapsed': elapsed,
            'success': False,
            'error': str(e)
        }


def query_generate(model: str, prompt: str, temperature: float = 0.0, max_tokens: int = 1024):
    start = time.time()
    try:
        resp = client.generate(model=model, prompt=prompt)
        elapsed = time.time() - start
        content = resp['message']['content'] if isinstance(resp, dict) else getattr(resp, 'message', {}).get('content', '')
        return {
            'model': model,
            'prompt': prompt,
            'response': content,
            'elapsed': elapsed,
            'success': True,
            'raw': resp
        }
    except Exception as e:
        elapsed = time.time() - start
        return {
            'model': model,
            'prompt': prompt,
            'response': '',
            'elapsed': elapsed,
            'success': False,
            'error': str(e)
        }


In [None]:
# Prepare helper to save prompt+response runs back to the corresponding task file (one file per category)
def save_prompt_run(task_id, run_entry):
    # Find the task file and append the run entry into its 'runs' list
    candidates = list(TASKS_DIR.glob(f"{int(task_id)}_*.json"))
    if not candidates:
        return
    p = candidates[0]
    d = json.loads(p.read_text(encoding='utf-8'))
    d.setdefault('runs', []).append(run_entry)
    p.write_text(json.dumps(d, ensure_ascii=False, indent=2))


def run_evaluation(models: list, strategies: list, examples: dict = None, save_prefix: str = 'results'):
    if examples is None:
        examples = EVAL_EXAMPLES

    # Normalize and validate model selection
    norm_models = validate_models(models)

    # Preview experiments and warn if something is off
    expected_runs, warnings = preview_experiments(TASKS, norm_models, strategies)
    if warnings:
        for w in warnings:
            print('Warning:', w)
    print(f'Running {expected_runs} experiment runs (skipping CoT for reasoning models)')

    results = []
    pbar = trange(expected_runs, desc='Running experiments')
    for task in TASKS:
        task_id = task['id']
        ex = examples.get(task_id, {})
        for model in norm_models:
            for strategy in strategies:
                # Skip CoT for reasoning-specialized models
                if should_skip_cot(model, strategy):
                    # record a skipped entry for traceability
                    results.append({
                        'task_id': task_id,
                        'task_name': task['name'],
                        'strategy': strategy,
                        'model': model['name'],
                        'prompt': None,
                        'response': '',
                        'elapsed': 0.0,
                        'success': False,
                        'error': 'CoT skipped for reasoning model',
                        'timestamp': datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
                    })
                    pbar.update(1)
                    continue

                model_name = model['name']
                if strategy == 'zero':
                    prompt = zero_shot_prompt(task, ex.get('input'))
                elif strategy == 'few':
                    prompt = few_shot_prompt(task, DEV_EXAMPLES.get(task_id, [])[:2], ex.get('input'))
                elif strategy == 'cot':
                    prompt = cot_prompt(task, ex.get('input'))
                else:
                    raise ValueError('Unknown strategy')

                r = query_chat(model_name, prompt)
                entry = {
                    'task_id': task_id,
                    'task_name': task['name'],
                    'strategy': strategy,
                    'model': model_name,
                    'prompt': prompt,
                    'response': r.get('response',''),
                    'elapsed': r.get('elapsed', None),
                    'success': r.get('success', False),
                    'error': r.get('error', None),
                    'timestamp': datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
                }
                results.append(entry)

                # Persist the prompt and response back into the per-task JSON file
                save_prompt_run(task_id, {
                    'timestamp': entry['timestamp'],
                    'model': model_name,
                    'strategy': strategy,
                    'prompt': prompt,
                    'response': entry['response'],
                    'elapsed': entry['elapsed'],
                    'success': entry['success'],
                    'error': entry['error']
                })

                pbar.update(1)
    pbar.close()

    # Save results
    ts = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
    out_json = OUTPUT_DIR / f"{save_prefix}_{ts}.jsonl"
    with open(out_json, 'w', encoding='utf-8') as fh:
        for r in results:
            fh.write(json.dumps(r, ensure_ascii=False) + '\n')

    # Save a CSV summary for manual annotation
    df = pd.DataFrame(results)
    csv_path = OUTPUT_DIR / f"{save_prefix}_{ts}.csv"
    df.to_csv(csv_path, index=False)

    print(f"Saved {len(results)} results to {out_json} and {csv_path}")

    return results

In [None]:
# Basic metrics and aggregation
def exact_match(pred: str, gold: str) -> bool:
    if gold is None:
        return False
    # simple normalization
    def norm(s):
        return ''.join(c.lower() for c in s if c.isalnum())
    return norm(pred) == norm(gold)

def token_overlap(pred: str, gold: str) -> float:
    if not gold or not pred:
        return 0.0
    def get_tokens(s):
        return set(re.findall(r'\w+', s.lower()))
    
    pred_tokens = get_tokens(pred)
    gold_tokens = get_tokens(gold)
    
    if not gold_tokens:
        return 0.0
        
    intersect = pred_tokens.intersection(gold_tokens)
    # Recall-oriented: how many of the gold tokens did the model produce?
    return len(intersect) / len(gold_tokens)

def compute_metrics(results: list, examples: dict):
    df = pd.DataFrame(results)
    # add expected where available
    df['expected'] = df['task_id'].map(lambda tid: examples.get(tid, {}).get('expected'))
    
    # Simple exact match
    df['exact_match'] = df.apply(lambda r: exact_match(r['response'], r['expected']) if r['expected'] else None, axis=1)
    
    # Soft token overlap (better for open-ended answers)
    df['overlap_score'] = df.apply(lambda r: token_overlap(r['response'], r['expected']) if r['expected'] else 0.0, axis=1)

    # Compute per-task per-model-strategy metrics
    agg = df.groupby(['task_id','task_name','model','strategy']).agg(
        n=('response','size'),
        n_exact=('exact_match', lambda x: sum(1 for v in x if v is True)),
        avg_overlap=('overlap_score', 'mean')
    ).reset_index()
    agg['accuracy'] = agg['n_exact'] / agg['n']
    return df, agg

In [None]:
# Define the models and strategies for the experiment
selected_models = [
    'SpeakLeash/bielik-1.5b-v3.0-instruct:Q8_0',
    'ministral-3:3b',
    'deepseek-r1:7b'
]
strategies = ['zero', 'few', 'cot']

# Execute the evaluation
# This will iterate through 10 tasks, 3 models, and up to 3 strategies per model.
# Total expected runs: 10 * (2*3 + 1*2) = 10 * 8 = 80 runs.
results = run_evaluation(
    models=selected_models,
    strategies=strategies,
    examples=EVAL_EXAMPLES,
    save_prefix='lab5_experiment'
)

# Compute and display metrics
df_results, agg_metrics = compute_metrics(results, EVAL_EXAMPLES)

print("\n--- Evaluation Summary ---")
# Pivot the results for a better view: Models as columns, Tasks as rows
summary_pivot = agg_metrics.pivot_table(
    index=['task_id', 'task_name'], 
    columns=['model', 'strategy'], 
    values='accuracy'
)
display(summary_pivot)

# Save the final aggregated metrics to a file for the report
agg_metrics.to_csv(OUTPUT_DIR / "final_experiment_results.csv", index=False)
print(f"\nFinal results saved to {OUTPUT_DIR / 'final_experiment_results.csv'}")

### LLM-as-a-Judge Evaluation
Since `exact_match` is too strict for open-ended tasks, we use a reasoning model (`deepseek-r1:7b`) to evaluate the quality of the responses based on the task criteria. 

The judge will assign a score from 0 to 5. We then normalize this to a 0-1 scale.

In [None]:
def get_judge_score(task, prompt, response, judge_model='deepseek-r1:7b'):
    """Use an LLM to judge the quality of a response on a scale of 0-5."""
    if not response:
        return 0.0
        
    criteria = task.get('eval_criteria', 'Correctness and relevance')
    task_name = task.get('name', 'General Task')
    
    judge_prompt = f"""Evaluate the following LLM response based on the task description and evaluation criteria.
    
Task: {task_name}
Criteria: {criteria}

User Prompt: {prompt}
LLM Response: {response}

Give a score from 0 to 5, where:
0: Completely irrelevant or incorrect
1: Major issues, barely follows prompt
2: Follows prompt but has significant errors
3: Good response, minor issues
4: Very good response, follows almost all criteria
5: Perfect response

Return ONLY the numeric score (0, 1, 2, 3, 4, or 5). Do not provide any explanation."""

    try:
        # Use simple chat query
        messages = [{'role': 'user', 'content': judge_prompt}]
        res = client.chat(model=judge_model, messages=messages)
        content = res['message']['content']
        
        # Remove reasoning blocks if present (DeepSeek-R1 style)
        content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
        
        # Find the first digit in the response
        match = re.search(r'([0-5])', content)
        if match:
            return float(match.group(1))
        return 0.0
    except Exception as e:
        print(f"Error judging response: {e}")
        return 0.0

def run_judge_evaluation(results_df, tasks, judge_model='nemotron-3-nano:latest'):
    print(f"Judging {len(results_df)} responses using {judge_model}...")
    scores = []
    
    # Map tasks by ID for easy lookup
    task_map = {t['id']: t for t in tasks}
    
    for _, row in tqdm(results_df.iterrows(), total=len(results_df)):
        # Skip if error occurred during generation
        if not row['success'] or not row['response']:
            scores.append(0.0)
            continue
            
        task = task_map.get(row['task_id'], {})
        score = get_judge_score(task, row['prompt'], row['response'], judge_model)
        scores.append(score)
        
    results_df['judge_score'] = scores
    results_df['normalized_judge_score'] = results_df['judge_score'] / 5.0
    return results_df

# Run the judge on our results
# Note: To save time, you could run this on a subset or only for non-exact matches
df_results = run_judge_evaluation(df_results, TASKS)

# Re-compute aggregate metrics with judge scores
agg_metrics = df_results.groupby(['task_id','task_name','model','strategy']).agg(
    n=('response','size'),
    avg_accuracy=('exact_match', 'mean'),
    avg_overlap=('overlap_score', 'mean'),
    avg_judge_score=('normalized_judge_score', 'mean')
).reset_index()

display(agg_metrics.sort_values('avg_judge_score', ascending=False).head(10))

In [None]:
# Visualizing the impact of prompting strategies using Judge Scores
plt.figure(figsize=(12, 6))
sns.barplot(data=agg_metrics, x='strategy', y='avg_judge_score', hue='model')
plt.title('Impact of Prompting Strategy (Judge Score) across Models')
plt.ylabel('Average Judge Score (Normalized 0-1)')
plt.xlabel('Prompting Strategy')
plt.legend(title='Model', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(OUTPUT_DIR / "strategy_comparison_judge.png")
plt.show()

# Performance by Task Type using Judge Scores
plt.figure(figsize=(14, 7))
sns.barplot(data=agg_metrics, x='task_name', y='avg_judge_score', hue='model')
plt.title('Model Performance by Task Type (Judge Score)')
plt.xticks(rotation=45, ha='right')
plt.ylabel('Average Judge Score (Normalized 0-1)')
plt.legend(title='Model', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(OUTPUT_DIR / "task_performance_judge.png")
plt.show()

# Distribution Comparison
plt.figure(figsize=(10, 5))
melted_metrics = agg_metrics.melt(id_vars=['model'], value_vars=['avg_accuracy', 'avg_judge_score'], 
                                   var_name='Metric', value_name='Score')
sns.boxplot(data=melted_metrics, x='Metric', y='Score')
plt.title('Exact Match Accuracy vs. LLM Judge Score')
plt.show()