In [65]:
!pip install langchain langgraph openai pydantic serpapi tiktoken python-dotenv requests
!pip install langchain-community
!pip install google-search-results



In [66]:
from pydantic import BaseModel, Field
from typing import List

class ResearchPlanningStep(BaseModel):
    subtopic: str = Field(..., description="Subtopic to research")
    keywords: List[str] = Field(..., description="Keywords")
    intent: str = Field(..., description="Intent of research")

class SourceSummary(BaseModel):
    source_url: str = Field(..., description="Source URL")
    summary: str = Field(..., description="Extracted Summary")
    citation: str = Field(..., description="Source citation")

class FinalBrief(BaseModel):
    topic: str = Field(..., description="Research topic")
    depth: int = Field(..., description="Depth level")
    sections: List[str] = Field(..., description="Brief sections")
    references: List[str] = Field(..., description="Reference list")


In [67]:
from collections import defaultdict

class ContextStore:
    def __init__(self):
        self.user_history = defaultdict(list)

    def add_brief(self, user_id: str, brief: dict):
        self.user_history[user_id].append(brief)

    def get_history(self, user_id: str):
        return self.user_history[user_id]

context_store = ContextStore()


In [68]:
!pip install google-search-results



In [69]:
import os
from langchain_openai import ChatOpenAI # Changed import path for ChatOpenAI
from langchain.utilities import SerpAPIWrapper

# Set your API keys here or use %env magic in Colab
os.environ["OPENAI_API_KEY"] = "OPENAI_API_KEY"
os.environ["SERPAPI_API_KEY"] = "SERPAPI_API_KEY"

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
search = SerpAPIWrapper()

In [70]:
from langchain.output_parsers import PydanticOutputParser
from typing import List
from pydantic import BaseModel, ValidationError # Import ValidationError
import json # Import the json library
import re # Import regular expression module

# Define a Pydantic model for the list of research steps
class ResearchPlanList(BaseModel):
    plans: List[ResearchPlanningStep]

def plan_research(topic: str, context: List[dict], depth: int) -> List[ResearchPlanningStep]:
    # Update parser to expect a list of ResearchPlanningStep
    parser = PydanticOutputParser(pydantic_object=ResearchPlanList)
    prompt = f"""
    You are to generate a list of research plans for topic "{topic}" with depth {depth}. Prior context: {context}.
    Output JSON array matching: subtopic (str), keywords (list[str]), intent (str).
    Format the output as a JSON object with a single key "plans" which is a list of the research steps.
    OUTPUT ONLY THE JSON OBJECT AND NOTHING ELSE. Do not include markdown code blocks.

    {parser.get_format_instructions()}
    """
    output = llm.predict(prompt)
    print("Raw LLM output:", output) # Add print statement to see raw output

    json_string = output.strip() # Start with stripping whitespace

    # Attempt to extract JSON from markdown code block if present
    json_match = re.search(r'```json\n(.*?)\n```', output, re.DOTALL)
    if json_match:
        json_string = json_match.group(1).strip()
        print("Extracted JSON string from markdown:", json_string)
    else:
        print("No markdown block found, attempting to use raw output as JSON:", json_string)


    parsed_plans = []
    try:
        # Attempt to parse the extracted JSON string using Pydantic parser
        parsed_output = parser.parse(json_string)
        parsed_plans = parsed_output.plans
        print("Successfully parsed with Pydantic.")
    except ValidationError as e:
        print(f"Pydantic validation error: {e}")
        # Fallback: Try direct json loading and manual validation if Pydantic fails
        try:
            print("Attempting fallback JSON loading...")
            json_data = json.loads(json_string)
            if isinstance(json_data, dict) and "plans" in json_data and isinstance(json_data["plans"], list):
                 # Basic validation of list elements
                 valid_plans = []
                 for item in json_data["plans"]:
                     try:
                         # Validate each item against ResearchPlanningStep
                         valid_plans.append(ResearchPlanningStep(**item))
                     except ValidationError as item_e:
                         print(f"Validation error for plan item: {item_e} - {item}")
                         # Optionally skip or log invalid items
                         pass
                 parsed_plans = valid_plans
                 print("Successfully parsed with fallback JSON loading.")
            else:
                 print("Fallback JSON did not match expected structure.")
                 parsed_plans = []

        except json.JSONDecodeError as json_e:
            print(f"Fallback JSON decode error: {json_e}")
            parsed_plans = []
        except Exception as fallback_e:
             print(f"Unexpected fallback error: {fallback_e}")
             parsed_plans = []

    except Exception as e:
        print(f"Unexpected parsing error: {e}")
        parsed_plans = []

    if not parsed_plans:
        print("Warning: No valid research plans were parsed.")

    return parsed_plans # Return the list of plans

In [71]:
def search_sources(plans: List[ResearchPlanningStep]):
    results = []
    for plan in plans:
        query = " ".join(plan.keywords)
        search_results = search.run(query)
        # For demo: one result per plan, normally multiple processed
        results.append({"query": query, "content": search_results, "url": "example_source_url"})
    return results


In [72]:
def summarize_sources(sources):
    summaries = []
    for src in sources:
        prompt = f"Summarize this content:\n{src['content']}"
        summary_text = llm.predict(prompt)
        summaries.append({
            "source_url": src["url"],
            "summary": summary_text,
            "citation": src["url"]
        })
    return summaries


In [73]:
def synthesize_brief(topic, summaries, context, depth):
    combined_text = "\n\n".join([s["summary"] for s in summaries])
    prompt = f"""
    You are a research assistant compiling a final brief on topic '{topic}' with depth {depth}.
    Prior research context: {context}
    Source summaries:
    {combined_text}

    Provide a structured brief with sections and references.
    """
    final_output = llm.predict(prompt)
    # Minimal parse: wrap in FinalBrief object with dummy sections & refs
    return FinalBrief(
        topic=topic,
        depth=depth,
        sections=[final_output],
        references=[s["citation"] for s in summaries]
    )


In [74]:
from langgraph.graph import StateGraph
from typing import TypedDict, List, Any # Import TypedDict and Any
from pydantic import BaseModel # Import BaseModel

# Define the state schema for the graph
class GraphState(TypedDict):
    user_id: str
    follow_up: bool
    topic: str
    depth: int
    context: List[dict]
    plan: List[ResearchPlanningStep] # Assuming ResearchPlanningStep is defined
    sources: List[dict]
    summaries: List[dict]
    brief: FinalBrief # Assuming FinalBrief is defined
    final: FinalBrief # Assuming FinalBrief is defined


def build_graph():
    # Pass the state_schema to the StateGraph constructor
    graph = StateGraph(GraphState)

    graph.add_node("context_summarization", lambda state: {
        **state,
        "context": context_store.get_history(state["user_id"]) if state.get("follow_up", False) else [] # Use .get with default
    })

    graph.add_node("planning", lambda state: {
        **state,
        "plan": plan_research(state["topic"], state["context"], state["depth"])
    })

    graph.add_node("search", lambda state: {
        **state,
        "sources": search_sources(state["plan"])
    })

    graph.add_node("summarize", lambda state: {
        **state,
        "summaries": summarize_sources(state["sources"])
    })

    graph.add_node("synthesis", lambda state: {
        **state,
        "brief": synthesize_brief(state["topic"], state["summaries"], state["context"], state["depth"])
    })

    graph.add_node("post_processing", lambda state: {
        **state,
        "final": state["brief"]
    })

    graph.add_edge("context_summarization", "planning")
    graph.add_edge("planning", "search")
    graph.add_edge("search", "summarize")
    graph.add_edge("summarize", "synthesis")
    graph.add_edge("synthesis", "post_processing")

    graph.set_entry_point("context_summarization")
    return graph

graph = build_graph()

In [78]:
state = {
    "topic": "Quantum Computing applications",
    "depth": 2,
    "follow_up": False,
    "user_id": "user123"
}

# Compile the graph before running
compiled_graph = graph.compile()

# Use invoke instead of run
result = compiled_graph.invoke(state)

# Save brief in context
context_store.add_brief("user123", result["final"].model_dump()) # Use model_dump()

print(result["final"].model_dump_json(indent=2)) # Use model_dump_json() and pass args correctly

Raw LLM output: {"plans":[{"subtopic":"Quantum Algorithms","keywords":["Shor's Algorithm","Grover's Algorithm","Quantum Speedup"],"intent":"Explore the efficiency of quantum algorithms compared to classical algorithms."},{"subtopic":"Quantum Cryptography","keywords":["Quantum Key Distribution","BB84 Protocol","Post-Quantum Cryptography"],"intent":"Investigate the security advantages of quantum cryptography over traditional methods."},{"subtopic":"Quantum Machine Learning","keywords":["Quantum Neural Networks","Quantum Support Vector Machines","Data Classification"],"intent":"Examine how quantum computing can enhance machine learning techniques."},{"subtopic":"Quantum Simulation","keywords":["Molecular Simulation","Material Science","Quantum Chemistry"],"intent":"Assess the potential of quantum computers in simulating complex quantum systems."},{"subtopic":"Quantum Networking","keywords":["Quantum Internet","Entanglement Distribution","Quantum Repeaters"],"intent":"Research the developm