## Research Assistant

### Goal
Our goal is to build a lightweight, multi-agent system around chat models that customizes the research process.

Source Selection

- Users can choose any set of input sources for their research.

Planning

- Users provide a topic, and the system generates a team of AI analysts, each focusing on one sub-topic.
- Human-in-the-loop will be used to refine these sub-topics before research begins.

LLM Utilization

- Each analyst will conduct in-depth interviews with an expert AI using the selected sources.
- The interview will be a multi-turn conversation to extract detailed insights as shown in the STORM paper.
- These interviews will be captured in a using sub-graphs with their internal state.

Research Process

- Experts will gather information to answer analyst questions in parallel.
- And all interviews will be conducted simultaneously through map-reduce.

Output Format

- The gathered insights from each interview will be synthesized into a final report.
- We'll use customizable prompts for the report, allowing for a flexible output format.

### 1. Setup the environment

In [2]:
# connect with gemini api
import getpass
import os

if "GOOGLE_API_KEY" not in os.environ:
    os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter your Google AI API key: ")

In [3]:
# initialize llm chat model
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-001",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    # other params...
)

In [10]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

In [11]:
from langfuse.callback import CallbackHandler

# make a simple trace to the langfuse
langfuse_handler = CallbackHandler(
    secret_key="sk-lf-b810fab9-0dc8-4675-a77c-fe3c9bf80bfc",
    public_key="pk-lf-a2d11315-f91c-44ba-a414-d8f335196820",
    host="https://cloud.langfuse.com"
    # host="https://us.cloud.langfuse.com", # 🇺🇸 US region
)
 
# Your Langchain code 
# Add Langfuse handler as callback (classic and LCEL)
llm.invoke( "What is name", config={"callbacks": [langfuse_handler]})

AIMessage(content="As a large language model, I don't have a name. You can just call me Bard.", additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.0-flash-001', 'safety_ratings': []}, id='run--e10070b6-9fdb-4bbf-a400-1e00b3ac80fe-0', usage_metadata={'input_tokens': 3, 'output_tokens': 22, 'total_tokens': 25, 'input_token_details': {'cache_read': 0}})

### 2. Generate team of sub agents

AI analysts, each focusing on one sub-topic.

In [2]:
from typing import List
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

# class to get structured analyst from the llm
class Analyst(BaseModel):
    # the state or relation of being closely associated or affiliated with a particular Analyst.
    affiliation: str = Field(
        description="Primary affiliation of the analyst."
        )
    # name of the analyst
    name: str = Field(
        description="name of the analyst"
    )
    # role of the analyst in terms of topic
    role: str = Field(
        description="Description of the analyst focus, concerns, and motives."
    ) 
    # this method is callable like a property but behind is a function 
    @property
    def persona(self) -> str:
        return f"Name: {self.name}\nRole: {self.role}\nAffiliation: {self.affiliation}\nDescription: {self.description}\n"

# this class is to get list of Analysts form llm model  
class Perspectives(BaseModel):
    analysts: List[Analyst] = Field(
        description="Comprehensive list of analysts with their roles and affiliations.",
    )

# then make the internal state of the subgraph generate analysts
class GenerateAnalystsState(TypedDict):
    topic: str # reseach topic
    max_analysts: int # maximum number of analysts
    human_analyst_feedback: str # any human feedback to llm
    analysts: List[Analyst] # generated analysts

# Requirment analysis agent
Get requirment from the client and make questions and make a report about the requiment

In [6]:
from typing import List
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import MessagesState

class GeneratedSubHeading(BaseModel):
    generated_sub_heading: str = Field(None, description="generated sub part for understand the user need.")

class AnalyseState(MessagesState):
    max_generated_queries: int # Number of generated queries
    human_analyst_feedback: str # Human feedback
    generated_queries: List[GeneratedSubHeading] # Generated questions

class Perspectives(BaseModel):
    sub_headings: List[GeneratedSubHeading] = Field(
        description="Comprehensive list of sub part headings that align with user need.",
    )


In [7]:
generate_query_instructions = """You are an caller agent user call for specific need.
                                 
                                 So you need do identify sub parts of the user need and generate sub part headings for get good understand about user need.

                                 You need to generate maximum {max_sub_parts} sub parts to get clear understand about user need.
                                 
                                 """

In [8]:
# test for question generation.
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# Make the system message
system_message = generate_query_instructions.format(max_sub_parts=5)

# Enforce structured output
structured_llm = llm.with_structured_output(Perspectives)

# Generate question 
queries = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate sub part headings.")]+ [HumanMessage(content="I want to build a website")])

In [10]:
print(queries.sub_headings)

[GeneratedSubHeading(generated_sub_heading='Purpose of the website'), GeneratedSubHeading(generated_sub_heading='Target audience'), GeneratedSubHeading(generated_sub_heading='Key features'), GeneratedSubHeading(generated_sub_heading='Content strategy'), GeneratedSubHeading(generated_sub_heading='Technical requirements')]


In [None]:
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

def create_questions(state: AnalyseState):
    # define the variables
    max_questions=state["max_generated_queries"]

    # get the structed output
    system_message = generate_query_instructions.format(max_questions=max_questions)

    # Enforce structured output
    structured_llm = llm.with_structured_output(GeneratedQuery)

    # Generate question 
    queries = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate the set questions.")])

    # update the state
    return {"generated_queries": queries.generated_query}

In [16]:
def should_continue(state: AnalyseState):
    """ Return the next node to execute """

    # Check if human feedback
    human_analyst_feedback=state.get('generated_queries', None)
    if human_analyst_feedback:
        return "generate_queries"
    
    # Otherwise end
    return END

In [17]:
# Add nodes and edges 
builder = StateGraph(AnalyseState)
builder.add_node("create_questions", create_questions)
builder.add_node("should_continue", should_continue)
builder.add_edge(START, "create_questions")
builder.add_edge("create_questions", "should_continue")
builder.add_conditional_edges("should_continue", should_continue, ["create_questions", END])

# Compile
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png(max_retries=5, retry_delay=2.0)))

KeyboardInterrupt: 

In [6]:
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
thread = {"configurable": {"thread_id": "1"}}
for event in graph.stream({"messages":[HumanMessage("Hi!, I need to build a website")]}, thread, stream_mode="values"):
    questions = event.get('generated_queries', '')
    if questions:
        for question in questions:
            # print the question
            print(f"question: {question}")

AttributeError: 'dict' object has no attribute 'generated_queries'

In [None]:
question_instructions = """You are an caller agent user call for specific need you need do identify the need of the user and give a summery of user need.
                            
                           Continue to ask questions to drill down and refine your understanding of the user need.
        
                           When you are satisfied with your understanding, complete the interview with: "Thank you so much for your help!"
                           
                           """


In [None]:
from typing import List
from pydantic import BaseModel, Field
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class GeneratedQuery(BaseModel):
    generated_query: str = Field(
        None,
        description="A question to clarify the user's needs."
    )

class AnalyseState(MessagesState):
    max_generated_queries: int = Field(
        ...,
        description="Maximum number of questions to generate."
    )
    human_analyst_feedback: str = Field(
        "",
        description="Free-form human feedback."
    )
    generated_queries: List[GeneratedQuery] = Field(
        default_factory=list,
        description="List of generated questions so far."
    )

def create_questions(state: AnalyseState):
    """
    Populate state.generated_queries up to state.max_generated_queries.
    This is just a stub—you'd replace the body with your question-generation logic.
    """
    # Example stub: generate placeholder questions until we hit max
    while len(state.generated_queries) < state.max_generated_queries:
        q_num = len(state.generated_queries) + 1
        state.generated_queries.append(
            GeneratedQuery(generated_query=f"What is detail #{q_num}?")
        )
    # After generating, you might send these back to your orchestrator,
    # and wait for human feedback to populate state.human_analyst_feedback.
    return state

def should_continue(state: AnalyseState):
    """
    If we have human feedback, loop back to create more questions;
    otherwise terminate the graph.
    """
    if state.human_analyst_feedback:
        return "create_questions"
    return END

# Build the state graph
builder = StateGraph(AnalyseState)
builder.add_node("create_questions", create_questions)
builder.add_node("should_continue", should_continue)

builder.add_edge(START, "create_questions")
builder.add_edge("create_questions", "should_continue")
builder.add_conditional_edges(
    "should_continue",
    should_continue,
    ["create_questions", END]
)

# Compile with a persistent memory
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# (Optional) visualize
from PIL import Image
display(Image(graph.get_graph().draw_mermaid_png()))

ValueError: Failed to reach https://mermaid.ink/ API while trying to render your graph after 1 retries. To resolve this issue:
1. Check your internet connection and try again
2. Try with higher retry settings: `draw_mermaid_png(..., max_retries=5, retry_delay=2.0)`
3. Use the Pyppeteer rendering method which will render your graph locally in a browser: `draw_mermaid_png(..., draw_method=MermaidDrawMethod.PYPPETEER)`

### Unit Testing for notebook

This ensure the written functions are giving correct outputs

In [11]:
import unittest
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

class TestFunctions(unittest.TestCase):

    def test_generated_sub_heading_type(self):
        
        # generate sub heading based on input 
        generate_query_instructions = """You are an caller agent user call for specific need.
                                 
                                 So you need do identify sub parts of the user need and generate sub part headings for get good understand about user need.

                                 You need to generate maximum {max_sub_parts} sub parts to get clear understand about user need.
                                 
                                 """
        
        # Make the system message
        system_message = generate_query_instructions.format(max_sub_parts=5)

        # Enforce structured output
        structured_llm = llm.with_structured_output(Perspectives)

        # Generate question 
        queries = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate sub part headings.")]+ [HumanMessage(content="I want to build a website")])

        # check for the type
        self.assertIsInstance(queries.sub_headings,list)
        for item in queries.sub_headings:
            self.assertIsInstance(item, GeneratedSubHeading)


# run the unit tests
if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)


test_generated_sub_heading_type (__main__.TestFunctions.test_generated_sub_heading_type) ... ok

----------------------------------------------------------------------
Ran 1 test in 1.140s

OK
