<a href="https://colab.research.google.com/github/mshumer/OpenDeepResearcher/blob/main/open_deep_researcher.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()



In [16]:
import trafilatura
import asyncio
import aiohttp
import json
import time
from collections import deque

TPM_LIMIT = 60  # 每分钟最大请求数
LOCK = asyncio.Lock()  # 异步锁
token_bucket = deque(maxlen=TPM_LIMIT*2)  # 适当扩大缓冲区

# =======================
# Configuration Constants
# =======================
DEEPSEEK_API_KEY = "sk-vvREpHRkCDZlun57ndlOgzWPTu7D4zERHf0tmQ4UIA4tf4SS" # Replace with your DEEPSEEK API key


# Endpoints
#DEEPSEEK_URL = "https://api.siliconflow.cn/v1/chat/completions"
DEEPSEEK_URL = "https://api.lkeap.cloud.tencent.com/v1/chat/completions"

SEARXNG_INSTANCE = "http://localhost:18888"  # 选择一个可用的 SearXNG 实例 参考：https://searx.space/ 或者本地部署：docker run -d --name searxng -p 8888:8080 searxng/searxng

# Default LLM model (can be changed if desired)
#DEFAULT_MODEL = "deepseek-ai/DeepSeek-V3"
DEFAULT_MODEL = "deepseek-v3"

# ============================
# Asynchronous Helper Functions
# ============================

async def call_openrouter_async(session, messages, model=DEFAULT_MODEL):
    global token_bucket
    
    async with LOCK:  # 原子操作开始
        current_time = time.time()
        # 清理过期令牌（1分钟前的请求）
        while token_bucket and token_bucket[0] < current_time - 60:
            token_bucket.popleft()
        
        # 等待直到有可用令牌
        while len(token_bucket) >= TPM_LIMIT:
            oldest = token_bucket[0]
            wait_time = oldest + 60 - current_time
            await asyncio.sleep(max(0, wait_time))
            current_time = time.time()
            # 再次清理过期令牌
            while token_bucket and token_bucket[0] < current_time - 60:
                token_bucket.popleft()
        
        token_bucket.append(current_time)
    
    # API请求逻辑
    headers = {
        "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {"model": model, "messages": messages}
    
    for retry in range(3):
        try:
            async with session.post(DEEPSEEK_URL, headers=headers, json=payload) as resp:
                if resp.status == 200:
                    result = await resp.json()
                    return result['choices'][0]['message']['content']
                elif resp.status == 429:
                    await asyncio.sleep(2 ** retry)
                    continue
                else:
                    print(f"API Error {resp.status}: {await resp.text()}")
                    return None
        except Exception as e:
            print(f"Request failed: {str(e)}")
            await asyncio.sleep(1)
    
    return None


async def generate_search_queries_async(session, user_query):
    """
    Ask the LLM to produce up to four precise search queries (in Python list format)
    based on the user’s query.
    """
    prompt = (
        "You are an expert research assistant. Given the user's query, generate up to four distinct, "
        "precise search queries that would help gather comprehensive information on the topic. "
        "Return only a Python list of strings, for example: ['query1', 'query2', 'query3']."
    )
    messages = [
        {"role": "system", "content": "You are a helpful and precise research assistant."},
        {"role": "user", "content": f"User Query: {user_query}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        try:
            # Expect exactly a Python list (e.g., "['query1', 'query2']")
            search_queries = eval(response)
            if isinstance(search_queries, list):
                return search_queries
            else:
                print("LLM did not return a list. Response:", response)
                return []
        except Exception as e:
            print("Error parsing search queries:", e, "\nResponse:", response)
            return []
    return []


async def perform_search_async(session, query):
    params = {"q": query, "format": "json"}
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
    }
    try:
        async with session.get(SEARXNG_INSTANCE + "/search", params=params, headers=headers) as resp:
            if resp.status == 200:
                data = await resp.json()
                links = [result["url"] for result in data.get("results", [])]
                return links
            else:
                print(f"SearXNG error: {resp.status}")
                return []
    except Exception as e:
        print("Error performing SearXNG search:", e)
        return []

async def is_page_useful_async(session, user_query, page_text):
    """
    Ask the LLM if the provided webpage content is useful for answering the user's query.
    The LLM must reply with exactly "Yes" or "No".
    """
    prompt = (
        "You are a critical research evaluator. Given the user's query and the content of a webpage, "
        "determine if the webpage contains information relevant and useful for addressing the query. "
        "Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. Do not include any extra text."
    )
    messages = [
        {"role": "system", "content": "You are a strict and concise evaluator of research relevance."},
        {"role": "user", "content": f"User Query: {user_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            return answer
        else:
            # Fallback: try to extract Yes/No from the response.
            if "Yes" in answer:
                return "Yes"
            elif "No" in answer:
                return "No"
    return "No"


async def extract_relevant_context_async(session, user_query, search_query, page_text):
    """
    Given the original query, the search query used, and the page content,
    have the LLM extract all information relevant for answering the query.
    """
    prompt = (
        "You are an expert information extractor. Given the user's query, the search query that led to this page, "
        "and the webpage content, extract all pieces of information that are relevant to answering the user's query. "
        "Return only the relevant context as plain text without commentary."
    )
    messages = [
        {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
        {"role": "user", "content": f"User Query: {user_query}\nSearch Query: {search_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        return response.strip()
    return ""


async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts):
    """
    Based on the original query, the previously used search queries, and all the extracted contexts,
    ask the LLM whether additional search queries are needed. If yes, return a Python list of up to four queries;
    if the LLM thinks research is complete, it should return "<done>".
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an analytical research assistant. Based on the original query, the search queries performed so far, "
        "and the extracted contexts from webpages, determine if further research is needed. "
        "If further research is needed, provide up to four new search queries as a Python list (for example, "
        "['new query1', 'new query2']). If you believe no further research is needed, respond with exactly <done>."
        "\nOutput only a Python list or the token <done> without any additional text."
    )
    messages = [
        {"role": "system", "content": "You are a systematic research planner."},
        {"role": "user", "content": f"User Query: {user_query}\nPrevious Search Queries: {previous_search_queries}\n\nExtracted Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        cleaned = response.strip()
        if cleaned == "<done>":
            return "<done>"
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                return new_queries
            else:
                print("LLM did not return a list for new search queries. Response:", response)
                return []
        except Exception as e:
            print("Error parsing new search queries:", e, "\nResponse:", response)
            return []
    return []


async def generate_final_report_async(session, user_query, all_contexts):
    """
    Generate the final comprehensive report using all gathered contexts.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an expert researcher and report writer. Based on the gathered contexts below and the original query, "
        "write a comprehensive, well-structured, and detailed report that addresses the query thoroughly. "
        "Include all relevant insights and conclusions without extraneous commentary."
    )
    messages = [
        {"role": "system", "content": "You are a skilled report writer."},
        {"role": "user", "content": f"User Query: {user_query}\n\nGathered Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]
    report = await call_openrouter_async(session, messages)
    return report


async def fetch_webpage_text_async(link):
    """
    使用 trafilatura 异步提取网页主要内容。
    """
    loop = asyncio.get_event_loop()
    downloaded = await loop.run_in_executor(None, trafilatura.fetch_url, link)
    if downloaded:
        text = await loop.run_in_executor(None, trafilatura.extract, downloaded)
        return text
    return None

async def process_link(session, link, user_query, search_query):
    """
    如果链接以 .pdf 或 .xls 结尾，则跳过处理。
    """
    # 检查链接是否以 .pdf 或 .xls 结尾
    if link.lower().endswith(('.pdf', '.xls')):
        print(f"Skipping link as it is a PDF or XLS file: {link}")
        return None

    """
    处理单个链接：获取内容，判断是否有用，并提取相关上下文。
    """
    print(f"Fetching content from: {link}")
    page_text = await fetch_webpage_text_async(link)
    if not page_text:
        print(f"Failed to extract content from: {link}")
        return None

    usefulness = await is_page_useful_async(session, user_query, page_text)
    print(f"Page usefulness for {link}: {usefulness}")

    if usefulness == "Yes":
        context = await extract_relevant_context_async(session, user_query, search_query, page_text)
        if context:
            print(f"Extracted context from {link} (first 200 chars): {context[:200]}")
            return context

    return None

    
# =========================
# Main Asynchronous Routine
# =========================

async def async_main():
    user_query = input("Enter your research query/topic: ").strip()
    iter_limit_input = input("Enter maximum number of iterations (default 10): ").strip()
    iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10

    aggregated_contexts = []    # All useful contexts from every iteration
    all_search_queries = []     # Every search query used across iterations
    iteration = 0

    async with aiohttp.ClientSession() as session:
        # ----- INITIAL SEARCH QUERIES -----
        new_search_queries = await generate_search_queries_async(session, user_query)
        if not new_search_queries:
            print("No search queries were generated by the LLM. Exiting.")
            return
        print("返回搜索词:", new_search_queries)    
        all_search_queries.extend(new_search_queries)

        # ----- ITERATIVE RESEARCH LOOP -----
        while iteration < iteration_limit:
            print(f"\n=== Iteration {iteration + 1} ===")
            iteration_contexts = []

            # For each search query, perform SERPAPI searches concurrently.
            search_tasks = [perform_search_async(session, query) for query in new_search_queries]
            search_results = await asyncio.gather(*search_tasks)

            # Aggregate all unique links from all search queries of this iteration.
            # Map each unique link to the search query that produced it.
            unique_links = {}
            for idx, links in enumerate(search_results):
                query = new_search_queries[idx]
                for link in links:
                    if link not in unique_links:
                        unique_links[link] = query

            print(f"Aggregated {len(unique_links)} unique links from this iteration.")

            # Process each link concurrently: fetch, judge, and extract context.
            link_tasks = [
                process_link(session, link, user_query, unique_links[link])
                for link in unique_links
            ]
            link_results = await asyncio.gather(*link_tasks)

            # Collect non-None contexts.
            for res in link_results:
                if res:
                    iteration_contexts.append(res)

            if iteration_contexts:
                aggregated_contexts.extend(iteration_contexts)
            else:
                print("No useful contexts were found in this iteration.")

            # ----- ASK THE LLM IF MORE SEARCHES ARE NEEDED -----
            new_search_queries = await get_new_search_queries_async(session, user_query, all_search_queries, aggregated_contexts)
            if new_search_queries == "<done>":
                print("LLM indicated that no further research is needed.")
                break
            elif new_search_queries:
                print("LLM provided new search queries:", new_search_queries)
                all_search_queries.extend(new_search_queries)
            else:
                print("LLM did not provide any new search queries. Ending the loop.")
                break

            iteration += 1

        # ----- FINAL REPORT -----
        print("\nGenerating final report...")
        final_report = await generate_final_report_async(session, user_query, aggregated_contexts)
        print("\n==== FINAL REPORT ====\n")
        print(final_report)


def main():
    asyncio.run(async_main())


if __name__ == "__main__":
    main()


Enter your research query/topic:  量化交易的前世今生
Enter maximum number of iterations (default 10):   3


返回搜索词: ['量化交易的历史发展', '量化交易的技术演变', '量化交易的现状与趋势', '量化交易的成功案例分析']

=== Iteration 1 ===
Aggregated 124 unique links from this iteration.
Fetching content from: https://xueqiu.com/8185159194/277790290
Fetching content from: https://zhuanlan.zhihu.com/p/146118079
Fetching content from: https://www.incredibuild.cn/blog/quant-trading-strategies-definitions-and-importance
Fetching content from: https://xueqiu.com/1160805449/132961443
Fetching content from: https://www.gelonghui.com/p/253940
Fetching content from: https://www.investor.org.cn/learning_center/investors_classroom/hot_topic/online/essay_competition_2894/202301/t20230104_627035.shtml
Fetching content from: https://blog.csdn.net/itcast_cn/article/details/121496459
Fetching content from: https://www.cnblogs.com/lizhihang/p/12610617.html
Fetching content from: https://www.bilibili.com/opus/513834399647919696
Fetching content from: https://zhuanlan.zhihu.com/p/443296363
Fetching content from: https://zhuanlan.zhihu.com/p/681428318
Fetchi