<a href="https://colab.research.google.com/github/2003Yash/complete_langgraph/blob/main/complete_langgraph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## SIMPLE REACT AGENT FROM SCRATCH ( RE = REASONING + ACT)

In [None]:
# Simple ReAct Agent Example
# Concept: The agent Thinks -> Acts -> Observes -> Answers

from openai import OpenAI
import re

client = OpenAI()

# Simple helper actions
def calculate(expression):
    """Run a math calculation safely."""
    try:
        return eval(expression)
    except Exception as e:
        return f"Error: {e}"

def dog_weight(breed):
    """Return average weight of a dog breed."""
    weights = {
        "Bulldog": 50,
        "Border Collie": 37,
        "Scottish Terrier": 20,
        "Toy Poodle": 7
    }
    return f"A {breed} weighs about {weights.get(breed, 50)} lbs"

# Map actions to functions
actions = {
    "calculate": calculate,
    "dog_weight": dog_weight
}

# Simple prompt telling the model how to reason
system_prompt = """
You think in a loop: Thought → Action → PAUSE → Observation.
End with: Answer.

Available actions:
- calculate: do math (example: calculate: 5 + 7)
- dog_weight: get average dog weight (example: dog_weight: Bulldog)
"""

# Simple Agent class
class Agent:
    def __init__(self, system_prompt):
        self.messages = [{"role": "system", "content": system_prompt}]

    def step(self, user_input):
        self.messages.append({"role": "user", "content": user_input})
        reply = client.chat.completions.create(
            model="gpt-4o-mini",
            temperature=0,
            messages=self.messages
        ).choices[0].message.content
        print(reply)
        self.messages.append({"role": "assistant", "content": reply})
        return reply

# Regex to detect actions
action_re = re.compile(r"^Action: (\w+): (.*)$", re.MULTILINE)

# Simple query loop
def run_query(question):
    agent = Agent(system_prompt)
    user_input = question

    for _ in range(5):  # limit to avoid infinite loops
        reply = agent.step(user_input)
        match = action_re.search(reply)
        if match:
            action_name, action_input = match.groups()
            if action_name in actions:
                observation = actions[action_name](action_input.strip())
                user_input = f"Observation: {observation}"
            else:
                print("Unknown action:", action_name)
                break
        else:
            break

# Example run
run_query("How much does a Toy Poodle weigh?")


# Agent Algorithm:

# 1. Agent gets the question → “How much does a Toy Poodle weigh?”
# 2. Thinks and decides to use an action:
#   → Action: dog_weight: Toy Poodle
# 3. The code runs that action → gets Observation: A Toy Poodle weighs about 7 lbs
# 4. Agent is called again → now gives the final Answer.

## SIMPLE LANGGRAPH AGENT ( Exploring concepts of nodes, edges, conditional edges and tools )

In [None]:
# 🧠 Simple LangGraph Example
# Concept: Agent uses Nodes, Edges, Tools, and Conditional Edges

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage, ToolMessage # the agent’s conversation includes different roles — user input, system instructions, AI replies, and tool results. Each message type helps the model understand who said what and manage context properly in the reasoning loop.
import operator

# --- TOOL (like a search or calculator) ---
class SimpleSearchTool:
    name = "search_tool"

    def invoke(self, args):
        query = args.get("query", "")
        return f"Fake search results for: '{query}'"

tool = SimpleSearchTool()

# --- STATE (holds the conversation) ---
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

# --- AGENT ---
class SimpleAgent:
    def __init__(self, model, tools, system=""):
        self.system = system
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

        # Build graph
        g = StateGraph(AgentState)

        # Add nodes (steps)
        g.add_node("llm", self.ask_model)
        g.add_node("action", self.run_tool)

        # Add conditional edges
        g.add_conditional_edges(
            "llm",
            self.should_use_tool,  # function that decides next step
            {True: "action", False: END}
        )

        # After using tool, go back to the LLM
        g.add_edge("action", "llm")

        # Start at the LLM node
        g.set_entry_point("llm")
        self.graph = g.compile()

    # --- Decide whether to use a tool ---
    def should_use_tool(self, state: AgentState):
        last_msg = state["messages"][-1]
        return len(last_msg.tool_calls) > 0

    # --- Step 1: Ask the LLM what to do ---
    def ask_model(self, state: AgentState):
        msgs = [SystemMessage(content=self.system)] + state["messages"]
        reply = self.model.invoke(msgs)
        return {"messages": [reply]}

    # --- Step 2: Run tool if needed ---
    def run_tool(self, state: AgentState):
        tool_calls = state["messages"][-1].tool_calls
        results = []
        for t in tool_calls:
            tool_name = t["name"]
            args = t["args"]
            if tool_name in self.tools:
                result = self.tools[tool_name].invoke(args)
            else:
                result = "Unknown tool — please retry."
            results.append(ToolMessage(
                tool_call_id=t["id"], name=tool_name, content=str(result)
            ))
        return {"messages": results}

# --- SYSTEM PROMPT ---
prompt = """You are a helpful assistant.
If someone asks a question that requires information, use the search_tool.
Otherwise, answer directly."""

# --- RUN THE AGENT ---
model = ChatOpenAI(model="gpt-4o-mini")
agent = SimpleAgent(model, [tool], system=prompt)

# Example input
msgs = [HumanMessage(content="Search for the capital of France")]
result = agent.graph.invoke({"messages": msgs})

print("\n--- Final Response ---")
print(result["messages"][-1].content)


## AGENTIC SEARCH TOOL ( Tavily - a search engine designed for llm where we search something it gets ul scraps and returns the answer instead of url ( like duckduckgo ))

In [None]:
# 🧩 Simple Tavily Search Example
# Learn what Tavily is and how to use it

from tavily import TavilyClient
import os

# 1️⃣ Setup: create a Tavily client with your API key
# (you can store TAVILY_API_KEY in your .env file)
client = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))

# 2️⃣ Run a simple query
query = "What is special about Nvidia’s new Blackwell GPU?"
result = client.search(query, include_answer=True)

# 3️⃣ Print the short, summarized answer
print("🔍 Tavily Answer:")
print(result["answer"])

#example: ( if we ussed duckduckgo we will only get urls not answers or content summary)
# {
#   "answer": "Nvidia's new Blackwell GPU offers up to 20x AI performance improvement...",
#   "results": [
#     {
#       "title": "Nvidia Blackwell Overview",
#       "url": "https://www.nvidia.com/en-us/data-center/blackwell-architecture/",
#       "content": "Blackwell architecture introduces ..."
#     }
#   ]
# }


# to use as tool for langgraph agents:
from langchain_community.tools.tavily_search import TavilySearchResults
tool = TavilySearchResults(max_results=4)  # increased number of results ( if itself whill use api key fron .env no need to create a client )


## LANGGRAPH PERSISTENCE AND STREAMING ( Persistance means = especially when useing long running agents we keep a state and we use over tie sometimes we go back to that state if we need again in future, happens by saving the state in a seperate database AND Streaming means = when agents are taking too long we output a stream of data which describes what exactly is happening inside llms )

PERSISTANCE

In [None]:
# --- Persistence Example ---
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator

# Define simple memory for persistence
memory = SqliteSaver.from_conn_string(":memory:")

# Define a simple state type
class ChatState(TypedDict):
    messages: Annotated[list, operator.add]

# Simple echo node that repeats what the user says
def echo_node(state: ChatState):
    user_msg = state["messages"][-1].content
    reply = f"Echo: {user_msg}"
    return {"messages": [AIMessage(content=reply)]}

# Build graph
graph = StateGraph(ChatState)
graph.add_node("echo", echo_node)
graph.set_entry_point("echo")
bot = graph.compile(checkpointer=memory)

# Simulate persistent conversation (same thread_id keeps memory)
thread = {"configurable": {"thread_id": "1"}}

for event in bot.stream({"messages": [HumanMessage(content="Hello!")]}, thread):
    print(event)

for event in bot.stream({"messages": [HumanMessage(content="How are you?")]}, thread):
    print(event)

# Even though the code runs in two separate calls, the graph “remembers” the conversation because of the SQLite checkpoint (persistence).

In [None]:
STREAMING

In [None]:
# --- Streaming Example (fixed & clearer) ---
import asyncio
from langgraph.graph import StateGraph
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
import operator

# Async checkpoint (not really used here, but shows concept of persistence)
memory = AsyncSqliteSaver.from_conn_string(":memory:")

# Define simple conversation state
class ChatState(TypedDict):
    messages: Annotated[list, operator.add]

# A node that streams output word by word
async def stream_node(state: ChatState):
    user_text = state["messages"][-1].content
    # Simulate model streaming one word at a time
    for word in f"Streaming reply to: {user_text}".split():
        await asyncio.sleep(0.1)  # simulate delay
        yield {"event": "on_chat_model_stream", "data": {"chunk": {"content": word + " "}}}

# Build simple graph
graph = StateGraph(ChatState)
graph.add_node("stream", stream_node)
graph.set_entry_point("stream")
bot = graph.compile(checkpointer=memory)

async def main():
    thread = {"configurable": {"thread_id": "2"}}
    stream = []  # <-- we collect chunks here

    async for event in bot.astream_events({"messages": [HumanMessage(content="Hi there!")]}, thread):
        if event["event"] == "on_chat_model_stream":
            chunk = event["data"]["chunk"]["content"]
            print(chunk, end="")  # live stream output
            stream.append(chunk)  # store chunk

    print("\n\nFull streamed message:")
    print("".join(stream))  # <-- final combined output

asyncio.run(main())

# Instead of printing a full response at once, the assistant “streams” it piece by piece (word by word), like a live typing effect.

# Each event from astream_events() yields a partial output chunk that we print in real time to simulate streaming.
# We also store these chunks in a list and join them afterward to form the complete message.

## LANGGRAPH HUMAN IN THE LOOP

MANUAL HUMAN IN LOOP

In [None]:
# STEPS: UPDATE STATE ARRAY CODE TO BE THREADID AWARE AND WHEN WE WILL PROCEED THE NEXT STATE WE USE HUMAN PERMISSIN LIKE TYPING YES IN DIALOGUE BOX TO PROCEED WITH IT'S DECISION
# STATE IS A SNAPSHOT OF CURRENT NODE AND ALL IT'S METADATA, BY OBSERVING STATE ARAY WE CAN UNDERSAND WHAT HAS BEEN PROCESSED IN WHAT NODES




# Step-1: import dependencies

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.sqlite import SqliteSaver
from uuid import uuid4

memory = SqliteSaver.from_conn_string(":memory:")

tool = TavilySearchResults(max_results=2)

# Step-2: Upgraded state array logic
"""
In previous examples we've annotated the `messages` state key
with the default `operator.add` or `+` reducer, which always
appends new messages to the end of the existing messages array.

Now, to support replacing existing messages, we annotate the
`messages` key with a customer reducer function, which replaces
messages with the same `id`, and appends them otherwise.
"""
def reduce_messages(left: list[AnyMessage], right: list[AnyMessage]) -> list[AnyMessage]:
    # assign ids to messages that don't have them
    for message in right:
        if not message.id:
            message.id = str(uuid4())
    # merge the new messages with the existing messages
    merged = left.copy()
    for message in right:
        for i, existing in enumerate(merged):
            # replace any existing messages with the same id
            if existing.id == message.id:
                merged[i] = message
                break
        else:
            # append any new messages to the end
            merged.append(message)
    return merged


# Step-3: Broiler plate code about langgraph agent, except add a new parameter in below graph.compile with interrupt bfore where we cause interuption

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], reduce_messages]

class Agent:
    def __init__(self, model, tools, system="", checkpointer=None):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(
            checkpointer=checkpointer,
            interrupt_before=["action"]
        )
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        print(state)
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}


prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-3.5-turbo")
abot = Agent(model, [tool], system=prompt, checkpointer=memory) # <--- Agent initialization

# runs a query ("Whats the weather in SF?") through your LangGraph agent, using the defined thread for persistence, and prints the agent's process and final output as it happens.
messages = [HumanMessage(content="Whats the weather in SF?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread): # triggers graph execution = abot.graph.stream() initiates the LangGraph execution with the given state and thread (using.stream() function), yielding events as the agent processes the query.
    for v in event.values(): #  iterates through and prints agent events, revealing the agent's internal steps and outputs.
        print(v)


# Analyse states to manual node process histry analysis
abot.graph.get_state(thread)
abot.graph.get_state(thread).next

# Step-4: Code to human for manual interaction from terminal to type y to continue with processing:

messages = [HumanMessage("Whats the weather in LA?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)
while abot.graph.get_state(thread).next:
    print("\n", abot.graph.get_state(thread),"\n")
    _input = input("proceed?")
    if _input != "y":
        print("aborting")
        break
    for event in abot.graph.stream(None, thread):
        for v in event.values():
            print(v)

MODIFY STATE ( in state array as we store node state and metadata as processing history, we can also edit the state array and restart it from particular state simulation agent's modifed or wild card processing )

In [None]:
# Continution of above code cell: where we run until the interrupt and then modify the state

# rerun the graph with a new thread id so we don't conflict memeory state:
messages = [HumanMessage("Whats the weather in LA?")]
thread = {"configurable": {"thread_id": "3"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

# Analyse States

abot.graph.get_state(thread)
current_values = abot.graph.get_state(thread)
current_values.values['messages'][-1]
current_values.values['messages'][-1].tool_calls

# modify state
_id = current_values.values['messages'][-1].tool_calls[0]['id']
current_values.values['messages'][-1].tool_calls = [
    {'name': 'tavily_search_results_json',
  'args': {'query': 'current weather in Louisiana'},
  'id': _id}
]

abot.graph.update_state(thread, current_values.values)

# analyse updated values
abot.graph.get_state(thread)

# re-run the graph from that particular point:
for event in abot.graph.stream(None, thread):
    for v in event.values():
        print(v)


TIME-TRAVEL ( when we go behind some state in state memory and re-run it from any prev state it's called time-travel )

In [None]:
# == First Time-travel concept: Go back in time

# print state history and append in a list
states = []
for state in abot.graph.get_state_history(thread):
    print(state)
    print('--')
    states.append(state)

# go-back 3 states
to_replay = states[-3]

# start again from 3 prev states
for event in abot.graph.stream(None, to_replay.config):
    for k, v in event.items():
        print(v)




# == Second Time-travel concept: Go back in time and edit

to_replay

# update to_replay value
_id = to_replay.values['messages'][-1].tool_calls[0]['id']
to_replay.values['messages'][-1].tool_calls = [{'name': 'tavily_search_results_json',
  'args': {'query': 'current weather in LA, accuweather'},
  'id': _id}]

# update state
branch_state = abot.graph.update_state(to_replay.config, to_replay.values)

# re-run the graph
for event in abot.graph.stream(None, branch_state):
    for k, v in event.items():
        if k != "__end__":
            print(v)


# == Third Time-travel concept: Add message to a state at a given time

# get the state
to_replay
_id = to_replay.values['messages'][-1].tool_calls[0]['id']

# modify the state
state_update = {"messages": [ToolMessage(
    tool_call_id=_id,
    name="tavily_search_results_json",
    content="54 degree celcius",
)]}

# create modified state
branch_and_add = abot.graph.update_state(
    to_replay.config,
    state_update,
    as_node="action")

# rerun the graph
for event in abot.graph.stream(None, branch_and_add):
    for k, v in event.items():
        print(v)

## FURTHER RESEARCH CONCEPTS ( LANGSMITH, LANGSERVE)