### **Deep Researcher**

Empower yourself with an intelligent agent capable of reasoning through extensive online resources and executing intricate, multi-stage research tasks.

In [None]:
!pip3 install nest_asyncio tavily-python

In [4]:
import nest_asyncio
nest_asyncio.apply()

This notebook implements an AI researcher that persistently searches for information based on a user query until it's confident all necessary details have been gathered. It leverages several services:

* [**Tavily:**](https://tavily.com/) To perform intelligent, AI-optimized searches and also to fetch and extract clean, LLM-friendly webpage content
* [**OpenRouter:**](https://accounts.openrouter.ai/sign-in) To interact with a LLM for generating search queries, evaluating page relevance, and extracting context.

### Key Components

#### Tavily Search Integration

Tavily is dedicated to creating an advanced search layer that bridges large language models (LLMs) with the web, empowering AI agents with real-time, context-aware data. With its adaptable search functionalities, Tavily allows AI systems to refine search approaches, gather raw content for in-depth analysis, or obtain concise summaries for rapid understanding. Unlike traditional models limited by fixed training datasets, Tavily’s Search and Extract features integrate semantic, contextual, and keyword-based search methods to provide up-to-date, actionable insights, enhancing data-driven decision-making processes.

#### Openrouter

The system uses OpenRouter to interact with a language model for various tasks such as query generation, relevance assessment, and context extraction. This allows for dynamic and intelligent information gathering based on the evolving understanding of the user's query.

In [124]:
import asyncio
import aiohttp
import json
import logging
from tavily import AsyncTavilyClient
from typing import Dict, List, Optional

OPENROUTER_API_KEY = "sk-XXXX" ## Get your key at https://accounts.openrouter.ai/sign-in
TAVILY_API_KEY = "tvly-XXX" ## Get your key at https://tavily.com/

# Endpoints
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
TAVILY_SEARCH_URL = "https://api.tavily.com/search"

# LLM model
DEFAULT_MODEL = "anthropic/claude-3.5-haiku"

# Initialize the Tavily client
tavily_client = AsyncTavilyClient(api_key=TAVILY_API_KEY) if TAVILY_API_KEY else TavilyClient()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [135]:
async def call_openrouter_async(
    session: aiohttp.ClientSession,
    messages: List[Dict[str, str]],
    model: str = DEFAULT_MODEL,
    timeout: int = 15
) -> Optional[str]:
    """
    Asynchronously call the OpenRouter chat completion API with the provided messages and return the response.

    :param session: aiohttp ClientSession to use for the request.
    :param messages: List of messages to send to the API.
    :param model: Model to use for the completion.
    :param timeout: Timeout in seconds for the request.
    :return: The content of the assistant’s reply or None if an error occurs.
    """
    headers = {
        "Authorization": f"Bearer {OPENROUTER_API_KEY}",
        "X-Title": "OpenDeepResearcher",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model,
        "messages": messages
    }

    try:
        async with session.post(OPENROUTER_URL, headers=headers, json=payload, timeout=timeout) as resp:
            if resp.status == 200:
                result = await resp.json()
                try:
                    return result['choices'][0]['message']['content']
                except (KeyError, IndexError) as e:
                    logger.error("Unexpected OpenRouter response structure: %s", result)
                    return None
            else:
                text = await resp.text()
                logger.error("OpenRouter API error: %d - %s", resp.status, text)
                return None
    except aiohttp.ClientError as e:
        logger.error("Error calling OpenRouter: %s", e)
        return None
    except asyncio.TimeoutError:
        logger.error("OpenRouter API request timed out")
        return None

In [126]:
import ast
async def generate_search_queries_async(
    session: aiohttp.ClientSession,
    user_query: str,
    max_queries: int = 4
) -> List[str]:
    """
    Asynchronously generate up to four precise search queries based on the user's query using an LLM.

    Args:
        session (aiohttp.ClientSession): The aiohttp session to use for the API call.
        user_query (str): The user's query to generate search queries from.
        max_queries (int, optional): Maximum number of search queries to generate. Defaults to 4.

    Returns:
        List[str]: A list of search queries generated by the LLM. Returns an empty list if an error occurs.

    Example:
        >>> queries = await generate_search_queries_async(session, "What is the impact of AI on healthcare?")
        >>> print(queries)
        ['impact of AI on healthcare', 'AI applications in healthcare', 'benefits of AI in medicine', 'AI in healthcare research']
    """
    prompt = (
        "You are an expert research assistant. Given the user's query, generate up to four distinct, "
        "precise search queries that would help gather comprehensive information on the topic. "
        f"Return only a Python list of strings, for example: ['query1', 'query2', 'query3']. "
        f"Do not return more than {max_queries} queries."
    )
    messages = [
        {"role": "system", "content": "You are a helpful and precise research assistant."},
        {"role": "user", "content": f"User Query: {user_query}\n\n{prompt}"}
    ]

    # Call the OpenRouter API
    response = await call_openrouter_async(session, messages)
    if not response:
        logger.error("No response received from the LLM.")
        return []

    try:
        # Safely evaluate the response to ensure it's a valid Python list
        search_queries = ast.literal_eval(response)
        if isinstance(search_queries, list):
            # Ensure the list contains only strings and trim to max_queries
            search_queries = [query.strip() for query in search_queries if isinstance(query, str)]
            return search_queries[:max_queries]
        else:
            logger.error("LLM did not return a valid list. Response: %s", response)
            return []
    except (ValueError, SyntaxError) as e:
        logger.error("Error parsing search queries: %s. Response: %s", e, response)
        return []

In [127]:
async def perform_search_async(session , query):
    """
    Perform an asynchronous search using the Tavily API.
    More info : https://docs.tavily.com/api-reference/endpoint/search

    Args:
        query (str): The search query.
        max_results (int, optional): Maximum number of results to return. Defaults to 5.
    """
    payload = {
    "query": query,
    "topic": "general",
    "search_depth": "basic",
    "max_results": 10, ## Modify it to go deeper
    "time_range": None,
    "days": 3, ##Remove this to get results from all the times
    "include_answer": True,
    "include_raw_content": True,
    "include_images": False,
    "include_image_descriptions": False,
    "include_domains": [],
    "exclude_domains": []
    }
    headers = {
        "Authorization": f"Bearer {TAVILY_API_KEY}",
        "Content-Type": "application/json"
    }
    
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(TAVILY_SEARCH_URL, json=payload, headers=headers) as resp:
                if resp.status == 200:
                    results = await resp.json()
                    return results
                else:
                    error_text = await resp.text()
                    logger.error(f"Tavily API error: {resp.status} - {error_text}")
                    return None
    except aiohttp.ClientError as e:
        logger.error(f"Error performing Tavily search: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error during Tavily search: {e}")
        return None

In [128]:
async def is_page_useful_async(session, user_query, page_text):
    """
    Determines whether the provided webpage content is useful for answering the given user query.
    
    The function queries an LLM with a strict prompt that requires a "Yes" or "No" response.
    If the model returns an unexpected response, the function attempts to extract "Yes" or "No" from it.
    
    Args:
        session: The HTTP session used for making asynchronous API requests.
        user_query (str): The user's search query.
        page_text (str): The text content of the webpage (truncated to 20,000 characters for processing).
    
    Returns:
        str: Either "Yes" or "No" indicating whether the page is useful for answering the query.
    """
    prompt = (
        "You are an expert evaluator of research relevance. Analyze the user's query and the provided webpage content. "
        "Determine whether the webpage contains valuable information that directly helps answer the query. "
        "Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. "
        "Do not include any explanations, extra words, or formatting beyond 'Yes' or 'No'."
    )
    
    messages = [
        {"role": "system", "content": "You are a strict and concise evaluator of research relevance."},
        {"role": "user", "content": f"User Query: {user_query}\n\nWebpage Content (truncated to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    
    response = await call_openrouter_async(session, messages)
    
    if response:
        answer = response.strip()
        if answer in {"Yes", "No"}:
            return answer
        
        # Fallback: Attempt to extract "Yes" or "No" if extra text is included.
        if "Yes" in answer:
            return "Yes"
        elif "No" in answer:
            return "No"
    
    return "No"  # Default fallback to "No" if response is empty or ambiguous

In [129]:
async def extract_relevant_context_async(session, user_query, search_query, page_text):
    """
    Extracts relevant information from a webpage based on the user's query and the search query used.
    
    The function prompts an LLM to extract and return only the relevant context from the webpage content.
    No extra commentary or formatting is included in the response.
    
    Args:
        session: The HTTP session used for making asynchronous API requests.
        user_query (str): The original user query.
        search_query (str): The search query that led to the webpage.
        page_text (str): The text content of the webpage (truncated to 20,000 characters for processing).
    
    Returns:
        str: Extracted relevant information as plain text, or an empty string if no relevant context is found.
    """
    prompt = (
        "You are an expert information extractor. Analyze the user's query, the search query that led to this page, "
        "and the webpage content. Extract all pieces of information that are directly relevant to answering the user's query. "
        "Return only the extracted relevant context as plain text without any additional commentary or formatting."
    )
    
    messages = [
        {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
        {"role": "user", "content": f"User Query: {user_query}\nSearch Query: {search_query}\n\nWebpage Content (truncated to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    
    response = await call_openrouter_async(session, messages)
    
    return response.strip() if response else ""

In [130]:
async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts):
    """
    Determines whether additional search queries are needed based on the original query, past searches, and extracted contexts.
    
    If further research is needed, the function returns a Python list of up to four new search queries.
    If no further research is required, the function returns the string "<done>".
    
    Args:
        session: The HTTP session used for making asynchronous API requests.
        user_query (str): The original user query.
        previous_search_queries (list of str): A list of search queries previously used.
        all_contexts (list of str): A list of extracted relevant contexts from webpages.
    
    Returns:
        list[str] or str: A list of up to four new search queries if further research is needed, or "<done>" if no more searches are required.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an analytical research assistant. Based on the original query, the search queries performed so far, "
        "and the extracted contexts from webpages, determine if further research is needed. "
        "If further research is needed, provide up to four new search queries as a Python list (for example, "
        "['new query1', 'new query2']). If you believe no further research is needed, respond with exactly <done>. "
        "\nOutput only a Python list or the token <done> without any additional text."
    )
    
    messages = [
        {"role": "system", "content": "You are a systematic research planner."},
        {"role": "user", "content": f"User Query: {user_query}\nPrevious Search Queries: {previous_search_queries}\n\nExtracted Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]
    
    response = await call_openrouter_async(session, messages)
    
    if response:
        cleaned = response.strip()
        if cleaned == "<done>":
            return "<done>"
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                return new_queries
            else:
                print("LLM did not return a list for new search queries. Response:", response)
                return []
        except Exception as e:
            print("Error parsing new search queries:", e, "\nResponse:", response)
            return []
    
    return []


In [131]:
async def generate_final_report_async(session, user_query, all_contexts):
    """
    Generates a comprehensive final report using all gathered contexts relevant to the user's query.
    
    The function prompts an LLM to synthesize a well-structured and detailed report, incorporating key insights,
    citations, and conclusions from the collected contexts.
    
    Args:
        session: The HTTP session used for making asynchronous API requests.
        user_query (str): The original user query.
        all_contexts (list of str): A list of extracted relevant contexts from various sources.
    
    Returns:
        str: A comprehensive report addressing the user's query, integrating all relevant insights and citations.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an expert researcher and report writer. Based on the gathered contexts below and the original query, "
        "write a comprehensive, well-structured, and detailed report that thoroughly addresses the query. "
        "Incorporate all relevant insights, citation URLs, and conclusions without extraneous commentary. "
        "Ensure clarity, coherence, and factual accuracy."
    )
    
    messages = [
        {"role": "system", "content": "You are a skilled report writer."},
        {"role": "user", "content": f"User Query: {user_query}\n\nGathered Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]
    
    report = await call_openrouter_async(session, messages)
    
    return report.strip() if report else ""

In [132]:
async def process_link(session, link, user_query, link_data):
    """
    Processes a single link by evaluating its usefulness and extracting relevant context if applicable.
    
    This function uses the pre-fetched content from the `unique_links` dictionary to determine
    the relevance of the page to the user's query and extracts meaningful information if the page
    is deemed useful.
    
    Args:
        session: The HTTP session used for making asynchronous API requests.
        link (str): The URL of the webpage to process.
        user_query (str): The original user query.
        link_data (dict): A dictionary containing the following keys:
            - "query": The search query that led to the webpage.
            - "content": The extracted content from the webpage.
            - "raw_content": The raw content from the webpage.
    
    Returns:
        str or None: Extracted relevant context if the page is useful, otherwise None.
    """
    print(f"Processing link: {link}")
    
    # Extract content from link_data
    page_text = link_data.get("content")  # Use "content" or "raw_content" as needed
    search_query = link_data.get("query")
    
    if not page_text:
        print(f"No content available for {link}")
        return None
    
    # Evaluate the usefulness of the page
    usefulness = await is_page_useful_async(session, user_query, page_text)
    print(f"Page usefulness for {link}: {usefulness}")
    
    # If the page is useful, extract relevant context
    if usefulness == "Yes":
        context = await extract_relevant_context_async(session, user_query, search_query, page_text)
        if context:
            print(f"Extracted context from {link} (first 200 chars): {context[:200]}")
            return context
    
    return None


In [133]:
async def async_main():
    """
    Perform an iterative deep research process using LLM-generated search queries and web scraping.

    This method:
    1. Takes a user query and an optional iteration limit as input.
    2. Generates initial search queries using an LLM.
    3. Performs web searches for each query and aggregates unique links.
    4. Fetches, judges, and extracts useful content from each link.
    5. Iteratively asks the LLM if more searches are needed and generates new queries.
    6. Compiles a final report based on the aggregated research data.

    Example:
        >>> asyncio.run(async_main())
        Enter your deep research query/topic: What is the impact of AI on healthcare?
        Enter maximum number of iterations (default 10): 5
        === Iteration 1 ===
        Aggregated 15 unique links from this iteration.
        LLM provided new search queries: ['AI in healthcare research', 'benefits of AI in medicine']
        === Iteration 2 ===
        Aggregated 10 unique links from this iteration.
        LLM indicated that no further research is needed.
        Generating final report...
        ==== FINAL REPORT ====
        "The impact of AI on healthcare is significant..."
    """
    # Get user input
    user_query = input("Enter your deep research query/topic: ").strip()
    iter_limit_input = input("Enter maximum number of iterations (default 10): ").strip()
    iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10

    # Initialize research data
    aggregated_contexts: List[str] = []  # All useful contexts from every iteration
    all_search_queries: List[str] = []   # Every search query used across iterations
    iteration = 0

    async with aiohttp.ClientSession() as session:

        new_search_queries = await generate_search_queries_async(session, user_query)
        if not new_search_queries:
            print("No search queries were generated by the LLM. Exiting.")
            return
        all_search_queries.extend(new_search_queries)

        while iteration < iteration_limit:
            print(f"\n=== Iteration {iteration + 1} ===")
            iteration_contexts: List[str] = []

            # For each search query, perform Tavily searches concurrently.
            search_tasks = [perform_search_async(session, query) for query in new_search_queries]
            search_results = await asyncio.gather(*search_tasks)
            
            # Aggregate all unique links from all search queries of this iteration.
            # Map each unique link to its associated data (query, content, raw_content).
            unique_links = {}
            for idx, result in enumerate(search_results):
                query = new_search_queries[idx]
                if result and "results" in result:  # Ensure the result is valid and contains "results"
                    for item in result["results"]:
                        link = item.get("url")
                        if link and link not in unique_links:  # Ensure the link is valid and not already processed
                            unique_links[link] = {
                                "query": query,  # The search query that produced this link
                                "content": item.get("content"),  # Extracted content
                                "raw_content": item.get("raw_content")  # Raw content
                            }
            
            print(f"Aggregated {len(unique_links)} unique links from this iteration.")

            # Process each link concurrently: fetch, judge, and extract context
            link_tasks = [
                process_link(session, link, user_query, unique_links[link])
                for link in unique_links
            ]
            link_results = await asyncio.gather(*link_tasks)
            
            # Collect non-None contexts
            iteration_contexts = [res for res in link_results if res]
            if iteration_contexts:
                aggregated_contexts.extend(iteration_contexts)
            else:
                print("No useful contexts were found in this iteration.")
            

            new_search_queries = await get_new_search_queries_async(
                session, user_query, all_search_queries, aggregated_contexts
            )
            if new_search_queries == "<done>":
                print("LLM indicated that no further research is needed.")
                break
            elif new_search_queries:
                print("LLM provided new search queries:", new_search_queries)
                all_search_queries.extend(new_search_queries)
            else:
                print("LLM did not provide any new search queries. Ending the loop.")
                break

            iteration += 1

        print("\nPreparing final report...")
        final_report = await generate_final_report_async(session, user_query, aggregated_contexts)
        print("\n==== DEEP RESEARCH REPORT ====\n")
        print(final_report)

In [136]:
def main():
    asyncio.run(async_main())


if __name__ == "__main__":
    main()

Enter your deep research query/topic:  Does pipes , pumps and compressors qualify for manufacturing exemption in the state of texas , if yes then in what situations and if no why?
Enter maximum number of iterations (default 10):  3



=== Iteration 1 ===
Aggregated 23 unique links from this iteration.
Processing link: https://law.justia.com/codes/texas/tax-code/title-2/subtitle-e/chapter-151/subchapter-h/section-151-318/
Processing link: https://codes.findlaw.com/tx/tax-code/tax-sect-151-318/
Processing link: https://freemanlaw.com/texas-sales-and-use-tax-the-manufacturing-exemption/
Processing link: https://www.katyedc.org/business-retention-and-expansion/incentives/p/item/1608/texas-sales-and-use-tax-exemptions-manufacturing-machinery-&-equipment
Processing link: https://smarttaxusa.com/texas-state-manufacturing-exemptions/
Processing link: https://carelabs.com/uploads/3/4/5/0/34502433/manufacturing_-_tx_mfg_tax_exemptions.pdf
Processing link: http://www.carelabs.com/uploads/3/4/5/0/34502433/manufacturing_-_tx_mfg_tax_exemptions.pdf
Processing link: https://comptroller.texas.gov/taxes/publications/94-124.php
Processing link: https://taxconnections.com/texas-state-sales-and-use-tax-exemptions/
Processing link: htt