# LangGraph implementation


In [20]:
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.tools import tool
from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai.chat_models import ChatOpenAI

from langgraph.graph import StateGraph, START, END

from nlp.NLSPipeline import extract_nutritional_features
from models.dev.faiss_indexes import FlatIndex

from dotenv import load_dotenv
from typing import Dict, List, Dict, Any, Optional, TypedDict

# from langchain_openai import ChatOpenAI

# Load env
load_dotenv()

# Load index
index = FlatIndex("../../recipe_embeddings.json")



## Custom Code
Works but no input loop.


In [None]:
# Define your tools
@tool
def recommend_recipes(q: str) -> str:
    """Takes in user input, extracts relevant features, and output recommended recipes from database"""

    print(f"\n Parsed user input: \n {q} \n")

    extracted_features = extract_nutritional_features(q)

    print(f"\n Extracted features: \n {extracted_features} \n")

    recs = index.recommend_recipes(
        user_ingredients=extracted_features.user_ingredients,
        allergens=extracted_features.allergens,
        calories=extracted_features.calories,
        total_fat=extracted_features.total_fat,
        protein=extracted_features.protein,
        saturated_fat=extracted_features.saturated_fat,
        carbs=extracted_features.carbs,
        sodium=extracted_features.sodium,
        sugar=extracted_features.sugar,
        top_n=10,
    )

    print(f"\n recs: \n {recs} \n")

    return ", ".join(recs)


@tool
def final_answer(answer: str) -> str:
    """Useful for providing the final answer to the user."""
    return answer


tools = [recommend_recipes, final_answer]

# Define the LLM
llm = ChatOpenAI(temperature=0.7)

# Define the agent
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """
            You are a recipe recommendation program that takes in user inputs
            and outputs a list of recipes. If a user does not provide enough information
            on what recipes they want, you will keep on asking them about it until
            you have enough information to get a good recommendation for the user.
            """,
        ),
        ("placeholder", "{chat_history}"),
        ("human", "{query}"),
        ("placeholder", "{agent_scratchpad}"),
    ]
)
agent = create_tool_calling_agent(llm=llm, prompt=prompt, tools=tools)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)


# Define the LangGraph state
class AgentState(TypedDict):
    messages: List[Any]
    ingredients: Optional[str]
    nutrition: Optional[str]
    recipes: Optional[str]


# Define the nodes
def agent_node(state: AgentState) -> Dict:
    query = state["messages"][-1]
    chat_history = state["messages"][:1]
    print(f"agent_node: query = {query}")
    print(f"agent_node: chat_history = {chat_history}")
    result = agent_executor.invoke({"query": query, "chat_history": chat_history})
    print(f"agent_node: result = {result}")
    output = {"messages": state["messages"] + [AIMessage(content=result["output"])]}
    print(f"agent_node: output = {output}")
    return output


def recommendation_node(state: AgentState):
    print(f"recommend_node: state = {state}")
    result = recommend_recipes.run(
        ingredients=state["ingredients"], nutrition=state["nutrition"]
    )
    return {
        "messages": state["messages"]
        + [AIMessage(content=f"Here are some recommended recipes: {result}")],
        "recipes": result,
    }


def final_answer_node(state: AgentState) -> Dict:
    print(f"final_answer_node: state = {state}")
    result = final_answer.run(answer=state["messages"][-1])
    return {"messages": state["messages"] + [AIMessage(content=result)]}


# Define router
def router(state: AgentState):
    if "recommend_recipes" in state["messages"][-1]:
        return "recommend"
    elif "final_answer" in state["messages"][-1]:
        return "final"
    else:
        return "agent"


# Define the graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("recommendation_tool", recommendation_node)
workflow.add_node("final_answer_tool", final_answer_node)

# Set up the edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    router,
    {
        "recommend": "recommendation_tool",
        "final": "final_answer_tool",
        "agent": "agent",
    },
)
workflow.add_edge("recommendation_tool", "agent")
workflow.add_edge("final_answer_tool", END)

# Compile the graph
app = workflow.compile()

# Example usage
# inputs = {
#     "messages": [
#         HumanMessage(
#             content="I'm looking for healthy recipes with chicken and low carbs."
#         )
#     ],
#     "ingredients": "chicken, garlic, honey",
#     "nutrition": "High protein",
#     "recipes": "Honey garlic chicken",
# }
inputs = AgentState(
    # messages=["I'm looking for healthy recipes with chicken and low carbs"]
    messages=[
        "Please recommend recipes that are healthy are have chicken. I want it to have low carbs too."
    ]
)
result = app.invoke(inputs)
print(result)

# inputs2 = {"messages": [HumanMessage(content="Hi, how are you?")]}
# result2 = app.invoke(inputs2)
# print(result2)

# inputs3 = {"messages": [HumanMessage(content="I need recipes with beef and high protein")]}
# result3 = app.invoke(inputs3)
# print(result3)

In [None]:
from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
    pass

## Input Loop but No Memory

In [21]:
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END


# Define tools
@tool
def recommend_recipes(q: str) -> str:
    """Takes in user input, extracts relevant features, and output recommended recipes from database"""

    print(f"\n Parsed user input: \n {q} \n")

    extracted_features = extract_nutritional_features(q)

    print(f"\n Extracted features: \n {extracted_features} \n")

    recs = index.recommend_recipes(
        user_ingredients=extracted_features.user_ingredients,
        allergens=extracted_features.allergens,
        calories=extracted_features.calories,
        total_fat=extracted_features.total_fat,
        protein=extracted_features.protein,
        saturated_fat=extracted_features.saturated_fat,
        carbs=extracted_features.carbs,
        sodium=extracted_features.sodium,
        sugar=extracted_features.sugar,
        top_n=10,
    )

    print(f"\n recs: \n {recs} \n")

    return ", ".join(recs)


@tool
def final_answer(answer: str) -> str:
    """Useful for providing the final answer to the user."""
    print("Hi im the final answer")
    return answer


# Define tool node
tools = [recommend_recipes, final_answer]
tool_node = ToolNode(tools)  # A single node that contains all the tools


# Define llm
llm = ChatOpenAI().bind_tools(tools)


# Define nodes
def router(state: MessagesState):
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return "end"


def agent(state: MessagesState):
    messages = state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}


# Define graph
workflow = StateGraph(MessagesState)

workflow.add_node("agent", agent)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", router, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")

app = workflow.compile()


# IO
def stream_graph_updates(user_input: str):
    for event in app.stream({"messages": [{"role": "user", "content": user_input}]}):
        for value in event.values():
            print("Assistant:", value["messages"][-1].content)


while True:
    try:
        user_input = input("User: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break

        stream_graph_updates(user_input)
    except:
        # fallback if input() is not available
        user_input = "What do you know about LangGraph?"
        print("User: " + user_input)
        stream_graph_updates(user_input)
        break

Assistant: Hello! How can I assist you today?
Assistant: 

 Parsed user input: 
 high protein chicken recipes with rice or potatoes 


 Extracted features: 
 user_ingredients=['chicken', 'rice', 'potatoes'] allergens=[] calories=None total_fat=None saturated_fat=None carbs=None sugar=None sodium=None protein=None 

Assistant: Error: TypeError('cannot unpack non-iterable NoneType object')
 Please fix your mistakes.
Assistant: I encountered an error while trying to retrieve the recommended recipes. Let me try again.

 Parsed user input: 
 high protein chicken with rice 


 Parsed user input: 
 high protein chicken with potatoes 


 Extracted features: 
 user_ingredients=['chicken', 'rice'] allergens=[] calories=None total_fat=None saturated_fat=None carbs=None sugar=None sodium=None protein=(20.0, 30.0) 


 Extracted features: 
 user_ingredients=['chicken', 'potatoes'] allergens=[] calories=None total_fat=None saturated_fat=None carbs=None sugar=None sodium=None protein=(20.0, 30.0) 

As

In [None]:
from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

## Has memory but input loop is glitchy (James Briggs implementation)

In [None]:
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.prebuilt import ToolNode
import operator


# Define state
class AgentState(TypedDict):
    input: str
    # messages: Annotated[list[AnyMessage], add_messages]
    chat_history: list[BaseMessage]
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]


# Define tools
@tool
def recommend_recipes(q: str) -> str:
    """Takes in user input, extracts relevant features, and output recommended recipes from database"""

    print(f"\n Parsed user input: \n {q} \n")

    extracted_features = extract_nutritional_features(q)

    print(f"\n Extracted features: \n {extracted_features} \n")

    recs = index.recommend_recipes(
        user_ingredients=extracted_features.user_ingredients,
        allergens=extracted_features.allergens,
        calories=extracted_features.calories,
        total_fat=extracted_features.total_fat,
        protein=extracted_features.protein,
        saturated_fat=extracted_features.saturated_fat,
        carbs=extracted_features.carbs,
        sodium=extracted_features.sodium,
        sugar=extracted_features.sugar,
        top_n=10,
    )

    print(f"\n recs: \n {recs} \n")

    return ", ".join(recs)


@tool
def final_answer(answer: str) -> str:
    """Useful for providing the final answer to the user."""
    print("Hi im the final answer")
    return answer


# Define tool node
tools = [recommend_recipes, final_answer]
tool_node = ToolNode(tools)  # A single node that contains all the tools


# Define llm
llm = ChatOpenAI()


# Define prompt
system_prompt = """You are the oracle, the great AI decision maker.
Given the user's query you must decide what to do with it based on the
list of tools provided to you.

You are also recipe recommendation program that takes in user inputs
and outputs a list of recipes. If a user does not provide enough information
on what recipes they want, you will keep on asking them about it until
you have enough information to get a good recommendation for the user.

If you see that a tool has been used (in the scratchpad) with a particular
query, do NOT use that same tool with the same query again. Also, do NOT use
any tool more than twice (ie, if the tool appears in the scratchpad twice, do
not use it again).

You should aim to collect information from a diverse range of sources before
providing the answer to the user. Once you have collected plenty of information
to answer the user's question (stored in the scratchpad) use the final_answer
tool."""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="chat_history"),
        ("user", "{input}"),
        ("assistant", "scratchpad: {scratchpad}"),
    ]
)


# Define scratchpad
def create_scratchpad(intermediate_steps: list[AgentAction]):
    """
    Creates a scratchpad displaying the step-by-step input and output between Human and Agent Messsages.
    """
    research_steps = []
    for i, action in enumerate(intermediate_steps):
        if action.log != "TBD":
            # this was the ToolExecution
            research_steps.append(
                f"Tool: {action.tool}, input: {action.tool_input}\n"
                f"Output: {action.log}"
            )
    return "\n---\n".join(research_steps)


# Define agent
agent = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "scratchpad": lambda x: create_scratchpad(
            intermediate_steps=x["intermediate_steps"]
        ),
    }
    | prompt
    | llm.bind_tools(tools, tool_choice="any")
)

# Test run
inputs = {
    "input": "please give me a breakfast recipe with eggs",
    "chat_history": [],
    "intermediate_steps": [],
}
out = agent.invoke(inputs)
out

In [None]:
def agent_node(state: list):
    print("run_agent")
    print(f"intermediate_steps: {state['intermediate_steps']}")
    out = agent.invoke(state)
    tool_name = out.tool_calls[0]["name"]  # Get name of tool
    tool_args = out.tool_calls[0]["args"]  # Get args that were passed to the tool
    action_out = AgentAction(tool=tool_name, tool_input=tool_args, log="TBD")
    return {"intermediate_steps": [action_out]}


def router(state: list):
    # return the tool name to use
    if isinstance(state["intermediate_steps"], list):
        return state["intermediate_steps"][-1].tool
    else:
        # if we output bad format go to final answer
        print("Router invalid format")
        return "final_answer"

In [None]:
tool_str_to_func = {
    "recommend_recipes": recommend_recipes,
    "final_answer": final_answer,
}


def tools_node(state: list):
    # use this as helper function so we repeat less code
    tool_name = state["intermediate_steps"][-1].tool
    tool_args = state["intermediate_steps"][-1].tool_input
    print(f"{tool_name}.invoke(input={tool_args})")
    # run tool
    out = tool_str_to_func[tool_name].invoke(input=tool_args)
    action_out = AgentAction(tool=tool_name, tool_input=tool_args, log=str(out))
    return {"intermediate_steps": [action_out]}

In [None]:
from langgraph.graph import StateGraph, END

graph = StateGraph(AgentState)

graph.add_node("agent", agent_node)
graph.add_node("recommend_recipes", tools_node)
graph.add_node("final_answer", tools_node)

graph.set_entry_point("agent")

graph.add_conditional_edges(
    source="agent",  # where in graph to start
    path=router,  # function to determine which node is called
)

# create edges from each tool back to the oracle
for tool_obj in tools:
    if tool_obj.name != "final_answer":
        graph.add_edge(tool_obj.name, "agent")

# if anything goes to final answer, it must then move to END
graph.add_edge("final_answer", END)

runnable = graph.compile()

In [None]:
from IPython.display import Image, display

try:
    display(Image(runnable.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [None]:
out = runnable.invoke(
    {
        "input": "i want a chicken rice recipe with calories between 0-2000, total fat between 0-1000, saturated fat between 0-1000, carbs between 0-1000, sugar between 0-200, sodium between 0-500mg, and protein between 0-200",
        "chat_history": ["Im allergic to tree nuts"],
    }
)

print("\n\nFinal output:")
out

In [None]:
out["intermediate_steps"][-1].tool_input

In [None]:
out["intermediate_steps"]

In [None]:
from langchain_core.messages import HumanMessage


def stream_graph_updates(user_input: str, state: AgentState):
    state["chat_history"].append(HumanMessage(content=user_input))

    for event in runnable.stream(state):
        for value in event.values():
            assistant_message = value["chat_history"][-1].content
            print("Assistant:", assistant_message)
            state["chat_history"].append(AIMessage(content=assistant_message))


while True:
    try:
        user_input = input("User: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break

        # Initialize state if first interaction
        if "state" not in locals():
            state = {"input": user_input, "chat_history": [], "intermediate_steps": []}

        print(state)

        stream_graph_updates(user_input, state)
    except Exception as e:
        print(f"Error: {e}")
        break

In [None]:
# Initialize state
state: AgentState = {
    "input": "",
    "chat_history": [],
    "intermediate_steps": [],
}


def stream_graph_updates(user_input: str, state: AgentState):
    try:
        # Update input
        state["input"] = user_input
        chat_history_input = HumanMessage(content=user_input)
        print(f"chat_history_input: {chat_history_input}")
        state["chat_history"].append(chat_history_input)
        print(f"state: {state}")

        # Stream response from runnable
        for event in runnable.invoke(state):
            print("DEBUG: Received event:", event)  # Print full event

            for key, value in event.items():
                print(f"DEBUG: Key: {key}, Value: {value}")

            for value in event.values():
                print(f"value: {value}")
                print(f"chat_history in value: {'chat_history' in value}")
                print(f"value['chat_history']: {value['chat_history']}")
                if "chat_history" in value and value["chat_history"]:
                    assistant_message = value["chat_history"][-1].content
                    print("Assistant:", assistant_message)
                    state["chat_history"].append(AIMessage(content=assistant_message))
                else:
                    print("Warning: No chat history found in response!")

    except Exception as e:
        print(f"Error in stream_graph_updates: {e}")


while True:
    try:
        user_input = input("User: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break

        stream_graph_updates(user_input, state)
    except Exception as e:
        print(f"Error in main loop: {e}")
        break