In [71]:
import os
import asyncio
import operator
import requests
import warnings
from typing import TypedDict, Annotated, List, Union
warnings.filterwarnings("ignore", category=DeprecationWarning)

from pydantic import BaseModel, Field
from IPython.display import display, Markdown
from pprint import pprint
from dotenv import load_dotenv

from langchain_groq import ChatGroq
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_community.tools import TavilySearchResults

In [72]:
load_dotenv()

True

In [73]:
api_key = os.getenv("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Autonomous Agent"

user_key = os.getenv("PUSHOVER_KEY")
api_token = os.getenv("PUSHOVER_TOKEN")
pushover_url = "https://api.pushover.net/1/messages.json"

In [74]:
#  CONFIGURATION & INSTRUCTIONS 
HOW_MANY_SEARCHES = 5

PLANNER_INSTRUCTIONS = "You are a helpful research assistant. Given a query, come up with a set of web searches to perform to best answer the query. Output {HOW_MANY_SEARCHES} terms to query for. Focus on recent and distinct queries."

SEARCH_INSTRUCTIONS = "You are a research assistant. Given a search term, you search the web for that term and produce a concise, factual summary of the results. The summary must be 2-3 paragraphs. Capture the main points, especially specific entity names, dates, and version numbers. Write clearly. Do not include any additional commentary other than the summary itself."

WRITER_INSTRUCTIONS = (
    "You are a senior researcher writing a comprehensive, in-depth professional report.\n\n"
    "CRITICAL REQUIREMENTS:\n"
    "1. LENGTH: Your report MUST be between 1500-2000 words. This is not optional.\n"
    "2. DEPTH: Provide extensive detail, multiple examples, comparisons, and analysis for each topic.\n"
    "3. STRUCTURE: Use clear markdown sections with proper headings (##, ###).\n"
    "4. ACCURACY: ONLY use information from the provided search results. Do NOT invent frameworks or data.\n"
    "5. SPECIFICITY: Include specific names, versions, dates, market data, and technical details found in search results.\n"
    "6. COMPARISONS: Provide detailed comparisons between frameworks, highlighting strengths/weaknesses.\n"
    "7. EXAMPLES: Include concrete use cases and implementation examples where available.\n\n"
    "If search data is missing on a topic, explicitly state: 'I could not find information on [topic]'.\n"
    "DO NOT mention 'TensorFlow Agents' or 'PyTorch Agents' unless they appear in the actual search results.\n\n"
    "Remember: The report must be COMPREHENSIVE (1500-2000 words) with substantial depth and detail."
)

PUSH_INSTRUCTIONS = """You are a member of a research team and will be provided with a short summary of a report.
When you receive the report summary, you send a push notification to the user using your tool, informing them that research is complete,
and including the report summary you receive"""

#### SCHEMA
##### This is How the Output will be Arranged

In [75]:
class WebSearchItem(BaseModel):
    reason: str = Field(description = " Your reasoning for why this search is import to the query")
    query: str = Field(description = "The search term to use for the web search.")

class WebSearchPlan(BaseModel):
    searches: List[WebSearchItem] = Field(description = "A list of web searches performed.")

class ReportData(BaseModel):
    short_summary: str = Field(description = "A short 2-3 sentence summary of the findings.")
    markdown_report: str = Field(description = "The final report.")
    follow_up_questions: str = Field(description = "Suggested topics to research further.")

#### TOOLS

In [76]:
@tool
def web_search_tool(query: str) -> str:
    """Search the web for the given term. Use this for research."""
    try:
        search = TavilySearchResults(max_results = 3)
        return str(search.invoke(query))
    except Exception as e:
        return f"Error performing search :{e}"

@tool
def push_notification_tool(message: str):
    """Send a push notification with this brief message"""
    if not user_key or not api_token:
        return "Error: PUSHOVER USER OR PUSHOVER TOKEN not found in the Environment"
    payload = {"user": user_key, "token": api_token, "message": message }
    pushover_url = "https://api.pushover.net/1/messages.json"

    try:
        response = requests.post(pushover_url, data= payload)
        if response.status_code == 200:
            return "Success"
        else:
            return f"Failed to send notification: {response.text}"
    except Exception as e:
        return f"Error sending notification: {e}"


In [77]:
web_search_tool

StructuredTool(name='web_search_tool', description='Search the web for the given term. Use this for research.', args_schema=<class 'langchain_core.utils.pydantic.web_search_tool'>, func=<function web_search_tool at 0x00000222A8FCCE00>)

In [78]:
push_notification_tool

StructuredTool(name='push_notification_tool', description='Send a push notification with this brief message', args_schema=<class 'langchain_core.utils.pydantic.push_notification_tool'>, func=<function push_notification_tool at 0x00000222A92B56C0>)

#### Create a StateGraph

In [79]:
#STATE DEFINITION
class ResearchState(TypedDict, total=False):
    messages: Annotated[List[BaseMessage], add_messages]
    query: str
    search_plan: List[WebSearchItem]
    search_results: List[str]
    report: ReportData

### LL Models

In [80]:
model_mini = ChatGroq(model= "llama-3.1-8b-instant")
model_large = ChatGroq(model= "llama-3.3-70b-versatile")

#### Create the Nodes

In [81]:
async def planner_node(state: ResearchState):
    """PlannerAgent: Logic to generate the search plan."""
    print("Planning the searches...ðŸ¤”")
    planner = model_mini.with_structured_output(WebSearchPlan)
    response = await planner.ainvoke([
        SystemMessage(content=PLANNER_INSTRUCTIONS),
        HumanMessage(content=f"Query: {state['query']}")
    ])
    print(f"Will search {len(response.searches)} searches ðŸ”Ž")
    return {
        "search_plan": response.searches,
        "messages": [AIMessage(content=f"Planned {len(response.searches)} searches.")]
    }
    

async def search_node(state: ResearchState):
    """SearchAgent: Executes processes with autonomous tool loops."""
    search_agent = model_mini.bind_tools([web_search_tool])
    
    print(f"Executing {len(state['search_plan'])} parallel searches...ðŸ”Ž")
    async def perform_single_search(item: WebSearchItem):
        initial_msg = [
            SystemMessage(content=SEARCH_INSTRUCTIONS),
            HumanMessage(content=f"Search term: {item.query}\nReason: {item.reason}")
        ]
        res1 = await search_agent.ainvoke(initial_msg)
        messages = list(initial_msg) + [res1]

        if res1.tool_calls:
            for tc in res1.tool_calls:
                if tc['name'] == 'web_search_tool':
                    out = web_search_tool.invoke(tc['args'])
                    messages.append(ToolMessage(content=str(out), tool_call_id=tc['id']))
            
            res2 = await search_agent.ainvoke(messages)
            return f"Summary for {item.query}: {res2.content}"
        
        return f"Summary for {item.query}: {res1.content}"

    results = await asyncio.gather(*[perform_single_search(i) for i in state["search_plan"]])
    
    print("Web research completed.")
    return {
        "search_results": results,
        "messages": [AIMessage(content="Web research completed.")]
    }


async def writer_node(state: ResearchState):
    """WriterAgent: Synthesize the final report."""
    print("Thinkning about the report...ðŸ¤”")

    writer = model_large.with_structured_output(ReportData)
    prompt = f"Original Query: {state['query']}\n\nResearch Results:\n" + "\n".join(state["search_results"])

    response = await writer.ainvoke([
        SystemMessage(content=WRITER_INSTRUCTIONS),
        HumanMessage(content=prompt)
    ])
    print("Finished writing report")
    return{
        "report": response,
        "messages": [AIMessage(content="Final Report Generated.")]
    }

async def push_node(state: ResearchState):
    """PushAgent: Autonomous Push notification"""
    print("Pushing Notification...ðŸ””")
    pusher = model_mini.bind_tools([push_notification_tool])
    summary = state["report"].short_summary

    messages = [
        SystemMessage(content=PUSH_INSTRUCTIONS),
        HumanMessage(content=summary)
    ]

    res1 = await pusher.ainvoke(messages)
    messages.append(res1)

    if res1.tool_calls:
        for tc in res1.tool_calls:
            if tc['name'] == 'push_notification_tool':
                out = push_notification_tool,invoke(tc['arg'])
                messages.append(ToolMessage(content=str(out), tool_calls_id=tc['id']))
    
        res2 = await pusher.ainvoke(messages)
        print("Push sent and work perfected")
        return{"messages": [AIMessage(content="Notification Pushed and Comfirmed.")]}

    return {"message": [AIMessage(content="Notification step completed (no call).")]}



#### GRAPH ASSEMBLY 

In [82]:
builder = StateGraph(ResearchState)

builder.add_node("planner", planner_node)
builder.add_node("researcher", search_node)
builder.add_node("writer", writer_node)
builder.add_node("notifier", push_node)

builder.add_edge(START, "planner")
builder.add_edge("planner", "researcher")
builder.add_edge("researcher", "writer")
builder.add_edge("writer", "notifier")
builder.add_edge("notifier", END)


memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

#### RUNTIME EXECUTION

In [83]:
async def run_workflow(user_query: str):
    inputs = {"query": user_query, "messages":[HumanMessage(content=user_query)]}
    config = {"configurable": {"thread_id": "user_1"}}

    async for event in graph.astream(inputs, config=config, stream_mode="values"):
        pass
    final_state = await graph.aget_state(config)
    report = final_state.values["report"]

    display(Markdown("# Final Research Report"))
    display(Markdown(report.markdown_report))
    print("\nFollow-up Questions:")
    for q in report.follow_up_questions:
        print(f" - {q}")


#### Launch

In [None]:
await run_workflow("What are the most trending topics in Artificial Intelligence and Machine Learning in 2026")

Planning the searches...ðŸ¤”
Will search 6 searches ðŸ”Ž
Executing 6 parallel searches...ðŸ”Ž
