# Food Scan Benchmark


This notebook provides a complete, end-to-end workflow for benchmarking Multimodal Large Language Models (MLLMs) on January's food image dataset (JFID).

**The process is as follows:**

1.  **Setup:** Install dependencies and configure API keys.
2.  **Define Components:** Set up Pydantic schemas, the dataset loader, the model wrapper, and evaluation metrics.
3.  **Run Evaluation:** Loop through the dataset, send images to a chosen MLLM, and collect predictions.
4.  **Analyze Results:** Calculate metrics and summarize the model's performance.

The dataset is downloaded from a public S3 bucket and cached locally.


## Setup


Add your API keys to a `.env` file in the same directory as this notebook:

```
OPENAI_API_KEY="sk-..."
GEMINI_API_KEY="..."
JANUARY_API_ENDPOINT="..."
JANUARY_API_UUID="..."
JANUARY_API_TOKEN="..."
```


In [None]:
# Install packages
%pip install --upgrade litellm boto3 pandas tqdm python-dotenv pydantic tabulate openai scikit-learn scipy plotly openpyxl

## Imports


In [None]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
from pydantic import BaseModel, Field
from typing import List
import openai
from sklearn.metrics.pairwise import cosine_similarity
from scipy.optimize import linear_sum_assignment
import hashlib
import pandas as pd
import numpy as np
import ast
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import tarfile
import litellm
from litellm.exceptions import APIError
from typing import Optional, Dict
import json
import base64
from pathlib import Path
from tqdm.auto import tqdm
import asyncio
import time
from typing import Union, Tuple
from difflib import SequenceMatcher
import os
from dotenv import load_dotenv
import httpx


warnings.filterwarnings("ignore")
load_dotenv()

## Core Components


### Schema Definition


In [None]:
class Ingredient(BaseModel):
    name: str = Field(description="Name of the ingredient, e.g., 'scrambled eggs'")
    quantity: float = Field(description="Numerical quantity of the ingredient")
    unit: str = Field(description="Unit of measurement, e.g., 'cup', 'slice', 'g'")
    calories: float = Field(description="Estimated calories for this ingredient")
    carbs: float = Field(
        description="Estimated grams of carbohydrates for this ingredient"
    )
    protein: float = Field(description="Estimated grams of protein for this ingredient")
    fat: float = Field(description="Estimated grams of fat for this ingredient")


class TotalMacros(BaseModel):
    calories: float = Field(description="Total estimated calories for the entire meal")
    carbs: float = Field(
        description="Total estimated grams of carbohydrates for the entire meal"
    )
    protein: float = Field(
        description="Total estimated grams of protein for the entire meal"
    )
    fat: float = Field(description="Total estimated grams of fat for the entire meal")


class FoodAnalysis(BaseModel):
    meal_name: str = Field(
        description="A descriptive name for the meal, e.g., 'Breakfast Platter'"
    )
    ingredients: List[Ingredient] = Field(
        description="A list of all identified ingredients and their nutritional information"
    )
    total_macros: TotalMacros = Field(
        description="The sum of macros for all ingredients"
    )


### Dataset Class


In [None]:
class FoodScanDataset:
    """Handles downloading, caching, and loading the food dataset."""

    _S3_BUCKET = "january-food-image-dataset-public"
    _S3_KEY = "food-scan-benchmark-dataset.tar.gz"

    def __init__(self, root: Path):
        self.root = root.expanduser()
        self.img_dir = self.root / "food-scan-benchmark-dataset" / "fsb_images"
        self.csv_path = (
            self.root / "food-scan-benchmark-dataset" / "food_scan_bench_v1.csv"
        )

        if not self.csv_path.exists():
            self._download_and_extract()

        self.df = pd.read_csv(self.csv_path)

    def _download_and_extract(self):
        print(f"Dataset not found in {self.root}. Downloading from S3...")
        self.root.mkdir(parents=True, exist_ok=True)
        local_archive = self.root / "fsb.tar.gz"

        s3 = boto3.client(
            "s3",
            config=Config(signature_version=UNSIGNED),
        )
        with open(local_archive, "wb") as f:
            s3.download_fileobj(self._S3_BUCKET, self._S3_KEY, f)

        print("Download complete. Extracting...")
        with tarfile.open(local_archive) as tar:
            tar.extractall(path=self.root)
        local_archive.unlink()
        print("Extraction complete.")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx: int):
        row = self.df.iloc[idx]
        img_path = self.img_dir / row.image_filename

        try:
            ingredients = ast.literal_eval(row.ingredients_list)
        except (ValueError, SyntaxError):
            ingredients = []

        return {
            "image_id": row.image_id,
            "image_path": img_path,
            "gt": {
                "meal_name": row.meal_name,
                "ingredients": ingredients,
                "macros": {
                    "calories": row.total_calories,
                    "carbs": row.total_carbs,
                    "protein": row.total_protein,
                    "fat": row.total_fat,
                },
            },
        }

### Model config and helpers


In [None]:
MODEL_COSTS = {
    "january/food-vision-v1": {
        "input": 0.0,  # Not used, cost is per-image
        "output": 0.0,
        "display_name": "January AI",
    },
    "gpt-4.1": {
        "input": 2.00,
        "output": 8.00,
        "display_name": "gpt-4.1",
    },
    "gpt-4o": {
        "input": 2.50,
        "output": 10.00,
        "display_name": "gpt-4o",
    },
    "gpt-4o-mini": {
        "input": 0.15,
        "output": 0.60,
        "display_name": "gpt-4o-mini",
    },
    "gemini/gemini-2.5-flash-preview-05-20": {
        "input": 0.15,
        "output": 0.60,
        "display_name": "gemini-2.5-flash",
    },
    "gemini/gemini-2.5-pro-preview-06-05": {
        "input": 1.25,
        "output": 10.00,
        "display_name": "gemini-2.5-pro",
    },
}


def img2b64(path: Path) -> str:
    """Converts an image file to a base64 encoded string for API calls."""
    encoded = base64.b64encode(path.read_bytes()).decode()
    return f"data:image/jpeg;base64,{encoded}"


def calculate_cost(model_name: str, input_tokens: int, output_tokens: int) -> float:
    """Calculate the cost for a model based on token usage."""
    if model_name not in MODEL_COSTS:
        return 0.0

    costs = MODEL_COSTS[model_name]
    input_cost = (input_tokens / 1_000_000) * costs["input"]
    output_cost = (output_tokens / 1_000_000) * costs["output"]
    return round(input_cost + output_cost, 6)


def get_display_name(model_name: str) -> str:
    """Return the user-friendly model label (falls back to raw id)."""
    return MODEL_COSTS.get(model_name, {}).get("display_name", model_name)

### Model Wrappers


In [None]:
class LiteModel:
    """A robust wrapper around any LiteLLM-supported vision model with prompt engineering options."""

    PROMPT_VARIANTS = {
        "detailed": {
            "suffix": "d",
            "prompt": "You are an expert nutritionist with 20 years of experience. Analyze this food image very carefully and provide the most accurate breakdown possible. Consider portion sizes, cooking methods, and hidden ingredients.",
        },
        "step_by_step": {
            "suffix": "s",
            "prompt": "You are an expert nutritionist. Please analyze this image step by step: 1) First identify all visible food items, 2) Estimate portion sizes, 3) Calculate nutritional content for each item, 4) Sum the totals. Be precise and systematic.",
        },
        "conservative": {
            "suffix": "c",
            "prompt": "You are a conservative nutritionist who prefers to underestimate rather than overestimate. Analyze this food image and provide a realistic, slightly conservative nutritional breakdown.",
        },
        "confident": {
            "suffix": "f",
            "prompt": "You are a highly confident nutritionist. Analyze this food image and provide your best estimate of the nutritional content. Trust your expertise.",
        },
    }

    def __init__(self, model_name: str, prompt_variant="detailed", **litellm_kwargs):
        self.model_name = model_name
        self.prompt_variant = prompt_variant
        self.kwargs = litellm_kwargs

        cfg = self.PROMPT_VARIANTS.get(prompt_variant, self.PROMPT_VARIANTS["detailed"])
        self.system_prompt = cfg["prompt"]
        self.prompt_suffix = cfg["suffix"]

    async def analyse(self, img_path: Path) -> Tuple[Optional[dict], Optional[str]]:
        """
        Analyzes an image and returns a structured dict with cost info, or None and an error message on failure.

        Returns:
            Tuple[Optional[dict], Optional[str]]: A tuple of (result, error_message).
                                                  On success, result is a dict and error_message is None.
                                                  On failure, result is None and error_message is a string.
        """
        b64_img = img2b64(img_path)

        messages = [
            {"role": "system", "content": self.system_prompt},
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": "Analyze this food image and provide a detailed nutritional breakdown. Include: meal name, all ingredients with quantities/units, and complete macro information (calories, carbs, protein, fat) for each ingredient and the total meal.",
                    },
                    {"type": "image_url", "image_url": {"url": b64_img}},
                ],
            },
        ]

        try:
            resp = await litellm.acompletion(
                model=self.model_name,
                messages=messages,
                response_format=FoodAnalysis,
                temperature=0.0,
                **self.kwargs,
            )
            raw = resp.choices[0].message.content.strip()
            data = json.loads(raw)

            usage = resp.usage
            input_tokens = usage.prompt_tokens if usage else 0
            output_tokens = usage.completion_tokens if usage else 0
            cost = calculate_cost(self.model_name, input_tokens, output_tokens)

            result = FoodAnalysis(**data).model_dump()
            result["cost_usd"] = cost
            result["prompt_variant"] = self.prompt_variant

            return result, None

        except APIError as e:
            error_msg = f"API Error: {e}"
            print(f"{error_msg} for {img_path.name}")
            return None, error_msg
        except Exception as e:
            error_msg = f"Unexpected Error: {e}"
            print(f"{error_msg} for {img_path.name}")
            return None, error_msg


def pretty_label(full_model_name: str) -> str:
    if full_model_name == "january/food-vision-v1":
        return get_display_name(full_model_name)

    for variant, meta in LiteModel.PROMPT_VARIANTS.items():
        postfix = f"_{variant}"
        if full_model_name.endswith(postfix):
            base = full_model_name[: -len(postfix)]
            return f"{get_display_name(base)}_{meta['suffix']}"

    if "_" in full_model_name:
        base, variant = full_model_name.rsplit("_", 1)
        suffix = LiteModel.PROMPT_VARIANTS.get(variant, {}).get(
            "suffix", variant[0].lower()
        )
        return f"{get_display_name(base)}_{suffix}"

    return get_display_name(full_model_name)

In [None]:
class JanuaryAIModel:
    """A wrapper for the proprietary January AI food vision API."""

    COST_PER_IMAGE = 0.01

    def __init__(self):
        self.endpoint = os.getenv("JANUARY_API_ENDPOINT")
        self.uuid = os.getenv("JANUARY_API_UUID")
        self.token = os.getenv("JANUARY_API_TOKEN")

        if not all([self.endpoint, self.uuid, self.token]):
            raise ValueError(
                "January AI API credentials not found in environment variables."
            )

        self.headers = {
            "Content-Type": "application/json",
            "UUID": self.uuid,
            "x-jan-e2e-tests-token": self.token,
        }
        self.client = httpx.AsyncClient()

    def _calculate_ingredient_macros(self, ingredient: dict) -> Dict[str, float]:
        """Re-implements legacy macro logic for a single ingredient."""
        (cal, fat, carbs, prot, mass, fiber) = (0, 0, 0, 0, 0, 0)
        if "servings" in ingredient and ingredient["servings"]:
            s = ingredient["servings"][0]
            q, sel, scale = s["quantity"], s["selected_quantity"], s["scaling_factor"]
            w = s.get("weight_grams", 0)
            cal = ingredient["energy"] * sel * scale / q
            fat = ingredient["fat"] * sel * scale / q
            carbs = ingredient["carbs"] * sel * scale / q
            prot = ingredient["protein"] * sel * scale / q
            fib = ingredient.get("fiber", 0) or 0
            fiber = fib * sel * scale / q
            mass = w * sel / q
        return dict(
            calories=cal, fat=fat, carbs=carbs, protein=prot, fiber=fiber, mass=mass
        )

    def _parse_server_response(self, js: Dict) -> FoodAnalysis:
        """Parses the raw JSON from the API into the standard FoodAnalysis schema."""
        ing_objs: List[Ingredient] = []
        for ing in js.get("ingredients", []):
            m = self._calculate_ingredient_macros(ing)
            ing_objs.append(
                Ingredient(
                    name=ing.get("name", "unknown ingredient"),
                    quantity=ing.get("servings", [{}])[0].get("selected_quantity", 0),
                    unit=ing.get("servings", [{}])[0].get("unit", "g"),
                    calories=m["calories"],
                    carbs=m["carbs"],
                    protein=m["protein"],
                    fat=m["fat"],
                )
            )

        tot = TotalMacros(
            calories=sum(i.calories for i in ing_objs),
            carbs=sum(i.carbs for i in ing_objs),
            protein=sum(i.protein for i in ing_objs),
            fat=sum(i.fat for i in ing_objs),
        )

        return FoodAnalysis(
            meal_name=js.get("foodName", "unknown meal"),
            ingredients=ing_objs,
            total_macros=tot,
        )

    async def analyse(self, img_path: Path) -> Tuple[Optional[dict], Optional[str]]:
        """
        Analyzes an image using the January AI API.
        Returns a structured dict, or None and an error message on failure.
        """
        payload = {"photoUrl": img2b64(img_path)}
        try:
            r = await self.client.post(
                self.endpoint, headers=self.headers, json=payload, timeout=30.0
            )
            r.raise_for_status()  # Raise an exception for bad status codes (4xx or 5xx)

            js = r.json()
            parsed_response = self._parse_server_response(js)

            result = parsed_response.model_dump()
            result["cost_usd"] = self.COST_PER_IMAGE
            result["prompt_variant"] = "default"

            return result, None

        except httpx.HTTPStatusError as e:
            error_msg = f"API Error: {e.response.status_code} - {e.response.text}"
            print(f"{error_msg} for {img_path.name}")
            return None, error_msg
        except Exception as e:
            error_msg = f"Unexpected Error: {e}"
            print(f"{error_msg} for {img_path.name}")
            return None, error_msg

### Metrics


In [None]:
class Metrics:
    """Comprehensive metrics for food analysis evaluation."""

    _embedding_cache = {}

    @staticmethod
    def _normalize_ingredient_list(ingredients: Union[str, list, None]) -> List[dict]:
        """
        IMPROVEMENT: Centralized cleanup function.
        Safely parses ingredient data which might be a string representation of a list.
        This avoids repeating the same try-except block in multiple metric functions.
        """
        if not ingredients:
            return []
        if isinstance(ingredients, list):
            return ingredients
        if isinstance(ingredients, str):
            try:
                parsed = ast.literal_eval(ingredients)
                return parsed if isinstance(parsed, list) else []
            except (ValueError, SyntaxError):
                return []
        return []

    @staticmethod
    async def get_embedding(text, model="text-embedding-3-small"):
        """Get OpenAI embedding with caching."""
        cache_key = hashlib.md5(f"{text}_{model}".encode()).hexdigest()
        if cache_key in Metrics._embedding_cache:
            return Metrics._embedding_cache[cache_key]
        try:
            client = openai.AsyncOpenAI()
            response = await client.embeddings.create(model=model, input=text.strip())
            embedding = response.data[0].embedding
            Metrics._embedding_cache[cache_key] = embedding
            return embedding
        except Exception as e:
            print(f"Error getting embedding for '{text}': {e}")
            return [0.0] * 1536

    @staticmethod
    async def semantic_ingredient_match_embeddings(
        gt_ingredients, pred_ingredients, threshold=0.75
    ):
        """Semantic ingredient matching using OpenAI embeddings and cosine similarity."""
        gt_ingredients = Metrics._normalize_ingredient_list(gt_ingredients)
        pred_ingredients = Metrics._normalize_ingredient_list(pred_ingredients)

        def normalize_name(item):
            return str(item.get("name", "")).lower().strip()

        gt_names = [normalize_name(x) for x in gt_ingredients if normalize_name(x)]
        pred_names = [normalize_name(x) for x in pred_ingredients if normalize_name(x)]

        if not gt_names and not pred_names:
            return 1.0, []
        if not gt_names or not pred_names:
            return 0.0, []

        gt_embeddings = await asyncio.gather(
            *(Metrics.get_embedding(name) for name in gt_names)
        )
        pred_embeddings = await asyncio.gather(
            *(Metrics.get_embedding(name) for name in pred_names)
        )

        gt_embeddings = [emb for emb in gt_embeddings if emb and len(emb) > 1]
        pred_embeddings = [emb for emb in pred_embeddings if emb and len(emb) > 1]

        if not gt_embeddings or not pred_embeddings:
            return 0.0, []

        similarity_matrix = cosine_similarity(gt_embeddings, pred_embeddings)
        cost_matrix = 1 - similarity_matrix
        row_indices, col_indices = linear_sum_assignment(cost_matrix)

        matches = 0
        match_details = []
        for row_idx, col_idx in zip(row_indices, col_indices):
            similarity = similarity_matrix[row_idx, col_idx]
            if similarity >= threshold:
                matches += 1
                match_details.append(
                    {
                        "gt_ingredient": gt_names[row_idx],
                        "pred_ingredient": pred_names[col_idx],
                        "similarity": similarity,
                    }
                )

        recall = matches / len(gt_names)
        return recall, match_details

    @staticmethod
    def semantic_ingredient_match(gt_ingredients, pred_ingredients, threshold=0.7):
        """Fallback method using string similarity."""
        gt_ingredients = Metrics._normalize_ingredient_list(gt_ingredients)
        pred_ingredients = Metrics._normalize_ingredient_list(pred_ingredients)

        def normalize_name(item):
            name_str = item.get("name", "") if isinstance(item, dict) else item
            return str(name_str).lower().strip().replace("-", " ").replace("_", " ")

        gt_names = [normalize_name(x) for x in gt_ingredients if normalize_name(x)]
        pred_names = [normalize_name(x) for x in pred_ingredients if normalize_name(x)]

        if not gt_names:
            return 1.0 if not pred_names else 0.0

        matches = 0
        for gt_name in gt_names:
            if pred_names:
                best_match = max(
                    SequenceMatcher(None, gt_name, pred_name).ratio()
                    for pred_name in pred_names
                )
                if best_match >= threshold:
                    matches += 1

        return matches / len(gt_names)

    @staticmethod
    async def semantic_precision_score(
        gt_ingredients, pred_ingredients, threshold=0.75
    ):
        """
        NEW: Calculates precision based on semantic matches from embeddings.
        """
        gt_list = Metrics._normalize_ingredient_list(gt_ingredients)
        pred_list = Metrics._normalize_ingredient_list(pred_ingredients)

        def normalize_name(item):
            return str(item.get("name", "")).lower().strip()

        gt_names = [normalize_name(x) for x in gt_list if normalize_name(x)]
        pred_names = [normalize_name(x) for x in pred_list if normalize_name(x)]

        if not pred_names:
            return 1.0
        if not gt_names:
            return 0.0

        _, match_details = await Metrics.semantic_ingredient_match_embeddings(
            gt_list, pred_list, threshold
        )

        matches = len(match_details)
        precision = matches / len(pred_names)
        return precision

    @staticmethod
    async def semantic_f1_score(gt_ingredients, pred_ingredients, threshold=0.75):
        gt_list = Metrics._normalize_ingredient_list(gt_ingredients)
        pred_list = Metrics._normalize_ingredient_list(pred_ingredients)

        def normalize_name(item):
            return str(item.get("name", "")).lower().strip()

        gt_names = [normalize_name(x) for x in gt_list if normalize_name(x)]
        pred_names = [normalize_name(x) for x in pred_list if normalize_name(x)]

        if not gt_names and not pred_names:
            return 1.0
        if not gt_names or not pred_names:
            return 0.0

        _, match_details = await Metrics.semantic_ingredient_match_embeddings(
            gt_list, pred_list, threshold
        )

        matches = len(match_details)

        if not pred_names or not gt_names:
            return 0.0

        precision = matches / len(pred_names)
        recall = matches / len(gt_names)

        if precision + recall == 0:
            return 0.0

        f1_score = 2 * (precision * recall) / (precision + recall)
        return f1_score

    @staticmethod
    async def meal_name_similarity(gt_name: str, pred_name: str) -> float:
        """Calculates cosine similarity between the embeddings of two meal names."""
        gt_name = str(gt_name or "").strip()
        pred_name = str(pred_name or "").strip()

        if not pred_name:
            return 0.0

        gt_embedding_list, pred_embedding_list = await asyncio.gather(
            Metrics.get_embedding(gt_name), Metrics.get_embedding(pred_name)
        )

        gt_embedding = np.array(gt_embedding_list).reshape(1, -1)
        pred_embedding = np.array(pred_embedding_list).reshape(1, -1)

        return cosine_similarity(gt_embedding, pred_embedding)[0][0]

    @staticmethod
    def macro_wMAPE(gt_mac: dict, pred_mac: dict):
        """Calculates Weighted Mean Absolute Percentage Error over the four main macros."""
        keys = ["calories", "carbs", "protein", "fat"]

        absolute_errors = sum(abs(gt_mac.get(k, 0) - pred_mac.get(k, 0)) for k in keys)
        sum_of_actuals = sum(abs(gt_mac.get(k, 0)) for k in keys)

        if sum_of_actuals == 0:
            return 0.0 if absolute_errors == 0 else 100.0

        return (absolute_errors / sum_of_actuals) * 100

    @staticmethod
    def macro_percentage_error(gt_mac, pred_mac):
        """Calculate percentage error for each macro."""
        keys = ["calories", "carbs", "protein", "fat"]
        errors = {}
        for key in keys:
            gt_val = gt_mac.get(key, 0)
            pred_val = pred_mac.get(key, 0)
            if gt_val > 0:
                errors[key] = abs(gt_val - pred_val) / gt_val * 100
            else:
                errors[key] = 0 if pred_val == 0 else 100
        return errors

    @staticmethod
    def ingredient_count_accuracy(gt_ingredients, pred_ingredients):
        """How well does the model predict the number of ingredients?"""
        gt_count = len(Metrics._normalize_ingredient_list(gt_ingredients))
        pred_count = len(Metrics._normalize_ingredient_list(pred_ingredients))
        if gt_count == 0 and pred_count == 0:
            return 1.0
        if gt_count == 0:
            return 0.0
        return 1 - abs(gt_count - pred_count) / gt_count

## Evaluation Function


In [None]:
async def _process_sample(
    idx: int,
    ds: FoodScanDataset,
    llm: Union[LiteModel, JanuaryAIModel],
    model_name: str,
    use_embeddings: bool = True,
) -> dict:
    start_time = time.time()
    sample = ds[idx]
    pred, error_msg = await llm.analyse(sample["image_path"])
    end_time = time.time()
    gt = sample["gt"]

    item = {
        "image_id": sample["image_id"],
        "model": model_name,
        "response_time_seconds": round(end_time - start_time, 2),
    }

    if pred is None:
        item.update(
            {
                "meal_name_similarity": 0.0,
                "semantic_precision_ing": 0.0,
                "semantic_match": 0.0,
                "semantic_f1_ing": 0.0,
                "semantic_match_embeddings": 0.0,
                "ingredient_count_acc": 0.0,
                "wmape_mac": None,
                "error": error_msg or "failed",
                "cost_usd": 0.0,
                "calories_pct_error": None,
                "carbs_pct_error": None,
                "protein_pct_error": None,
                "fat_pct_error": None,
                "match_details": None,
            }
        )
    else:
        gt_ingredients = gt["ingredients"]
        pred_ingredients = pred.get("ingredients", [])
        gt_macros = gt["macros"]
        pred_macros = pred.get("total_macros", {})

        meal_name_sim = await Metrics.meal_name_similarity(
            gt["meal_name"], pred.get("meal_name", "")
        )

        semantic_match_embeddings, match_details = 0.0, None
        semantic_f1 = 0.0
        semantic_precision = 0.0
        if use_embeddings:
            try:
                (
                    semantic_match_embeddings,
                    match_details,
                ) = await Metrics.semantic_ingredient_match_embeddings(
                    gt_ingredients, pred_ingredients
                )
                semantic_f1 = await Metrics.semantic_f1_score(
                    gt_ingredients, pred_ingredients
                )
                semantic_precision = await Metrics.semantic_precision_score(
                    gt_ingredients, pred_ingredients
                )
            except Exception as e:
                print(
                    f"Error in embedding similarity for image {sample['image_id']}: {e}"
                )
                semantic_match_embeddings = Metrics.semantic_ingredient_match(
                    gt_ingredients, pred_ingredients
                )
                semantic_f1 = 0.0
                semantic_precision = 0.0

        pct_errors = Metrics.macro_percentage_error(gt_macros, pred_macros)

        item.update(
            {
                "meal_name": pred.get("meal_name", ""),
                "gt_meal_name": gt["meal_name"],
                "meal_name_similarity": meal_name_sim,
                "semantic_precision_ing": semantic_precision,
                "semantic_match": Metrics.semantic_ingredient_match(
                    gt_ingredients, pred_ingredients
                ),
                "semantic_f1_ing": semantic_f1,
                "semantic_match_embeddings": semantic_match_embeddings,
                "ingredient_count_acc": Metrics.ingredient_count_accuracy(
                    gt_ingredients, pred_ingredients
                ),
                "wmape_mac": Metrics.macro_wMAPE(gt_macros, pred_macros),
                "error": None,
                "cost_usd": pred.get("cost_usd", 0.0),
                "calories_pct_error": pct_errors.get("calories"),
                "carbs_pct_error": pct_errors.get("carbs"),
                "protein_pct_error": pct_errors.get("protein"),
                "fat_pct_error": pct_errors.get("fat"),
            }
        )

    return item


async def run_evaluation(
    models: Union[str, List[str]],
    cache_dir: Path,
    max_items: Optional[int] = None,
    max_concurrent: int = 5,
    use_embeddings: bool = True,
) -> pd.DataFrame:
    """Run evaluation with multiple models, including custom ones."""
    models_to_run = [models] if isinstance(models, str) else models
    all_prompt_variants = list(LiteModel.PROMPT_VARIANTS.keys())

    ds = FoodScanDataset(cache_dir)
    n = min(max_items, len(ds)) if max_items else len(ds)
    tasks_to_run = []
    for model_name in models_to_run:
        if model_name == "january/food-vision-v1":
            tasks_to_run.append({"model_name": model_name, "prompt_variant": "default"})
        else:
            for variant in all_prompt_variants:
                tasks_to_run.append(
                    {"model_name": model_name, "prompt_variant": variant}
                )

    sem = asyncio.Semaphore(max_concurrent)

    january_model = None
    if any(t["model_name"] == "january/food-vision-v1" for t in tasks_to_run):
        january_model = JanuaryAIModel()

    async def _worker(task_info: dict, idx: int):
        async with sem:
            model_name = task_info["model_name"]
            prompt_variant = task_info["prompt_variant"]

            if model_name == "january/food-vision-v1":
                llm = january_model
                model_name_for_results = model_name
            else:
                llm = LiteModel(model_name, prompt_variant=prompt_variant)
                model_name_for_results = f"{model_name}_{prompt_variant}"

            return await _process_sample(
                idx, ds, llm, model_name_for_results, use_embeddings
            )

    all_jobs = [(i, task) for i in range(n) for task in tasks_to_run]

    pbar = tqdm(total=len(all_jobs), desc="Processing images", dynamic_ncols=True)

    async def _job_runner(job):
        result = await _worker(job[1], job[0])
        pbar.update(1)
        return result

    results = await asyncio.gather(*[_job_runner(job) for job in all_jobs])
    pbar.close()

    return pd.DataFrame(results)

## Analysis and Visualization


In [None]:
class BenchmarkAnalyzer:
    """Comprehensive analysis and visualization of benchmark results."""

    def __init__(self, results_df):
        self.df = results_df.copy()

        def get_base_model(model_name):
            if model_name == "january/food-vision-v1":
                return model_name

            for variant in LiteModel.PROMPT_VARIANTS:
                if model_name.endswith(f"_{variant}"):
                    return model_name.rsplit(f"_{variant}", 1)[0]

            return model_name

        self.df["base_model"] = self.df["model"].apply(get_base_model)
        self.df["pretty_model"] = self.df["model"].apply(pretty_label)

        if not self.df.empty:
            self.successful_df = self.df[self.df["error"].isna()].copy()

            if not self.successful_df.empty:
                self.successful_df = self._add_overall_score(self.successful_df)

                best_indices = self.successful_df.groupby(["image_id", "base_model"])[
                    "overall_score"
                ].idxmax()
                self.best_of_df = self.successful_df.loc[best_indices].copy()
                self.best_of_df["model"] = self.best_of_df["base_model"].apply(
                    lambda x: f"{get_display_name(x)} (Best)"
                    if x != "january/food-vision-v1"
                    else get_display_name(x)
                )

                numeric_cols = [
                    "meal_name_similarity",
                    "semantic_match_embeddings",
                    "semantic_precision_ing",
                    "semantic_f1_ing",
                    "ingredient_count_acc",
                    "wmape_mac",
                    "cost_usd",
                    "response_time_seconds",
                    "calories_pct_error",
                    "carbs_pct_error",
                    "protein_pct_error",
                    "fat_pct_error",
                ]
                self.average_of_df = (
                    self.successful_df.groupby(["image_id", "base_model"])[numeric_cols]
                    .mean()
                    .reset_index()
                )
                self.average_of_df["model"] = self.average_of_df["base_model"].apply(
                    lambda x: f"{get_display_name(x)} (Avg)"
                    if x != "january/food-vision-v1"
                    else get_display_name(x)
                )

                self.average_of_df = self._add_overall_score(self.average_of_df)

                self.plot_df = pd.concat(
                    [self.best_of_df, self.average_of_df], ignore_index=True
                ).drop_duplicates(subset=["model", "image_id"], keep="first")

            else:
                self.best_of_df = pd.DataFrame()
                self.average_of_df = pd.DataFrame()
                self.plot_df = pd.DataFrame()

        else:
            self.successful_df = pd.DataFrame()
            self.best_of_df = pd.DataFrame()
            self.average_of_df = pd.DataFrame()
            self.plot_df = pd.DataFrame()

    def _add_overall_score(self, df):
        """
        Calculates a overall score using a weighted geometric mean.
        This method is robust to outliers and uses a "knock-out" criterion,
        where a score of 0 in any key metric results in an overall score of 0.

        IMPROVEMENT: This version uses absolute normalization for cost and speed,
        making the score comparable across different benchmark runs.
        """
        if df.empty:
            return df

        COST_CEILING = 1
        TIME_CEILING = 60

        weights = {
            "name_similarity": 0.15,
            "ing_accuracy": 0.40,
            "macro_accuracy": 0.25,
            "cost": 0.10,
            "speed": 0.10,
        }
        assert np.isclose(sum(weights.values()), 1.0), "Weights must sum to 1.0"

        norm_name_sim = df["meal_name_similarity"].clip(0, 1)
        norm_acc_ing = df["semantic_f1_ing"].clip(0, 1)
        norm_acc_macro = (1 - (df["wmape_mac"] / 100)).clip(0, 1)

        clipped_cost = df["cost_usd"].clip(upper=COST_CEILING)
        norm_cost = 1 - (clipped_cost / COST_CEILING)

        clipped_time = df["response_time_seconds"].clip(upper=TIME_CEILING)
        norm_speed = 1 - (clipped_time / TIME_CEILING)

        df["overall_score"] = 100 * (
            (norm_name_sim ** weights["name_similarity"])
            * (norm_acc_ing ** weights["ing_accuracy"])
            * (norm_acc_macro ** weights["macro_accuracy"])
            * (norm_cost ** weights["cost"])
            * (norm_speed ** weights["speed"])
        )

        df["norm_name_similarity"] = norm_name_sim
        df["norm_ing_accuracy"] = norm_acc_ing
        df["norm_macro_accuracy"] = norm_acc_macro
        df["norm_cost"] = norm_cost
        df["norm_speed"] = norm_speed

        return df

    def summary_statistics(self):
        """Generate comprehensive summary statistics for individual variants and aggregated models."""
        print("=== BENCHMARK SUMMARY (PER VARIANT) ===\n")
        if self.df.empty:
            print("No results to analyze.")
            return

        for model in sorted(self.df["model"].unique()):
            display_name = pretty_label(model)
            model_df = self.df[self.df["model"] == model]
            successful = model_df[model_df["error"].isna()]
            success_rate = (
                (len(successful) / len(model_df)) * 100 if len(model_df) > 0 else 0
            )

            print(f"--- {display_name} ---")
            print(
                f"  Success Rate: {success_rate:.1f}% ({len(successful)}/{len(model_df)})"
            )

            if not successful.empty:
                print(
                    f"  Avg Semantic Match (Embeddings): {successful['semantic_match_embeddings'].mean():.3f}"
                )
                print(
                    f"  Avg Semantic Precision (Ingredients): {successful['semantic_precision_ing'].mean():.3f}"
                )
                print(f"  Avg wMAPE (Macros): {successful['wmape_mac'].mean():.1f}%")
                print(f"  Avg Cost per Image: ${successful['cost_usd'].mean():.4f}")
                print(
                    f"  Avg Response Time: {successful['response_time_seconds'].mean():.1f}s\n"
                )

        print("\n=== AGGREGATED SUMMARY (BEST OF N PROMPTS) ===\n")
        if self.best_of_df.empty:
            print("No successful results to analyze for aggregation.")
        else:
            agg_best_df = (
                self.best_of_df.groupby("model")
                .agg(
                    semantic_match_embeddings=("semantic_match_embeddings", "mean"),
                    semantic_precision_ing=("semantic_precision_ing", "mean"),
                    wmape_mac=("wmape_mac", "mean"),
                    cost_usd=("cost_usd", "mean"),
                    response_time_seconds=("response_time_seconds", "mean"),
                    overall_score=("overall_score", "mean"),
                    sample_count=("image_id", "count"),
                )
                .reset_index()
            )

            for _, row in agg_best_df.iterrows():
                print(f"--- {row['model']} (from {row['sample_count']} samples) ---")
                print(
                    f"  Avg Semantic Match (Embeddings): {row['semantic_match_embeddings']:.3f}"
                )
                print(
                    f"  Avg Semantic Precision (Ingredients): {row['semantic_precision_ing']:.3f}"
                )
                print(f"  Avg wMAPE (Macros): {row['wmape_mac']:.1f}%")
                print(f"  Avg Cost per Image: ${row['cost_usd']:.4f}")
                print(f"  Avg Response Time: {row['response_time_seconds']:.1f}s\n")
                print(f"  overall Score: {row['overall_score']:.2f} / 100\n")

        print("\n=== AGGREGATED SUMMARY (AVERAGE) ===\n")
        if self.average_of_df.empty:
            print("No successful results to analyze for aggregation.")
            self.analyze_errors()
            return

        agg_df = (
            self.average_of_df.groupby("model")
            .agg(
                semantic_match_embeddings=("semantic_match_embeddings", "mean"),
                semantic_precision_ing=("semantic_precision_ing", "mean"),
                wmape_mac=("wmape_mac", "mean"),
                cost_usd=("cost_usd", "mean"),
                response_time_seconds=("response_time_seconds", "mean"),
                overall_score=("overall_score", "mean"),
                sample_count=("image_id", "count"),
            )
            .reset_index()
        )

        for _, row in agg_df.iterrows():
            print(f"--- {row['model']} (from {row['sample_count']} samples) ---")
            print(
                f"  Avg Semantic Match (Embeddings): {row['semantic_match_embeddings']:.3f}"
            )
            print(
                f"  Avg Semantic Precision (Ingredients): {row['semantic_precision_ing']:.3f}"
            )
            print(f"  Avg wMAPE (Macros): {row['wmape_mac']:.1f}%")
            print(f"  Avg Cost per Image: ${row['cost_usd']:.4f}")
            print(f"  Avg Response Time: {row['response_time_seconds']:.1f}s\n")
            print(f"  overall Score: {row['overall_score']:.2f} / 100\n")

        self.analyze_errors()

    def analyze_errors(self):
        """Analyze and summarize the specific errors encountered."""
        print("--- ERROR ANALYSIS ---")
        error_df = self.df[self.df["error"].notna()]
        if error_df.empty:
            print("No errors encountered. All API calls were successful.\n")
            return

        print("Error counts by model:")
        error_summary = error_df.groupby("model")["error"].count().sort_index()
        print(error_summary.to_string())

        print("\nMost common error messages:")
        common_errors = (
            error_df["error"].str.split(":").str[0].value_counts().nlargest(5)
        )
        print(common_errors.to_string())
        print()

    def create_performance_dashboard(self):
        """Create a comprehensive performance comparison dashboard with improved styling."""
        if self.plot_df.empty:
            print("No successful predictions to plot.")
            return

        models = sorted(self.plot_df["model"].unique())

        short_labels = {m: m for m in models}
        idx_map = {m: i + 1 for i, m in enumerate(short_labels)}

        colors = px.colors.qualitative.Plotly

        fig = make_subplots(
            rows=2,
            cols=3,
            subplot_titles=(
                "Meal Name Similarity",
                "Macro Nutritional wMAPE (%)",
                "Response Time Distribution",
                "Recall (Ingredients)",
                "Precision (Ingredients)",
                "F1 Score (Ingredients)",
            ),
            vertical_spacing=0.15,
            horizontal_spacing=0.08,
        )

        for i, model in enumerate(models):
            m_idx = str(idx_map[model])
            color = colors[i % len(colors)]
            d_ok = self.plot_df[self.plot_df["model"] == model]
            if d_ok.empty:
                continue

            box_style = dict(
                marker_color=color,
                marker_line_color="rgba(0,0,0,0.3)",
                marker_line_width=1,
                line_color=color,
                line_width=2,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["meal_name_similarity"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=1,
                col=1,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["wmape_mac"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=1,
                col=2,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["response_time_seconds"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=1,
                col=3,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["semantic_match_embeddings"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=2,
                col=1,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["semantic_precision_ing"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=2,
                col=2,
            )
            fig.add_trace(
                go.Box(
                    y=d_ok["semantic_f1_ing"],
                    name=m_idx,
                    legendgroup=m_idx,
                    showlegend=False,
                    **box_style,
                ),
                row=2,
                col=3,
            )

        for i, model in enumerate(models):
            m_idx = str(idx_map[model])
            fig.add_trace(
                go.Scatter(
                    x=[None],
                    y=[None],
                    mode="markers",
                    marker=dict(
                        size=12,
                        color=colors[i % len(colors)],
                        symbol="circle",
                        line=dict(width=2, color="rgba(0,0,0,0.3)"),
                    ),
                    legendgroup=m_idx,
                    showlegend=True,
                    name=f"{m_idx}: {short_labels[model]}",
                )
            )

        axis_style = dict(
            showgrid=True,
            gridcolor="rgba(128,128,128,0.2)",
            gridwidth=1,
            zeroline=True,
            zerolinecolor="rgba(128,128,128,0.4)",
            zerolinewidth=1,
            tickfont=dict(size=11, color="#2f2f2f"),
        )
        for row in [1, 2]:
            for col in [1, 2, 3]:
                fig.update_xaxes(**axis_style, row=row, col=col)
                fig.update_yaxes(**axis_style, row=row, col=col)
        fig.update_yaxes(title_text="Cosine Similarity", row=1, col=1)
        fig.update_yaxes(title_text="Weighted MAPE (%)", row=1, col=2)
        fig.update_yaxes(title_text="Response Time (sec)", row=1, col=3)
        fig.update_yaxes(title_text="Recall", row=2, col=1)
        fig.update_yaxes(title_text="Precision", row=2, col=2)
        fig.update_yaxes(title_text="F1 Score", row=2, col=3)
        fig.update_layout(
            height=800,
            width=1400,
            title=dict(
                text="<b>Model Performance Dashboard</b>",
                x=0.5,
                xanchor="center",
                font=dict(size=24, color="#1f1f1f", family="Arial Black"),
            ),
            showlegend=True,
            legend=dict(
                title="<b>Models</b>",
                title_font=dict(size=14, color="#1f1f1f"),
                font=dict(size=11),
                bgcolor="rgba(255,255,255,0.8)",
                bordercolor="rgba(128,128,128,0.5)",
                borderwidth=1,
                x=1.02,
                y=1,
                xanchor="left",
                yanchor="top",
            ),
            plot_bgcolor="rgba(248,249,250,0.8)",
            paper_bgcolor="white",
            font=dict(family="Arial, sans-serif", size=11, color="#2f2f2f"),
            margin=dict(l=80, r=200, t=120, b=80),
            hovermode="closest",
        )
        config = dict(
            displayModeBar=True,
            displaylogo=False,
            modeBarButtonsToRemove=["pan2d", "lasso2d"],
            toImageButtonOptions=dict(
                format="png",
                filename="model_performance_dashboard",
                height=800,
                width=1400,
                scale=2,
            ),
        )
        fig.show(config=config)
        fig.write_html("performance_dashboard.html", config=config)

    def export_detailed_report(self, filename="benchmark_report.csv"):
        """Export a detailed report with raw data and summary to CSV files."""
        print(f"Exporting detailed report to {filename}...")
        self.df.to_csv(filename, index=False)

        if not self.plot_df.empty:
            summary = (
                self.plot_df.groupby("model")
                .agg(
                    semantic_match_embeddings_mean=(
                        "semantic_match_embeddings",
                        "mean",
                    ),
                    semantic_precision_ing_mean=("semantic_precision_ing", "mean"),
                    wmape_mac_mean=("wmape_mac", "mean"),
                    cost_usd_total=("cost_usd", "sum"),
                    response_time_seconds_mean=("response_time_seconds", "mean"),
                )
                .reset_index()
            )
            summary_filename = filename.replace(".csv", "_summary.csv")
            summary.to_csv(summary_filename, index=False)
        print("Export complete.")

    def create_win_loss_analysis(self, baseline_model_name: Optional[str] = None):
        if self.average_of_df.empty or self.best_of_df.empty:
            print("No successful results to analyze.")
            return

        base_models = sorted(self.average_of_df["base_model"].unique())

        if len(base_models) < 2:
            print("Need at least two models to perform a win-loss comparison.")
            return

        if baseline_model_name:
            if baseline_model_name not in base_models:
                print(
                    f"Error: Baseline model '{baseline_model_name}' not found in results."
                )
                print(f"Available models are: {base_models}")
                return
            baseline_model = baseline_model_name
        else:
            baseline_model = (
                "january/food-vision-v1"
                if "january/food-vision-v1" in base_models
                else base_models[0]
            )
            print(
                f"INFO: No baseline model specified. Using average performance of '{get_display_name(baseline_model)}' as the default."
            )

        baseline_data = self.average_of_df[
            self.average_of_df["base_model"] == baseline_model
        ].copy()

        competitor_dfs = {
            "Avg": self.average_of_df[
                self.average_of_df["base_model"] != baseline_model
            ],
            "Best": self.best_of_df[self.best_of_df["base_model"] != baseline_model],
        }

        macro_cols = [
            "calories_pct_error",
            "carbs_pct_error",
            "protein_pct_error",
            "fat_pct_error",
        ]
        win_loss_data = {}

        for perf_type, df in competitor_dfs.items():
            for competitor_base_model in df["base_model"].unique():
                competitor_model_name = (
                    f"{get_display_name(competitor_base_model)} ({perf_type})"
                )
                win_loss_data[competitor_model_name] = {
                    m: {"win": 0, "tie": 0, "loss": 0} for m in macro_cols
                }

                competitor_data = df[df["base_model"] == competitor_base_model]

                comparison_df = pd.merge(
                    baseline_data,
                    competitor_data,
                    on="image_id",
                    suffixes=("_base", "_comp"),
                )
                if comparison_df.empty:
                    continue

                for macro in macro_cols:
                    base_error = comparison_df[f"{macro}_base"]
                    comp_error = comparison_df[f"{macro}_comp"]
                    win_loss_data[competitor_model_name][macro]["win"] = (
                        base_error < comp_error
                    ).sum()
                    win_loss_data[competitor_model_name][macro]["tie"] = (
                        base_error == comp_error
                    ).sum()
                    win_loss_data[competitor_model_name][macro]["loss"] = (
                        base_error > comp_error
                    ).sum()

        if not win_loss_data:
            print("No common images found to compare models.")
            return

        competitor_models = sorted(win_loss_data.keys())
        colors = {"win": "#16A085", "tie": "#95A5A6", "loss": "#E74C3C"}
        macro_titles = ["Calories", "Carbohydrates", "Protein", "Fat"]
        fig = make_subplots(
            rows=len(competitor_models),
            cols=4,
            subplot_titles=macro_titles,
            shared_xaxes=True,
            vertical_spacing=0.08,
            horizontal_spacing=0.08,
        )

        for i, competitor in enumerate(competitor_models):
            row_idx = i + 1

            for j, macro in enumerate(macro_cols):
                col_idx = j + 1
                data = win_loss_data[competitor][macro]
                total = sum(data.values())
                if total == 0:
                    continue
                win_pct, tie_pct, loss_pct = (
                    data["win"] / total * 100,
                    data["tie"] / total * 100,
                    data["loss"] / total * 100,
                )
                fig.add_trace(
                    go.Bar(
                        name="Win",
                        x=[win_pct],
                        y=[competitor],
                        orientation="h",
                        marker=dict(color=colors["win"]),
                        showlegend=False,
                        text=f"{win_pct:.0f}%" if win_pct > 5 else "",
                        textposition="inside",
                        customdata=[data["win"]],
                        hovertemplate="<b>Win</b><br>Count: %{customdata}<br>Percentage: %{x:.1f}%<br><extra></extra>",
                    ),
                    row=row_idx,
                    col=col_idx,
                )
                fig.add_trace(
                    go.Bar(
                        name="Tie",
                        x=[tie_pct],
                        y=[competitor],
                        orientation="h",
                        marker=dict(color=colors["tie"]),
                        showlegend=False,
                        text=f"{tie_pct:.0f}%" if tie_pct > 5 else "",
                        textposition="inside",
                        customdata=[data["tie"]],
                        hovertemplate="<b>Tie</b><br>Count: %{customdata}<br>Percentage: %{x:.1f}%<br><extra></extra>",
                    ),
                    row=row_idx,
                    col=col_idx,
                )
                fig.add_trace(
                    go.Bar(
                        name="Loss",
                        x=[loss_pct],
                        y=[competitor],
                        orientation="h",
                        marker=dict(color=colors["loss"]),
                        showlegend=False,
                        text=f"{loss_pct:.0f}%" if loss_pct > 5 else "",
                        textposition="inside",
                        customdata=[data["loss"]],
                        hovertemplate="<b>Loss</b><br>Count: %{customdata}<br>Percentage: %{x:.1f}%<br><extra></extra>",
                    ),
                    row=row_idx,
                    col=col_idx,
                )
                fig.update_yaxes(
                    row=row_idx, col=col_idx, showticklabels=False, title_text=""
                )

            pretty_base = get_display_name(baseline_model)
            pretty_comp = competitor
            axis_num = i * 4 + 1
            y_anchor_ref = f"y{'' if axis_num == 1 else axis_num}"

            fig.add_annotation(
                x=-0.05,
                y=competitor,
                xref="paper",
                yref=y_anchor_ref,
                text=f"<b>{pretty_base}</b>",
                showarrow=False,
                xanchor="right",
                yanchor="middle",
                font=dict(size=12),
            )
            fig.add_annotation(
                x=1.05,
                y=competitor,
                xref="paper",
                yref=y_anchor_ref,
                text=f"<b>{pretty_comp}</b>",
                showarrow=False,
                xanchor="left",
                yanchor="middle",
                font=dict(size=12),
            )

        fig.update_layout(
            barmode="stack",
            title={
                "text": f"<b>{get_display_name(baseline_model)} vs. Others</b>",
                "x": 0.5,
                "xanchor": "center",
            },
            height=max(200, 60 * len(competitor_models) + 80),
            width=1000,
            showlegend=False,
            plot_bgcolor="rgba(250,251,252,0.8)",
            paper_bgcolor="white",
            margin=dict(l=200, r=200, t=100, b=20),
        )
        fig.update_xaxes(range=[0, 100], ticksuffix="%", showticklabels=False)
        fig.update_yaxes(showticklabels=False)

        fig.show()

    def plot_overall_score(self):
        """Create a bar chart comparing the overall score of each model."""
        if self.plot_df.empty or "overall_score" not in self.plot_df.columns:
            print("No overall score data to plot.")
            return

        summary_df = self.plot_df.groupby("model")["overall_score"].mean().reset_index()
        summary_df = summary_df.sort_values("overall_score", ascending=False)

        fig = px.bar(
            summary_df,
            x="model",
            y="overall_score",
            title="<b>Overall Score</b>",
            labels={"model": "Model", "overall_score": "Composite Score (0-100)"},
            text="overall_score",
            color="model",
            color_discrete_sequence=px.colors.qualitative.Plotly,
        )
        fig.update_traces(texttemplate="%{text:.2f}", textposition="outside")

        max_score = summary_df["overall_score"].max() if not summary_df.empty else 100

        fig.update_layout(
            uniformtext_minsize=8,
            uniformtext_mode="hide",
            xaxis_title=None,
            xaxis_tickangle=-45,
            showlegend=False,
            title_x=0.5,
            title_font=dict(size=20, family="Arial Black"),
            font=dict(family="Arial, sans-serif", size=12),
            yaxis_range=[0, max_score * 1.15],
        )
        fig.show()

## Run Benchmark


In [None]:
BENCHMARK_CONFIG = {
    "models": [
        "january/food-vision-v1",
        "gpt-4o-mini",
        "gpt-4o",
        "gemini/gemini-2.5-flash-preview-05-20",
        "gemini/gemini-2.5-pro-preview-06-05",
    ],
    "max_items": 20,
    "cache_dir": Path(".cache/food_scan_bench"),
    "max_concurrent_requests": 50,
    "use_embeddings_for_matching": True,
    "report_filename": "benchmark_results.csv",
}

results_df = await run_evaluation(
    models=BENCHMARK_CONFIG["models"],
    cache_dir=BENCHMARK_CONFIG["cache_dir"],
    max_items=BENCHMARK_CONFIG["max_items"],
    max_concurrent=BENCHMARK_CONFIG["max_concurrent_requests"],
    use_embeddings=BENCHMARK_CONFIG["use_embeddings_for_matching"],
)

In [None]:
analyzer = BenchmarkAnalyzer(results_df)

analyzer.summary_statistics()
analyzer.create_performance_dashboard()
analyzer.create_win_loss_analysis(baseline_model_name="january/food-vision-v1")
analyzer.plot_overall_score()

# analyzer.export_detailed_report(BENCHMARK_CONFIG["report_filename"])

# display(results_df.head())