In [None]:
# --- 0. Setup
import io, sys, textwrap, numpy as np, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

pd.set_option("display.max_colwidth", 120)
sns.set_theme()


In [None]:
# --- 1. Upload CSVs
from google.colab import files
uploaded = files.upload()  # Choose the six CSVs when the dialog opens

list(uploaded.keys())


Saving All_Diets.csv to All_Diets.csv
Saving dash.csv to dash.csv
Saving keto.csv to keto.csv
Saving mediterranean.csv to mediterranean.csv
Saving paleo.csv to paleo.csv
Saving vegan.csv to vegan.csv


['All_Diets.csv',
 'dash.csv',
 'keto.csv',
 'mediterranean.csv',
 'paleo.csv',
 'vegan.csv']

In [None]:
# === SNIPPET 1: Load + Sanity Checks EDA ===
# Run this cell as-is in Google Colab.

import os
import pandas as pd
import numpy as np

# ---------- CONFIG ----------
# Put your CSVs in this folder. If using Google Drive, set this to your drive path like '/content/drive/MyDrive/diets_data'
DATA_DIR = "/content"   # change if needed

FILES = [
    "All_Diets.csv",
    "dash.csv",
    "keto.csv",
    "mediterranean.csv",
    "paleo.csv",
    "vegan.csv"
]

# Column names we expect (case-insensitive match handled below)
EXPECTED_COLS = {
    "Diet_type": "Diet_type",
    "Recipe_name": "Recipe_name",
    "Cuisine_type": "Cuisine_type",
    "Protein(g)": "Protein(g)",
    "Carbs(g)": "Carbs(g)",
    "Fat(g)": "Fat(g)",
    "Extraction_day": "Extraction_day"
}

# ---------- HELPERS ----------
def normalize_columns(df):
    """
    - Strip spaces from headers
    - Fix common case/spacing differences to match EXPECTED_COLS
    """
    original = list(df.columns)
    # Standardize: strip, collapse spaces, keep parentheses
    cleaned = [c.strip().replace("  ", " ").replace("\xa0", " ") for c in df.columns]
    df.columns = cleaned

    # Build a lower->expected mapping
    lower_to_expected = {k.lower(): v for k, v in EXPECTED_COLS.items()}
    new_cols = {}
    for c in df.columns:
        key = c.lower()
        # try exact lower match
        if key in lower_to_expected:
            new_cols[c] = lower_to_expected[key]
        else:
            # fallback: simple heuristics for protein/carbs/fat
            if "protein" in key:
                new_cols[c] = "Protein(g)"
            elif "carb" in key:
                new_cols[c] = "Carbs(g)"
            elif "fat" in key:
                new_cols[c] = "Fat(g)"
            elif "diet" in key:
                new_cols[c] = "Diet_type"
            elif "cuisine" in key:
                new_cols[c] = "Cuisine_type"
            elif "recipe" in key and "name" in key:
                new_cols[c] = "Recipe_name"
            elif "extract" in key and "day" in key:
                new_cols[c] = "Extraction_day"
            else:
                new_cols[c] = c  # leave as-is if unknown

    df = df.rename(columns=new_cols)
    return df, original, df.columns.tolist()

def coerce_numeric(df):
    for col in ["Protein(g)", "Carbs(g)", "Fat(g)"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df

def quick_report(df, name):
    print(f"\n=== {name} ===")
    print("Shape:", df.shape)
    print("Columns:", list(df.columns))
    print(df.head(3))

# ---------- LOAD EACH FILE ----------
frames = []
missing_files = []
load_issues = []

for fname in FILES:
    fpath = os.path.join(DATA_DIR, fname)
    if not os.path.exists(fpath):
        missing_files.append(fname)
        continue
    try:
        temp = pd.read_csv(fpath)
        temp, orig_cols, new_cols = normalize_columns(temp)
        temp = coerce_numeric(temp)
        temp["source_file"] = fname
        frames.append(temp)
    except Exception as e:
        load_issues.append((fname, str(e)))

# ---------- REPORT LOAD STATUS ----------
print("=== LOAD STATUS ===")
print("Data directory:", DATA_DIR)
print("Files expected:", FILES)
if missing_files:
    print("Missing files:", missing_files)
else:
    print("Missing files: None")

if load_issues:
    print("\nIssues while loading:")
    for fname, err in load_issues:
        print(f" - {fname}: {err}")
else:
    print("No loading issues.")

# ---------- CONCAT ----------
if not frames:
    raise SystemExit("No files loaded. Please check DATA_DIR and filenames, then re-run.")

df = pd.concat(frames, ignore_index=True)

# Keep only expected columns if present
expected_present = [c for c in EXPECTED_COLS.values() if c in df.columns]
other_cols = [c for c in df.columns if c not in expected_present + ["source_file"]]
ordered_cols = expected_present + other_cols + ["source_file"]
df = df[ordered_cols]

quick_report(df, "COMBINED DF (first 3 rows)")

# ---------- BASIC SANITY CHECKS ----------
print("\n=== DTYPE CHECKS ===")
print(df.dtypes)

print("\n=== NULL COUNTS ===")
print(df.isna().sum().sort_values(ascending=False))

# Basic numeric stats
num_cols = ["Protein(g)", "Carbs(g)", "Fat(g)"]
num_cols = [c for c in num_cols if c in df.columns]
print("\n=== NUMERIC SUMMARY (Protein/Carbs/Fat) ===")
print(df[num_cols].describe())

# Diet types & cuisines
if "Diet_type" in df.columns:
    print("\n=== Diet_type value counts (top 20) ===")
    print(df["Diet_type"].value_counts().head(20))

if "Cuisine_type" in df.columns:
    print("\n=== Cuisine_type value counts (top 20) ===")
    print(df["Cuisine_type"].value_counts().head(20))

# Duplicates by (Diet_type, Recipe_name)
key_cols = [c for c in ["Diet_type", "Recipe_name"] if c in df.columns]
if len(key_cols) == 2:
    dup_mask = df.duplicated(subset=key_cols, keep=False)
    dup_count = dup_mask.sum()
    print(f"\n=== DUPLICATE CHECK on {key_cols} ===")
    print("Duplicate rows (count):", dup_count)
    if dup_count > 0:
        print("Sample duplicates:")
        print(df.loc[dup_mask, key_cols + num_cols + ["source_file"]].head(10))

# Quick outlier look (simple bounds)
def outlier_bounds(series):
    q1, q3 = series.quantile([0.25, 0.75])
    iqr = q3 - q1
    low = q1 - 1.5*iqr
    high = q3 + 1.5*iqr
    return low, high

print("\n=== SIMPLE OUTLIER FLAGS (IQR) ===")
for c in num_cols:
    s = df[c].dropna()
    if len(s) == 0:
        print(f"{c}: no data")
        continue
    low, high = outlier_bounds(s)
    outliers = ((df[c] < low) | (df[c] > high)).sum()
    print(f"{c}: low<{low:.2f}, high>{high:.2f}, outliers={outliers}")

# Grouped nutrient means by Diet_type
if "Diet_type" in df.columns and num_cols:
    print("\n=== Mean nutrients by Diet_type ===")
    print(df.groupby("Diet_type")[num_cols].mean().round(2).sort_index())

# Save a cleaned preview CSV (optional)
preview_path = os.path.join(DATA_DIR, "combined_diets_preview.csv")
df.head(1000).to_csv(preview_path, index=False)
print(f"\nSaved a preview of first 1000 rows to: {preview_path}")


=== LOAD STATUS ===
Data directory: /content
Files expected: ['All_Diets.csv', 'dash.csv', 'keto.csv', 'mediterranean.csv', 'paleo.csv', 'vegan.csv']
Missing files: None
No loading issues.

=== COMBINED DF (first 3 rows) ===
Shape: (15612, 9)
Columns: ['Diet_type', 'Recipe_name', 'Cuisine_type', 'Protein(g)', 'Carbs(g)', 'Fat(g)', 'Extraction_day', 'Extraction_time', 'source_file']
  Diet_type                                                       Recipe_name  \
0     paleo                                   Bone Broth From 'Nom Nom Paleo'   
1     paleo  Paleo Effect Asian-Glazed Pork Sides, A Sweet & Crispy Appetizer   
2     paleo                                                 Paleo Pumpkin Pie   

       Cuisine_type  Protein(g)  Carbs(g)  Fat(g) Extraction_day  \
0          american        5.22      1.29    3.20     2022-10-16   
1  south east asian      181.55     28.62  146.14     2022-10-16   
2          american       30.91    302.59   96.76     2022-10-16   

  Extraction_time

In [None]:
# === SNIPPET 2: Canonicalize + Deduplicate ===
import re
import pandas as pd
import numpy as np

assert 'df' in globals(), "Run Snippet 1 first to load the combined dataframe as `df`."

work = df.copy()

# ----------------- Canonicalize helpers -----------------
def clean_text(s):
    if pd.isna(s):
        return s
    s = str(s).lower()
    # remove quotes and fancy quotes
    s = s.replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"')
    s = s.replace('"', '').replace("'", "")
    # remove the word 'recipes' if it appears at the end or standalone
    s = re.sub(r'\brecipes?\b$', '', s).strip()
    # collapse whitespace
    s = re.sub(r'\s+', ' ', s).strip()
    return s

def mode_or_first(series):
    # safe mode for strings
    try:
        m = series.mode(dropna=True)
        return m.iloc[0] if len(m) else series.dropna().iloc[0] if series.dropna().size else np.nan
    except Exception:
        return series.dropna().iloc[0] if series.dropna().size else np.nan

# ----------------- Apply canonicalization -----------------
for col in ["Diet_type", "Recipe_name", "Cuisine_type"]:
    if col in work.columns:
        work[col] = work[col].apply(clean_text)

# explicit small normalizations for Diet_type (if needed)
diet_map = {
    "med": "mediterranean",
    "dash diet": "dash",
}
work["Diet_type"] = work["Diet_type"].replace(diet_map)

# build a canonical recipe key per diet
work["recipe_key"] = work["Diet_type"].astype(str) + "||" + work["Recipe_name"].astype(str)

# ----------------- Inspect duplicate groups -----------------
dup_counts = work["recipe_key"].value_counts()
print("=== Top duplicate groups (before dedup) ===")
print(dup_counts.head(10))

# ----------------- Aggregate duplicates -----------------
num_cols = ["Protein(g)", "Carbs(g)", "Fat(g)"]
text_cols = ["Diet_type", "Recipe_name", "Cuisine_type", "Extraction_day", "Extraction_time", "source_file"]

agg_spec = {c: "mean" for c in num_cols}
# choose stable representatives for text columns
agg_spec.update({
    "Diet_type": mode_or_first,
    "Recipe_name": mode_or_first,
    "Cuisine_type": mode_or_first,
    "Extraction_day": "min",     # earliest seen
    "Extraction_time": "min",
    "source_file": mode_or_first
})

dedup = work.groupby("recipe_key", as_index=False).agg(agg_spec)

# reorder columns nicely
ordered = ["Diet_type", "Recipe_name", "Cuisine_type"] + num_cols + ["Extraction_day", "Extraction_time", "source_file"]
dedup = dedup[ordered]

# ----------------- Reports -----------------
print("\n=== Shapes ===")
print("Before dedup:", work.shape)
print("After  dedup:", dedup.shape)

print("\n=== Nulls after dedup ===")
print(dedup.isna().sum().sort_values(ascending=False))

print("\n=== Numeric summary after dedup ===")
print(dedup[num_cols].describe())

# check remaining duplicates by (Diet_type, Recipe_name)
rem_dups = dedup.duplicated(subset=["Diet_type","Recipe_name"]).sum()
print(f"\nRemaining exact duplicates by (Diet_type, Recipe_name): {rem_dups}")

# Per-diet recipe counts after dedup
print("\n=== Per-diet counts after dedup ===")
print(dedup["Diet_type"].value_counts())

# Save preview
dedup_preview_path = "/content/diets_dedup_preview.csv"
dedup.head(1000).to_csv(dedup_preview_path, index=False)
print(f"\nSaved a preview of first 1000 deduplicated rows to: {dedup_preview_path}")


=== Top duplicate groups (before dedup) ===
recipe_key
mediterranean||mediterranean chicken          98
mediterranean||mediterranean salad            54
mediterranean||mediterranean pasta salad      50
mediterranean||mediterranean quinoa salad     40
mediterranean||mediterranean pizza            32
mediterranean||mediterranean tuna salad       30
mediterranean||mediterranean pasta            26
mediterranean||mediterranean orzo salad       24
mediterranean||mediterranean potato salad     24
mediterranean||mediterranean chicken salad    22
Name: count, dtype: int64

=== Shapes ===
Before dedup: (15612, 10)
After  dedup: (6924, 9)

=== Nulls after dedup ===
Diet_type          0
Recipe_name        0
Cuisine_type       0
Protein(g)         0
Carbs(g)           0
Fat(g)             0
Extraction_day     0
Extraction_time    0
source_file        0
dtype: int64

=== Numeric summary after dedup ===
        Protein(g)     Carbs(g)       Fat(g)
count  6924.000000  6924.000000  6924.000000
mean   

In [None]:
# === SNIPPET 3: Targeted EDA for modeling decisions ===
import numpy as np
import pandas as pd

assert 'dedup' in globals(), "Run Snippet 2 first so `dedup` exists."

data = dedup.copy()
num_cols = ["Protein(g)", "Carbs(g)", "Fat(g)"]

print("Rows:", len(data))
print("Diets:", data["Diet_type"].unique())

# 1) Diet macro centroids (means) and macro percentage composition
centroids = (
    data.groupby("Diet_type")[num_cols]
    .mean()
    .round(2)
    .sort_index()
)

# Macro percentage composition per diet
comp = centroids.copy()
comp_sum = comp.sum(axis=1).replace(0, np.nan)
for c in num_cols:
    comp[c] = (comp[c] / comp_sum * 100).round(2)

print("\n=== Diet macro centroids (mean grams) ===")
print(centroids)

print("\n=== Diet macro composition (%) ===")
print(comp.rename(columns={
    "Protein(g)":"Protein_%","Carbs(g)":"Carbs_%","Fat(g)":"Fat_%"
}))

# 2) Top cuisines per diet + cuisine overlap (Jaccard)
def top_cuisines_per_diet(df, topn=10):
    out = {}
    for d, sub in df.groupby("Diet_type"):
        vc = sub["Cuisine_type"].value_counts()
        out[d] = vc.head(topn)
    return out

tops = top_cuisines_per_diet(data, topn=10)
print("\n=== Top 10 cuisines per diet (counts) ===")
for d, vc in tops.items():
    print(f"\n[{d}]")
    print(vc)

# Jaccard overlap on cuisine sets (top 20 cuisines per diet)
diet_cuisines = {
    d: set(sub["Cuisine_type"].value_counts().head(20).index)
    for d, sub in data.groupby("Diet_type")
}

diets = sorted(diet_cuisines.keys())
jacc_rows = []
for i, d1 in enumerate(diets):
    row = []
    for j, d2 in enumerate(diets):
        s1, s2 = diet_cuisines[d1], diet_cuisines[d2]
        inter = len(s1 & s2)
        union = len(s1 | s2) if len(s1 | s2) else 1
        jacc = inter / union
        row.append(round(jacc, 3))
    jacc_rows.append(row)

jacc_df = pd.DataFrame(jacc_rows, index=diets, columns=diets)
print("\n=== Cuisine overlap (Jaccard) using top-20 cuisines per diet ===")
print(jacc_df)

# 3) Extreme values scan and hypothetical P99 caps (no mutation yet)
def p99_scan(df, group_col="Diet_type", metrics=num_cols):
    frames = []
    for d, sub in df.groupby(group_col):
        row = {"Diet_type": d, "count": len(sub)}
        for m in metrics:
            q99 = sub[m].quantile(0.99)
            above = (sub[m] > q99).sum()
            row[f"{m}_p99"] = round(q99, 2)
            row[f"{m}_above_p99"] = int(above)
        frames.append(row)
    return pd.DataFrame(frames).sort_values("Diet_type")

p99_tbl = p99_scan(data)
print("\n=== P99 thresholds and #rows above (by diet) ===")
print(p99_tbl)

# Show how means would change if we capped at P99 within each diet (for information only)
def mean_after_p99_cap(df, metrics=num_cols):
    capped = []
    for d, sub in df.groupby("Diet_type"):
        temp = sub.copy()
        for m in metrics:
            cap = sub[m].quantile(0.99)
            temp[m] = np.where(temp[m] > cap, cap, temp[m])
        capped.append(temp)
    capped = pd.concat(capped, ignore_index=True)
    return (
        df.groupby("Diet_type")[metrics].mean().round(2)
        .rename(columns=lambda c: c + " (orig)")
        .join(capped.groupby("Diet_type")[metrics].mean().round(2).rename(columns=lambda c: c + " (p99-capped)"))
        .sort_index()
    )

cap_effect = mean_after_p99_cap(data)
print("\n=== Mean change if we cap at P99 within diet (for awareness; not applied) ===")
print(cap_effect)

# 4) Correlations between macros (overall and per diet)
print("\n=== Overall macro correlations (Pearson) ===")
print(data[num_cols].corr().round(3))

print("\n=== Per-diet macro correlations (Pearson) ===")
for d, sub in data.groupby("Diet_type"):
    print(f"\n[{d}]")
    print(sub[num_cols].corr().round(3))


Rows: 6924
Diets: ['dash' 'keto' 'mediterranean' 'paleo' 'vegan']

=== Diet macro centroids (mean grams) ===
               Protein(g)  Carbs(g)  Fat(g)
Diet_type                                  
dash                69.16    160.44  100.05
keto               100.75     57.68  151.92
mediterranean      102.83    153.92  105.40
paleo               88.93    128.14  134.91
vegan               56.62    253.84  103.24

=== Diet macro composition (%) ===
               Protein_%  Carbs_%  Fat_%
Diet_type                               
dash               20.98    48.67  30.35
keto               32.46    18.59  48.95
mediterranean      28.39    42.50  29.10
paleo              25.27    36.41  38.33
vegan              13.69    61.36  24.96

=== Top 10 cuisines per diet (counts) ===

[dash]
Cuisine_type
american          609
world             219
mediterranean     169
italian           155
french            143
british            61
mexican            57
south american     50
chinese            3

In [None]:
# Install the exact Spark/PySpark version we’ll use in the project
!pip -q install pyspark==3.5.1

# Sanity check: Java (Colab usually has OpenJDK 11+)
!java -version


openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [None]:
# Step 1 (retry, single cell): Create SparkSession and run tiny checks (no executor-memory lookup)

import os, sys
from pyspark.sql import SparkSession, functions as F

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

spark = (
    SparkSession.builder
    .appName("DietRecSys-BigData-Colab")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.shuffle.partitions", "64")
    .getOrCreate()
)

print("✅ SparkSession OK")
print("PySpark:", spark.version)
print("Python :", sys.version.split()[0])
print("Master :", spark.sparkContext.master)

# Tiny RDD job
rdd = spark.sparkContext.parallelize(range(1, 11))
print("RDD sum 1..10:", rdd.sum())

# Tiny DataFrame job
data = [
    ("Sample Salad", "mediterranean", 8.0, 20.0, 12.0),
    ("Tofu Stir Fry", "vegan",         18.0, 35.0, 10.0),
    ("Keto Omelette", "keto",          22.0,  2.0, 25.0),
]
df = spark.createDataFrame(data, ["Recipe_name","Diet_type","Protein_g","Carbs_g","Fat_g"])

df_calc = (
    df.withColumn("kcal", F.col("Protein_g")*4 + F.col("Carbs_g")*4 + F.col("Fat_g")*9)
      .withColumn("pct_protein", (F.col("Protein_g")*4 / F.col("kcal"))*100)
      .withColumn("pct_carbs"  , (F.col("Carbs_g")  *4 / F.col("kcal"))*100)
      .withColumn("pct_fat"    , (F.col("Fat_g")    *9 / F.col("kcal"))*100)
)

df_calc.show(truncate=False)


✅ SparkSession OK
PySpark: 3.5.1
Python : 3.12.11
Master : local[*]
RDD sum 1..10: 55
+-------------+-------------+---------+-------+-----+-----+------------------+------------------+-----------------+
|Recipe_name  |Diet_type    |Protein_g|Carbs_g|Fat_g|kcal |pct_protein       |pct_carbs         |pct_fat          |
+-------------+-------------+---------+-------+-----+-----+------------------+------------------+-----------------+
|Sample Salad |mediterranean|8.0      |20.0   |12.0 |220.0|14.545454545454545|36.36363636363637 |49.09090909090909|
|Tofu Stir Fry|vegan        |18.0     |35.0   |10.0 |302.0|23.841059602649008|46.35761589403973 |29.80132450331126|
|Keto Omelette|keto         |22.0     |2.0    |25.0 |321.0|27.41433021806853 |2.4922118380062304|70.09345794392523|
+-------------+-------------+---------+-------+-----+-----+------------------+------------------+-----------------+



In [None]:
# Step 2: Upload CSVs, read with Spark, standardize columns, merge, and show basic counts.

import re, os, sys
from typing import Dict
from google.colab import files
from pyspark.sql import functions as F, types as T
from pyspark.sql import DataFrame

print("📤 Please choose your CSVs (e.g., All_Diets.csv, dash.csv, keto.csv, mediterranean.csv, paleo.csv, vegan.csv)")
uploaded = files.upload()  # <-- interactively upload files

def std_key(s: str) -> str:
    """Normalize column names: lowercase, remove non-alphanum, map common variants."""
    s = s.strip().lower()
    s = re.sub(r"[\s\-\./()%]+", "_", s)       # spaces & punctuation -> underscore
    s = re.sub(r"_+", "_", s).strip("_")
    # common synonyms
    synonyms = {
        "protein_g":"protein_g", "protein":"protein_g", "proteing":"protein_g",
        "carbs_g":"carbs_g", "carb_g":"carbs_g", "carbs":"carbs_g", "carbohydrate_g":"carbs_g",
        "fat_g":"fat_g", "fat":"fat_g", "fats_g":"fat_g",
        "diet":"diet_type", "diettype":"diet_type", "diet_type":"diet_type",
        "recipe":"recipe_name", "recipename":"recipe_name", "recipe_name":"recipe_name",
        "cuisine":"cuisine_type", "cuisinetype":"cuisine_type", "cuisine_type":"cuisine_type",
        "extraction_day":"extraction_day", "extractiondate":"extraction_day", "date":"extraction_day",
        "extraction_time":"extraction_time", "time":"extraction_time",
    }
    # handle units in header like Protein(g)
    s = s.replace("protein_g_", "protein_g").replace("carbs_g_", "carbs_g").replace("fat_g_", "fat_g")
    return synonyms.get(s, s)

STD_COLS = [
    "recipe_name","diet_type","cuisine_type","protein_g","carbs_g","fat_g","extraction_day","extraction_time","source_file"
]

def coalesce_select(df: DataFrame, fname: str) -> DataFrame:
    """Rename columns to standard names and select all standard columns (create missing if needed)."""
    # build rename map
    rename_map: Dict[str, str] = {}
    for c in df.columns:
        sk = std_key(c)
        rename_map[c] = sk

    # apply renames (avoid collisions by chaining)
    for old, new in rename_map.items():
        if old != new:
            df = df.withColumnRenamed(old, new)

    # derive diet type from filename if missing or null
    file_base = os.path.splitext(os.path.basename(fname))[0].lower()
    inferred_diet = None
    for tag in ["keto", "vegan", "paleo", "mediterranean", "dash"]:
        if tag in file_base:
            inferred_diet = tag
            break
    if inferred_diet is None and "all_diet" in file_base:
        inferred_diet = None  # All_Diets likely already has diet_type

    # add source file
    df = df.withColumn("source_file", F.lit(os.path.basename(fname)))

    # ensure all standard columns exist
    for col in ["recipe_name","diet_type","cuisine_type","protein_g","carbs_g","fat_g","extraction_day","extraction_time"]:
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None).cast(T.StringType()))

    # if diet_type null/empty and inferred available, fill
    if inferred_diet:
        df = df.withColumn(
            "diet_type",
            F.when(F.col("diet_type").isNull() | (F.length(F.trim("diet_type")) == 0), F.lit(inferred_diet)).otherwise(F.col("diet_type"))
        )

    # clean numeric text like "12 g" or "10.5g"
    def to_double(colname: str):
        return F.regexp_replace(F.col(colname).cast("string"), r"[^0-9\.\-]+", "").cast(T.DoubleType())

    df = (
        df
        .withColumn("recipe_name", F.trim(F.col("recipe_name").cast(T.StringType())))
        .withColumn("diet_type"  , F.lower(F.trim(F.col("diet_type").cast(T.StringType()))))
        .withColumn("cuisine_type", F.trim(F.col("cuisine_type").cast(T.StringType())))
        .withColumn("protein_g"  , to_double("protein_g"))
        .withColumn("carbs_g"    , to_double("carbs_g"))
        .withColumn("fat_g"      , to_double("fat_g"))
        .withColumn("extraction_day" , F.trim(F.col("extraction_day").cast(T.StringType())))
        .withColumn("extraction_time", F.trim(F.col("extraction_time").cast(T.StringType())))
    )

    return df.select(*STD_COLS)

# ---------- Load each uploaded CSV into Spark ----------
dfs = []
file_counts = []
for fname in uploaded.keys():
    path = f"/content/{fname}"
    with open(path, "wb") as f:
        f.write(uploaded[fname])

    # robust CSV reader
    df_raw = (
        spark.read
             .option("header", "true")
             .option("inferSchema", "true")
             .option("multiLine", "true")
             .option("escape", '"')
             .option("quote", '"')
             .csv(path)
    )
    cnt = df_raw.count()
    file_counts.append((fname, cnt))
    df_std = coalesce_select(df_raw, fname)
    dfs.append(df_std)

# ---------- Union all ----------
from functools import reduce
if not dfs:
    raise ValueError("No files uploaded.")

df_all = reduce(DataFrame.unionByName, dfs)

print("📊 Row counts by file:")
for fn, c in file_counts:
    print(f" - {fn}: {c:,}")

print("\n🧱 Combined rows (pre-dedup):", f"{df_all.count():,}")

# ---------- Basic cleaning: normalize keys & deduplicate ----------
df_clean = (
    df_all
    .withColumn("recipe_key", F.lower(F.regexp_replace(F.col("recipe_name"), r"\s+", " ")))
    .withColumn("diet_key"  , F.lower(F.regexp_replace(F.col("diet_type")  , r"\s+", " ")))
    .withColumn("cuisine_key", F.lower(F.regexp_replace(F.col("cuisine_type"), r"\s+", " ")))
)

# deduplicate primarily on recipe + diet; cuisine helps reduce dup variants if available
dedup_cols = ["recipe_key", "diet_key", "cuisine_key"]
df_unique = df_clean.dropDuplicates(dedup_cols).drop("recipe_key","diet_key","cuisine_key")

print("✅ Unique rows (post-dedup):", f"{df_unique.count():,}")

print("\n🔎 Sample unified schema:")
df_unique.printSchema()

print("\n🍽️ Preview (10 rows):")
df_unique.show(10, truncate=False)

print("\n📈 Count by diet_type:")
df_unique.groupBy("diet_type").count().orderBy(F.desc("count")).show(truncate=False)


📤 Please choose your CSVs (e.g., All_Diets.csv, dash.csv, keto.csv, mediterranean.csv, paleo.csv, vegan.csv)


Saving All_Diets.csv to All_Diets (1).csv
Saving dash.csv to dash (1).csv
Saving keto.csv to keto (1).csv
Saving mediterranean.csv to mediterranean (1).csv
Saving paleo.csv to paleo (1).csv
Saving vegan.csv to vegan (1).csv
📊 Row counts by file:
 - All_Diets (1).csv: 7,806
 - dash (1).csv: 1,745
 - keto (1).csv: 1,512
 - mediterranean (1).csv: 1,753
 - paleo (1).csv: 1,274
 - vegan (1).csv: 1,522

🧱 Combined rows (pre-dedup): 15,612
✅ Unique rows (post-dedup): 7,126

🔎 Sample unified schema:
root
 |-- recipe_name: string (nullable = true)
 |-- diet_type: string (nullable = true)
 |-- cuisine_type: string (nullable = true)
 |-- protein_g: double (nullable = true)
 |-- carbs_g: double (nullable = true)
 |-- fat_g: double (nullable = true)
 |-- extraction_day: string (nullable = true)
 |-- extraction_time: string (nullable = true)
 |-- source_file: string (nullable = false)


🍽️ Preview (10 rows):
+-----------------------------------------------------------------------------+---------+---

In [None]:
# Step 3: Exploratory Data Analysis (EDA) on the unified Spark DataFrame (df_unique)

from pyspark.sql import functions as F, types as T

# --- Guard: ensure df_unique exists ---
try:
    _ = df_unique.limit(1).count()
except NameError as e:
    raise RuntimeError("df_unique is not defined. Please run Step 2 cell first.") from e

print("📦 Rows:", df_unique.count())

# --- 1) Basic schema + null counts per column ---
print("\n🔎 Schema:")
df_unique.printSchema()

print("\n🚦 Null / empty counts:")
nulls = []
for c in df_unique.columns:
    nulls.append(
        df_unique.select(
            F.count(F.when(F.col(c).isNull() | (F.trim(F.col(c)) == ""), c)).alias("nulls")
        ).withColumn("column", F.lit(c))
    )
nulls_df = nulls[0]
for nxt in nulls[1:]:
    nulls_df = nulls_df.unionByName(nxt)
nulls_df.select("column","nulls").orderBy(F.desc("nulls")).show(100, truncate=False)

# --- 2) Add calories + macro% for nutrition-driven EDA ---
df_nut = (
    df_unique
    .withColumn("protein_g", F.col("protein_g").cast(T.DoubleType()))
    .withColumn("carbs_g"  , F.col("carbs_g").cast(T.DoubleType()))
    .withColumn("fat_g"    , F.col("fat_g").cast(T.DoubleType()))
    .withColumn("kcal"     , F.col("protein_g")*4 + F.col("carbs_g")*4 + F.col("fat_g")*9)
    .withColumn("pct_protein", F.when(F.col("kcal")>0, (F.col("protein_g")*4 / F.col("kcal"))*100))
    .withColumn("pct_carbs"  , F.when(F.col("kcal")>0, (F.col("carbs_g")  *4 / F.col("kcal"))*100))
    .withColumn("pct_fat"    , F.when(F.col("kcal")>0, (F.col("fat_g")    *9 / F.col("kcal"))*100))
)
df_nut.cache()
print("\n🧮 Added columns: kcal, pct_protein, pct_carbs, pct_fat")

# --- 3) Global numeric sanity stats ---
def five_num(col):
    return [
        F.expr(f"percentile_approx({col}, 0.00)").alias("min"),
        F.expr(f"percentile_approx({col}, 0.25)").alias("p25"),
        F.expr(f"percentile_approx({col}, 0.50)").alias("p50"),
        F.expr(f"percentile_approx({col}, 0.75)").alias("p75"),
        F.expr(f"percentile_approx({col}, 1.00)").alias("max"),
        F.mean(col).alias("mean"),
    ]

for col in ["protein_g","carbs_g","fat_g","kcal","pct_protein","pct_carbs","pct_fat"]:
    print(f"\n📐 Five-number summary: {col}")
    df_nut.agg(*five_num(col)).show(truncate=False)

# --- 4) Diet-wise macro distribution (means + medians) ---
print("\n🥗 Diet-wise macro % (mean/median) and kcal:")
diet_stats = (
    df_nut.groupBy("diet_type")
    .agg(
        F.round(F.mean("pct_protein"),2).alias("mean_%protein"),
        F.round(F.expr("percentile_approx(pct_protein, 0.5)"),2).alias("median_%protein"),
        F.round(F.mean("pct_carbs"),2).alias("mean_%carbs"),
        F.round(F.expr("percentile_approx(pct_carbs, 0.5)"),2).alias("median_%carbs"),
        F.round(F.mean("pct_fat"),2).alias("mean_%fat"),
        F.round(F.expr("percentile_approx(pct_fat, 0.5)"),2).alias("median_%fat"),
        F.round(F.mean("kcal"),1).alias("mean_kcal"),
        F.round(F.expr("percentile_approx(kcal, 0.5)"),1).alias("median_kcal"),
        F.count("*").alias("n")
    )
    .orderBy(F.desc("n"))
)
diet_stats.show(50, truncate=False)

# --- 5) Top cuisines overall and per diet ---
print("\n🍽️ Top cuisines overall:")
df_nut.groupBy("cuisine_type").count().orderBy(F.desc("count")).show(20, truncate=False)

print("\n🍽️ Top 10 cuisines per diet:")
from pyspark.sql.window import Window
w = Window.partitionBy("diet_type").orderBy(F.desc("count"))
top_cuisines_per_diet = (
    df_nut.groupBy("diet_type","cuisine_type")
          .count()
          .withColumn("rk", F.row_number().over(w))
          .filter(F.col("rk") <= 10)
          .orderBy("diet_type", F.desc("count"))
)
top_cuisines_per_diet.show(200, truncate=False)

# --- 6) Outlier scan (very high grams or impossible macro %) ---
print("\n🚨 Possible outliers (any nutrient > 500g or any macro% outside 0–100):")
outliers = df_nut.filter(
    (F.col("protein_g") > 500) | (F.col("carbs_g") > 500) | (F.col("fat_g") > 500) |
    (F.col("pct_protein") < 0) | (F.col("pct_protein") > 100) |
    (F.col("pct_carbs")   < 0) | (F.col("pct_carbs")   > 100) |
    (F.col("pct_fat")     < 0) | (F.col("pct_fat")     > 100)
).select("recipe_name","diet_type","cuisine_type","protein_g","carbs_g","fat_g","kcal","pct_protein","pct_carbs","pct_fat","source_file")
outliers.show(50, truncate=False)

# --- 7) Save a clean, typed working table for next steps (in-memory temp view) ---
df_nut.createOrReplaceTempView("recipes_nutrition")

print("\n✅ EDA complete. Temp view 'recipes_nutrition' is ready for downstream queries.")


📦 Rows: 7126

🔎 Schema:
root
 |-- recipe_name: string (nullable = true)
 |-- diet_type: string (nullable = true)
 |-- cuisine_type: string (nullable = true)
 |-- protein_g: double (nullable = true)
 |-- carbs_g: double (nullable = true)
 |-- fat_g: double (nullable = true)
 |-- extraction_day: string (nullable = true)
 |-- extraction_time: string (nullable = true)
 |-- source_file: string (nullable = false)


🚦 Null / empty counts:
+---------------+-----+
|column         |nulls|
+---------------+-----+
|recipe_name    |0    |
|diet_type      |0    |
|cuisine_type   |0    |
|protein_g      |0    |
|carbs_g        |0    |
|fat_g          |0    |
|extraction_day |0    |
|extraction_time|0    |
|source_file    |0    |
+---------------+-----+


🧮 Added columns: kcal, pct_protein, pct_carbs, pct_fat

📐 Five-number summary: protein_g
+---+----+-----+------+-------+-----------------+
|min|p25 |p50  |p75   |max    |mean             |
+---+----+-----+------+-------+-----------------+
|0.0|23.5|5

In [None]:
# STEP 4 — Feature engineering + cosine-similarity recommender (Spark-safe final version)

from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import VectorAssembler, StandardScaler, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array

# ------------------------------------------------------------------
# 1️⃣  Recreate base nutrition features
# ------------------------------------------------------------------
df_base = (
    df_unique
    .select("recipe_name","diet_type","cuisine_type","protein_g","carbs_g","fat_g","source_file")
    .withColumn("kcal", F.col("protein_g")*4 + F.col("carbs_g")*4 + F.col("fat_g")*9)
    .withColumn("pct_protein", F.when(F.col("kcal")>0, (F.col("protein_g")*4 / F.col("kcal"))*100))
    .withColumn("pct_carbs"  , F.when(F.col("kcal")>0, (F.col("carbs_g")  *4 / F.col("kcal"))*100))
    .withColumn("pct_fat"    , F.when(F.col("kcal")>0, (F.col("fat_g")    *9 / F.col("kcal"))*100))
)

p99 = df_base.approxQuantile("kcal", [0.99], 0.01)[0] or 1.0
df_feat = (
    df_base
    .withColumn("kcal_clip", F.when(F.col("kcal") > p99, F.lit(p99)).otherwise(F.col("kcal")))
    .withColumn("kcal_log", F.log1p(F.col("kcal_clip")))
    .na.drop(subset=["pct_protein","pct_carbs","pct_fat","kcal_log"])
)

# ------------------------------------------------------------------
# 2️⃣  Assemble + scale + normalize (unit vectors)
# ------------------------------------------------------------------
assembler  = VectorAssembler(inputCols=["pct_protein","pct_carbs","pct_fat","kcal_log"], outputCol="feat_raw")
scaler     = StandardScaler(inputCol="feat_raw", outputCol="feat_std", withMean=True, withStd=True)
normalizer = Normalizer(inputCol="feat_std", outputCol="feat_unit", p=2.0)

pipe = Pipeline(stages=[assembler, scaler, normalizer])
model = pipe.fit(df_feat)

recipes_features = model.transform(df_feat).withColumn("feat_arr", vector_to_array("feat_unit")).cache()

print("✅ Features ready. Example:")
recipes_features.select("recipe_name","diet_type","cuisine_type","feat_arr").show(3, truncate=False)

# ------------------------------------------------------------------
# 3️⃣  Define recommender (Spark-safe, no pandas.pipe)
# ------------------------------------------------------------------
def recommend_by_recipe(name_query: str, top_k: int = 10, diet_filter: str = None, cuisine_filter: str = None):
    q = recipes_features.filter(F.lower("recipe_name").contains(name_query.lower()))
    if diet_filter:
        q = q.filter(F.lower("diet_type") == diet_filter.lower())
    if cuisine_filter:
        q = q.filter(F.lower("cuisine_type") == cuisine_filter.lower())

    seed = q.select("recipe_name","diet_type","cuisine_type","feat_arr").limit(1).collect()
    if not seed:
        raise ValueError(f"No recipe matched: {name_query}")

    seed_name = seed[0]["recipe_name"]
    seed_diet = seed[0]["diet_type"]
    seed_cuisine = seed[0]["cuisine_type"]
    seed_vec = seed[0]["feat_arr"]

    seed_col = F.array(*[F.lit(float(v)) for v in seed_vec])

    sim = recipes_features.filter(F.col("recipe_name") != seed_name)
    if diet_filter:
        sim = sim.filter(F.lower("diet_type") == diet_filter.lower())
    if cuisine_filter:
        sim = sim.filter(F.lower("cuisine_type") == cuisine_filter.lower())

    sim = sim.withColumn(
        "cosine",
        F.aggregate(
            F.transform(
                F.sequence(F.lit(1), F.size("feat_arr")),
                lambda i: F.element_at("feat_arr", i) * F.element_at(seed_col, i)
            ),
            F.lit(0.0),
            lambda acc, x: acc + x
        )
    ).select("recipe_name","diet_type","cuisine_type","cosine","source_file")\
     .orderBy(F.desc("cosine"))\
     .limit(top_k)

    print(f"\n🔎 Seed: {seed_name} | Diet: {seed_diet} | Cuisine: {seed_cuisine}")
    return sim

# ------------------------------------------------------------------
# 4️⃣  Try sample queries
# ------------------------------------------------------------------
demo1 = recommend_by_recipe("Keto Cheesecake", top_k=10, diet_filter="keto")
demo1.show(truncate=False)

demo2 = recommend_by_recipe("Pesto", top_k=10)
demo2.show(truncate=False)


✅ Features ready. Example:
+------------------------------------------+---------+-------------+--------------------------------------------------------------------------------------+
|recipe_name                               |diet_type|cuisine_type |feat_arr                                                                              |
+------------------------------------------+---------+-------------+--------------------------------------------------------------------------------------+
|5 Minute Keto Pesto recipes               |keto     |italian      |[-0.2686407798907863, -0.5605329589125139, 0.7636181287396181, 0.174706287272828]     |
|A Spanish Potato Tortilla On Hiking Trails|dash     |mediterranean|[0.3012825115919542, 0.351316905074813, -0.5510709159201137, -0.6943530269570846]     |
|Asian Inspired Keto Pork Chops recipes    |keto     |chinese      |[0.9079613503070878, -0.4166233251561074, -0.03636635694403716, -0.026621032418318438]|
+------------------------------------

In [None]:
# STEP 5 — Goal-based recommendations (fixed Spark version)

from pyspark.sql import functions as F, types as T
from pyspark.ml.linalg import Vectors

# ------------------------------------------------------------------
# 1️⃣  Prepare full feature+nutrition DataFrame
# ------------------------------------------------------------------
recipes_full = (
    recipes_features
    .select("recipe_name","diet_type","cuisine_type","source_file","feat_arr")
    .join(
        df_unique.select(
            "recipe_name","diet_type","cuisine_type","protein_g","carbs_g","fat_g"
        ),
        on=["recipe_name","diet_type","cuisine_type"], how="inner"
    )
    .withColumn("kcal", F.col("protein_g")*4 + F.col("carbs_g")*4 + F.col("fat_g")*9)
    .withColumn("pct_protein", F.when(F.col("kcal")>0, (F.col("protein_g")*4 / F.col("kcal"))*100))
    .withColumn("pct_carbs"  , F.when(F.col("kcal")>0, (F.col("carbs_g")  *4 / F.col("kcal"))*100))
    .withColumn("pct_fat"    , F.when(F.col("kcal")>0, (F.col("fat_g")    *9 / F.col("kcal"))*100))
    .cache()
)
print("✅ recipes_full ready:", recipes_full.count(), "rows")

# helper: compute cosine to literal target vector
def _cosine_to_target(df, target_vec):
    seed_col = F.array(*[F.lit(float(v)) for v in target_vec])
    return df.withColumn(
        "cosine",
        F.aggregate(
            F.transform(
                F.sequence(F.lit(1), F.size("feat_arr")),
                lambda i: F.element_at("feat_arr", i) * F.element_at(seed_col, i)
            ),
            F.lit(0.0),
            lambda acc, x: acc + x
        )
    )

# ------------------------------------------------------------------
# 2️⃣  Define goal-based recommender
# ------------------------------------------------------------------
def recommend_by_goal(
    min_protein_pct=None, max_carb_pct=None,
    max_kcal=None, min_kcal=None,
    diet_filter=None, cuisine_filter=None,
    top_k=15,
    target_protein_pct=None, target_carb_pct=None,
    target_fat_pct=None, target_kcal=None
):
    df = recipes_full

    # hard filters
    if diet_filter:
        df = df.filter(F.lower("diet_type") == diet_filter.lower())
    if cuisine_filter:
        df = df.filter(F.lower("cuisine_type") == cuisine_filter.lower())
    if min_protein_pct is not None:
        df = df.filter(F.col("pct_protein") >= float(min_protein_pct))
    if max_carb_pct is not None:
        df = df.filter(F.col("pct_carbs") <= float(max_carb_pct))
    if min_kcal is not None:
        df = df.filter(F.col("kcal") >= float(min_kcal))
    if max_kcal is not None:
        df = df.filter(F.col("kcal") <= float(max_kcal))

    if df.limit(1).count() == 0:
        raise ValueError("No recipes match these filters; try relaxing thresholds.")

    # infer target macro %
    tp = float(target_protein_pct or max(min_protein_pct or 25, 25))
    tc = float(target_carb_pct or min(max_carb_pct or 30, 30))
    tf = float(target_fat_pct or (100 - (tp + tc)))
    s = tp + tc + tf
    tp, tc, tf = 100*tp/s, 100*tc/s, 100*tf/s

    # target kcal
    tk = float(target_kcal or df.approxQuantile("kcal", [0.5], 0.05)[0] or 500)

    # ------------------------------------------------------------------
    #  Build target row and transform through fitted pipeline models
    # ------------------------------------------------------------------
    target_df = spark.createDataFrame(
        [(tp, tc, tf, tk)],
        schema=T.StructType([
            T.StructField("pct_protein", T.DoubleType(), False),
            T.StructField("pct_carbs",   T.DoubleType(), False),
            T.StructField("pct_fat",     T.DoubleType(), False),
            T.StructField("kcal",        T.DoubleType(), False),
        ])
    ).withColumn("kcal_clip", F.when(F.col("kcal")>p99, p99).otherwise(F.col("kcal")))\
     .withColumn("kcal_log", F.log1p("kcal_clip"))\
     .select("pct_protein","pct_carbs","pct_fat","kcal_log")

    # use the already-fitted models
    target_vec = model.stages[-1].transform(
        model.stages[-2].transform(
            model.stages[-3].transform(target_df)
        )
    ).select("feat_unit").first()[0].toArray().tolist()

    # compute cosine similarities
    ranked = _cosine_to_target(df, target_vec)\
        .select("recipe_name","diet_type","cuisine_type",
                F.round("kcal",1).alias("kcal"),
                F.round("pct_protein",1).alias("prot%"),
                F.round("pct_carbs",1).alias("carb%"),
                F.round("pct_fat",1).alias("fat%"),
                F.round("cosine",4).alias("similarity"))\
        .orderBy(F.desc("similarity"))\
        .limit(top_k)

    print(f"\n🎯 Target: protein {tp:.1f}%, carbs {tc:.1f}%, fat {tf:.1f}%, kcal≈{tk:.0f}")
    return ranked

# ------------------------------------------------------------------
# 3️⃣  Demo queries
# ------------------------------------------------------------------
print("\n💪 High-protein, low-carb (<600 kcal) recipes:")
demo_goal_1 = recommend_by_goal(min_protein_pct=30, max_carb_pct=25, max_kcal=600, top_k=10)
demo_goal_1.show(truncate=False)

print("\n🥑 Keto high-protein, low-carb (<700 kcal) recipes:")
demo_goal_2 = recommend_by_goal(min_protein_pct=30, max_carb_pct=15, max_kcal=700, diet_filter="keto", top_k=10)
demo_goal_2.show(truncate=False)


✅ recipes_full ready: 7126 rows

💪 High-protein, low-carb (<600 kcal) recipes:

🎯 Target: protein 30.0%, carbs 25.0%, fat 45.0%, kcal≈419
+--------------------------------------------------------------+---------+-------------+-----+-----+-----+----+----------+
|recipe_name                                                   |diet_type|cuisine_type |kcal |prot%|carb%|fat%|similarity|
+--------------------------------------------------------------+---------+-------------+-----+-----+-----+----+----------+
|Paleo Steak and Veggies                                       |paleo    |american     |268.5|36.5 |19.7 |43.9|0.999     |
|Chop-Chop Beef Stir-Fry                                       |dash     |chinese      |438.3|31.6 |23.0 |45.4|0.9964    |
|Keto Grain-free Bacon and Shrimp Risotto recipes              |keto     |italian      |391.5|35.6 |20.9 |43.6|0.9897    |
|Paleo Chocolate Protein Truffles                              |paleo    |american     |325.6|36.2 |14.9 |48.9|0.9822    |
|

In [None]:
# 📦 SAVE FINAL DATA FOR DEMO (run ONCE)
import os
os.makedirs("/content/final_demo", exist_ok=True)

df_unique.write.mode("overwrite").parquet("/content/final_demo/df_unique.parquet")
recipes_full.write.mode("overwrite").parquet("/content/final_demo/recipes_full.parquet")
model.write().overwrite().save("/content/final_demo/pipeline_model")

print("✅ Saved all demo artifacts. Next time you can start directly from Step B.")


✅ Saved all demo artifacts. Next time you can start directly from Step B.


In [None]:
# === ONE-CELL DEMO CONSOLE (Spark Big Data, no external UI) ===

from pyspark.sql import SparkSession, functions as F
from pyspark.ml import PipelineModel

# 1) Start Spark + load artifacts saved earlier (from your Step A)
spark = SparkSession.builder.master("local[*]").appName("DietDemoConsole").getOrCreate()
df_unique    = spark.read.parquet("/content/final_demo/df_unique.parquet")
recipes_full = spark.read.parquet("/content/final_demo/recipes_full.parquet")
model        = PipelineModel.load("/content/final_demo/pipeline_model")

print("✅ Spark ready")
print(f"Rows: {df_unique.count():,} | Columns: {len(df_unique.columns)}")
print("Big Data evidence: Spark DataFrames + Spark ML pipeline + Parquet artifacts\n")

# 2) Small helpers (Spark-only; no servers)
def show(df, n=10):
    df.show(n, truncate=False)

def goal_query(min_protein=30, max_carb=25, max_kcal=600, diet=None, cuisine=None, top_k=10):
    df = recipes_full
    if diet:
        df = df.filter(F.lower("diet_type")==diet.lower())
    if cuisine:
        df = df.filter(F.lower("cuisine_type")==cuisine.lower())
    df = df.filter(
        (F.col("pct_protein")>=float(min_protein)) &
        (F.col("pct_carbs")<=float(max_carb)) &
        (F.col("kcal")<=float(max_kcal))
    )
    return (df.select(
                "recipe_name","diet_type","cuisine_type",
                F.round("kcal",1).alias("kcal"),
                F.round("pct_protein",1).alias("prot%"),
                F.round("pct_carbs",1).alias("carb%"),
                F.round("pct_fat",1).alias("fat%")
            )
            .orderBy(F.desc("prot%"))
            .limit(int(top_k))
           )

def similar_to(name_substring, diet=None, top_k=10):
    seed = recipes_full.filter(F.lower("recipe_name").contains(name_substring.lower()))
    if diet:
        seed = seed.filter(F.lower("diet_type")==diet.lower())
    if seed.limit(1).count()==0:
        print("⚠️ Seed recipe not found. Try another name fragment.")
        return None
    v = seed.select("pct_protein","pct_carbs","pct_fat","kcal").first()
    ref_prot, ref_carb, ref_fat = float(v[0]), float(v[1]), float(v[2])
    df = recipes_full
    if diet:
        df = df.filter(F.lower("diet_type")==diet.lower())
    df = df.withColumn("nutrient_diff",
            F.abs(F.col("pct_protein")-ref_prot) +
            F.abs(F.col("pct_carbs")-ref_carb)   +
            F.abs(F.col("pct_fat")-ref_fat)
        )
    return (df.orderBy("nutrient_diff")
            .select("recipe_name","diet_type","cuisine_type",
                    F.round("kcal",1).alias("kcal"),
                    F.round("pct_protein",1).alias("prot%"),
                    F.round("pct_carbs",1).alias("carb%"),
                    F.round("pct_fat",1).alias("fat%"),
                    F.round("nutrient_diff",1).alias("diff"))
            .limit(int(top_k))
           )

# 3) Ready-made demo tables (instant, repeatable)
print("💪 High-protein, low-carb, <600 kcal (any diet)")
show(goal_query(min_protein=30, max_carb=25, max_kcal=600, top_k=10))

print("\n🥗 Mediterranean weight-loss-ish (<600 kcal)")
show(goal_query(min_protein=20, max_carb=50, max_kcal=600, diet="mediterranean", top_k=10))

print("\n🧀 Similar to: 'Keto Cheesecake' (within keto)")
sim = similar_to("keto cheesecake", diet="keto", top_k=10)
if sim is not None:
    show(sim)

# 4) (Optional) One ad-hoc query via prompt (press Enter to skip)
try:
    q = input("\nType a quick query like 'goal 35 20 550 keto' or 'sim pesto' (Enter to skip): ").strip()
    if q:
        parts = q.split()
        if parts[0].lower()=="goal":
            # goal <min_prot> <max_carb> <max_kcal> [diet]
            minp = float(parts[1]); maxc = float(parts[2]); maxk = float(parts[3])
            diet = parts[4] if len(parts)>4 else None
            print(f"\n🎯 Goal => min_protein={minp}, max_carb={maxc}, max_kcal={maxk}, diet={diet}")
            show(goal_query(minp, maxc, maxk, diet=diet, top_k=10))
        elif parts[0].lower()=="sim":
            # sim <name substring> [diet]
            name = parts[1]
            diet = parts[2] if len(parts)>2 else None
            print(f"\n🔎 Similar to => '{name}' diet={diet}")
            out = similar_to(name, diet=diet, top_k=10)
            if out is not None: show(out)
        else:
            print("Unknown command. Use 'goal ...' or 'sim ...'.")
except EOFError:
    pass

print("\n✅ Demo complete. Re-run this one cell any time—no heavy recompute.")


✅ Spark ready
Rows: 7,126 | Columns: 9
Big Data evidence: Spark DataFrames + Spark ML pipeline + Parquet artifacts

💪 High-protein, low-carb, <600 kcal (any diet)
+---------------------------------+-------------+--------------+-----+-----+-----+----+
|recipe_name                      |diet_type    |cuisine_type  |kcal |prot%|carb%|fat%|
+---------------------------------+-------------+--------------+-----+-----+-----+----+
|Shoyu Ahi Poke recipes           |paleo        |japanese      |555.3|80.6 |8.3  |11.1|
|Honey Lime Shrimp recipes        |paleo        |american      |496.7|74.3 |21.2 |4.5 |
|Spicy Chicken Salad              |dash         |south american|283.9|73.5 |5.2  |21.3|
|Simple Savory Scallops           |dash         |american      |17.1 |71.0 |21.6 |7.4 |
|Mediterranean Tuna Salad recipes |mediterranean|mediterranean |417.2|68.0 |15.5 |16.6|
|Mediterranean Tilapia recipes    |mediterranean|mediterranean |302.3|65.3 |15.8 |18.9|
|Paleo Cod Burgers                |paleo     

In [None]:
# STEP 6 — Goal presets + one-liner helper + CSV export

from pyspark.sql import functions as F

# Uses recommend_by_goal from Step 5 and recipes_full/model already in memory.

_GOAL_PRESETS = {
    # name: (min_protein_pct, max_carb_pct, max_kcal, notes)
    "muscle_gain":        dict(min_protein_pct=30, max_carb_pct=40, max_kcal=800),
    "weight_loss":        dict(min_protein_pct=25, max_carb_pct=35, max_kcal=550),
    "keto_cut":           dict(min_protein_pct=30, max_carb_pct=15, max_kcal=700),
    "diabetes_friendly":  dict(min_protein_pct=25, max_carb_pct=25, max_kcal=650),
    "low_fat_heart":      dict(min_protein_pct=20, max_carb_pct=60, max_kcal=600),  # fat% will adjust via cosine target
    "endurance":          dict(min_protein_pct=20, max_carb_pct=55, max_kcal=750),
}

def goal_query(name: str, top_k: int = 15, diet_filter: str = None, cuisine_filter: str = None,
               override=None, target_overrides=None):
    """
    Run a preset goal by name, optionally overriding thresholds or target vector hints.
      - name: one of _GOAL_PRESETS keys (case-insensitive).
      - override: dict to override hard filters (min_protein_pct, max_carb_pct, max_kcal, min_kcal).
      - target_overrides: dict to override target vector (target_protein_pct/target_carb_pct/target_fat_pct/target_kcal)
    """
    key = name.strip().lower().replace(" ", "_")
    if key not in _GOAL_PRESETS:
        raise ValueError(f"Unknown goal '{name}'. Try: {list(_GOAL_PRESETS.keys())}")

    params = _GOAL_PRESETS[key].copy()
    if isinstance(override, dict):
        params.update(override)

    target_params = target_overrides or {}

    return recommend_by_goal(
        top_k=top_k,
        diet_filter=diet_filter,
        cuisine_filter=cuisine_filter,
        **params,
        **target_params
    )

def export_csv(df, filename="/content/recommendations.csv"):
    """Save a Spark DataFrame to a single CSV at given path (coalesce + header)."""
    path = filename
    tmpdir = "/content/_tmp_csv_out"
    (df.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", True)
       .csv(tmpdir))
    # move the part file to the target filename
    import glob, shutil, os
    part = glob.glob(f"{tmpdir}/part-*.csv")[0]
    shutil.move(part, path)
    shutil.rmtree(tmpdir, ignore_errors=True)
    print(f"💾 Saved: {path}")

# ---------------- Demos ----------------
print("\n🏋️  Preset: muscle_gain (any diet)")
gx1 = goal_query("muscle_gain", top_k=10)
gx1.show(truncate=False)

print("\n⚖️  Preset: weight_loss (DASH only)")
gx2 = goal_query("weight_loss", top_k=10, diet_filter="dash")
gx2.show(truncate=False)

# Save one to CSV for your report
export_csv(gx2, "/content/weight_loss_dash_top10.csv")



🏋️  Preset: muscle_gain (any diet)

🎯 Target: protein 30.0%, carbs 30.0%, fat 40.0%, kcal≈555
+------------------------------------------------+-------------+-------------+-----+-----+-----+----+----------+
|recipe_name                                     |diet_type    |cuisine_type |kcal |prot%|carb%|fat%|similarity|
+------------------------------------------------+-------------+-------------+-----+-----+-----+----+----------+
|Simple Savory Scallops                          |dash         |american     |17.1 |71.0 |21.6 |7.4 |0.9961    |
|Shrimp “Egg Roll” Stir Fry                      |dash         |chinese      |270.5|40.3 |33.1 |26.6|0.9898    |
|Pan-Seared Shrimp with Rosemary Spaghetti Squash|dash         |mediterranean|311.1|46.6 |21.8 |31.6|0.9893    |
|Paleo Coconut Crepes                            |paleo        |french       |119.4|47.0 |32.5 |20.5|0.9892    |
|Turkey Club Sandwich                            |paleo        |american     |523.8|35.8 |30.5 |33.7|0.9879    |
|