In [12]:
import os, re, json, numpy as np, pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import mean_squared_error
from google import genai
from google.genai import types


In [3]:
ROOT_DIR  = Path.cwd().parent
DATA_DIR  = ROOT_DIR / "data" / "test data"    # adjust if "test" not "test_data"

USERS_CSV = DATA_DIR / "ten_users_gpt-4o.csv"
MEALS_CSV = DATA_DIR / "test_meal_logs.csv"

users_df  = pd.read_csv(USERS_CSV)
logs_df   = pd.read_csv(MEALS_CSV)
logs_df["timestamp"] = pd.to_datetime(logs_df["timestamp"])

print(f"Loaded {len(users_df)} users, {len(logs_df)} meal rows")


Loaded 10 users, 264 meal rows


In [None]:
# ▢ Patch — run once, then rerun downstream cells
from dotenv import load_dotenv
load_dotenv("../.env", override=True)

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=GEMINI_API_KEY) 

EMBED_MODEL = "models/gemini-embedding-exp-03-07"


def gemini_embed(text: str) -> np.ndarray:
    """
    Return a 768‑d embedding vector from Gemini embedding‑exp‑03‑07.
    """
    res = client.models.embed_content( #client.models.batch_embed_contents(...) (if you embed many meal titles)
        model=EMBED_MODEL,
        contents=text,
        config=types.EmbedContentConfig(task_type="SEMANTIC_SIMILARITY")
    )
    return np.array(res["embedding"]["values"], dtype=np.float32)



In [5]:
LOOKBACK = 7
def history_summary(uid):
    cutoff = datetime.now() - timedelta(days=LOOKBACK)
    recent = logs_df[(logs_df.user_id==uid) & (logs_df.timestamp>=cutoff)]
    titles = recent.meal_name.tolist() or [""]
    vecs   = np.vstack([gemini_embed(t) for t in titles])
    flavor_vec = vecs.mean(axis=0)

    daily  = recent.groupby(recent.timestamp.dt.date)[["kcal","protein_g","carbs_g","fat_g"]].sum()
    macros = daily.mean().to_dict() if not daily.empty else {"kcal":0,"protein_g":0,"carbs_g":0,"fat_g":0}
    return {"flavor_vec": flavor_vec, "macro_7d": macros}


In [6]:
def ramp_targets(hist_macros, true_targets, pct=0.15):
    ramp = true_targets.copy()
    cur  = hist_macros.get("kcal", 0)
    goal = true_targets["optimal_calories"]
    step = abs(cur) * pct
    diff = goal - cur
    if abs(diff) > step:
        ramp_kcal = cur + step * np.sign(diff)
    else:
        ramp_kcal = goal
    ramp["optimal_calories"] = ramp_kcal
    ratio = ramp_kcal / goal
    for m in ("protein_g","carbs_g","fat_g"):
        ramp[m] = true_targets[m] * ratio
    return ramp


In [7]:
PLANNER_SCHEMA = {
  "intent": {},
  "meals": [
    {"label":"string","name":"string",
     "macros":{"calories":"integer","protein_g":"number",
               "carbs_g":"number","fat_g":"number"}}
  ]
}

def gemini_json(schema, payload, temperature=0.3, max_tokens=700,
                system_text=""):
    prompt = (system_text +
              "Respond ONLY with JSON matching this schema:\n" +
              json.dumps(schema, indent=2) +
              "\n\n### INPUT\n" + json.dumps(payload))
    txt   = client.models.generate_content(
        model="models/gemini-2.0-flash-lite",
        contents=[{"role":"user","parts":[{"text":prompt}]}],
        config=genai.types.GenerateContentConfig(
            temperature=temperature, max_output_tokens=max_tokens)
    ).candidates[0].content.parts[0].text
    return json.loads(re.search(r"\{.*\}", txt, re.S).group())

def build_plan(profile, targets, history, alpha_taste=0.5, temperature=0.45):
    sys = (f"Weight flavour {alpha_taste:.2f} and macro {1-alpha_taste:.2f}.\n"
           f"Today's kcal target {targets['optimal_calories']:.0f}. ")
    return gemini_json(PLANNER_SCHEMA,
                       {"profile":profile,"targets":targets,"history":history},
                       temperature=temperature,
                       max_tokens=800,
                       system_text=sys)


In [8]:
def macro_score(df, targets):
    tot  = df[["kcal","protein_g","carbs_g","fat_g"]].sum().values
    targ = np.array([targets["optimal_calories"],
                     targets["protein_g"],
                     targets["carbs_g"],
                     targets["fat_g"]])
    rmse = np.sqrt(mean_squared_error(targ, tot))
    return max(0, 1 - rmse / targ[0])

def taste_score(df, flavor_vec):
    vec = gemini_embed(" ".join(df.title))
    return cosine_similarity([vec],[flavor_vec])[0,0]

def day_scores(df, flavor_vec, targets, beta_macro=0.6):
    macro = macro_score(df, targets)
    taste = taste_score(df, flavor_vec)
    return beta_macro*macro + (1-beta_macro)*taste, macro, taste


In [17]:
K = 4
alpha_taste = 0.5
beta_macro  = 0.6
THRESH = 0.70

results = []

for _, u in users_df.iterrows():
    uid = u.user_id
    hist = history_summary(uid)
    true_targ = {"optimal_calories":u.optimal_calories,"protein_g":u.protein_g,
                 "carbs_g":u.carbs_g,"fat_g":u.fat_g}
    ramped    = ramp_targets(hist["macro_7d"], true_targ, pct=0.15)

    best_score, best_df = -1, None

    print(f"\n====== User {uid} ======")
    for k in range(1, K+1):
        plan = build_plan(u.to_dict(), ramped, hist,
                          alpha_taste=alpha_taste,
                          temperature=0.45+0.05*k)
        df   = pd.DataFrame([{
            "meal_slot": m["label"], "title": m["name"],
            "kcal": m["macros"]["calories"], "protein_g": m["macros"]["protein_g"],
            "carbs_g": m["macros"]["carbs_g"], "fat_g": m["macros"]["fat_g"]
        } for m in plan["meals"]])

        score, macro, taste = day_scores(df, hist["flavor_vec"],
                                         ramped, beta_macro)

        print(f"R{k}  score {score:.3f} (macro {macro:.3f} · taste {taste:.3f})")
        if score > best_score:
            best_score, best_df = score, df

    results.append({"user_id":uid,"day_score":round(best_score,3)})
    print(f"🏆  Best for user {uid}: {best_score:.3f}")
    display(best_df[["meal_slot","title","kcal","protein_g"]])


TypeError: Models.embed_content() got an unexpected keyword argument 'task_type'

In [10]:
summary_df = pd.DataFrame(results)
print("=== Day scores ===")
display(summary_df)


=== Day scores ===
