In [1]:
import sqlite3
from typing import Annotated, TypedDict, Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode

In [2]:
from dotenv import load_dotenv
import os

# Load variables from the .env file
load_dotenv()

True

In [3]:
api_key = os.environ["OPENAI_API_KEY"]

In [4]:

# 1. Define the State
class State(TypedDict):
    # The add_messages reducer ensures new messages are appended, not overwritten
    messages: Annotated[list[BaseMessage], add_messages]

# 2. Define the Node (The Logic)
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=api_key)

def chatbot(state: State):
    return {"messages": [llm.invoke(state["messages"])]}



    


### 1. Simple Persistent Agent (The "Memory" Test)
This script allows you to chat with an agent, stop the cell, and restart it while retaining memory.

In [5]:
# 3. Build the Graph
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

# Jupyter Implementation with Context Manager
def run_simple_memory_test():
    # 'checkpoints.db' saves to disk; ':memory:' stays in RAM
    with SqliteSaver.from_conn_string("simple_checkpoints.db") as memory:
        app = builder.compile(checkpointer=memory)
        
        config = {"configurable": {"thread_id": "amod_ds_001"}}
        
        # Test 1: Introduction
        print("--- Step 1 ---")
        for event in app.stream({"messages": [HumanMessage(content="Hi, I'm Amod, a Senior DS.")]}, config, stream_mode="values"):
            if "messages" in event:
                last_msg = event["messages"][-1]
                if last_msg.type == "ai":
                    print(f"ü§ñ: {last_msg.content}")

        # Test 2: Memory Check
        print("\n--- Step 2 ---")
        for event in app.stream({"messages": [HumanMessage(content="What is my name and seniority?")]}, config, stream_mode="values"):
            if "messages" in event:
                last_msg = event["messages"][-1]
                if last_msg.type == "ai":
                    print(f"ü§ñ: {last_msg.content}")


In [6]:
run_simple_memory_test()

--- Step 1 ---
ü§ñ: Hello Amod! As a Senior Data Scientist, you likely have a wealth of experience in data analysis, machine learning, and statistical modeling. If there's anything specific you'd like to discuss or explore, feel free to let me know!

--- Step 2 ---
ü§ñ: Your name is Amod, and you are a Senior Data Scientist.


### 2. Multi-Step Tool Agent (The "Agentic" Test)
This version includes the Router and ToolNode logic. It uses stream_mode="updates" which is much cleaner for Jupyter as it labels which node produced which output.

In [14]:
from typing import Annotated, TypedDict, Literal
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_core.messages import AIMessage

# 1. Define Tool
@tool
def calculate_compute_budget(project_name: str):
    """Calculates the GPU compute budget for a specific AI project."""
    budgets = {"project_alpha": "$50,000", "project_omega": "$120,000"}
    return f"The budget for {project_name} is {budgets.get(project_name.lower(), 'not defined')}."

tools = [calculate_compute_budget]
tool_node = ToolNode(tools)

# 2. Setup Logic
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

model = ChatOpenAI(model="gpt-4o",api_key=api_key).bind_tools(tools)

def call_model(state: AgentState):
    return {"messages": [model.invoke(state["messages"])]}



def router(state: AgentState) -> Literal["tools", END]:
    last_msg = state["messages"][-1]
    # Check if it's an AI message AND if it has tool calls
    if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
        return "tools"
    # If it's a ToolMessage or a plain text AIMessage, we stop or go to next logic
    return END

# 3. Build Graph
builder = StateGraph(AgentState)
builder.add_node("agent", call_model)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", router)
builder.add_edge("tools", "agent")

# 4. Execute with Context Manager
def run_agentic_workflow():
    with SqliteSaver.from_conn_string("agentic_checkpoints.db") as memory:
        app = builder.compile(checkpointer=memory)
        config = {"configurable": {"thread_id": "agent_session_1"}}
        
        query = "What is the budget for project_omega?"
        print(f"üöÄ User Query: {query}\n")
        
        # We use 'updates' to see exactly which node is firing
        for output in app.stream({"messages": [HumanMessage(content=query)]}, config, stream_mode="updates"):
            for node_name, state_update in output.items():
                print(f"üìç [Node: {node_name}]")
                if "messages" in state_update:
                    last_m = state_update["messages"][-1]
                    # Print tool calls or final text
                    content = last_m.tool_calls if last_m.tool_calls else last_m.content
                    print(f"   Output: {content}")



In [16]:
run_agentic_workflow()

üöÄ User Query: What is the budget for project_omega?

üìç [Node: agent]
   Output: The budget for project_omega is $120,000.
