<a href="https://colab.research.google.com/github/hiroaki-com/ollama-llm-benchmark/blob/main/ollama_multi_model_benchmarker_en.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

A completely free benchmarking tool for automatically comparing Ollama model performance in Google Colab environment.

Key Features:
- üîÑ Flexible Model Selection (Easy selection via comma-separated input + checkboxes)
- üéØ Single Source of Truth Design (Model list managed in one place for easy editing)
- üìä Comprehensive Performance Metrics (Measures generation speed, TTFT, model size, quantization level, etc.)
- üíæ Automatic Result Saving (Saves integrated JSON, session archives, and size cache to `Google Drive` under `MyDrive`)
- üìà Visualization Reports (Displays results instantly with graphs + Markdown tables + response previews)

How to Use:
1. üìã Run the `Model Registry` cell to load the model list
2. ‚úÖ Select test targets using `checkboxes`
3. üß™ Run the `Ollama Multi-Model Benchmarker` cell to start measurement

In [None]:
#@title üìã Model Registry

# @markdown Model Configuration
# @markdown - Enter test target models separated by commas. Search for model names at https://ollama.com/search and verify the official names.
# @markdown
# @markdown - üí° Selection guideline for `T4 GPU` environment: `8B` (recommended) | `14B` (practical) | `20B`+ (slow)

model_list = "qwen3:8b, qwen3:14b, qwen2.5-coder:7b, qwen2.5-coder:14b, ministral-3:8b, ministral-3:14b, devstral-small-2:24b, gpt-oss:20b, deepseek-r1:8b" #@param {type:"string"}

AVAILABLE_MODELS = [
    model.strip()
    for model in model_list.split(',')
    if model.strip()
]

if not AVAILABLE_MODELS:
    raise ValueError("‚ùå Model list is empty. Please enter at least one model.")

from IPython.display import display, HTML
import ipywidgets as widgets

def create_model_selector():
    checkboxes = []

    select_all = widgets.Checkbox(
        value=True,
        description='‚úÖ Select All Models',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='300px')
    )

    model_checks = {}
    for model in AVAILABLE_MODELS:
        cb = widgets.Checkbox(
            value=True,
            description=model,
            indent=False,
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )
        model_checks[model] = cb
        checkboxes.append(cb)

    def on_select_all_change(change):
        for cb in checkboxes:
            cb.value = change['new']

    select_all.observe(on_select_all_change, names='value')

    header = widgets.HTML('<h3>üì¶ Available Models</h3><p style="margin: 5px 0 10px 0; font-size: 13px;">Select all or uncheck individually to narrow down test targets. After selection, run the next cell.</p>')
    separator = widgets.HTML('<hr style="margin: 10px 0;">')

    ui = widgets.VBox([
        header,
        select_all,
        separator,
        widgets.VBox(checkboxes, layout=widgets.Layout(padding='0 0 0 20px'))
    ])

    display(ui)

    return select_all, model_checks

select_all_widget, model_checkboxes = create_model_selector()

print(f"‚úÖ Model list loaded: {len(AVAILABLE_MODELS)} models available.")
print("‚û°Ô∏è Please run the next cell (Benchmarker).")

In [None]:
#@title üß™ Ollama Multi-Model Benchmarker

# @markdown Benchmark Configuration
# @markdown - `save_to_drive`: Set to `True` to save results to Google Drive
# @markdown - `timeout_seconds`: Maximum processing time per model (seconds)
# @markdown - `custom_test_prompt`: If left blank, default prompt (Python code generation) will be used
# @markdown

save_to_drive = False #@param {type:"boolean"}
timeout_seconds = 1000 #@param {type:"integer"}
custom_test_prompt = "" #@param {type:"string"}

import os
import sys
import subprocess
import time
import json
import requests
import shutil
import warnings
import atexit
import psutil
import platform
import matplotlib.pyplot as plt
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from IPython.display import display, Markdown

warnings.filterwarnings("ignore", category=DeprecationWarning)

@dataclass
class Config:
    OLLAMA_BINARY: str = "/usr/local/bin/ollama"
    OLLAMA_HOST: str = "0.0.0.0:11434"
    OLLAMA_API_BASE: str = "http://0.0.0.0:11434"

    PULL_MAX_RETRIES: int = 3
    PULL_BACKOFF_BASE: int = 5

    SERVER_STARTUP_MAX_ATTEMPTS: int = 30
    SERVER_STARTUP_POLL_INTERVAL: int = 1
    SERVER_HEALTH_CHECK_TIMEOUT: int = 2

    WARMUP_TIMEOUT: int = 300
    WARMUP_NUM_PREDICT: int = 1
    MODEL_UNLOAD_WAIT: int = 2
    MODEL_UNLOAD_TIMEOUT: int = 10

    DEFAULT_PROMPT: str = "Write a recursive Python function with type hints and a docstring to compute the factorial of a number, test it with n = 5, and show only the code and the expected result."
    MAX_PROMPT_CHARS: int = 500
    MAX_RESPONSE_DISPLAY_CHARS: int = 1500

    DISK_SAFETY_MARGIN_GB: int = 2
    UNKNOWN_MODEL_MIN_FREE_GB: int = 20

    NUM_CTX: int = 4096
    TEMPERATURE: float = 0.0

    CACHE_FILENAME: str = "model_size_cache.json"

class C:
    RESET = '\033[0m'
    GREEN = '\033[32m'
    RED = '\033[31m'
    YELLOW = '\033[33m'
    BLUE = '\033[34m'
    CYAN = '\033[36m'
    MAGENTA = '\033[35m'
    WHITE = '\033[37m'
    DIM = '\033[2m'
    BOLD = '\033[1m'

config = Config()
session_id = datetime.utcnow().strftime('%Y%m%d_%H%M%S')

ollama_process: Optional[subprocess.Popen] = None

try:
    if select_all_widget.value:
        selected_models = AVAILABLE_MODELS.copy()
    else:
        selected_models = [
            model for model, checkbox in model_checkboxes.items()
            if checkbox.value
        ]
except NameError:
    print(f"{C.RED}‚ùå Error: Model Registry not loaded{C.RESET}")
    print(f"{C.YELLOW}Please run the 'Model Registry' cell first.{C.RESET}")
    raise SystemExit("Model Registry cell must be executed before benchmark")

if not selected_models:
    print(f"{C.RED}‚ùå Error: No models selected{C.RESET}")
    print(f"{C.YELLOW}Please select at least one model in the Model Registry cell.{C.RESET}")
    raise SystemExit("At least one model must be selected")

def cleanup_ollama_server() -> None:
    global ollama_process
    if ollama_process:
        try:
            ollama_process.terminate()
            ollama_process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            try:
                ollama_process.kill()
            except ProcessLookupError:
                pass
        except ProcessLookupError:
            pass
        except Exception as e:
            print(f"{C.YELLOW}Warning: Failed to cleanup Ollama process: {e}{C.RESET}", file=sys.stderr)

def trim_to_boundary(text: str, limit: int) -> str:
    if len(text) <= limit:
        return text
    candidate = text[:limit]
    for sep in ["\n", "„ÄÇ", ".", "„ÄÅ", ",", " "]:
        idx = candidate.rfind(sep)
        if idx > limit // 2:
            return candidate[:idx + len(sep)].rstrip()
    return candidate.rstrip()

if custom_test_prompt.strip():
    resolved_prompt = trim_to_boundary(custom_test_prompt.strip(), config.MAX_PROMPT_CHARS)
else:
    resolved_prompt = config.DEFAULT_PROMPT

TEST_PROMPTS: List[Dict[str, str]] = [
    {
        "name": "Python Factorial" if not custom_test_prompt.strip() else "Custom Prompt",
        "prompt": resolved_prompt,
        "expected": "Recursive factorial function" if not custom_test_prompt.strip() else "Custom"
    }
]

print("Ollama Multi-Model Benchmarker")
print(f"Models: {len(selected_models)} | Timeout: {timeout_seconds}s")
if custom_test_prompt.strip() and len(custom_test_prompt.strip()) != len(resolved_prompt):
    print(f"{C.YELLOW}  ‚Ä∫ Prompt truncated at boundary ({len(resolved_prompt)} chars){C.RESET}")
print()
print("Selected Models:")
for idx, model in enumerate(selected_models, 1):
    print(f"  {idx}. {model}")
print()
print("Metrics Definition:")
print(f"  {'t/s':<10} : Tokens per Second ... Generation speed")
print(f"  {'TTFT':<10} : Time To First Token . Response latency")
print(f"  {'Total':<10} : End-to-End Time ..... Total processing time")
print(f"  {'Size':<10} : Model Size .......... Disk/VRAM usage")
print()

if save_to_drive:
    from google.colab import drive
    drive.mount('/content/drive')

    BASE_DIR = '/content/drive/MyDrive/OllamaBenchmarks'
    RESULTS_FILE = f'{BASE_DIR}/benchmark_results.json'
    ARCHIVE_DIR = f'{BASE_DIR}/session_logs'
    CACHE_FILE = f'{BASE_DIR}/{config.CACHE_FILENAME}'

    os.makedirs(BASE_DIR, exist_ok=True)
    os.makedirs(ARCHIVE_DIR, exist_ok=True)
else:
    BASE_DIR = None
    RESULTS_FILE = None
    ARCHIVE_DIR = None
    CACHE_FILE = None

def load_size_cache() -> Dict[str, float]:
    if save_to_drive and CACHE_FILE and os.path.exists(CACHE_FILE):
        try:
            with open(CACHE_FILE, 'r') as f:
                return json.load(f)
        except Exception as e:
            print(f"{C.YELLOW}Warning: Failed to load size cache: {e}{C.RESET}")
    return {}

def update_size_cache(model_name: str, size_gb: float) -> None:
    if not save_to_drive or not CACHE_FILE:
        return

    try:
        cache = load_size_cache()
        cache[model_name] = size_gb
        with open(CACHE_FILE, 'w') as f:
            json.dump(cache, f, indent=2)
    except Exception as e:
        print(f"{C.YELLOW}Warning: Failed to update size cache: {e}{C.RESET}")

def get_disk_usage() -> Dict[str, float]:
    total, used, free = shutil.disk_usage("/")
    return {
        "total_gb": round(total / (1024**3), 2),
        "used_gb": round(used / (1024**3), 2),
        "free_gb": round(free / (1024**3), 2)
    }

def get_cpu_info() -> str:
    try:
        if platform.system() == "Linux":
            with open("/proc/cpuinfo", "r") as f:
                for line in f:
                    if "model name" in line:
                        return line.split(":")[1].strip()
        return platform.processor() or "Unknown CPU"
    except:
        return "Unknown CPU"

def get_system_info() -> Dict[str, Any]:
    try:
        gpu_info = !nvidia-smi --query-gpu=name,memory.total --format=csv,noheader,nounits
        if gpu_info:
            parts = gpu_info[0].split(',')
            gpu_name = parts[0].strip()
            vram_gb = int(parts[1].strip())
        else:
            gpu_name = "Unknown/None"
            vram_gb = 0
    except Exception as e:
        gpu_name = "Unknown"
        vram_gb = 0

    cpu_name = get_cpu_info()
    cpu_cores = os.cpu_count()
    try:
        ram_obj = psutil.virtual_memory()
        ram_total_gb = round(ram_obj.total / (1024**3), 2)
        ram_available_gb = round(ram_obj.available / (1024**3), 2)
    except Exception:
        ram_total_gb = 0
        ram_available_gb = 0

    disk_info = get_disk_usage()

    return {
        "gpu": gpu_name,
        "vram_gb": vram_gb,
        "cpu": cpu_name,
        "cpu_cores": cpu_cores,
        "ram_total_gb": ram_total_gb,
        "ram_available_gb": ram_available_gb,
        "disk_total_gb": disk_info["total_gb"],
        "disk_free_gb": disk_info["free_gb"],
        "platform": platform.platform()
    }

def get_model_details(model_name: str) -> Dict[str, Any]:
    try:
        response = requests.post(
            f"{config.OLLAMA_API_BASE}/api/show",
            json={"name": model_name},
            timeout=5
        )
        if response.status_code == 200:
            generate_response_data = response.json()
            details = generate_response_data.get("details", {})
            return {
                "quantization": details.get("quantization_level", "Unknown"),
                "family": details.get("family", "Unknown"),
                "parameter_size": details.get("parameter_size", "Unknown")
            }
    except Exception:
        pass
    return {"quantization": "Unknown", "family": "Unknown", "parameter_size": "Unknown"}

def get_installed_model_size(model_name: str) -> Optional[float]:
    try:
        response = requests.get(f"{config.OLLAMA_API_BASE}/api/tags", timeout=5)
        if response.status_code == 200:
            models = response.json().get("models", [])
            for m in models:
                if m["name"] == model_name or m["name"] == f"{model_name}:latest":
                    size_bytes = m.get("size", 0)
                    return round(size_bytes / (1024**3), 2)
    except Exception:
        pass
    return None

!apt-get update -qq
!apt-get install -y -qq zstd
!pip install -q psutil matplotlib ipywidgets

print()
!curl -fsSL https://ollama.com/install.sh | sh

print()

os.environ['OLLAMA_HOST'] = config.OLLAMA_HOST
os.environ['OLLAMA_KEEP_ALIVE'] = '5m'
os.environ['OLLAMA_MAX_LOADED_MODELS'] = '1'
os.environ['OLLAMA_FLASH_ATTENTION'] = '1'

startup_start = time.time()

ollama_process = subprocess.Popen(
    [config.OLLAMA_BINARY, "serve"],
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL
)

for attempt in range(config.SERVER_STARTUP_MAX_ATTEMPTS):
    try:
        response = requests.get(
            f"{config.OLLAMA_API_BASE}/api/tags",
            timeout=config.SERVER_HEALTH_CHECK_TIMEOUT
        )
        if response.status_code == 200:
            startup_time = round(time.time() - startup_start, 2)
            print(f"{C.GREEN}‚úÖ Ollama server ready in {startup_time}s{C.RESET}")
            atexit.register(cleanup_ollama_server)
            break
    except requests.RequestException:
        pass
    time.sleep(config.SERVER_STARTUP_POLL_INTERVAL)
else:
    raise RuntimeError("‚ùå Failed to start server")

sys_info = get_system_info()
print()
print(f"{C.BOLD}System Information ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ{C.RESET}")
print(f"  OS   : {sys_info['platform']}")
print(f"  CPU  : {sys_info['cpu']} ({sys_info['cpu_cores']} cores)")
print(f"  RAM  : {sys_info['ram_total_gb']} GB")
print(f"  GPU  : {sys_info['gpu']} ({sys_info['vram_gb']} GB VRAM)")
print()

def pull_model_with_retry(model_name: str, env: Dict[str, str], timeout: int) -> subprocess.CompletedProcess:
    for attempt in range(1, config.PULL_MAX_RETRIES + 1):
        try:
            result = subprocess.run(
                [config.OLLAMA_BINARY, "pull", model_name],
                capture_output=True,
                text=True,
                timeout=timeout,
                env=env
            )
            if result.returncode == 0:
                return result
            if attempt < config.PULL_MAX_RETRIES:
                wait = config.PULL_BACKOFF_BASE * (2 ** (attempt - 1))
                print(f"  {C.YELLOW}‚Ä∫ {'Retry':<7}{C.RESET} Pull failed (attempt {attempt}/{config.PULL_MAX_RETRIES}) ‚Äî retrying in {wait}s ...")
                time.sleep(wait)
        except subprocess.TimeoutExpired as e:
            if attempt >= config.PULL_MAX_RETRIES:
                raise
            wait = config.PULL_BACKOFF_BASE * (2 ** (attempt - 1))
            print(f"  {C.YELLOW}‚Ä∫ {'Retry':<7}{C.RESET} Pull timeout (attempt {attempt}/{config.PULL_MAX_RETRIES}) ‚Äî retrying in {wait}s ...")
            time.sleep(wait)
    raise RuntimeError(f"Pull failed after {config.PULL_MAX_RETRIES} attempts: {result.stderr}")

def warmup_model(model_name: str) -> float:
    try:
        warmup_res = requests.post(
            f"{config.OLLAMA_API_BASE}/api/generate",
            json={
                "model": model_name,
                "prompt": "warmup",
                "stream": False,
                "options": {
                    "num_ctx": config.NUM_CTX,
                    "num_predict": config.WARMUP_NUM_PREDICT
                }
            },
            timeout=config.WARMUP_TIMEOUT
        )
        if warmup_res.status_code == 200:
            warmup_data = warmup_res.json()
            return round(warmup_data.get("total_duration", 0) / 1e9, 2)
    except requests.RequestException as e:
        print(f"  {C.YELLOW}‚Ä∫ {'Warning':<7}{C.RESET} Warmup failed: {e}", file=sys.stderr)
    return 0.0

def unload_model(model_name: str) -> None:
    try:
        requests.post(
            f"{config.OLLAMA_API_BASE}/api/generate",
            json={"model": model_name, "keep_alive": 0},
            timeout=config.MODEL_UNLOAD_TIMEOUT
        )
        time.sleep(config.MODEL_UNLOAD_WAIT)
    except requests.RequestException as e:
        print(f"  {C.YELLOW}‚Ä∫ {'Warning':<7}{C.RESET} Unload failed: {e}", file=sys.stderr)

def delete_model(model_name: str, env: Dict[str, str]) -> None:
    try:
        subprocess.run(
            [config.OLLAMA_BINARY, "rm", model_name],
            capture_output=True,
            text=True,
            env=env,
            timeout=30
        )
    except subprocess.TimeoutExpired as e:
        print(f"  {C.YELLOW}‚Ä∫ {'Warning':<7}{C.RESET} Delete timeout: {e}", file=sys.stderr)

def save_benchmark_result(benchmark_result: Dict[str, Any], results_file: str) -> None:
    try:
        if os.path.exists(results_file):
            with open(results_file, 'r', encoding='utf-8') as f:
                all_data = json.load(f)
        else:
            all_data = {
                "schema_version": "1.1",
                "last_updated": None,
                "benchmarks": []
            }

        all_data["benchmarks"].append(benchmark_result)
        all_data["last_updated"] = datetime.utcnow().isoformat() + "Z"

        temp_file = results_file + ".tmp"
        with open(temp_file, 'w', encoding='utf-8') as f:
            json.dump(all_data, f, indent=2, ensure_ascii=False)
        os.replace(temp_file, results_file)

    except (IOError, json.JSONDecodeError) as e:
        print(f"  {C.YELLOW}‚Ä∫ {'Warning':<7}{C.RESET} Save error: {str(e)}")

session_start_time = datetime.utcnow()
benchmark_results: List[Dict[str, Any]] = []
successful_tests = 0
failed_tests = 0

model_size_cache = load_size_cache()

selected_models.sort(key=lambda m: model_size_cache.get(m, 0))

for model_idx, model_name in enumerate(selected_models, 1):
    print(f"{C.BLUE}‚ñ∂{C.RESET} {C.BOLD}[{model_idx}/{len(selected_models)}] {model_name}{C.RESET}")

    disk_before = get_disk_usage()

    cached_size = model_size_cache.get(model_name)
    if cached_size is not None:
        required_space = cached_size + config.DISK_SAFETY_MARGIN_GB
        size_str = f"{cached_size}GB (cached)"
    else:
        required_space = config.UNKNOWN_MODEL_MIN_FREE_GB
        size_str = "Unknown (Checking >20GB)"

    if disk_before['free_gb'] < required_space:
        print(f"  {C.RED}‚Ä∫ {'Skip':<7}{C.RESET} Insufficient disk space (need {required_space:.1f}GB, free {disk_before['free_gb']}GB)")

        failed_metrics = {
            "model": model_name,
            "error": "Insufficient disk space",
            "required_gb": required_space,
            "free_gb": disk_before['free_gb']
        }
        benchmark_results.append({
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "model": model_name,
            "environment": sys_info,
            "metrics": failed_metrics
        })
        failed_tests += 1
        print()
        continue

    metrics: Dict[str, Any] = {
        "model": model_name,
        "pull_time": 0,
        "model_load_time": 0,
        "model_size_gb": cached_size if cached_size else 0,
        "meta": {},
        "tests": [],
        "error": None
    }

    try:
        print(f"  {C.DIM}‚Ä∫ {'Setup':<7}{C.RESET} Free: {disk_before['free_gb']}GB | Est. Size: {size_str}")

        env = os.environ.copy()
        env['OLLAMA_HOST'] = config.OLLAMA_HOST
        env['HOME'] = '/root'

        pull_start = time.time()
        pull_model_with_retry(model_name, env, timeout_seconds)
        metrics["pull_time"] = round(time.time() - pull_start, 2)

        model_details = get_model_details(model_name)
        metrics["meta"] = model_details
        quant_disp = model_details['quantization']

        real_size = get_installed_model_size(model_name)
        if real_size:
            metrics["model_size_gb"] = real_size
            if cached_size != real_size:
                update_size_cache(model_name, real_size)
                model_size_cache[model_name] = real_size
            size_display = f"{real_size}GB"
        else:
            size_display = "Unknown"

        print(f"  {C.DIM}‚Ä∫ {'Pull':<7}{C.RESET} {C.GREEN}Download complete {metrics['pull_time']}s{C.RESET} {C.DIM}({size_display}, {quant_disp}){C.RESET}")

        metrics["model_load_time"] = warmup_model(model_name)
        print(f"  {C.DIM}‚Ä∫ {'Load':<7}{C.RESET} {C.GREEN}VRAM load complete {metrics['model_load_time']}s{C.RESET}")

        for test_idx, test in enumerate(TEST_PROMPTS, 1):
            try:
                response = requests.post(
                    f"{config.OLLAMA_API_BASE}/api/generate",
                    json={
                        "model": model_name,
                        "prompt": test["prompt"],
                        "stream": False,
                        "options": {
                            "num_ctx": config.NUM_CTX,
                            "temperature": config.TEMPERATURE
                        }
                    },
                    timeout=timeout_seconds
                )

                if response.status_code == 200:
                    generate_response_data = response.json()
                    response_text = generate_response_data.get("response", "")

                    test_metrics = {
                        "name": test["name"],
                        "prompt": test["prompt"],
                        "response": response_text,
                        "total_time": round(generate_response_data.get("total_duration", 0) / 1e9, 2),
                        "first_token_time": round(generate_response_data.get("prompt_eval_duration", 0) / 1e9, 2),
                        "tokens": generate_response_data.get("eval_count", 0),
                        "tokens_per_sec": 0
                    }

                    eval_duration = generate_response_data.get("eval_duration", 0)
                    if eval_duration > 0:
                        test_metrics["tokens_per_sec"] = round(
                            test_metrics["tokens"] / (eval_duration / 1e9), 2
                        )

                    metrics["tests"].append(test_metrics)

                    print(f"  {C.DIM}‚Ä∫ {'Test':<7}{C.RESET} {test['name']}")
                    print(f"  {C.DIM}‚Ä∫ {'Stats':<7}{C.RESET} {C.CYAN}{test_metrics['tokens_per_sec']:>6.2f} t/s{C.RESET} {C.DIM}| TTFT {test_metrics['first_token_time']:>5.2f}s | {test_metrics['tokens']:>4} tokens{C.RESET}")

                else:
                    print(f"  {C.RED}‚Ä∫ {'Fail':<7}{C.RESET} HTTP {response.status_code}")

            except requests.RequestException as e:
                print(f"  {C.RED}‚Ä∫ {'Error':<7}{C.RESET} {str(e)}")

        unload_model(model_name)
        delete_model(model_name, env)

        print(f"  {C.DIM}‚Ä∫ {'Cleanup':<7}{C.RESET} Resources released")
        print(f"  {C.GREEN}‚úÖ Pass{C.RESET}")

        successful_tests += 1

    except subprocess.TimeoutExpired:
        metrics["error"] = "Timeout exceeded"
        print(f"  {C.RED}‚Ä∫ {'Error':<7}{C.RESET} Timeout exceeded ({timeout_seconds}s)")
        failed_tests += 1
    except Exception as e:
        metrics["error"] = str(e)
        print(f"  {C.RED}‚Ä∫ {'Error':<7}{C.RESET} {str(e)}")
        failed_tests += 1

    benchmark_result = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "model": model_name,
        "environment": sys_info,
        "metrics": metrics
    }

    benchmark_results.append(benchmark_result)

    if save_to_drive and RESULTS_FILE:
        save_benchmark_result(benchmark_result, RESULTS_FILE)

    print()

print(f"{C.BOLD}Benchmark Complete ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ{C.RESET}")
print(f"{C.DIM}Success: {C.GREEN}{successful_tests}{C.RESET}{C.DIM} | Failed: {C.RED}{failed_tests}{C.RESET}{C.DIM} | Total: {len(benchmark_results)}{C.RESET}")
print()

if save_to_drive and benchmark_results and ARCHIVE_DIR:
    try:
        session_end_time = datetime.utcnow()
        session_archive = {
            "session_id": session_id,
            "started_at": session_start_time.isoformat() + "Z",
            "completed_at": session_end_time.isoformat() + "Z",
            "duration_seconds": round((session_end_time - session_start_time).total_seconds(), 2),
            "system_info": sys_info,
            "models_tested": selected_models,
            "successful": successful_tests,
            "failed": failed_tests,
            "results": benchmark_results
        }

        archive_file = f"{ARCHIVE_DIR}/{session_id}_session.json"
        with open(archive_file, 'w', encoding='utf-8') as f:
            json.dump(session_archive, f, indent=2, ensure_ascii=False)

        print(f"{C.DIM}Archive{C.RESET}")
        print(f"  {C.DIM}‚Ä∫ {archive_file}{C.RESET}")
        print()

    except (IOError, OSError) as e:
        print(f"{C.YELLOW}‚Ä∫ Warning{C.RESET} Archive save error: {str(e)}")
        print()

if benchmark_results:
    successful_results = [
        r for r in benchmark_results
        if not r["metrics"].get("error") and r["metrics"]["tests"]
    ]

    if successful_results:
        successful_results.sort(key=lambda x: x["metrics"]["tests"][0]["tokens_per_sec"], reverse=True)

        fastest = successful_results[0]
        most_responsive = min(successful_results, key=lambda x: x["metrics"]["tests"][0]["first_token_time"])
        quickest_setup = min(successful_results, key=lambda x: x["metrics"]["pull_time"])

        leaders_md = [
            "Category Leaders ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ\n",
            "| Category | Model | Score |",
            "|:--- |:--- |:--- |",
            f"| ‚ö° Fastest Generation | {fastest['model']} | {fastest['metrics']['tests'][0]['tokens_per_sec']:.2f} t/s |",
            f"| ‚è±Ô∏è Most Responsive | {most_responsive['model']} | {most_responsive['metrics']['tests'][0]['first_token_time']:.2f} s |",
            f"| üì• Quickest Pull | {quickest_setup['model']} | {quickest_setup['metrics']['pull_time']:.2f} s |"
        ]
        display(Markdown("\n".join(leaders_md)))
        print()

        detail_md = [
            "Detailed Metrics ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ\n",
            "| Model | Speed | TTFT | Total | Tok | Pull | Load | Size |",
            "|:--- |---:|---:|---:|---:|---:|---:|---:|",
        ]

        for result in successful_results:
            m = result["metrics"]
            t = m["tests"][0]
            size_val = m.get("model_size_gb", 0)

            row = [
                f"`{result['model']}`",
                f"{t['tokens_per_sec']:.2f} t/s",
                f"{t['first_token_time']:.2f}s",
                f"{t['total_time']:.2f}s",
                str(t['tokens']),
                f"{m['pull_time']:.1f}s",
                f"{m['model_load_time']:.1f}s",
                f"{size_val}GB"
            ]
            detail_md.append(f"| {' | '.join(row)} |")

        display(Markdown("\n".join(detail_md)))
        print()

        try:
            print(f"{C.BOLD}Graph Display ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ{C.RESET}")

            plot_data = successful_results[::-1]
            num_models = len(plot_data)

            p_models = [r['model'] for r in plot_data]
            p_speeds = [r['metrics']['tests'][0]['tokens_per_sec'] for r in plot_data]
            p_ttft = [r['metrics']['tests'][0]['first_token_time'] for r in plot_data]
            p_total = [r['metrics']['tests'][0]['total_time'] for r in plot_data]
            p_load = [r['metrics']['model_load_time'] for r in plot_data]
            p_pull = [r['metrics']['pull_time'] for r in plot_data]
            p_sizes = [r['metrics'].get('model_size_gb', 0) for r in plot_data]

            plt.style.use('default')
            fig, axes = plt.subplots(3, 2, figsize=(18, 12))
            fig.suptitle('Ollama Benchmark Results', fontsize=18, fontweight='bold', y=0.96)

            colors = {
                'speed': '#00897B',
                'ttft': '#FB8C00',
                'total': '#1E88E5',
                'load': '#8E24AA',
                'pull': '#546E7A',
                'size': '#6D4C41'
            }

            def plot_smart_barh(ax, data, title, xlabel, color, num_models):
                base_height = min(0.6, 0.8 / max(num_models, 1))

                bars = ax.barh(p_models, data, color=color, alpha=0.85, height=base_height)

                ax.set_title(title, fontsize=12, fontweight='bold', pad=10)
                ax.set_xlabel(xlabel, fontsize=10, color='#333333')
                ax.grid(axis='x', linestyle=':', alpha=0.6)

                ax.spines['top'].set_visible(False)
                ax.spines['right'].set_visible(False)
                ax.spines['left'].set_color('#cccccc')
                ax.spines['bottom'].set_color('#cccccc')

                max_val = max(data) if data and max(data) > 0 else 1
                offset = max_val * 0.01

                for bar in bars:
                    width = bar.get_width()
                    ax.text(width + offset, bar.get_y() + bar.get_height()/2,
                            f' {width:.2f}',
                            ha='left', va='center', fontsize=9, fontweight='bold', color='#444444')

                ax.tick_params(axis='y', labelsize=10)

            plot_smart_barh(axes[0, 0], p_speeds, 'Generation Speed', 'Tokens / Sec', colors['speed'], num_models)
            plot_smart_barh(axes[0, 1], p_ttft, 'Time To First Token', 'Seconds', colors['ttft'], num_models)
            plot_smart_barh(axes[1, 0], p_total, 'Total Processing Time', 'Seconds', colors['total'], num_models)
            plot_smart_barh(axes[1, 1], p_load, 'Model Load Time (VRAM)', 'Seconds', colors['load'], num_models)
            plot_smart_barh(axes[2, 0], p_pull, 'Model Download Time', 'Seconds', colors['pull'], num_models)
            plot_smart_barh(axes[2, 1], p_sizes, 'Model Size (Disk/VRAM)', 'GB', colors['size'], num_models)

            plt.tight_layout(rect=[0, 0.03, 1, 0.95])
            plt.show()

        except Exception as e:
            print(f"{C.YELLOW}Visualization Error: {e}{C.RESET}")

        print()
        print(f"{C.BOLD}Model Responses (Preview) ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ{C.RESET}")
        print()

        for result in successful_results:
            model_name = result["model"]
            test = result["metrics"]["tests"][0]
            resp_text = test.get("response", "").strip()

            limit = config.MAX_RESPONSE_DISPLAY_CHARS
            is_truncated_char = len(resp_text) > limit
            if is_truncated_char:
                resp_text = resp_text[:limit]

            lines = resp_text.splitlines()
            line_limit = 25
            display_lines = lines[:line_limit]
            is_truncated_line = len(lines) > line_limit

            print(f"  {C.DIM}‚Ä∫{C.RESET} {C.BOLD}{model_name}{C.RESET}")
            for line in display_lines:
                print(f"    {C.DIM}|{C.RESET} {line}")

            if is_truncated_char or is_truncated_line:
                print(f"    {C.DIM}‚ãÆ ... (preview truncated){C.RESET}")
            print()