In [None]:
pip install langgraph langchain-google-genai langchain-community faiss-cpu pandas

In [None]:
import os
import getpass
import pandas as pd
from typing import Annotated, TypedDict, List
from langchain_community.vectorstores import FAISS
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage
from langgraph.graph import StateGraph, END, add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

In [None]:
# --- 1. SETUP API KEY ---
# Replace "PASTE_YOUR_KEY_HERE" with your actual AIza... key
os.environ["GOOGLE_API_KEY"] = "Your_API_KEY"

if "GOOGLE_API_KEY" not in os.environ:
    os.environ["GOOGLE_API_KEY"] = getpass.getpass("API Key: ")

embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")

In [None]:
# DATA: Note that Germany is "Safe" but expensive
data = {
    "product_id": ["GPU-X100", "GPU-X100", "CPU-Z50"],
    "location": ["Taiwan", "Germany", "California"],
    "stock": [0, 50, 100],  # Taiwan is out, Germany has stock
    "cost_modifier": [1.0, 1.3, 1.0] # Germany is 30% more expensive
}
df_inventory = pd.DataFrame(data)

news_feed = ["URGENT: Super Typhoon 'Kuna' is hitting Taiwan. Ports closed."]
vector_store = FAISS.from_texts(news_feed, embedding=embeddings)
retriever = vector_store.as_retriever()

In [None]:
# --- 2. TOOLS (Capabilities) ---

@tool
def check_inventory(product_name: str):
    """Checks stock levels and locations for a product."""
    print(f"üì¶ [Database] Checking stock for: {product_name}")
    matches = df_inventory[df_inventory["product_id"].str.contains(product_name, case=False)]
    if matches.empty:
        return "No stock found."
    return matches.to_string(index=False)

@tool
def check_risks(location: str):
    """Checks for logistics risks (weather, strikes) in a location."""
    print(f"üåç [Risk Scanner] Checking news for: {location}")
    docs = retriever.invoke(location)
    return "\n".join([doc.page_content for doc in docs]) if docs else "No major risks reported."

@tool
def finalize_order(product_id: str, location: str, quantity: int):
    """
    CRITICAL: Places the final shipping order.
    Use this ONLY when the user explicitly asks to buy/ship.
    """
    print(f"\nüö® [System] Attempting to finalize order from {location}...")

    # 1. Identify High Cost Logic (The "Germany" trap)
    row = df_inventory[(df_inventory["product_id"] == product_id) & (df_inventory["location"] == location)]
    cost_mod = row.iloc[0]['cost_modifier'] if not row.empty else 1.0

    warning_msg = ""
    if cost_mod > 1.0:
        warning_msg = f"‚ö†Ô∏è WARNING: Sourcing from {location} incurs a {(cost_mod-1)*100:.0f}% cost markup."

    # 2. TRIGGER HUMAN APPROVAL (The Solution to Challenge 2 & 3)
    # The code stops here and waits for input.
    decision = interrupt(f"Approve order for {quantity}x {product_id} from {location}? {warning_msg} (yes/no)")

    # 3. Handle Decision
    print(f"üë§ Human Admin said: {decision}")

    if decision.lower() == "yes":
        return f"‚úÖ SUCCESS: Order placed. Shipping {quantity} units from {location}. (Approved by Human)"
    else:
        return f"‚ùå ABORTED: Order cancelled by Human Admin due to cost/risk."

In [None]:
# --- 3. STATE & NODES ---

class State(TypedDict):
    messages: Annotated[list, add_messages]

def agent_node(state: State):
    """The Brain: Maintains conversation history (Memory)."""
    system_prompt = SystemMessage(content="""
    You are the Supply Chain Sentinel.
    1. ALWAYS check inventory first.
    2. ALWAYS check risks for the found location.
    3. If stock is found in a safe location, propose it to the user.
    4. NEVER finalize an order without calling 'finalize_order'.
    """)

    # Bind tools so the LLM knows it can use them
    model = llm.bind_tools([check_inventory, check_risks, finalize_order])
    return {"messages": [model.invoke([system_prompt] + state["messages"])]}

In [None]:
# --- 4. BUILD GRAPH ---

workflow = StateGraph(State)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode([check_inventory, check_risks, finalize_order]))

workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", tools_condition)
workflow.add_edge("tools", "agent")

# MEMORY IS KEY: Use MemorySaver to persist across sessions
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

In [None]:
# --- 5. EXECUTION: SOLVING THE CHALLENGES ---

# We use the same thread_id to simulate "Two Hours Later"
config = {"configurable": {"thread_id": "manager_session_55"}}

print("\n--- üïê SESSION 1: Initial Context ---")
# User asks about Taiwan. Agent finds stock is 0 and Typhoon exists.
print("User: Can we ship GPU-X100 from Taiwan?")
app.invoke(
    {"messages": [HumanMessage(content="Can we ship GPU-X100 from Taiwan?")]},
    config=config
)

print("\n--- üïë SESSION 2: Memory Test (2 Hours Later) ---")
# CHALLENGE 1 SOLVED: "Those GPUs" refers to previous context.
# User pivots to Germany.
print("User: What about 'those GPUs' we discussed? Are they available anywhere else?")
result = app.invoke(
    {"messages": [HumanMessage(content="What about those GPUs we discussed? Are they available anywhere else?")]},
    config=config
)
print("AI:", result['messages'][-1].content)

print("\n--- üïí SESSION 3: The Critical Decision (HITL) ---")
# CHALLENGE 2 & 3 SOLVED: Agent tries to ship from Germany (expensive).
# This triggers the 'finalize_order' tool, which triggers the INTERRUPT.
print("User: Okay, ship them from Germany immediately.")
final_res = app.invoke(
    {"messages": [HumanMessage(content="Okay, ship them from Germany immediately.")]},
    config=config
)

# NOTE: The code will PAUSE here.
# You (the Human) act as the manager.
# Type 'yes' to approve the 30% markup, or 'no' to save money.

# Handling the Resume (Simulating the UI callback)
# In a real app, this happens when the user clicks a button.
last_event = app.get_state(config).next
if last_event:
    # Resume with the decision
    resume_command = Command(resume="yes") # Change to "no" to test rejection
    final_output = app.invoke(resume_command, config=config)
    print("\n‚úÖ Final AI Response:\n", final_output['messages'][-1].content)