In [2]:
import os
import operator
from typing import Annotated, Sequence, TypedDict, Optional, List
import pandas as pd
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, SystemMessage, AIMessage, HumanMessage
from langgraph.graph import StateGraph, END
from google.adk import Agent

load_dotenv()

True

In [3]:
llm = ChatOpenAI(model="gpt-4o-mini", api_key=os.getenv("OPENAI_API_KEY"), temperature=0)

In [4]:
RECIPES_CSV = "/Users/pranavisriya/Documents/GenAI_training/Recipe recommendation copy/src/recipe_agent/data/recipes.csv"

def load_recipes_db(path: str = RECIPES_CSV) -> list[dict]:
    df = pd.read_csv(path)
    out = []
    for _, r in df.iterrows():
        dietary = []
        if isinstance(r.get("dietary"), str) and r["dietary"].strip():
            dietary = [x.strip() for x in r["dietary"].split("|") if x.strip()]

        ingredients = [x.strip() for x in str(r["ingredients"]).split("|") if x.strip()]

        out.append({
            "id": int(r["id"]),
            "name": str(r["name"]),
            "cuisine": str(r["cuisine"]),
            "cooking_time": int(r["cooking_time"]),
            "dietary": dietary,
            "ingredients": ingredients,
            "instructions": str(r.get("instructions", "")),
        })
    return out

RECIPES_DB = load_recipes_db()

In [5]:
class RecipeAgentState(TypedDict, total=False):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    ingredients: list[str]
    dietary_restrictions: list[str]
    max_cooking_time: int  # minutes
    cuisine_preference: str
    matched_recipes: list[dict]

class UserInput(BaseModel):
    ingredients: List[str] = Field(default_factory=list, description="Ingredients the user has")
    dietary_restrictions: List[str] = Field(
        default_factory=list,
        description='Allowed: "vegetarian", "vegan", "gluten-free"'
    )
    max_cooking_time: Optional[int] = Field(default=None, description="Minutes")
    cuisine_preference: Optional[str] = Field(default=None, description="Cuisine name, if any")

extractor = llm.with_structured_output(UserInput)

In [6]:
EXTRACTION_SYSTEM = """You extract structured cooking preferences from a user message.
Return ONLY valid JSON (no markdown, no explanation) with keys:
{
  "ingredients": [string],
  "dietary_restrictions": [string],   // allowed: "vegetarian","vegan","gluten-free"
  "max_cooking_time": integer|null,   // minutes
  "cuisine_preference": string|null
}

Rules:
- Ingredients: list foods the user says they have (no quantities).
- If user doesn't specify something, use null (or [] for lists).
- Be conservative: don't invent ingredients.
"""

In [7]:
def extract_user_preferences(state: RecipeAgentState) -> RecipeAgentState:
    messages = [{"role": "system", "content": EXTRACTION_SYSTEM}] + list(state["messages"])
    response = extractor.invoke(messages)
    return {
        "ingredients": response.ingredients,
        "dietary_restrictions": response.dietary_restrictions,
        "max_cooking_time": response.max_cooking_time,
        "cuisine_preference": response.cuisine_preference,
    }

In [8]:
def search_recipes(state: RecipeAgentState) -> RecipeAgentState:
    ingredients = set(i.lower() for i in state.get("ingredients", []))
    dietary = set(state.get("dietary_restrictions", []))
    max_time = state.get("max_cooking_time", None)
    cuisine_pref = (state.get("cuisine_preference", "") or "").lower()

    matches = []
    for r in RECIPES_DB:
        r_ings = set(x.lower() for x in r["ingredients"])

        if dietary and (not dietary.issubset(set(r["dietary"]))):
            continue
        if max_time is not None and r["cooking_time"] > max_time:
            continue

        overlap = len(ingredients.intersection(r_ings)) if ingredients else 0
        if ingredients and overlap == 0:
            continue

        score = overlap + (2 if cuisine_pref and r["cuisine"].lower() == cuisine_pref else 0)
        matches.append({**r, "score": score})

    return {"matched_recipes": matches}


In [9]:
def rank_recipes(state: RecipeAgentState) -> RecipeAgentState:
    recipes = state.get("matched_recipes", []) or []
    if not recipes:
        return {"matched_recipes": []}

    prompt = (
        "You are a cooking assistant.\n"
        "Rank the following recipes from best to worst for the user.\n\n"
        "User preferences:\n"
        f"- Ingredients: {state.get('ingredients', [])}\n"
        f"- Dietary: {state.get('dietary_restrictions', [])}\n"
        f"- Max time: {state.get('max_cooking_time', None)}\n"
        f"- Cuisine: {state.get('cuisine_preference', None)}\n\n"
        "Recipes:\n"
    )

    for r in recipes:
        prompt += (
            f"- {r['name']} | cuisine={r['cuisine']} | "
            f"time={r['cooking_time']} | score={r['score']}\n"
        )

    prompt += "\nReturn ONLY a comma-separated list of recipe names ranked from best to worst."
    response = llm.invoke([SystemMessage(content=prompt)])
    ranked_names = [x.strip() for x in response.content.split(",") if x.strip()]

    name_to_recipe = {r["name"]: r for r in recipes}
    ranked = [name_to_recipe[n] for n in ranked_names if n in name_to_recipe]

    remaining = [r for r in recipes if r["name"] not in ranked_names]
    remaining = sorted(remaining, key=lambda r: r["score"], reverse=True)

    return {"matched_recipes": ranked + remaining}

In [10]:
def generate_recommendation(state: RecipeAgentState) -> RecipeAgentState:
    recipes = state.get("matched_recipes", []) or []
    ingredients = state.get("ingredients", []) or []

    if not ingredients:
        return {"messages": [AIMessage(content="Tell me what ingredients you have so I can recommend recipes.")]}
    if not recipes:
        return {"messages": [AIMessage(content="I couldnâ€™t find a matching recipe. Want to relax constraints or add more ingredients?")]}

    prompt = f"""
You are a friendly cooking assistant.

User preferences:
- Ingredients: {state.get("ingredients", [])}
- Dietary restrictions: {state.get("dietary_restrictions", [])}
- Max cooking time: {state.get("max_cooking_time", None)}
- Cuisine preference: {state.get("cuisine_preference", None)}

Candidate recipes (use ONLY these, do not invent new recipes):
{recipes}

Task:
1) Pick the BEST 3 recipes from the candidates for this user.
2) Write the final response EXACTLY in this format:

Based on your ingredients and preferences, here are 3 recipes:
1. <name> (<time> min) - <cuisine>
2. <name> (<time> min) - <cuisine>
3. <name> (<time> min) - <cuisine>
Which one would you like the full recipe for?

No extra text. No explanations. Only the formatted response.
"""
    response = llm.invoke([SystemMessage(content=prompt)])
    return {"messages": [AIMessage(content=response.content)]}

In [11]:
def build_recipe_graph():
    g = StateGraph(RecipeAgentState)
    g.add_node("extract_user_preferences", extract_user_preferences)
    g.add_node("search_recipes", search_recipes)
    g.add_node("rank_recipes", rank_recipes)
    g.add_node("generate_recommendation", generate_recommendation)

    g.set_entry_point("extract_user_preferences")
    g.add_edge("extract_user_preferences", "search_recipes")
    g.add_edge("search_recipes", "rank_recipes")
    g.add_edge("rank_recipes", "generate_recommendation")
    g.add_edge("generate_recommendation", END)
    return g.compile()

graph = build_recipe_graph()

In [12]:
def recommend_recipes(user_message: str) -> str:
    """
    ADK tool: calls your LangGraph recipe manager and returns the final assistant message.
    """
    out = graph.invoke({"messages": [HumanMessage(content=user_message)]})
    msgs = out.get("messages") or []
    return msgs[-1].content if msgs else "No response."

In [13]:
PRICES_CSV = "/Users/pranavisriya/Documents/GenAI_training/Recipe recommendation copy/src/recipe_agent/data/ingredient_prices.csv"
WALLET_CSV = "/Users/pranavisriya/Documents/GenAI_training/Recipe recommendation copy/src/recipe_agent/data/wallet.csv"

def get_best_ingredient_prices(ingredients: list[str]):
    df = pd.read_csv(PRICES_CSV)
    df["ingredient"] = df["ingredient"].astype(str).str.lower().str.strip()
    results = []
    for ing in ingredients:
        ing2 = ing.lower().strip()
        sub = df[df["ingredient"] == ing2]
        if sub.empty:
            results.append({"ingredient": ing2, "store": None, "price_usd": None, "unit": None})
        else:
            best = sub.sort_values("price_usd", ascending=True).iloc[0]
            results.append({
                "ingredient": ing2,
                "store": str(best["store"]),
                "price_usd": float(best["price_usd"]),
                "unit": str(best["unit"]),
            })
    return results

def authenticate_wallet(user_id: str, pin: str) -> bool:
    df = pd.read_csv(WALLET_CSV)
    row = df[df["user_id"] == user_id]
    return (not row.empty) and (str(row.iloc[0]["pin"]) == str(pin))

def get_wallet_balance(user_id: str) -> float:
    df = pd.read_csv(WALLET_CSV)
    row = df[df["user_id"] == user_id]
    return float(row.iloc[0]["balance_usd"])

def deduct_wallet(user_id: str, amount: float) -> float:
    df = pd.read_csv(WALLET_CSV)
    idx = df.index[df["user_id"] == user_id]
    if len(idx) == 0:
        raise ValueError("User not found")
    i = idx[0]
    bal = float(df.loc[i, "balance_usd"])
    if amount > bal:
        raise ValueError("Insufficient funds")
    df.loc[i, "balance_usd"] = round(bal - amount, 2)
    df.to_csv(WALLET_CSV, index=False)
    return float(df.loc[i, "balance_usd"])

In [14]:
recipe_agent = Agent(
    name="recipe_agent",
    description="Recipe recommendations (calls LangGraph recipe manager).",
    tools=[recommend_recipes],
    instruction="""
Use the tool recommend_recipes(user_message) to generate recipe recommendations.
Do not invent recipes outside the CSV database.
Return the tool output directly to the user.
"""
)


In [15]:
ingredient_price_agent = Agent(
    name="ingredient_price_agent",
    description="Find best store prices for ingredients using CSV.",
    tools=[get_best_ingredient_prices],
    instruction="""
Extract ingredient names from the user request and call get_best_ingredient_prices(ingredients).
Do not guess prices if not present in CSV.
"""
)


In [16]:
wallet_agent = Agent(
    name="wallet_agent",
    description="Authenticate and manage wallet balance using CSV.",
    tools=[authenticate_wallet, get_wallet_balance, deduct_wallet],
    instruction="""
Ask for user_id and PIN if needed.
Never deduct without explicit user confirmation.
Check balance before deduction.
"""
)

In [17]:
root_agent = Agent(
    name="recipe_manager",
    description="Routes between recipe, prices, and wallet agents.",
    sub_agents=[recipe_agent, ingredient_price_agent, wallet_agent],
    instruction="""
You are the recipe manager.

Routing:
- If user asks for recipes / cooking suggestions -> recipe_agent
- If user asks about ingredient prices / where to buy -> ingredient_price_agent
- If user asks about wallet / balance / purchase / pin -> wallet_agent

Be grounded: do not invent recipes, prices, or wallet data.
"""
)

In [None]:
import os
os.environ["GOOGLE_API_KEY"]='YOUR_GOOGLE_API_KEY'

In [None]:
import uuid
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types  # ADK message Content/Part types

APP_NAME = "recipe_app"
session_service = InMemorySessionService()

runner = Runner(
    agent=root_agent,          
    app_name=APP_NAME,
    session_service=session_service
)

USER_ID = "user_001"
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID)

async def call_adk(query: str):
    content = types.Content(role="user", parts=[types.Part(text=query)])

    final_text = None
    async for event in runner.run_async(
        user_id=USER_ID,
        session_id=session.id,
        new_message=content
    ):
        if event.is_final_response():
            if event.content and event.content.parts:
                final_text = event.content.parts[0].text
            else:
                final_text = event.error_message or "No final text returned."
            break

    return final_text

resp = await call_adk("I have rice and egg, suggest something Chinese")
print(resp)


  async for event in agen:


Based on your ingredients and preferences, here are 3 recipes:
1. Egg Fried Rice (15 min) - Chinese
2. Tofu Buddha Bowl (30 min) - Asian
3. Veggie Omelette (12 min) - American
Which one would you like the full recipe for?


In [18]:
graph = build_recipe_graph()

from langchain_core.messages import HumanMessage

out = graph.invoke({
    "messages": [HumanMessage(content="I have rice and egg, suggest something Chinese")]
})

out["messages"][-1].content

'Based on your ingredients and preferences, here are 3 recipes:\n1. Egg Fried Rice (15 min) - Chinese\n2. Tofu Buddha Bowl (30 min) - Asian\n3. Veggie Omelette (12 min) - American\nWhich one would you like the full recipe for?'

In [26]:
await call_adk("Where can I buy rice cheaply?")


'The cheapest place to buy rice is Walmart, where 2lb of rice costs $3.99.'

In [27]:
await call_adk("What is the cheapest place to buy rice and eggs?")


"The cheapest place to buy rice is Walmart, where 2lb of rice costs $3.99. I don't have price information for eggs."

In [29]:
await call_adk("I have chicken, garlic, and onion. Suggest dinner ideas")


'Based on your ingredients and preferences, here are 3 recipes:\n1. Chicken Stir Fry (20 min) - Chinese\n2. Lentil Soup (35 min) - Middle Eastern\n3. Paneer Tikka (25 min) - Indian\nWhich one would you like the full recipe for?'

In [30]:
await call_adk("My user id is user_001 and pin is 9999. What is my balance?")


'I could not authenticate you with the provided user ID and PIN. Please try again with the correct user ID and PIN.'