In [3]:
import re
import random
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

# --- ML / NLP
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

# --- GUI
import tkinter as tk
from tkinter import scrolledtext, messagebox

# --- Stemming (no external downloads needed)
try:
    from nltk.stem.snowball import SnowballStemmer
    STEMMER = SnowballStemmer("english")
except Exception:
    STEMMER = None  # Fallback (skip stemming if not available)

# =========================
# Config
# =========================
DATASETS = [
    # r"C:\users\mahek\Desktop\pbl5\indian_food.csv",
    # r"C:\users\mahek\Desktop\pbl5\Food_Recipe.csv",
    # r"C:\users\mahek\Desktop\pbl5\recipes.csv",
    r"C:\Users\mahek\Desktop\pbl5\indian_recipes.csv"
]
TOP_K_DEFAULT = 5
RANDOM_SEED = 42

# Ingredient-synonym/alias mapping
SYNONYM_MAP_PHRASES = {
    r"\bcapsicum\b": "bell pepper",
    r"\bmirchi\b": "chili",
    r"\bchilli\b": "chili",
    r"\bchilies\b": "chili",
    r"\bgreen chili(es)?\b": "chili",
    r"\bred chili(es)?\b": "chili",
    r"\bcoriander leaves\b": "coriander",
    r"\bdhania\b": "coriander",
    r"\bcilantro\b": "coriander",
    r"\bjeera\b": "cumin",
    r"\bzeera\b": "cumin",
    r"\bhari mirch\b": "chili",
    r"\bhing\b": "asafoetida",
    r"\bgur\b": "jaggery",
    r"\bgud\b": "jaggery",
    r"\bmaida\b": "all purpose flour",
    r"\batta\b": "wheat flour",
    r"\bdahi\b": "yogurt",
    r"\bcurd\b": "yogurt",
    r"\bmethi\b": "fenugreek",
    r"\bkasuri methi\b": "fenugreek",
    r"\baloo\b": "potato",
    r"\bbhindi\b": "okra",
    r"\bturka\b": "tempering",
}

# Common noise words
NOISE_TOKENS = {
    "fresh", "finely", "chopped", "sliced", "diced", "ground", "powder",
    "whole", "optional", "to", "taste", "medium", "large", "small", "cup",
    "cups", "tsp", "tbsp", "tablespoon", "teaspoon", "pinch", "piece",
    "pieces", "handful", "and", "or", "of", "oil", "water"
}

# =========================
# Loading & Unification
# =========================
def load_dataset(path: Union[str, Path]) -> pd.DataFrame:
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Data file not found: {path}")
    if path.suffix.lower() == ".csv":
        df = pd.read_csv(path, encoding="utf-8", engine="python", on_bad_lines="skip")
    elif path.suffix.lower() == ".json":
        df = pd.read_json(path)
    else:
        raise ValueError("Unsupported file type. Provide CSV or JSON.")
    df.columns = [re.sub(r"\s+", "_", c.strip().lower()) for c in df.columns]
    return df


def unify_dataset(df: pd.DataFrame, src_name: str) -> pd.DataFrame:
    """
    Map dataset-specific columns to a unified schema: [name, ingredients, instructions]
    """
    name_col = None
    ing_col = None
    ins_col = None

    if "indian_food" in src_name.lower():
        name_col = "name"
        ing_col = "ingredients"
        ins_col = None  # no instructions

    elif "food_recipe" in src_name.lower():
        name_col = "recipe_name"
        ing_col = "ingredients"
        ins_col = "directions"

    elif "recipes" in src_name.lower():
        name_col = "name"
        ing_col = "ingredients_name"
        ins_col = "instructions"

    # fallback detection
    if not name_col:
        for c in ["name", "title", "recipe_name", "dish", "recipe"]:
            if c in df.columns:
                name_col = c
                break
    if not ing_col:
        for c in ["ingredients", "ingredient", "ingredients_name", "translatedingredients"]:
            if c in df.columns:
                ing_col = c
                break
    if not ins_col:
        for c in ["instructions", "steps", "directions", "method", "translatedinstructions"]:
            if c in df.columns:
                ins_col = c
                break

    out = pd.DataFrame({
        "name": df[name_col] if name_col in df.columns else [f"recipe_{i}" for i in range(len(df))],
        "ingredients": df[ing_col] if ing_col in df.columns else "",
        "instructions": df[ins_col] if ins_col and ins_col in df.columns else "",
    })
    return out


def load_all_datasets(paths: List[str]) -> pd.DataFrame:
    dfs = []
    for p in paths:
        df = load_dataset(p)
        df_u = unify_dataset(df, p)
        dfs.append(df_u)
    df_all = pd.concat(dfs, ignore_index=True)
    return df_all

# =========================
# Normalization helpers
# =========================
def apply_synonyms(text: str) -> str:
    s = text
    for pattern, repl in SYNONYM_MAP_PHRASES.items():
        s = re.sub(pattern, repl, s)
    return s


def tokenize_items(value: Any) -> List[str]:
    if pd.isna(value):
        return []
    if isinstance(value, (list, tuple, set)):
        items = list(value)
    else:
        items = re.split(r"[,;\|\n]+", str(value))
    toks: List[str] = []
    for item in items:
        s = item.lower().strip()
        if not s:
            continue
        s = apply_synonyms(s)
        s = re.sub(r"(\d+\/\d+|\d+\.\d+|\d+)", " ", s)
        s = re.sub(r"[^a-z\s\-]", " ", s)
        s = re.sub(r"\s+", " ", s).strip()
        for t in s.split():
            if t in NOISE_TOKENS:
                continue
            if STEMMER:
                t = STEMMER.stem(t)
            toks.append(t)
    return toks


def normalize_ingredients(value: Any) -> str:
    return " ".join(tokenize_items(value))


def normalize_instructions(value: Any) -> str:
    if pd.isna(value):
        return ""
    s = str(value).lower()
    s = apply_synonyms(s)
    s = re.sub(r"[^a-z\s\-]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    if STEMMER:
        s = " ".join(STEMMER.stem(t) for t in s.split() if t not in NOISE_TOKENS)
    return s

# =========================
# Model
# =========================
class RecipeRecord:
    def __init__(self, idx: int, name: str, ing_text: str, ins_text: str):
        self.idx = idx
        self.name = name
        self.ing_text = ing_text
        self.ins_text = ins_text


class HybridSearch:
    def __init__(self):
        self.ing_word = TfidfVectorizer(analyzer="word", ngram_range=(1, 2), min_df=2, strip_accents="unicode")
        self.ing_char = TfidfVectorizer(analyzer="char", ngram_range=(3, 5), min_df=2, strip_accents="unicode")
        self.ins_word = TfidfVectorizer(analyzer="word", ngram_range=(1, 2), min_df=5, strip_accents="unicode")
        self.ins_char = TfidfVectorizer(analyzer="char", ngram_range=(3, 5), min_df=5, strip_accents="unicode")

        self.records: List[RecipeRecord] = []
        self.M_ing_word = None
        self.M_ing_char = None
        self.M_ins_word = None
        self.M_ins_char = None

    def fit(self, df: pd.DataFrame) -> "HybridSearch":
        ing_norm = df["ingredients"].apply(normalize_ingredients)
        ins_norm = df["instructions"].apply(normalize_instructions)

        self.records = []
        for i in range(len(df)):
            self.records.append(RecipeRecord(i, str(df.iloc[i]["name"]), ing_norm.iloc[i], ins_norm.iloc[i]))

        ing_corpus = [r.ing_text for r in self.records]
        ins_corpus = [r.ins_text for r in self.records]

        self.M_ing_word = self.ing_word.fit_transform(ing_corpus)
        self.M_ing_char = self.ing_char.fit_transform(ing_corpus)

        if any(len(t) > 0 for t in ins_corpus):
            self.M_ins_word = self.ins_word.fit_transform(ins_corpus)
            self.M_ins_char = self.ins_char.fit_transform(ins_corpus)
        return self

    def _encode_query(self, query_ingredients: Union[str, List[str]]) -> Tuple:
        if isinstance(query_ingredients, list):
            q_text = " ".join(tokenize_items(query_ingredients))
        else:
            q_text = " ".join(tokenize_items(query_ingredients))
        q_ing_word = self.ing_word.transform([q_text])
        q_ing_char = self.ing_char.transform([q_text])
        q_ins_word = self.ins_word.transform([q_text]) if self.M_ins_word is not None else None
        q_ins_char = self.ins_char.transform([q_text]) if self.M_ins_char is not None else None
        return q_ing_word, q_ing_char, q_ins_word, q_ins_char

    def search(self, query_ingredients: Union[str, List[str]], top_k: int = 5) -> List[Tuple[float, RecipeRecord]]:
        q_ing_word, q_ing_char, q_ins_word, q_ins_char = self._encode_query(query_ingredients)

        s_ing_word = linear_kernel(q_ing_word, self.M_ing_word)[0]
        s_ing_char = linear_kernel(q_ing_char, self.M_ing_char)[0]
        s_ing = 0.6 * s_ing_word + 0.4 * s_ing_char

        if self.M_ins_word is not None and q_ins_word is not None:
            s_ins_word = linear_kernel(q_ins_word, self.M_ins_word)[0]
            s_ins_char = linear_kernel(q_ins_char, self.M_ins_char)[0]
            s_ins = 0.6 * s_ins_word + 0.4 * s_ins_char
        else:
            s_ins = np.zeros_like(s_ing)

        sim = 0.75 * s_ing + 0.25 * s_ins

        top_idx = np.argsort(sim)[::-1][:top_k]
        return [(float(sim[i]), self.records[i]) for i in top_idx]

# =========================
# Evaluation
# =========================
def evaluate_model(model: HybridSearch, top_k: int = 5, num_queries: int = 200, seed: int = RANDOM_SEED) -> Dict[str, float]:
    rng = random.Random(seed)
    candidates = [r for r in model.records if len(r.ing_text.split()) >= 3]
    sample = rng.sample(candidates, k=min(num_queries, len(candidates)))

    hit1 = hitk = 0
    rr_sum = prec_sum = rec_sum = 0.0

    for r in sample:
        toks = r.ing_text.split()
        q_len = min(len(toks), rng.randint(3, min(8, len(toks))))
        q = " ".join(rng.sample(toks, q_len))

        results = model.search(q, top_k=top_k)
        ranks = [i for i, (_, rec) in enumerate(results, start=1) if rec.idx == r.idx]
        if ranks:
            rank = ranks[0]
            if rank == 1:
                hit1 += 1
            if rank <= top_k:
                hitk += 1
                rr_sum += 1.0 / rank
                prec_sum += 1.0 / top_k
                rec_sum += 1.0

    n = max(1, len(sample))
    return {
        "n_queries": float(n),
        "top1_acc": hit1 / n,
        f"top{top_k}_acc": hitk / n,
        f"mrr@{top_k}": rr_sum / n,
        f"precision@{top_k}": prec_sum / n,
        f"recall@{top_k}": rec_sum / n,
    }

# =========================
# GUI
# =========================
def start_gui(model: HybridSearch):
    root = tk.Tk()
    root.title("Recipe Search (Hybrid TF-IDF)")

    tk.Label(root, text="Enter Ingredients (comma-separated):").pack(pady=(8, 2))
    entry = tk.Entry(root, width=80)
    entry.pack(padx=8, pady=(0, 6))

    output = scrolledtext.ScrolledText(root, width=100, height=24)
    output.pack(padx=8, pady=6)

    def search_action():
        query = entry.get().strip()
        if not query:
            messagebox.showwarning("Input Error", "Please enter some ingredients.")
            return
        results = model.search(query, top_k=TOP_K_DEFAULT)
        output.delete(1.0, tk.END)
        for i, (score, rec) in enumerate(results, 1):
            output.insert(tk.END, f"{i}. {rec.name}  (score: {score:.3f})\n")
            output.insert(tk.END, f"   Ingredients (norm): {rec.ing_text[:220]}{'...' if len(rec.ing_text)>220 else ''}\n")
            if rec.ins_text:
                output.insert(tk.END, f"   Steps (norm): {rec.ins_text[:220]}{'...' if len(rec.ins_text)>220 else ''}\n")
            output.insert(tk.END, "\n")

    tk.Button(root, text="Search Recipes", command=search_action).pack(pady=(0, 10))
    root.mainloop()

# =========================
# Main
# =========================
if __name__ == "__main__":
    random.seed(RANDOM_SEED)
    np.random.seed(RANDOM_SEED)

    df_all = load_all_datasets(DATASETS)
    model = HybridSearch().fit(df_all)

    metrics = evaluate_model(model, top_k=TOP_K_DEFAULT, num_queries=200)
    print("\nEvaluation (synthetic retrieval):")
    for k, v in metrics.items():
        if isinstance(v, float):
            print(f"  {k:>14}: {v:.4f}")
        else:
            print(f"  {k:>14}: {v}")

    start_gui(model)


ValueError: empty vocabulary; perhaps the documents only contain stop words