# Research Analyzer - Multi-Analyst News Research System

## Phase 1: Setup & Environment

In [None]:
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langchain_community langchain_core tavily-python python-dotenv pydantic

In [None]:
import os, getpass
def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")

In [None]:
_set_env("TAVILY_API_KEY")

In [None]:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)

## Phase 2: Data Models

In [None]:
from typing import List
from pydantic import BaseModel, Field

class NewsAnalyst(BaseModel):
    affiliation: str = Field(description="Primary affiliation of the analyst.")
    name: str = Field(description="Name of the analyst.")
    role: str = Field(description="Role of the analyst.")
    description: str = Field(description="Description of the analyst's focus and expertise.")
    @property
    def persona(self) -> str:
        return f"Name: {self.name}\nRole: {self.role}\nAffiliation: {self.affiliation}\nDescription: {self.description}\n"

class AnalystTeam(BaseModel):
    analysts: List[NewsAnalyst] = Field(description="Team of news analysts.")

class SearchQuery(BaseModel):
    search_query: str = Field(None, description="Search query for news retrieval.")

## Phase 3: State Definitions

In [None]:
import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import MessagesState

class GenerateAnalystsState(TypedDict):
    topic: str
    max_analysts: int
    human_analyst_feedback: str
    analysts: List[NewsAnalyst]

class AnalysisState(MessagesState):
    max_num_turns: int
    context: Annotated[list, operator.add]
    analyst: NewsAnalyst
    analysis: str
    sections: Annotated[list, operator.add]

class ResearchGraphState(TypedDict):
    topic: str
    max_analysts: int
    human_analyst_feedback: str
    analysts: List[NewsAnalyst]
    sections: Annotated[list, operator.add]
    introduction: str
    content: str
    conclusion: str
    final_report: str

## Phase 4: Prompt Templates

In [None]:
ANALYST_INSTRUCTIONS = """You are tasked with creating a team of specialized news analysts.
1. Review the topic: {topic}
2. Examine any editorial feedback: {human_analyst_feedback}
3. Determine the most important perspectives for comprehensive news analysis.
4. Pick the top {max_analysts} perspectives.
5. Assign one analyst to each perspective with relevant expertise."""

QUESTION_INSTRUCTIONS = """You are a news analyst conducting research on {topic}.
Your goal is to gather specific, actionable insights. Your analytical focus: {goals}
Begin by introducing yourself, then pose your analytical questions.
When satisfied, conclude with: \"Analysis complete!\""""

SEARCH_INSTRUCTIONS = """Generate a search query for recent news and information.
Focus on the latest developments relevant to the conversation."""

ANSWER_INSTRUCTIONS = """You are a news information expert.
Analyst focus: {goals}
Answer using this context: {context}
Guidelines: Use only provided context, include specific data, cite sources [1], [2], etc."""

SECTION_WRITER_INSTRUCTIONS = """You are a news report writer.
Create a concise section based on analyst research.
Structure: ## {focus} (title), ### Key Findings, ### Analysis, ### Sources
Maximum 300 words. Use numbered sources."""

REPORT_WRITER_INSTRUCTIONS = """You are creating a comprehensive news report on: {topic}
Task: Review all analyst sections, identify key insights, synthesize into cohesive narrative.
Format: Use markdown, start with ## News Analysis, preserve citations, create ## Sources section.
Analyst sections: {context}"""

INTRODUCTION_INSTRUCTIONS = """Write a compelling introduction for an analysis report on {topic}.
Target 100 words. Use markdown. Create # title, then ## Introduction section.
Report sections: {formatted_str_sections}"""

CONCLUSION_INSTRUCTIONS = """Write a conclusion for an analysis report on {topic}.
Target 100 words. Use markdown. Use ## Conclusion header.
Report sections: {formatted_str_sections}"""

## Phase 5: Analyst Generation

In [None]:
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

def create_analysts(state: GenerateAnalystsState):
    topic = state['topic']
    max_analysts = state['max_analysts']
    human_analyst_feedback = state.get('human_analyst_feedback', '')
    structured_llm = llm.with_structured_output(AnalystTeam)
    system_message = ANALYST_INSTRUCTIONS.format(topic=topic, human_analyst_feedback=human_analyst_feedback, max_analysts=max_analysts)
    analysts = structured_llm.invoke([SystemMessage(content=system_message)] + [HumanMessage(content="Generate the analyst team.")])
    return {"analysts": analysts.analysts}

def human_feedback(state: GenerateAnalystsState):
    pass

def should_continue(state: GenerateAnalystsState):
    if state.get('human_analyst_feedback', None):
        return "create_analysts"
    return END

## Phase 6: Analysis Workflow

Create the search, question, and answer nodes for analyst research.

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import get_buffer_string

tavily_search = TavilySearchResults(max_results=3)

def generate_question(state: AnalysisState):
    analyst = state["analyst"]
    messages = state["messages"]
    first_msg = messages[0].content if messages else "the topic"
    topic = first_msg.replace("Analyze ", "").replace(" from your perspective.", "") if "Analyze " in first_msg else "the topic"
    system_message = QUESTION_INSTRUCTIONS.format(topic=topic, goals=analyst.persona)
    question = llm.invoke([SystemMessage(content=system_message)] + messages)
    return {"messages": [question]}

print("‚úì generate_question defined!")

In [None]:
def search_news(state: AnalysisState):
    structured_llm = llm.with_structured_output(SearchQuery)
    search_query = structured_llm.invoke([SystemMessage(content=SEARCH_INSTRUCTIONS)] + state['messages'])
    try:
        search_docs = tavily_search.invoke(search_query.search_query)
    except Exception:
        search_docs = []
    if not search_docs:
        formatted_search_docs = ""
    else:
        formatted_search_docs = "\n\n---\n\n".join([f'<Document href="{doc["url"]}"/>\n{doc["content"]}\n</Document>' for doc in search_docs])
    return {"context": [formatted_search_docs]}

print("‚úì search_news defined!")

In [None]:
def generate_answer(state: AnalysisState):
    analyst = state["analyst"]
    messages = state["messages"]
    context = state.get("context", [])
    system_message = ANSWER_INSTRUCTIONS.format(goals=analyst.persona, context=context)
    answer = llm.invoke([SystemMessage(content=system_message)] + messages)
    answer.name = "expert"
    return {"messages": [answer]}

def save_analysis(state: AnalysisState):
    messages = state["messages"]
    analysis = get_buffer_string(messages)
    return {"analysis": analysis}

def route_messages(state: AnalysisState, name: str = "expert"):
    messages = state["messages"]
    max_num_turns = state.get('max_num_turns', 2)
    num_responses = len([m for m in messages if isinstance(m, AIMessage) and m.name == name])
    if num_responses >= max_num_turns:
        return 'save_analysis'
    last_question = messages[-2]
    if "Analysis complete" in last_question.content:
        return 'save_analysis'
    return "ask_question"

print("‚úì generate_answer, save_analysis, route_messages defined!")

In [None]:
def write_section(state: AnalysisState):
    context = state.get("context", [])
    analyst = state["analyst"]
    system_message = SECTION_WRITER_INSTRUCTIONS.format(focus=analyst.description)
    section = llm.invoke([SystemMessage(content=system_message)] + [HumanMessage(content=f"Use this research: {context}")])
    return {"sections": [section.content]}

print("‚úì write_section defined!")

In [None]:
# Build analysis subgraph
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("ask_question", generate_question)
analysis_builder.add_node("search_news", search_news)
analysis_builder.add_node("answer_question", generate_answer)
analysis_builder.add_node("save_analysis", save_analysis)
analysis_builder.add_node("write_section", write_section)

analysis_builder.add_edge(START, "ask_question")
analysis_builder.add_edge("ask_question", "search_news")
analysis_builder.add_edge("search_news", "answer_question")
analysis_builder.add_conditional_edges("answer_question", route_messages, ['ask_question', 'save_analysis'])
analysis_builder.add_edge("save_analysis", "write_section")
analysis_builder.add_edge("write_section", END)

print("‚úì Analysis subgraph built!")
display(Image(analysis_builder.compile().get_graph().draw_mermaid_png()))

## Phase 7: Report Writing

Create report writing nodes (introduction, body, conclusion).

In [None]:
from langgraph.types import Send

def initiate_all_analyses(state: ResearchGraphState):
    human_analyst_feedback = state.get('human_analyst_feedback')
    if human_analyst_feedback:
        return "create_analysts"
    topic = state["topic"]
    return [Send("conduct_analysis", {"analyst": analyst, "messages": [HumanMessage(content=f"Analyze {topic} from your perspective.")]}) for analyst in state["analysts"]]

print("‚úì initiate_all_analyses defined!")

In [None]:
def write_report(state: ResearchGraphState):
    sections = state["sections"]
    topic = state["topic"]
    formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
    system_message = REPORT_WRITER_INSTRUCTIONS.format(topic=topic, context=formatted_str_sections)
    report = llm.invoke([SystemMessage(content=system_message)] + [HumanMessage(content="Write the news analysis report.")])
    return {"content": report.content}

def write_introduction(state: ResearchGraphState):
    sections = state["sections"]
    topic = state["topic"]
    formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
    instructions = INTRODUCTION_INSTRUCTIONS.format(topic=topic, formatted_str_sections=formatted_str_sections)
    intro = llm.invoke([instructions] + [HumanMessage(content="Write the report introduction")])
    return {"introduction": intro.content}

def write_conclusion(state: ResearchGraphState):
    sections = state["sections"]
    topic = state["topic"]
    formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
    instructions = CONCLUSION_INSTRUCTIONS.format(topic=topic, formatted_str_sections=formatted_str_sections)
    conclusion = llm.invoke([instructions] + [HumanMessage(content="Write the report conclusion")])
    return {"conclusion": conclusion.content}

print("‚úì write_report, write_introduction, write_conclusion defined!")

In [None]:
def finalize_report(state: ResearchGraphState):
    content = state["content"]
    if content.startswith("## News Analysis"):
        content = content.strip("## News Analysis")
    if "## Sources" in content:
        try:
            content, sources = content.split("\n## Sources\n")
        except:
            sources = None
    else:
        sources = None
    final_report = state["introduction"] + "\n\n---\n\n" + content + "\n\n---\n\n" + state["conclusion"]
    if sources is not None:
        final_report += "\n\n## Sources\n" + sources
    return {"final_report": final_report}

print("‚úì finalize_report defined!")

## Phase 8: Full Pipeline

Assemble the complete research graph and run a demo.

In [None]:
# Build the main research graph
builder = StateGraph(ResearchGraphState)
builder.add_node("create_analysts", create_analysts)
builder.add_node("human_feedback", human_feedback)
builder.add_node("conduct_analysis", analysis_builder.compile())
builder.add_node("write_report", write_report)
builder.add_node("write_introduction", write_introduction)
builder.add_node("write_conclusion", write_conclusion)
builder.add_node("finalize_report", finalize_report)

builder.add_edge(START, "create_analysts")
builder.add_edge("create_analysts", "human_feedback")
builder.add_conditional_edges("human_feedback", initiate_all_analyses, ["create_analysts", "conduct_analysis"])
builder.add_edge("conduct_analysis", "write_report")
builder.add_edge("conduct_analysis", "write_introduction")
builder.add_edge("conduct_analysis", "write_conclusion")
builder.add_edge(["write_conclusion", "write_report", "write_introduction"], "finalize_report")
builder.add_edge("finalize_report", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

print("‚úì Full research graph compiled!")
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# Run the complete research pipeline
from IPython.display import Markdown

max_analysts = 3
topic = "SpaceX Starship Development"
thread = {"configurable": {"thread_id": "1"}}

final_report = None
print(f"üîç Starting research on: {topic}")
print(f"üìä Using {max_analysts} analysts\n")

for event in graph.stream({"topic": topic, "max_analysts": max_analysts}, thread, stream_mode="updates"):
    print(f"  ‚úì Processing: {list(event.keys())}")
    if "finalize_report" in event:
        final_report = event["finalize_report"]["final_report"]

print("\n‚úÖ Research complete!\n")
if final_report:
    display(Markdown(final_report))