In [4]:
!pip install langchain-google-genai colorama

Collecting colorama
  Using cached colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Using cached colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Installing collected packages: colorama
Successfully installed colorama-0.4.6


In [1]:
import json
import os
import uuid
import platform
import logging
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import ToolMessage, HumanMessage, AIMessage
from langchain_core.runnables import RunnableLambda, RunnableConfig, Runnable
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.constants import END, START
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.graph.message import AnyMessage, add_messages
from pydantic import BaseModel
from typing import Annotated, Optional, Literal, List
from typing_extensions import TypedDict
import colorama
colorama.init()

In [2]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="\033[37m%(asctime)s - %(levelname)s - %(message)s\033[0m"
)
logger = logging.getLogger(__name__)

In [3]:
# Load environment variables
if not load_dotenv():
    logger.error("Failed to load .env file. Ensure it exists and is readable.")
    raise ValueError("Failed to load .env file.")

In [4]:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")

In [5]:
from dotenv import load_dotenv, find_dotenv

# Where is .env?
dotenv_path = find_dotenv()
print("Dotenv path:", repr(dotenv_path))

# Load (and override any existing vars)
load_dotenv(dotenv_path, override=True)

# Show what actually got loaded
print("GROQ_API_KEY:", os.getenv("GROQ_API_KEY"))
print("GEMINI_API_KEY:", os.getenv("GEMINI_API_KEY"))
print("TAVILY_API_KEY:", os.getenv("TAVILY_API_KEY"))

Dotenv path: '/Users/dharmanshusingh/Desktop/Kairon_assign/.env'
GROQ_API_KEY: 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
GEMINI_API_KEY: AIzaSyCihdVXQDYWiKO3lvR6rHD2faFkPuBKByg
TAVILY_API_KEY: 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


In [6]:
# Validate API keys
if not GEMINI_API_KEY:
    logger.error("GEMINI_API_KEY is not set.")
    raise ValueError("GEMINI_API_KEY is not set in the .env file.")
if not TAVILY_API_KEY:
    logger.error("TAVILY_API_KEY is not set.")
    raise ValueError("TAVILY_API_KEY is not set in the .env file.")

In [7]:
# Function to print events
def _print_event(event: dict, _printed: set, max_length: int = 1500) -> None:
    current_state = event.get("dialog_state", "None")
    logger.info(f"Current state: {current_state}")
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=False)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            logger.info(msg_repr)
            _printed.add(message.id)

In [8]:
# Prompt templates
research_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """
You are an expert research agent. Your task is to deeply research the user's query using the Tavily search tool. Gather accurate, current, and relevant data. If no relevant results are found, indicate this clearly and suggest alternative approaches. Avoid speculation and focus on precision and reliability.
        """),
        ("placeholder", "{messages}"),
    ]
)

draft_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """
You are a professional drafting agent. Your task is to take the research agent's response and rephrase it into a polished, professional format suitable for a formal report or presentation. Follow these guidelines:
- Use a formal tone, avoiding colloquial or informal language.
- Structure the response with clear headings, bullet points, or paragraphs as appropriate.
- Retain all key details (e.g., names, dates, technologies) but make the content concise and well-organized.
- Eliminate redundancy and ensure clarity for a professional audience.
- If the input lacks sufficient detail, note this professionally and suggest further research.
        """),
        ("placeholder", "{messages}"),
    ]
)

In [9]:
# Initialize LLM
def initialize_llm() -> ChatGoogleGenerativeAI:
    try:
        return ChatGoogleGenerativeAI(model="gemini-1.5-flash", api_key=GEMINI_API_KEY, temperature=0.7)
    except Exception as e:
        logger.error(f"Failed to initialize LLM: {e}")
        raise ValueError(f"Failed to initialize LLM: {e}")

In [10]:
# Initialize tools
def initialize_tools() -> List[TavilySearchResults]:
    try:
        return [TavilySearchResults(max_results=5, api_key=TAVILY_API_KEY)]
    except Exception as e:
        logger.error(f"Failed to initialize Tavily search tool: {e}")
        raise ValueError(f"Failed to initialize Tavily search tool: {e}")

In [11]:
# State definition
class State(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]
    dialog_state: Literal["Research Agent", "Draft Agent", None]

In [12]:
# Assistant agent class
class Assistant:
    def __init__(self, runnable: Runnable, node_name: str):
        self.runnable = runnable
        self.node_name = node_name  # Track the node name (e.g., "Research Agent", "Draft Agent")

    def __call__(self, state: State, config: RunnableConfig) -> dict:
        result = None
        max_retries = 3
        for _ in range(max_retries):
            try:
                result = self.runnable.invoke(state)
                if not result.content and not hasattr(result, "tool_calls"):
                    state["messages"].append(HumanMessage(content="Please provide a meaningful response."))
                else:
                    break
            except Exception as e:
                logger.error(f"Error in {self.node_name}: {e}")
                error_message = AIMessage(content=f"Error: {str(e)}. Please retry or rephrase your query.")
                return {"messages": [error_message], "dialog_state": self.node_name}
        if not result:
            logger.error(f"{self.node_name} failed to produce a valid response after retries.")
            return {"messages": [AIMessage(content="Failed to process request.")], "dialog_state": self.node_name}
        return {"messages": [result], "dialog_state": self.node_name}

In [13]:
# Handle tool errors
def handle_tool_error(state: dict) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    logger.error(f"Tool error: {error}")
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\nPlease try a different query or check the tool configuration.",
                tool_call_id=tc["id"],
            ) for tc in tool_calls
        ],
        "dialog_state": "Research Tools"
    }

In [14]:
# Create tool node with fallback
def create_tool_node_with_fallback(tools: list) -> ToolNode:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )

In [15]:
# Routing logic from Research Agent
def route_from_research_agent(state: State) -> str:
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        logger.info("Routing to Research Tools due to tool calls.")
        return "Research Tools"
    logger.info("Routing to Draft Agent.")
    return "Draft Agent"

In [16]:
# Build and compile LangGraph
def build_graph() -> StateGraph:
    llm = initialize_llm()
    tools_search = initialize_tools()
    search_runnable = research_prompt | llm.bind_tools(tools_search)
    draft_runnable = draft_prompt | llm  # No tools for draft agent

    builder = StateGraph(State)
    builder.add_node("Research Agent", Assistant(search_runnable, "Research Agent"))
    builder.add_node("Draft Agent", Assistant(draft_runnable, "Draft Agent"))
    builder.add_node("Research Tools", create_tool_node_with_fallback(tools_search))

    builder.add_edge(START, "Research Agent")
    builder.add_conditional_edges("Research Agent", route_from_research_agent, ["Research Tools", "Draft Agent"])
    builder.add_edge("Research Tools", "Research Agent")
    builder.add_edge("Draft Agent", END)

    memory = MemorySaver()
    return builder.compile(checkpointer=memory)

In [17]:
# Generate graph visualization
def generate_graph_visualization(graph: StateGraph) -> None:
    try:
        graph_image = graph.get_graph(xray=True).draw_mermaid_png()
        image_path = "workflow_graph.png"
        with open(image_path, "wb") as f:
            f.write(graph_image)
        logger.info(f"Graph saved as {image_path}")
        system = platform.system()
        if system == "Darwin":
            os.system(f"open {image_path}")
        elif system == "Windows":
            os.startfile(image_path)
        else:
            os.system(f"xdg-open {image_path}")
    except Exception as e:
        logger.warning(f"Could not generate graph visualization: {e}")

In [18]:
# Stream output updates
def run_graph(user_input: str, graph: StateGraph, config: dict) -> None:
    _printed = set()
    try:
        events = graph.stream(
            {"messages": [HumanMessage(content=user_input)], "dialog_state": None},
            config=config,
            stream_mode="values"
        )
        for event in events:
            _print_event(event, _printed)
        # Log final state after streaming
        final_state = event.get("dialog_state", "None")
        logger.info(f"Workflow completed. Final state: {final_state}")
    except Exception as e:
        logger.error(f"Error during graph execution: {e}")
        print(f"Error: {e}")

In [19]:
# Main function
def main():
    print("Deep Research AI Agentic System is running. Type 'exit' to quit.")
    graph = build_graph()
    thread_id = str(uuid.uuid4())
    config = {"configurable": {"thread_id": thread_id}}

    # Generate graph visualization
    generate_graph_visualization(graph)

    while True:
        user_input = input("User: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break
        if not user_input.strip() or len(user_input) > 1000:
            print("Please enter a valid query (non-empty and less than 1000 characters).")
            continue
        run_graph(user_input, graph, config)

In [21]:
if __name__ == "__main__":
    main()

Deep Research AI Agentic System is running. Type 'exit' to quit.


2025-04-26 19:57:34,046 - INFO - Graph saved as workflow_graph.png
2025-04-26 19:57:48,489 - INFO - Current state: None

latest news about nvidia
2025-04-26 19:57:50,943 - INFO - Routing to Research Tools due to tool calls.
2025-04-26 19:57:50,947 - INFO - Current state: Research Agent
Tool Calls:
  tavily_search_results_json (cad740a7-99db-445e-b117-079346100390)
 Call ID: cad740a7-99db-445e-b117-079346100390
  Args:
    query: latest news about nvidia
2025-04-26 19:57:54,322 - INFO - Current state: Research Agent
Name: tavily_search_results_json

[{"title": "Latest News - NVIDIA Newsroom", "url": "https://nvidianews.nvidia.com/news/latest?c=21926", "content": "NVIDIA Blackwell Ultra DGX SuperPOD Delivers Out-of-the-Box AI Supercomputer for Enterprises to Build AI Factories\n\nNVIDIA Announces Major Release of Cosmos World Foundation Models and Physical AI Data Tools\n\nNVIDIA Launches Family of Open Reasoning AI Models for Developers and Enterprises to Build Agentic AI Platforms\n\nN

Goodbye!
