In [10]:
import warnings
warnings.filterwarnings("ignore")

import json, re, unicodedata
from typing import Any, Dict, Optional, Tuple
from pathlib import Path

import pandas as pd
import requests
from tqdm import tqdm

API_URL = "https://smarthome.uni-regensburg.de/naehrwertrechner/api/1.0/recipe_info_optifast"


def call_api(prompt: str) -> Optional[Dict[str, Any]]:
    payload = {"recipe": prompt}
    headers = {"content-type": "application/json"}
    try:
        response = requests.post(API_URL, json=payload, headers=headers, verify=False, timeout=15)
        return response.json() if response.ok else None
    except requests.RequestException:
        return None


def norm(s: str) -> str:
    """Keep German letters (äöüß), normalize/clean for stable substring match."""
    if not isinstance(s, str):
        return ""
    s = unicodedata.normalize("NFC", s).lower()
    s = "".join(ch if (ch.isalnum() or ch.isspace()) else " " for ch in s)
    return re.sub(r"\s+", " ", s).strip()

def parse_detail(res: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
    """
    Expect:
      {"detailed_info":[[
          {"bezeichnung":"...", "einheit":"g|stück|liter", "menge":"..."},
          {"ZF":..., "ZE":..., "ZK":..., "GCAL":..., "ZA":...}
      ]]}
    Returns (recognized_name, nutrients_dict) or ("", {}).
    """
    di = res.get("detailed_info")
    if not isinstance(di, list) or not di:
        return "", {}
    row = di[0]
    if not isinstance(row, list) or len(row) < 2:
        return "", {}
    meta, nutr = row[0], row[1]
    name = meta.get("bezeichnung") if isinstance(meta, dict) else ""
    if not isinstance(nutr, dict):
        nutr = {}
    return str(name or ""), nutr

def is_unrecognized(res: Optional[Dict[str, Any]]) -> bool:
    if not isinstance(res, dict):
        return True
    name, _ = parse_detail(res)
    return (not name) or norm(name).startswith("nicht erkannt")

def _strip_diacritics(s: str) -> str:
    s = unicodedata.normalize("NFD", s)
    return "".join(ch for ch in s if unicodedata.category(ch) != "Mn")

def _de_ascii_fallback(s: str) -> str:
    return (
        s.replace("ä", "ae")
         .replace("ö", "oe")
         .replace("ü", "ue")
         .replace("ß", "ss")
    )
GER_STOP = {
    "aus","der","die","das","den","dem","des","von","mit","und","oder","ohne",
    "ein","eine","einer","einem","einen",
    "g","gramm","kg","ml","l","liter","stück","stueck","stk",
    "grün","gruen","rot","gelb","weiß","weiss","hell","dunkel","klein","groß","gross","mittel",
}

def _soft_regex_match(ingredient: str, recognized_name: str) -> bool:
    """
    True if all tokens from recognized_name (minus stopwords) occur somewhere
    in ingredient, in any order. Allows simple plural/suffix variants.
    """
    a = norm(ingredient)
    b = norm(recognized_name)
    tokens = [t for t in b.split() if t and t not in GER_STOP]
    if not tokens:
        return False
    # positive lookaheads for each token; allow common German endings
    lookaheads = [
        rf"(?=.*\b{re.escape(t)}(?:e|en|er|n|s)?\b)"
        for t in tokens
    ]
    pattern = rf"^{''.join(lookaheads)}.*$"
    return re.search(pattern, a) is not None

def ingredient_matches(ingredient: str, recognized_name: str) -> bool:
    a, b = norm(ingredient), norm(recognized_name)
    if not a or not b:
        return False

    # 1) current quick passes
    if a in b or b in a: return True
    if a.replace(" ", "") in b.replace(" ", "") or b.replace(" ", "") in a.replace(" ", ""): return True

    # 2) diacritic / ASCII fallbacks you already have...
    a1, b1 = _strip_diacritics(a), _strip_diacritics(b)
    if a1 in b1 or b1 in a1: return True
    if a1.replace(" ", "") in b1.replace(" ", "") or b1.replace(" ", "") in a1.replace(" ", ""): return True
    a2, b2 = _de_ascii_fallback(a), _de_ascii_fallback(b)
    if a2 in b2 or b2 in a2: return True
    if a2.replace(" ", "") in b2.replace(" ", "") or b2.replace(" ", "") in a2.replace(" ", ""): return True

    # 3) regex fallback (order-free, non-contiguous)
    if _soft_regex_match(ingredient, recognized_name):
        return True

    return False


WATER_TOKENS = {"wasser", "mineralwasser", "leitungswasser", "sprudelwasser", "tafelwasser"}

def _extract_gcal(res: Dict[str, Any]) -> Optional[float]:
    _, nutr = parse_detail(res)
    try:
        return float(nutr.get("GCAL"))
    except (TypeError, ValueError, AttributeError):
        return None

def _is_water_like(name: str) -> bool:
    n = norm(name)
    return any(tok in n for tok in WATER_TOKENS)

def has_nutrition(res: Dict[str, Any]) -> bool:
    """Valid if GCAL exists and > 0; for water-like items, accept GCAL==0/missing as valid."""
    name, _ = parse_detail(res)
    if _is_water_like(name):
        return True
    gcal = _extract_gcal(res)
    return (gcal is not None) and (gcal > 0.0)


def try_prompt(amount, unit, ingredient, debug: bool = False):
    prompt = f"{amount} {unit} {ingredient}".strip()
    if debug:
        print(f"[API CALL] {prompt}")
    res = call_api(prompt)

    if res is None:
        if debug: print("  -> status: network_error")
        return None, prompt, "network_error"

    if is_unrecognized(res):
        if debug: print("  -> status: unrecognized")
        return res, prompt, "unrecognized"

    name, _ = parse_detail(res)
    if not ingredient_matches(str(ingredient), name):
        if debug: print(f"  -> status: mismatch (recognized as {name!r})")
        return res, prompt, "mismatch"

    if not has_nutrition(res):
        if debug: print(f"  -> status: no_nutrition (recognized as {name!r})")
        return res, prompt, "no_nutrition"

    if debug: print(f"  -> status: ok (recognized as {name!r})")
    return res, prompt, "ok"


# core logic that also returns status + used prompt
def calc_nut_with_status(row, debug: bool = False):
    """
    Returns: (nutrition_json_or_None, final_status, prompt_used, recognized_name_or_None)
    """
    amount = row.get("norm_value")
    unit   = (row.get("norm_unit") or "").strip().lower()  # "g" | "liter" | "stück"

    # prefer 'zutat' from amount_annotation; fallback to 'ingredient'
    zutat = None
    aa = row.get("amount_annotation")
    if isinstance(aa, str) and aa.strip():
        try:
            zutat = json.loads(aa).get("zutat")
        except Exception:
            pass
    ing_from_ann = zutat or row.get("ingredient")
    ing_plain    = row.get("ingredient")

    # 1) first attempt (annotation name or ingredient)
    res1, p1, s1 = try_prompt(amount, unit, ing_from_ann, debug=debug)
    if s1 == "ok":
        name1, _ = parse_detail(res1)
        return json.dumps(res1, ensure_ascii=False), s1, p1, name1

    last_status, last_prompt, last_res = s1, p1, res1

    # 2) only if first was "no_nutrition": try plain ingredient
    if s1 == "no_nutrition" and ing_plain and norm(ing_plain) != norm(ing_from_ann):
        res2, p2, s2 = try_prompt(amount, unit, ing_plain, debug=debug)
        if s2 == "ok":
            name2, _ = parse_detail(res2)
            return json.dumps(res2, ensure_ascii=False), s2, p2, name2
        last_status, last_prompt, last_res = s2, p2, res2

    # 3) Stück/Liter → g fallback (only for content failures, not network)
    if unit in {"stück", "liter"} and last_status in {"no_nutrition", "mismatch", "unrecognized"}:
        res3, p3, s3 = try_prompt(amount, "g", ing_plain or ing_from_ann, debug=debug)
        if s3 == "ok":
            name3, _ = parse_detail(res3)
            return json.dumps(res3, ensure_ascii=False), s3, p3, name3
        # update trackers so failure report is accurate
        last_status, last_prompt, last_res = s3, p3, res3

    # failed: try to extract a recognized name if any
    name = None
    if isinstance(last_res, dict):
        n, _ = parse_detail(last_res)
        name = n or None
    return None, last_status, last_prompt, name


# i love tqdm ;)
try:
    HERE = Path(__file__).resolve()
except NameError:
    HERE = Path.cwd()

data = pd.read_csv(HERE.parent / "gemma_annotation_normalized.csv", dtype=str)

tqdm.pandas(desc="Fetching nutrition")
meta = data.progress_apply(calc_nut_with_status, axis=1, result_type="expand")
meta.columns = ["nutrition", "status", "prompt_used", "recognized_name"]

out = pd.concat([data.reset_index(drop=True), meta], axis=1)

found  = out[out["status"] == "ok"].copy()
failed = out[out["status"] != "ok"].copy()

found.to_csv("nutrition_found.csv", index=False, encoding="utf-8")
failed.to_csv("nutrition_failed.csv", index=False, encoding="utf-8")

print(f"Done. Found: {len(found)} → nutrition_found.csv | Failed: {len(failed)} → nutrition_failed.csv")


Fetching nutrition: 100%|██████████| 910/910 [04:13<00:00,  3.59it/s]

Done. Found: 807 → nutrition_found.csv | Failed: 103 → nutrition_failed.csv



