In [None]:
OPENROUTER_API_KEY = ''

In [16]:
import os
import json
import time
import csv
import html
import requests
from tqdm import tqdm
from urllib.parse import quote
from difflib import SequenceMatcher

import pandas as pd
import plotly.graph_objects as go

# ====================================================================
# Конфиг
# ====================================================================

OPENROUTER_API_KEY = OPENROUTER_API_KEY

RAW_BASE_URL = "https://raw.githubusercontent.com/AleksSoarer/img_to_llm/main/images/"
OUTPUT_DIR = "bench_full"

RETRIES = 7
RETRY_DELAYS = [1, 2, 3, 5, 8, 13, 21]
MAX_RERUN_ROUNDS = 5

MODELS = {
    "nemotron": "nvidia/nemotron-nano-12b-v2-vl:free",
    "gemini": "google/gemini-2.0-flash-exp:free",
    "grok": "x-ai/grok-4.1-fast:free",
}

MODEL_TITLES = {
    "nemotron": "Nemotron (Nvidia)",
    "gemini": "Gemini 2.0 Flash",
    "grok": "Grok 4.1 Fast",
}

PROMPT = """
Проанализируй изображение и верни строго JSON без пояснений:
{
  "terrain_type": "город|лес|поле|водоём",
  "objects": ["здания", "дороги", "транспорт"],
  "description": "1–2 предложения на русском"
}
"""


# ====================================================================
# Утилиты
# ====================================================================

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)


def list_github_images(raw_base: str):
    """Получаем список файлов из GitHub."""
    parts = raw_base.replace("https://raw.githubusercontent.com/", "").split("/")
    user, repo, branch = parts[:3]
    path = "/".join(parts[3:])

    api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{path}"
    r = requests.get(api_url)
    r.raise_for_status()

    return [
        f["name"]
        for f in r.json()
        if f["name"].lower().endswith((".jpg", ".jpeg", ".png"))
    ]


def clean_json(text: str | None) -> str | None:
    if not text:
        return None

    text = text.strip()

    # убираем ```json ... ```
    if text.startswith("```"):
        # срезаем начальные бэктики
        while text.startswith("`"):
            text = text[1:]
        # убираем "json" если есть
        if text.lower().startswith("json"):
            text = text[4:].strip()
        # срезаем хвостовые ```
        while text.endswith("`"):
            text = text[:-1]
        text = text.strip()

    return text if text else None


def try_parse_json(text: str | None):
    text = clean_json(text)
    if not text:
        return None
    try:
        return json.loads(text)
    except Exception:
        return None


def similarity(a: str | None, b: str | None) -> float:
    if not a or not b:
        return 0.0
    return SequenceMatcher(None, a, b).ratio()

def model_human_name(tag: str) -> str:
    return MODEL_TITLES.get(tag, tag)
# ====================================================================
# OPENROUTER Повторый вызов
# ====================================================================

def call_model_retry(model_id: str, image_url: str):
    payload = {
        "model": model_id,
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": PROMPT},
                    {"type": "image_url", "image_url": {"url": image_url}},
                ]
            }
        ]
    }

    headers = {
        "Authorization": f"Bearer {OPENROUTER_API_KEY}",
        "Content-Type": "application/json",
    }

    for attempt in range(RETRIES):
        r = requests.post(
            "https://openrouter.ai/api/v1/chat/completions",
            headers=headers,
            json=payload,
            timeout=50,
        )

        if r.status_code == 200:
            data = r.json()
            content = data["choices"][0]["message"]["content"]

            if isinstance(content, list):
                text = "\n".join(
                    c.get("text", "")
                    for c in content
                    if isinstance(c, dict)
                )
                if not text:
                    text = str(content)
            else:
                text = content

            return text, None

        if r.status_code in (429, 500, 502, 503, 504):
            delay = RETRY_DELAYS[min(attempt, len(RETRY_DELAYS) - 1)]
            time.sleep(delay)
            continue

        return None, f"HTTP {r.status_code}: {r.text}"

    return None, f"FAILED after {RETRIES} retries"


# ====================================================================
# Основной пайплайн
# ====================================================================

def first_pass_collect(files: list[str]) -> list[dict]:
    """
    Первый проход: делаем запросы по всем моделям.
    Если данных нет - собирается полностью, если есть - добираются пропущенные 
    """
    results = []

    for fname in tqdm(files, desc="First pass"):
        encoded = quote(fname)
        img_url = RAW_BASE_URL + encoded

        row = {"image": fname, "image_url": img_url}

        for tag, model_id in MODELS.items():
            raw, err = call_model_retry(model_id, img_url)
            row[f"{tag}_raw"] = raw
            row[f"{tag}_parsed"] = try_parse_json(raw)
            row[f"{tag}_error"] = err

        results.append(row)

    return results


def reparse_existing(results: list[dict]) -> None:
    """
    Проверка, raw если не обработан делаем парсинг
    """
    for r in results:
        for tag in MODELS.keys():
            raw_key = f"{tag}_raw"
            parsed_key = f"{tag}_parsed"

            if r.get(parsed_key) is None and r.get(raw_key):
                r[parsed_key] = try_parse_json(r[raw_key])


def rerun_missing(results: list[dict], rounds: int = MAX_RERUN_ROUNDS) -> None:
    """Добиваем ВСЕ кейсы, где parsed == None.
    1) если raw есть → пробуем заново распарсить
    2) если после парсинга parsed все ещё None → вызываем модель
    """
    for round_idx in range(1, rounds + 1):
        print(f"\n=== Повторный проход. Попытка {round_idx}/{rounds} ===")
        any_work = False

        for r in tqdm(results, desc=f"Round {round_idx}"):
            img_url = r["image_url"]

            for tag, model_id in MODELS.items():
                raw_key = f"{tag}_raw"
                parsed_key = f"{tag}_parsed"
                err_key = f"{tag}_error"

                # уже есть parsed → пропускаем
                if r.get(parsed_key) is not None:
                    continue

                # 1) если raw есть → пробуем заново распарсить
                raw = r.get(raw_key)
                if raw:
                    parsed = try_parse_json(raw)
                    if parsed is not None:
                        r[parsed_key] = parsed
                        continue  # успех, модель не вызываем

                # 2) иначе вызываем модель
                raw, err = call_model_retry(model_id, img_url)
                r[raw_key] = raw
                r[parsed_key] = try_parse_json(raw)
                r[err_key] = err

                if raw:
                    any_work = True

        if not any_work:
            print("Нечего больше добивать, выходим.")
            break

# ====================================================================
# Аналитика
# ====================================================================

def compute_model_stats(results: list[dict]) -> dict:
    """
    Надёжность и стабильность моделей:
    - total
    - success_count
    - success_rate
    - fail_count
    """
    stats = {}

    total = len(results)

    for tag in MODELS.keys():
        ok = sum(1 for r in results if r.get(f"{tag}_parsed") is not None)
        fail = total - ok
        stats[tag] = {
            "total": total,
            "success": ok,
            "fail": fail,
            "success_rate": ok / total if total else 0.0,
            "fail_rate": fail / total if total else 0.0,
        }

    return stats


def compute_pairwise_stats(results: list[dict]) -> dict:
    """
    Сравнение моделей между собой:
    - совпадение terrain_type
    - средний overlap по objects
    - similarity описаний
    """
    pairs = [
        ("nemotron", "grok"),
        ("nemotron", "gemini"),
        ("grok", "gemini"),
    ]

    out = {}

    for a, b in pairs:
        key = f"{a}_vs_{b}"
        terrain_match = 0
        obj_overlap_total = 0.0
        desc_sim_total = 0.0
        count = 0

        for r in results:
            pa = r.get(f"{a}_parsed")
            pb = r.get(f"{b}_parsed")
            if not pa or not pb:
                continue

            if pa.get("terrain_type") == pb.get("terrain_type"):
                terrain_match += 1

            set_a = set(pa.get("objects", []))
            set_b = set(pb.get("objects", []))
            union = len(set_a | set_b)
            inter = len(set_a & set_b)
            obj_overlap = inter / union if union else 0.0
            obj_overlap_total += obj_overlap

            desc_sim_total += similarity(pa.get("description"), pb.get("description"))
            count += 1

        if count > 0:
            out[key] = {
                "pairs": count,
                "terrain_match": terrain_match,
                "terrain_match_rate": terrain_match / count if count else 0.0,
                "objects_overlap_avg": obj_overlap_total / count if count else 0.0,
                "description_similarity_avg": desc_sim_total / count if count else 0.0,
            }
        else:
            out[key] = {
                "pairs": 0,
                "terrain_match": 0,
                "terrain_match_rate": 0.0,
                "objects_overlap_avg": 0.0,
                "description_similarity_avg": 0.0,
            }

    return out


def export_excel(results: list[dict], model_stats: dict, pair_stats: dict) -> None:
    """
    Выгрузка в Excel + подсветка:
    - зелёный фон если parsed != None
    - красный фон если parsed == None
    """
    ensure_dir(OUTPUT_DIR)
    xlsx_path = os.path.join(OUTPUT_DIR, "benchmark.xlsx")

    # превращаем results в табличку
    rows = []
    for r in results:
        row = {
            "image": r["image"],
            "image_url": r["image_url"],
        }
        for tag in MODELS.keys():
            parsed = r.get(f"{tag}_parsed")
            row[f"{tag}_parsed_str"] = json.dumps(parsed, ensure_ascii=False) if parsed else ""
            row[f"{tag}_ok"] = 1 if parsed else 0
            row[f"{tag}_error"] = r.get(f"{tag}_error")
        rows.append(row)

    df = pd.DataFrame(rows)

    with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
        df.to_excel(writer, sheet_name="results", index=False)

        # отдельный лист со статистикой
        ms_rows = []
        for tag, st in model_stats.items():
            ms_rows.append({
                "model": tag,
                "success_rate": st["success_rate"],
                "fail_rate": st["fail_rate"],
                "success": st["success"],
                "fail": st["fail"],
                "total": st["total"],
            })
        ms_df = pd.DataFrame(ms_rows)
        ms_df.to_excel(writer, sheet_name="model_stats", index=False)

        ps_rows = []
        for key, st in pair_stats.items():
            ps_rows.append({"pair": key, **st})
        ps_df = pd.DataFrame(ps_rows)
        ps_df.to_excel(writer, sheet_name="pair_stats", index=False)

        # подсветка success/fail
        wb = writer.book
        ws = wb["results"]

        from openpyxl.styles import PatternFill

        green_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
        red_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")

        # пробежимся по строкам и подсветим ячейки parsed_str
        header = [cell.value for cell in next(ws.iter_rows(min_row=1, max_row=1))]
        col_idx = {name: i + 1 for i, name in enumerate(header)}

        for row_cells in ws.iter_rows(min_row=2):
            for tag in MODELS.keys():
                col_parsed = col_idx.get(f"{tag}_parsed_str")
                col_ok = col_idx.get(f"{tag}_ok")
                if col_parsed is None or col_ok is None:
                    continue
                cell_parsed = row_cells[col_parsed - 1]
                cell_ok = row_cells[col_ok - 1]
                if cell_ok.value == 1:
                    cell_parsed.fill = green_fill
                else:
                    cell_parsed.fill = red_fill


def export_html_report(results: list[dict], model_stats: dict, pair_stats: dict) -> None:
    """
    HTML-отчёт:
    - bar по надёжности моделей
    - bar по согласованности
    - по каждому фото: картинка + 3 строки от моделей
    - в конце: "мой вывод" на основе статистики
    """
    ensure_dir(OUTPUT_DIR)
    html_path = os.path.join(OUTPUT_DIR, "report.html")

    # ---------- Plotly-графики ----------

    # success rates
    models = list(model_stats.keys())
    success_rates = [model_stats[m]["success_rate"] for m in models]
    fail_rates = [model_stats[m]["fail_rate"] for m in models]

    fig1 = go.Figure()
    fig1.add_bar(name="Успех", x=models, y=success_rates)
    fig1.add_bar(name="Провал", x=models, y=fail_rates)
    fig1.update_layout(
        title="Надёжность моделей",
        barmode="group",
        yaxis=dict(range=[0, 1])
    )

    # pairwise terrain_match
    pairs = list(pair_stats.keys())
    terrain_rates = [pair_stats[p]["terrain_match_rate"] for p in pairs]
    obj_overlap = [pair_stats[p]["objects_overlap_avg"] for p in pairs]
    desc_sim = [pair_stats[p]["description_similarity_avg"] for p in pairs]

    fig2 = go.Figure()
    fig2.add_bar(name="terrain_match_rate", x=pairs, y=terrain_rates)
    fig2.add_bar(name="objects_overlap_avg", x=pairs, y=obj_overlap)
    fig2.add_bar(name="description_similarity_avg", x=pairs, y=desc_sim)
    fig2.update_layout(
        title="Согласованность моделей (pairwise)",
        barmode="group",
        yaxis=dict(range=[0, 1])
    )

    # ---------- Автоматический вывод по моделям ----------

    # выбираем самую надёжную модель по success_rate
    best_tag, best_info = max(
        model_stats.items(),
        key=lambda kv: kv[1]["success_rate"] if kv[1]["total"] else 0.0
    )
    best_name = model_human_name(best_tag)

    # сортируем все модели по надёжности
    sorted_models = sorted(
        model_stats.items(),
        key=lambda kv: kv[1]["success_rate"],
        reverse=True,
    )

    # ---------- Генерация HTML ----------

    with open(html_path, "w", encoding="utf-8") as f:
        f.write(
            "<html><head><meta charset='utf-8'>"
            "<title>Vision Benchmark</title>"
            "<style>"
            "body { font-family: system-ui, -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; "
            "       margin: 20px; max-width: 1000px; }"
            "h1, h2, h3 { font-weight: 600; }"
            ".image-block { margin-bottom: 40px; padding-bottom: 20px; border-bottom: 1px solid #ddd; }"
            ".image-block img { max-width: 100%; height: auto; border-radius: 8px; "
            "                    display:block; margin-bottom: 10px; }"
            ".model-line { margin: 2px 0; }"
            ".model-name { font-weight:bold; }"
            ".terrain { font-style: italic; color:#555; }"
            ".objects { color:#444; }"
            ".desc { color:#222; }"
            ".error { color:#b00; font-style: italic; }"
            "</style>"
            "</head><body>\n"
        )

        f.write("<h1>Vision Benchmark Report</h1>\n")

        # блок с графиками
        f.write("<h2>Модельная надёжность</h2>\n")
        f.write(fig1.to_html(full_html=False, include_plotlyjs="cdn"))

        f.write("<h2>Согласованность моделей</h2>\n")
        f.write(fig2.to_html(full_html=False, include_plotlyjs=False))

        # ---------- Перебор по картинкам ----------

        f.write("<h2>Результаты по каждому фото</h2>\n")

        for r in results:
            img_name = r["image"]
            img_url = r["image_url"]

            f.write("<div class='image-block'>\n")
            f.write(f"<h3>{html.escape(img_name)}</h3>\n")
            f.write(f"<img src='{html.escape(img_url)}' alt='{html.escape(img_name)}'>\n")

            for tag in MODELS.keys():
                label = model_human_name(tag)
                parsed = r.get(f"{tag}_parsed")
                err = r.get(f"{tag}_error")

                if parsed:
                    terrain = parsed.get("terrain_type", "?")
                    objects = parsed.get("objects") or []
                    desc = parsed.get("description", "").strip()

                    obj_str = ", ".join(map(str, objects)) if objects else "нет объектов из списка"

                    f.write(
                        "<p class='model-line'>"
                        f"<span class='model-name'>{html.escape(label)}:</span> "
                        f"<span class='terrain'>[{html.escape(str(terrain))}]</span>, "
                        f"<span class='objects'>{html.escape(obj_str)}</span><br>"
                        f"<span class='desc'>{html.escape(desc)}</span>"
                        "</p>\n"
                    )
                else:
                    msg = err or "нет валидного ответа"
                    f.write(
                        "<p class='model-line'>"
                        f"<span class='model-name'>{html.escape(label)}:</span> "
                        f"<span class='error'>нет валидного JSON / {html.escape(str(msg))}</span>"
                        "</p>\n"
                    )

            f.write("</div>\n")

        # ---------- Вывод ----------

        f.write("<h2>Мой вывод</h2>\n")

        # краткая табличка по моделям
        f.write("<ul>\n")
        for tag, st in sorted_models:
            name = model_human_name(tag)
            sr = st["success_rate"]
            f.write(
                "<li>"
                f"<b>{html.escape(name)}:</b> "
                f"{st['success']}/{st['total']} успешных ответов "
                f"({sr:.0%} успеха)"
                "</li>\n"
            )
        f.write("</ul>\n")

        #вывод
        f.write("<p>")
        f.write(
            f"Самым стабильным в этом прогоне оказался "
            f"<b>{html.escape(best_name)}</b>: он даёт наибольшую долю "
            f"успешно распарсенных JSON по всем снимкам."
        )
        f.write("</p>\n")

        # комментарий про согласованность
        if pair_stats:
            best_pair = max(
                pair_stats.items(),
                key=lambda kv: kv[1]["terrain_match_rate"]
            )[0]
            terrain_rate = pair_stats[best_pair]["terrain_match_rate"]
            f.write(
                f"<p>С точки зрения согласованности по типу местности "
                f"лучше всего совпадают модели в паре <b>{html.escape(best_pair)}</b> "
                f"({terrain_rate:.0%} совпадений по terrain_type среди общих кейсов).</p>\n"
            )

        f.write(
            "<p>По детальности и качеству описания впереди оказывается Grok, как самая умная модель, за ним идёт Gemeni,"
            "но её проблема в доступности. Иногда удаётся пробиться, но вероятность успеха не большая. "
            "Замыкает тройку Nemotrin от Nvidia. Модель рабочая, доступная, может отвечать на русском, но из-за своего малого размера ответы не очень качественные.</p>\n"
        
        )

        f.write("</body></html>\n")

# ====================================================================
# MAIN
# ====================================================================

def run():
    ensure_dir(OUTPUT_DIR)

    results_path = os.path.join(OUTPUT_DIR, "results_final.json")


    # пробуем загрузить старые результаты
    results_path = os.path.join(OUTPUT_DIR, "results_final.json")
    if os.path.exists(results_path):
        print("Загружаю существующие результаты...")
        with open(results_path, "r", encoding="utf-8") as f:
            results = json.load(f)
    else:
        print("Запускаю первый проход...")
        files = list_github_images(RAW_BASE_URL)
        results = first_pass_collect(files)
        
    

    # Добивка по всем моделям
    rerun_missing(results, rounds=MAX_RERUN_ROUNDS)

    # Репарсим, если что-то руками правил или модель внезапно стала отдавать чистый JSON
    reparse_existing(results)

    # Аналитика
    model_stats = compute_model_stats(results)
    pair_stats = compute_pairwise_stats(results)

    # Сохраняем JSON
    with open(os.path.join(OUTPUT_DIR, "results_final.json"), "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    with open(os.path.join(OUTPUT_DIR, "analytics.json"), "w", encoding="utf-8") as f:
        json.dump({
            "model_stats": model_stats,
            "pair_stats": pair_stats,
        }, f, ensure_ascii=False, indent=2)

    # Excel + HTML
    export_excel(results, model_stats, pair_stats)
    export_html_report(results, model_stats, pair_stats)

    print(f"\nГотово. Смотри папку {OUTPUT_DIR}/")
    print(" - results_final.json")
    print(" - analytics.json")
    print(" - benchmark.xlsx")
    print(" - report.html")


if __name__ == "__main__":
    run()

Загружаю существующие результаты...

=== RERUN ROUND 1/5 ===


Round 1: 100%|██████████| 18/18 [17:22<00:00, 57.90s/it]

Нечего больше добивать, выходим.

Готово. Смотри папку bench_full/
 - results_final.json
 - analytics.json
 - benchmark.xlsx
 - report.html



