In [1]:
# ==========================================================
# 🍳 Full Self-contained RAG + Gemini + Recall Test Script
# ==========================================================

import os, re, json
from typing import List, Dict, Any, Tuple
import chromadb
from chromadb.utils import embedding_functions
import google.generativeai as genai
from difflib import SequenceMatcher
import pandas as pd
from tqdm import tqdm
import re
import ast

# ==========================================================
# 🔧 Configuration
# ==========================================================
DB_PATH = "vector_new_db/recipes"   # ✅ 你的数据库路径
MODEL_NAME = "gemini-2.0-flash-lite"
GENAI_API_KEY = os.getenv('GENAI_API_KEY') # ⚠️ 这里填你的 Google API Key

TOP_K_DEFAULT = 3
OUTPUT_DIR = "/content/evaluation_results"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# ==========================================================
# 🧠 Chroma + Embedder
# ==========================================================
chroma_client = chromadb.PersistentClient(path=DB_PATH)
embedder = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="BAAI/bge-m3"
)
collection = chroma_client.get_collection("recipes")

# ==========================================================
# 🤖 Gemini init
# ==========================================================
genai.configure(api_key=GENAI_API_KEY)
model = genai.GenerativeModel(MODEL_NAME)

# ==========================================================
# 🧂 Ingredient Normalization + Matching
# ==========================================================
TOKEN_CLEAN_RE = re.compile(r"[^a-zA-Z0-9\s\-]")
STOP_TOKENS = {"fresh","dried","chopped","sliced","minced","ground","crushed",
    "thinly","thick","thickly","small","medium","large","whole","skinless",
    "boneless","lean","extra","optional","to","taste","of","and","or"}

CANONICAL_MAP = {
    "chicken thigh":"chicken","chicken thighs":"chicken","chicken breast":"chicken",
    "beef rib":"beef ribs","beef ribs":"beef ribs","pork belly":"pork belly",
    "pork ribs":"pork ribs","pork":"pork",
    "tofu":"tofu","tofu skin":"yuba","bean curd skin":"yuba","yuba":"yuba",
    "garlic":"garlic","ginger":"ginger","chili oil":"chili oil",
    "soy sauce":"soy sauce","doubanjiang":"doubanjiang"
}
CONFLICTS = {("tofu","yuba"),("yuba","tofu")}

def _normalize(s:str)->str:
    s = s.lower().strip()
    s = TOKEN_CLEAN_RE.sub(" ", s)
    return re.sub(r"\s+"," ",s).strip()

def _canonicalize(s:str)->str:
    s_norm=_normalize(s)
    for k in sorted(CANONICAL_MAP,key=len,reverse=True):
        if re.search(rf"(?<![a-z0-9]){re.escape(k)}(?![a-z0-9])",s_norm):
            return CANONICAL_MAP[k]
    return s_norm

def _is_conflict(a,b): return (a,b) in CONFLICTS
def _similar(a,b): return SequenceMatcher(None,a,b).ratio()

def match_ing(a,b,th=0.88):
    a,b=_canonicalize(a),_canonicalize(b)
    if _is_conflict(a,b): return False
    if a==b: return True
    if re.search(rf"(?<![a-z0-9]){re.escape(a)}(?![a-z0-9])",b) or re.search(rf"(?<![a-z0-9]){re.escape(b)}(?![a-z0-9])",a):
        return True
    return _similar(a,b)>=th

# ==========================================================
# 🍲 RecipeModelEvaluator
# ==========================================================
class RecipeModelEvaluator:
    def __init__(self,collection,embedder,model,top_k:int=3):
        self.collection=collection
        self.embedder=embedder
        self.model=model
        self.top_k=top_k

    def _retrieve_recipes(self,query:str)->List[Dict[str,Any]]:
        emb=self.embedder([query])[0]
        res=self.collection.query(query_embeddings=[emb],n_results=self.top_k)
        docs=res.get("documents",[[]])[0]
        metas=res.get("metadatas",[[]])[0]
        out=[]
        for i in range(min(len(docs),self.top_k)):
            m=metas[i]
            out.append({
                "name":m.get("Recipes_name",""),
                "flavor":m.get("Flavor",""),
                "difficulty":m.get("Difficulty",""),
                "time":m.get("Estimated Cooking Time",""),
                "doc":docs[i]
            })
        return out

    def _build_prompt(self,query:str,cands:List[Dict[str,Any]])->str:
        blocks=[]
        for i,c in enumerate(cands,1):
            blocks.append(f"[Candidate {i}]\nName:{c['name']}\nFlavor:{c['flavor']}\nContent:\n{c['doc']}")
        schema={
            "recipe_name":"string",
            "chosen_from":"string",
            "ingredients":["string","..."],
            "steps":["string","..."],
        }
        return f"""
User request:
{query}

Output exactly ONE JSON object following this schema:
{json.dumps(schema,indent=2)}
"""

    def generate_recipe(self,prompt:str)->str:
        cands=self._retrieve_recipes(prompt)
        print(f"📦 Retrieved {len(cands)} candidates from Chroma")
        llm_prompt=self._build_prompt(prompt,cands)
        try:
            resp=self.model.generate_content(llm_prompt)
            return resp.text
        except Exception as e:
            return json.dumps({"recipe_name":"ERROR","chosen_from":"ERROR","ingredients":[],"steps":[],"reasoning":str(e)})

    def parse_json(self,text:str)->Dict[str,Any]:
        try: return json.loads(text)
        except: pass
        m=re.search(r"(\{.*\})",text,flags=re.DOTALL)
        if m:
            try: return json.loads(m.group(1))
            except: pass
        return {"recipe_name":"NO_MATCH","chosen_from":"NO_MATCH","ingredients":[],"steps":[],"reasoning":""}

    def calc_recall(self,user_ings:List[str],gen_ings:List[str])->Dict[str,Any]:
        u=[_normalize(x) for x in user_ings]
        g=[_normalize(x) for x in gen_ings]
        match=[]
        mg=set()
        for ui in u:
            for j,gi in enumerate(g):
                if j in mg: continue
                if match_ing(ui,gi):
                    match.append((ui,gi)); mg.add(j); break
        r=len(match)/len(u) if u else 0
        p=len(match)/len(g) if g else 0
        f=2*r*p/(r+p) if (r+p)>0 else 0
        return {"matched":len(match),"recall":r,"precision":p,"f1":f,"pairs":match}

# ==========================================================
# 🧪 Test: Single Prompt Recall
# ==========================================================
model_eval = RecipeModelEvaluator(collection,embedder,model)


# ===== 基础设置 =====
df = pd.read_csv("recipes_prompt_label.csv")
df = df.iloc[0:10]  # 仅测试前 10 条样本
BATCH_SIZE = 50
OUTPUT_DIR = "outputs/baseline"
os.makedirs(OUTPUT_DIR, exist_ok=True)

all_results = []  # 用于存放每条样本的指标数据

# ===== 主循环 =====
for i in tqdm(range(0, len(df), BATCH_SIZE), desc="Evaluating recipes"):
    batch = df.iloc[i:i + BATCH_SIZE]
    docs = batch["prompt"].tolist()
    ids = batch["ingredients/label"].tolist()

    for j, (prompt, gold_ings_raw) in enumerate(zip(docs, ids)):
        print("\n" + "=" * 80)
        print(f"🧩 Sample {i + j + 1}/{len(df)}")

        print("\n🔍 Prompt:", prompt)
        print("🎯 Ground Truth (raw):", gold_ings_raw)

        # 1️⃣ 将 gold_ings_raw 转成真正的 list
        if isinstance(gold_ings_raw, str):
            try:
                # 尝试把 "['salt','oil']" 转成 ['salt','oil']
                gold_ings = ast.literal_eval(gold_ings_raw)
                if not isinstance(gold_ings, list):
                    gold_ings = [gold_ings]
            except Exception:
                # 如果只是用逗号隔开的字符串
                gold_ings = [x.strip() for x in gold_ings_raw.split(",") if x.strip()]
        else:
            # 其他情况（如本身是 list/Series）
            gold_ings = list(gold_ings_raw) if not isinstance(gold_ings_raw, list) else gold_ings_raw

        print("✅ Parsed Ground Truth (list):", gold_ings)

        # 2️⃣ 生成模型输出
        raw_out = model_eval.generate_recipe(prompt)
        print("\n💬 Gemini Raw Output:\n", raw_out[:800])

        # 3️⃣ 解析模型输出
        parsed = model_eval.parse_json(raw_out)
        gen_ings = parsed.get("ingredients", [])
        print("\n🥬 Extracted Ingredients:", gen_ings)

        # 4️⃣ 计算指标
        metrics = model_eval.calc_recall(gold_ings, gen_ings)
        print("\n📊 Metrics:\n", json.dumps(metrics, indent=2, ensure_ascii=False))

        # 5️⃣ 保存结果（仅指标部分）
        metrics_row = {"id": i + j + 1, "prompt": prompt}
        metrics_row.update(metrics)
        metrics_row["raw_output"] = raw_out
        all_results.append(metrics_row)

# ===== 汇总保存到 CSV =====
results_df = pd.DataFrame(all_results)
results_path = os.path.join(OUTPUT_DIR, "metrics_results.csv")
results_df.to_csv(results_path, index=False, encoding="utf-8-sig")

print(f"\n✅ 所有指标已保存到: {results_path}")

Evaluating recipes:   0%|          | 0/1 [00:00<?, ?it/s]


🧩 Sample 1/10

🔍 Prompt: I have pork belly, garlic, and chili oil. I want to make something spicy and Sichuan-style.
🎯 Ground Truth (raw): pork belly, garlic, chili oil
✅ Parsed Ground Truth (list): ['pork belly', 'garlic', 'chili oil']
📦 Retrieved 3 candidates from Chroma

💬 Gemini Raw Output:
 ```json
{
  "recipe_name": "Spicy Sichuan Pork Belly with Garlic and Chili Oil",
  "chosen_from": "User Request and Common Sichuan Recipes",
  "ingredients": [
    "500g pork belly, skin on",
    "4 cloves garlic, minced",
    "2 tablespoons chili oil",
    "1 tablespoon light soy sauce",
    "1 tablespoon dark soy sauce",
    "1 teaspoon Sichuan peppercorns, toasted and ground",
    "1/2 teaspoon white sugar",
    "1/4 cup water",
    "1 tablespoon vegetable oil",
    "Green onions, chopped (for garnish)",
    "Cooked rice, for serving"
  ],
  "steps": [
    "Cut the pork belly into 1-inch thick slices.",
    "Blanch the pork belly in boiling water for 5 minutes. Drain and rinse with cold wat

Evaluating recipes: 100%|██████████| 1/1 [00:27<00:00, 27.12s/it]


💬 Gemini Raw Output:
 ```json
{
  "recipe_name": "Cold Tofu Skin Salad with Peanuts and Vinegar",
  "chosen_from": "User Request",
  "ingredients": [
    "Tofu skin (yuba)",
    "Peanuts, roasted and lightly crushed",
    "Rice vinegar",
    "Soy sauce (optional)",
    "Sesame oil (optional)",
    "Sugar or sweetener (optional)",
    "Chili oil or flakes (optional, for heat)",
    "Cilantro or scallions, chopped (optional, for garnish)"
  ],
  "steps": [
    "Rehydrate the tofu skin according to package directions. If using dried tofu skin, soak in warm water until softened. Fresh tofu skin can be blanched briefly in boiling water.",
    "Drain the tofu skin and gently squeeze out excess water.",
    "Cut the tofu skin into bite-sized pieces or strips.",
    "In a bowl, combine the tofu skin, peanuts, and vineg

🥬 Extracted Ingredients: ['Tofu skin (yuba)', 'Peanuts, roasted and lightly crushed', 'Rice vinegar', 'Soy sauce (optional)', 'Sesame oil (optional)', 'Sugar or sweetener (opt




In [12]:
results_df.to_csv(results_path, index=False, encoding="utf-8-sig")