# üöÄ AI-Powered Multi-Step Agentic Workflow with LangGraph & OpenAI GPT4o:   Plan-and-Execute

This notebook implements an **AI-powered multi-step execution system** using **LangGraph**, **LangChain**, and **OpenAI's GPT-4o**. The system follows a **structured planning, execution, and replanning process**, leveraging **ReAct (Reasoning + Acting)** methodology to dynamically break down tasks, retrieve relevant knowledge, and refine answers.

---

## üìå Overview

### üîπ What This Notebook Does:
- **ReAct Agent**: Implements an AI agent using **GPT-4o-mini** and integrates **Tavily API** for real-time web searches.
- **Planning**: Generates a structured **step-by-step** execution plan using an **LLM-powered planner**.
- **Execution**: Executes each planned step iteratively, refining responses.
- **Replanning**: Dynamically updates the plan based on execution results.
- **LangGraph Workflow**: Implements an **agentic execution graph** to handle decision-making.

### üèóÔ∏è Core Components:
- ‚úÖ **LangChain & OpenAI** ‚Üí AI-powered reasoning & execution.
- ‚úÖ **Tavily API** ‚Üí Real-time web search for external knowledge retrieval.
- ‚úÖ **LangGraph** ‚Üí Orchestrates planning, execution, and adaptive refinement.
- ‚úÖ **Asynchronous Execution** ‚Üí Optimizes performance using `asyncio`.

---

## üõ†Ô∏è Installation & Setup

Before running the notebook, install the required dependencies:

```sh
pip install -U langgraph langchain-community langchain-openai tavily-python python-dotenv


In [None]:
%pip install -U langgraph langchain-community langchain-openai tavily-python

In [None]:
%pip install python-dotenv




In [None]:
from IPython.display import Markdown, display

In [None]:
import os
from dotenv import load_dotenv
load_dotenv()
def _set_env(key: str):
    if not os.environ.get(key):
        os.environ[key] = os.getenv(key)

In [None]:
OPENAI_API_KEY="sk-proj-B"
TAVILY_API_KEY="tvly-"
# _set_env(OPENAI_API_KEY)
# _set_env(TAVILY_API_KEY)
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults

tools = [TavilySearchResults(max_results=5)]

In [None]:
from langchain_openai import ChatOpenAI

from langgraph.prebuilt import create_react_agent

llm = ChatOpenAI(model = "gpt-4o-mini")
prompt = "Be a helpful assistance and assist in the given query!!"

agent_executor = create_react_agent(llm, tools, prompt = prompt)

In [2]:
agent_executor.invoke({"messages": [("user", "show me a tabulary results of different llm models by comparing them with several benchmark test.")]})

In [None]:
import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict

class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str


In [None]:
from pydantic import BaseModel, Field

class Plan(BaseModel):
    steps: List[str] = Field(description="different steps to follow, and must be in sorted order")

In [None]:
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system", """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps."""
        ),
        ("placeholder", "{messages}")
    ]
)
planner = planner_prompt | ChatOpenAI(
    model = "gpt-4o",
    temperature=0
).with_structured_output(Plan)

In [3]:
planner.invoke(
    {
        "messages": [
            ("user", "List of all LLM models and the years they were released in increasing order")
        ]
    }
)

In [None]:
from typing import Union


class Response(BaseModel):
    """Response to user."""

    response: str


class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )


replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)


replanner = replanner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Act)

In [None]:
from typing import Literal
from langgraph.graph import END
import asyncio

async def execute_step(state: PlanExecute):
    # print(f"states: {state}")
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    await asyncio.sleep(1)
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    # Update 'past_steps' correctly
    past_steps = state.get("past_steps", [])  # Get existing past_steps or initialize an empty list
    past_steps.append((task, agent_response["messages"][-1].content))  # Append the new step
    return {
        "past_steps": past_steps, # Return the updated past_steps
    }

async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    # Include 'past_steps' in the returned dictionary, initialized as an empty list if not present
    return {"plan": plan.steps, "past_steps": state.get("past_steps", [])}


async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response, "past_steps": state.get("past_steps", [])}
    else:
        return {
            "plan": output.action.steps,
            "past_steps": state.get("past_steps", []),
        }


def should_end(state: PlanExecute):
    if "response" in state and state["response"]:
        return END
    else:
        return "agent"

In [None]:
from langgraph.graph import StateGraph, START

workflow = StateGraph(PlanExecute)


# Add the plan node
workflow.add_node("planner", plan_step)

# Add the execution step
workflow.add_node("agent", execute_step)

# Add a replan node
workflow.add_node("replan", replan_step)

workflow.add_edge(START, "planner")

# From plan we go to agent
workflow.add_edge("planner", "agent")

# From agent, we replan
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    # Next, we pass in the function that will determine which node is called next.
    should_end,
    ["agent", END],
)

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()


In [4]:
from IPython.display import Image, display

display(Image(app.get_graph(xray=True).draw_mermaid_png()))

## Alignment with the Image

| **Workflow Step**         | **Code**                                                                                          | **Graph Alignment**                                                                 |
|----------------------------|--------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------|
| **Start ‚Üí Planner**        | `workflow.add_edge(START, "planner")`                                                           | The workflow starts at `__start__` and moves to `planner`.                         |
| **Planner ‚Üí Agent**        | `workflow.add_edge("planner", "agent")`                                                        | Solid edge connects `planner` to `agent`.                                          |
| **Agent ‚Üí Replan**         | `workflow.add_edge("agent", "replan")`                                                         | Solid edge connects `agent` to `replan`.                                           |
| **Replan ‚Üí Conditional**   | `workflow.add_conditional_edges("replan", should_end, ["agent", END])`                          | Dashed edges connect `replan` to both `agent` (loop back) and `__end__` (terminate).|
| **Agent ‚Üí End (optional)** | The `should_end` condition allows skipping `replan` and terminating the workflow directly at `END`. | Dashed edge connects `agent` to `__end__` for early termination.                   |

---

## Key Takeaways

1. **Dynamic Flow**: The code uses conditional edges (`should_end`) to control whether the workflow loops back to `agent` or terminates.
2. **Alignment with Graph**:
   - Nodes (`planner`, `agent`, `replan`, `END`) and edges directly correspond to the diagram.
   - Dashed edges represent the dynamic conditional transitions in the workflow.
3. **Early Termination**: The `END` node can be reached either:
   - Directly after `agent` (if execution succeeds without replanning).
   - After `replan` (if no further steps are required).


In [5]:
config = {"recursion_limit": 50}
inputs = {
    "input": "List of all LLM models and the years they were released in increasing order after 2021 upto latest 5",
    "past_steps": []
    }
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k == "response":
            # Print the final response as Markdown
            print(f"```markdown\n{v}\n```")
        elif k != "__end__":
            print(v)