## Lets build a sample graph first

#### Define Sample State

In [5]:
from typing import TypedDict, Optional
from pydantic import BaseModel

class AgentState(BaseModel):
    step: int = 0
    log: list[str] = []

#### Create demo nodes

In [18]:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import Literal
import random

def llm_call(state: AgentState):
    state.step += 1
    state.log.append("LLM call executed")
    return state

def should_continue(state: AgentState) -> Command[Literal['tool_node', 'compress_research']]:
    decision = random.randint(0, 10)
    state.log.append(f"Think node decided with random={decision}")
    if decision > 5:
        return Command(goto="tool_node")
    else:
        return Command(goto="compress_research")

def tool_node(state: AgentState):
    state.step += 1
    state.log.append("Tool executed")
    return state

def compress_research(state: AgentState):
    state.step += 1
    state.log.append("Research compressed")
    return state


graph_builder = StateGraph(AgentState)

graph_builder.add_node('llm_call', llm_call)
graph_builder.add_node('tool_node', tool_node)
graph_builder.add_node('compress_research', compress_research)

graph_builder.add_edge(START, "llm_call")
graph_builder.add_edge("tool_node", "llm_call")  # back to LLM after tool

graph_builder.add_conditional_edges(
    'llm_call',
    should_continue,
    {
        "tool_node" : "tool_node",
        "compress_research" : "compress_research"
    }
)
graph_builder.add_edge("compress_research", END)

graph = graph_builder.compile()


In [19]:
from IPython.display import Image, display

data = graph.get_graph().draw_mermaid()
print(data)

---
config:
  flowchart:
    curve: linear
---
graph TD;
	__start__([<p>__start__</p>]):::first
	llm_call(llm_call)
	tool_node(tool_node)
	compress_research(compress_research)
	__end__([<p>__end__</p>]):::last
	__start__ --> llm_call;
	llm_call -.-> compress_research;
	llm_call -.-> tool_node;
	tool_node --> llm_call;
	compress_research --> __end__;
	classDef default fill:#f2f0ff,line-height:1.2
	classDef first fill-opacity:0
	classDef last fill:#bfb6fc



#### Now since we have a proper graph flow, lets design the original ResearchState and original graph

### Define the States for the research

In [1]:
%%writefile ../src/deep_research/research_scope.py

"""
This module includes the Research State and other states requried by the Research Agent
"""

from typing_extensions import TypedDict, Sequence, Annotated, List
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field
import operator

class ResearchState(TypedDict):
    researcher_messages : Annotated[Sequence[List[BaseMessage]], add_messages]
    tool_call_iterations : int
    research_topic : str
    compressed_research : str
    raw_notes : Annotated[List[str], operator.add]

class ResearchOutput(TypedDict):
    compressed_research : str
    raw_notes: Annotated[List[str], operator.add]
    researcher_messages : Annotated[Sequence[List[BaseMessage]], add_messages]

class LLMOuput(BaseModel):
    tool_calls : List[str] = Field(description="Which tools to call from the available tools")
    research_message : Optional[str] = Field(description="Research message")
    

class Summary(BaseModel):
    """Schema for webpage content summarization."""
    summary: str = Field(description="Concise summary of the webpage content")
    key_excerpts: str = Field(description="Important quotes and excerpts from the content")

Writing ../src/deep_research/research_scope.py


#### now lets build tavily tool

In [2]:
%%writefile ../src/deep_research/tavily.py

from tavily import TavilyClient
from dotenv import load_dotenv
from typing_extensions import List, Literal
from deep_research.openrouter import init_chat_model
from os import getenv
from deep_research.prompts import summarize_webpage_prompt
from langchain_core.tools import tool, InjectedToolArg
load_dotenv()

def get_today_str():
    """return todays date in windows, different method for other os"""
    return datetime.now().strftime("%Y -%m -%d")

# define the model
summary_model = init_chat_model(model='openai/gpt-oss-120b:free', temperature=0.3, api_key=getenv('OPENROUTER_API_KEY'))

# init tavily_client
tavily_client = TavilyClient(api_key=getenv('TAVILY_API_KEY'))

def tavily_search_multiple(
    search_queries : List[str],
    max_results: int = 3,
    topic : Literal['general', 'finance', 'news', ] = 'general',
    include_raw_content: bool =  True
):
    """Perform search using Tavily API for multiple queries.

    Args:
        search_queries: List of search queries to execute
        max_results: Maximum number of results per query
        topic: Topic filter for search results
        include_raw_content: Whether to include raw webpage content

    Returns:
        List of search result dictionaries
    """

    search_docs = []
    for query in search_queries:
        result = tavily_client.search(
            query=query,
            max_results = max_results,
            topic=topic,
            include_raw_content=include_raw_content
        )
        search_docs.append(result)
    return search_docs

def summarize_webpage_content(webpage_content: str) -> str:
    """Summarize webpage content using the configured summarization model.
    
    Args:
        webpage_content: Raw webpage content to summarize
        
    Returns:
        Formatted summary with key excerpts
    """
    try:
        # Set up structured output model for summarization
        structured_model = summarization_model.with_structured_output(Summary)
        
        # Generate summary
        summary = structured_model.invoke([
            HumanMessage(content=summarize_webpage_prompt.format(
                webpage_content=webpage_content, 
                date=get_today_str()
            ))
        ])
        
        # Format summary with clear structure
        formatted_summary = (
            f"<summary>\n{summary.summary}\n</summary>\n\n"
            f"<key_excerpts>\n{summary.key_excerpts}\n</key_excerpts>"
        )
        
        return formatted_summary
        
    except Exception as e:
        print(f"Failed to summarize webpage: {str(e)}")
        return webpage_content[:1000] + "..." if len(webpage_content) > 1000 else webpage_content

def deduplicate_search_results(search_results: List[dict]) -> dict:
    """Deduplicate search results by URL to avoid processing duplicate content.
    
    Args:
        search_results: List of search result dictionaries
        
    Returns:
        Dictionary mapping URLs to unique results
    """
    unique_results = {}
    
    for response in search_results:
        for result in response['results']:
            url = result['url']
            if url not in unique_results:
                unique_results[url] = result
    
    return unique_results

def process_search_results(unique_results: dict) -> dict:
    """Process search results by summarizing content where available.
    
    Args:
        unique_results: Dictionary of unique search results
        
    Returns:
        Dictionary of processed results with summaries
    """
    summarized_results = {}
    
    for url, result in unique_results.items():
        # Use existing content if no raw content for summarization
        if not result.get("raw_content"):
            content = result['content']
        else:
            # Summarize raw content for better processing
            content = summarize_webpage_content(result['raw_content'])
        
        summarized_results[url] = {
            'title': result['title'],
            'content': content
        }
    
    return summarized_results

def format_search_output(summarized_results: dict) -> str:
    """Format search results into a well-structured string output.
    
    Args:
        summarized_results: Dictionary of processed search results
        
    Returns:
        Formatted string of search results with clear source separation
    """
    if not summarized_results:
        return "No valid search results found. Please try different search queries or use a different search API."
    
    formatted_output = "Search results: \n\n"
    
    for i, (url, result) in enumerate(summarized_results.items(), 1):
        formatted_output += f"\n\n--- SOURCE {i}: {result['title']} ---\n"
        formatted_output += f"URL: {url}\n\n"
        formatted_output += f"SUMMARY:\n{result['content']}\n\n"
        formatted_output += "-" * 80 + "\n"
    
    return formatted_output

# ===== RESEARCH TOOLS =====

@tool(parse_docstring=True)
def tavily_search(
    query: str,
    max_results: Annotated[int, InjectedToolArg] = 3,
    topic: Annotated[Literal["general", "news", "finance"], InjectedToolArg] = "general",
) -> str:
    """Fetch results from Tavily search API with content summarization.

    Args:
        query: A single search query to execute
        max_results: Maximum number of results to return
        topic: Topic to filter results by ('general', 'news', 'finance')

    Returns:
        Formatted string of search results with summaries
    """
    # Execute search for single query
    search_results = tavily_search_multiple(
        [query],  # Convert single query to list for the internal function
        max_results=max_results,
        topic=topic,
        include_raw_content=True,
    )

    # Deduplicate results by URL to avoid processing duplicate content
    unique_results = deduplicate_search_results(search_results)

    # Process results with summarization
    summarized_results = process_search_results(unique_results)

    # Format output for consumption
    return format_search_output(summarized_results)


Writing ../src/deep_research/tavily.py


#### once we have the states defined we not proceed to to building the research agent

In [6]:
from deep_research.openrouter import init_chat_model
from langchain_core.messages import SystemMessage
from deep_research.prompts import research_agent_prompt, compress_research_human_message, compress_research_system_prompt
from langchain_core.messages import AIMessage, ToolMessage, SystemMessage, HumanMessage
from deep_research.research_state import ResearchState, ResearchOutput, LLMOuput, Summary
from deep_research.tavily import tavily_search
from typing_extensions import Literal
from langgraph.graph import StateGraph, START, END

model = init_chat_model(model='openai/gpt-oss-120b:free', temperature=0.3, api_key=getenv('OPENROUTER_API_KEY'))
compress_model = init_chat_model(model='x-ai/grok-4-fast:free', temperature=0.3, api_key=getenv('OPENROUTER_API_KEY')).with_structured_output(ResearchOutput)
tools = [tavily]
tools_by_name = {tool.name : tool for tool in tools}

def get_today_str():
    """return todays date in windows, different method for other os"""
    return datetime.now().strftime("%Y -%m -%d")

def llm_call(state : ResearchState):
    """
    Analyze current state and decide on next actions.
    
    The model analyzes the current conversation state and decides whether to:
    1. Call search tools to gather more information
    2. Provide a final answer based on gathered information
    
    Returns updated state with the model's response.
    """
    structured_model = model.with_structured_output(LLMOuput)
    result = structured_model.invoke([
        SystemMessage(content=research_agent_prompt.format(date=get_today_str())) + list(state['researcher_messages'])
    ])
    ai_message = AIMessage(
        content=result.research_message or "",
        additional_kwargs={
            "tool_calls": llm_result.tool_calls
        }
    )
    return {
        "researcher_messages" : [ai_message]
    }

def tool_node(state : ResearchState):
    tool_calls = state['researcher_messages'][-1].tool_calls
    observations = []
    for tool_call in tool_calls:
        tool = tools_by_name[tool_call['name']]
        observations.append(tool.invoke(tool_call['args']))

    tool_outputs = [
        ToolMessage(
            content=observation,
            name=tool_call['name'],
            tool_call_id=tool_call['id']
        ) for observation, tool_call in zip(observations, tool_calls)
    ]

    return {'researcher_messages' : tool_outputs}

def should_continue(state : ResearchState) -> Literal['llm_call','compress_research']:
    last_message = state['researcher_messsages'][-1]
    if last_message.tool_calls:
        return 'tool_calls'
    return 'compress_research'

def compress_research(state : ResearchState) -> dict:
    system_message = compress_research_system_prompt.format(date=get_today_str())
    messages = [SystemMessage(content=system_message)] + state.get("researcher_messages", []) + [HumanMessage(content=compress_research_human_message)]
    response = compress_model.invoke(messages)
    raw_notes = [
        str(m.content) for m in filter_messages(
            state["researcher_messages"], 
            include_types=["tool", "ai"]
        )
    ]
    
    return {
        "compressed_research": str(response.content),
        "raw_notes": ["\n".join(raw_notes)]
    }

graph_builder = StateGraph(ResearchState, output_schema=ResearchOutput)

graph_builder.add_node('llm_call', llm_call)
graph_builder.add_node('tool_node', tool_node)
graph_builder.add_node('compress_research', compress_research)

graph_builder.add_edge(START, "llm_call")
graph_builder.add_edge("tool_node", "llm_call")  # back to LLM after tool

graph_builder.add_conditional_edges(
    'llm_call',
    should_continue,
    {
        "tool_node" : "tool_node",
        "compress_research" : "compress_research"
    }
)
graph_builder.add_edge("compress_research", END)

graph = graph_builder.compile()
## we have stopped before creation of the the tool node and now will continue to create tool node from monday, we build the full agent

ImportError: cannot import name 'compress_research_system_prompt' from 'deep_research.prompts' (C:\FOG\deep_research_opensource\src\deep_research\prompts.py)