<a href="https://colab.research.google.com/github/ThomsenDrake/SearXNG-Researcher/blob/main/open_deep_researcher_searxng_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install nest_asyncio python-dotenv bs4 aiohttp requests
import nest_asyncio
nest_asyncio.apply()

In [None]:
import asyncio
import aiohttp
import json
from dotenv import load_dotenv
import os
import requests
from bs4 import BeautifulSoup
import re
import time
from typing import Optional
from datetime import datetime, timedelta
from collections import deque
from pathlib import Path

# Configuration Constants
# =======================
GEMINI_API_KEY = "YOUR_GEMINI_API_KEY"  # Replace with your Gemini API key

# Endpoints
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions"
SEARXNG_URL = "LOCALHOST:8080"  # Replace with your SearXNG URL

# Default LLM model
DEFAULT_MODEL = "gemini-2.0-flash"

# Add rate limiting constants
RATE_LIMIT_CALLS = 15  # Maximum calls per minute
RATE_LIMIT_PERIOD = 60  # Period in seconds
RATE_LIMIT_DELAY = RATE_LIMIT_PERIOD / RATE_LIMIT_CALLS  # ~4 seconds between calls
MAX_RETRIES = 3  # Maximum number of retries for rate-limited requests

# Rate limiting queue
call_timestamps = deque(maxlen=RATE_LIMIT_CALLS)

async def wait_for_rate_limit():
    """
    Ensure we don't exceed rate limits by tracking API calls and waiting when necessary.
    """
    now = datetime.now()

    # Remove timestamps older than our rate limit period
    while call_timestamps and (now - call_timestamps[0]) > timedelta(seconds=RATE_LIMIT_PERIOD):
        call_timestamps.popleft()

    # If we've made maximum calls within the period, wait until we can make another
    if len(call_timestamps) >= RATE_LIMIT_CALLS:
        wait_time = (call_timestamps[0] + timedelta(seconds=RATE_LIMIT_PERIOD) - now).total_seconds()
        if wait_time > 0:
            print(f"Rate limit approaching, waiting {wait_time:.1f}s")
            await asyncio.sleep(wait_time)

    # Add current timestamp to our queue
    call_timestamps.append(now)

async def call_openrouter_async(session, messages, model=DEFAULT_MODEL, retry_count=0) -> Optional[str]:
    """
    Asynchronously call the Gemini chat completion API with rate limiting and retries.
    Returns the content of the assistant's reply or None on failure.
    """
    if retry_count >= MAX_RETRIES:
        print("Max retries reached for API call")
        return None

    # Wait for rate limit before making call
    await wait_for_rate_limit()

    headers = {
        "Authorization": f"Bearer {GEMINI_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model,
        "messages": messages
    }

    try:
        async with session.post(GEMINI_URL, headers=headers, json=payload) as resp:
            if resp.status == 200:
                result = await resp.json()
                try:
                    return result['choices'][0]['message']['content']
                except (KeyError, IndexError) as e:
                    print("Unexpected Gemini response structure:", result)
                    return None
            elif resp.status == 429:
                # Rate limit hit - wait longer and retry
                wait_time = (retry_count + 1) * RATE_LIMIT_DELAY * 2
                print(f"Rate limit hit, waiting {wait_time}s before retry {retry_count + 1}/{MAX_RETRIES}")
                await asyncio.sleep(wait_time)
                return await call_openrouter_async(session, messages, model, retry_count + 1)
            else:
                text = await resp.text()
                print(f"Gemini API error: {resp.status} - {text}")
                return None
    except Exception as e:
        print("Error calling Gemini:", e)
        return None

async def generate_search_queries_async(session, user_query):
    """
    Ask the LLM to produce up to four precise search queries (in Python list format)
    based on the user’s query.
    """
    prompt = (
        "You are an expert research assistant. Given the user's query, generate up to four distinct, "
        "precise search queries that would help gather comprehensive information on the topic. "
        "Return only a Python list of strings without newlines or formatting, for example: ['query1', 'query2', 'query3']."
    )
    messages = [
        {"role": "system", "content": "You are a helpful and precise research assistant."},
        {"role": "user", "content": f"User Query: {user_query}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        try:
            # Clean up the response: remove code blocks, newlines, and extra whitespace
            cleaned_response = response.replace('```python', '').replace('```', '').strip()
            # Remove any newlines and normalize whitespace
            cleaned_response = ' '.join(cleaned_response.split())
            search_queries = eval(cleaned_response)
            if isinstance(search_queries, list):
                return search_queries
            else:
                print("LLM did not return a list. Response:", response)
                return []
        except Exception as e:
            print("Error parsing search queries:", e, "\nResponse:", response)
            return []
    return []

async def perform_search_async(session, query):
    """
    Asynchronously perform a search using SearXNG for the given query.
    Returns a list of result URLs.
    """
    # Note: you could add other parameters as needed.
    params = {
        "q": query,
        "engines" : "!web",
        "format": "json",
        "language": "en",
        "safesearch": 0,
        "pageno": 1
    }
    try:
        search_url = SEARXNG_URL.rstrip('/') + '/search'
        print(f"Searching with URL: {search_url}")
        print(f"Search query: {query}")
        print(f"Full request URL: {search_url}?{'&'.join(f'{k}={v}' for k,v in params.items())}")  # Debug full URL

        async with session.get(search_url, params=params) as resp:
            if resp.status == 200:
                try:
                    results = await resp.json()
                    print(f"Response status: {resp.status}")
                    if "results" in results:
                        links = [item.get("url") for item in results["results"] if "url" in item]
                        print(f"Found {len(links)} results")
                        return links
                    else:
                        print("Response content:", results)
                        return []
                except json.JSONDecodeError as e:
                    print(f"JSON decode error: {e}")
                    text = await resp.text()
                    print(f"Raw response: {text[:500]}")
                    return []
            else:
                text = await resp.text()
                print(f"SearXNG error {resp.status}: {text}")
                print(f"Request URL: {str(resp.url)}")
                return []
    except Exception as e:
        print(f"Error performing SearXNG search: {str(e)}")
        print(f"Request URL: {search_url}")
        return []

async def fetch_webpage_text_async(session, url):
    """
    Asynchronously retrieve and parse the text content of a webpage using BeautifulSoup.
    """
    try:
        # Using requests instead of aiohttp for simplicity with BeautifulSoup
        # In a production environment, you might want to use aiohttp with BS4
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        # Parse with BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')

        # Remove script and style elements
        for script in soup(["script", "style"]):
            script.decompose()

        # Get text and clean it up
        text = soup.get_text()

        # Clean up whitespace
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = ' '.join(chunk for chunk in chunks if chunk)

        return text
    except Exception as e:
        print(f"Error fetching webpage text: {e}")
        return ""

async def is_page_useful_async(session, user_query, page_text):
    """
    Ask the LLM if the provided webpage content is useful for answering the user's query.
    The LLM must reply with exactly "Yes" or "No".
    """
    prompt = (
        "You are a critical research evaluator. Given the user's query and the content of a webpage, "
        "determine if the webpage contains information relevant and useful for addressing the query. "
        "Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. Do not include any extra text."
    )
    messages = [
        {"role": "system", "content": "You are a strict and concise evaluator of research relevance."},
        {"role": "user", "content": f"User Query: {user_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            return answer
        else:
            # Fallback: try to extract Yes/No from the response.
            if "Yes" in answer:
                return "Yes"
            elif "No" in answer:
                return "No"
    return "No"

async def extract_relevant_context_async(session, user_query, search_query, page_text):
    """
    Given the original query, the search query used, and the page content,
    have the LLM extract all information relevant for answering the query.
    """
    prompt = (
        "You are an expert information extractor. Given the user's query, the search query that led to this page, "
        "and the webpage content, extract all pieces of information that are relevant to answering the user's query. "
        "Return only the relevant context as plain text without commentary."
    )
    messages = [
        {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
        {"role": "user", "content": f"User Query: {user_query}\nSearch Query: {search_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        return response.strip()
    return ""

async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts):
    """
    Based on the original query, the previously used search queries, and all the extracted contexts,
    ask the LLM whether additional search queries are needed. If yes, return a Python list of up to four queries;
    if the LLM thinks research is complete, it should return "<done>".
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an analytical research assistant. Based on the original query, the search queries performed so far, "
        "and the extracted contexts from webpages, determine if further research is needed. "
        "If further research is needed, provide up to four new search queries as a Python list (for example, "
        "['new query1', 'new query2']). If you believe no further research is needed, respond with exactly <done>."
        "\nOutput only a Python list or the token <done> without any additional text."
    )
    messages = [
        {"role": "system", "content": "You are a systematic research planner."},
        {"role": "user", "content": f"User Query: {user_query}\nPrevious Search Queries: {previous_search_queries}\n\nExtracted Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        cleaned = response.strip()
        if cleaned == "<done>":
            return "<done>"
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                return new_queries
            else:
                print("LLM did not return a list for new search queries. Response:", response)
                return []
        except Exception as e:
            print("Error parsing new search queries:", e, "\nResponse:", response)
            return []
    return []

async def generate_report_title(session, user_query, all_contexts):
    """
    Generate a concise, descriptive title for the report based on the query and findings.
    """
    prompt = (
        "Based on the user's query and the research findings, generate a concise but descriptive title "
        "for the report. The title should be clear and professional, no more than 10 words. "
        "Return only the title text without quotes or formatting."
    )
    messages = [
        {"role": "system", "content": "You are a professional title writer."},
        {"role": "user", "content": f"Query: {user_query}\n\nFindings Summary:\n{all_contexts[:1000]}\n\n{prompt}"}
    ]
    title = await call_openrouter_async(session, messages)
    return title.strip() if title else "Research Report"

def save_report_as_markdown(title, report_content, user_query):
    """
    Save the report as a markdown file in the reports directory.
    Returns the path to the saved file.
    """
    from pathlib import Path
    import re
    from datetime import datetime

    notebook_dir = Path.cwd()  # Use current working directory
    reports_dir = notebook_dir / "reports"
    reports_dir.mkdir(exist_ok=True)

    # Create a sanitized filename from the title
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    sanitized_title = re.sub(r'[^\w\s-]', '', title)
    sanitized_title = re.sub(r'[-\s]+', '-', sanitized_title).strip('-')
    filename = f"{timestamp}_{sanitized_title[:50]}.md"

    filepath = reports_dir / filename

    # Create the markdown content
    markdown_content = f"""# {title}

## Original Query
{user_query}

## Research Findings
{report_content}
"""

    # Save the file
    filepath.write_text(markdown_content, encoding='utf-8')
    return filepath

async def process_link(session, link, user_query, search_query):
    """
    Process a single link: fetch the webpage, check if it's useful,
    and if so, extract relevant context.
    """
    try:
        page_text = await fetch_webpage_text_async(session, link)
        if page_text:
            is_useful = await is_page_useful_async(session, user_query, page_text)
            if is_useful == "Yes":
                context = await extract_relevant_context_async(session, user_query, search_query, page_text)
                if context:
                    print(f"Extracted context from {link} (first 200 chars): {context[:200]}")
                    return context
    except Exception as e:
        print(f"Error processing link {link}: {e}")
    return None

async def process_batch_async(session, tasks, batch_size=3):
    """
    Process a list of tasks in batches to avoid overwhelming the API.
    Returns list of results in the same order as tasks.
    """
    results = []
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i + batch_size]
        batch_results = await asyncio.gather(*batch)
        results.extend(batch_results)
        # Add delay between batches
        if i + batch_size < len(tasks):
            await asyncio.sleep(RATE_LIMIT_DELAY * 2)
    return results

async def process_links_batch(session, links, user_query, unique_links):
    """
    Process links in small batches to avoid rate limits.
    """
    link_tasks = [
        process_link(session, link, user_query, unique_links[link])
        for link in links
    ]
    return await process_batch_async(session, link_tasks, batch_size=3)

# ...existing code before async_main...
async def generate_final_report_async(session, user_query, aggregated_contexts):
    """
    Stub function: combine contexts into a final report.
    """
    if aggregated_contexts:
        return "Final Report:\n" + "\n\n".join(aggregated_contexts)
    return "No research findings."

# =========================
# Main Asynchronous Routine
# =========================

async def async_main():
    user_query = input("Enter your research query/topic: ").strip()
    iter_limit_input = input("Enter maximum number of iterations (default 10): ").strip()
    iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10

    aggregated_contexts = []    # All useful contexts from every iteration
    all_search_queries = []     # Every search query used across iterations
    iteration = 0

    async with aiohttp.ClientSession() as session:
        # ----- INITIAL SEARCH QUERIES -----
        new_search_queries = await generate_search_queries_async(session, user_query)
        if not new_search_queries:
            print("No search queries were generated by the LLM. Exiting.")
            return
        all_search_queries.extend(new_search_queries)

        # ----- ITERATIVE RESEARCH LOOP -----
        while iteration < iteration_limit:
            print(f"\n=== Iteration {iteration + 1} ===")
            iteration_contexts = []

            # For each search query, perform SERPAPI searches concurrently.
            search_tasks = [perform_search_async(session, query) for query in new_search_queries]
            search_results = await asyncio.gather(*search_tasks)

            # Aggregate all unique links from all search queries of this iteration.
            # Map each unique link to the search query that produced it.
            unique_links = {}
            for idx, links in enumerate(search_results):
                query = new_search_queries[idx]
                for link in links:
                    if link not in unique_links:
                        unique_links[link] = query

            print(f"Aggregated {len(unique_links)} unique links from this iteration.")

            # Process links in batches
            all_links = list(unique_links.keys())
            link_results = []
            for i in range(0, len(all_links), 3):
                batch = all_links[i:i+3]
                results = await process_links_batch(session, batch, user_query, unique_links)
                link_results.extend(results)

            # Collect non-None contexts.
            for res in link_results:
                if res:
                    iteration_contexts.append(res)

            if iteration_contexts:
                aggregated_contexts.extend(iteration_contexts)
            else:
                print("No useful contexts were found in this iteration.")

            # ----- ASK THE LLM IF MORE SEARCHES ARE NEEDED -----
            new_search_queries = await get_new_search_queries_async(session, user_query, all_search_queries, aggregated_contexts)
            if new_search_queries == "<done>":
                print("LLM indicated that no further research is needed.")
                break
            elif new_search_queries:
                print("LLM provided new search queries:", new_search_queries)
                all_search_queries.extend(new_search_queries)
            else:
                print("LLM did not provide any new search queries. Ending the loop.")
                break
            iteration += 1

        # ----- FINAL REPORT -----
        print("\nGenerating final report...")
        max_report_retries = 3
        final_report = None

        for retry in range(max_report_retries):
            try:
                final_report = await generate_final_report_async(session, user_query, aggregated_contexts)
                if final_report:
                    break
                print(f"Empty report received, retrying ({retry + 1}/{max_report_retries})...")
                await asyncio.sleep(RATE_LIMIT_DELAY * 2)
            except Exception as e:
                print(f"Error generating report (attempt {retry + 1}): {e}")
                await asyncio.sleep(RATE_LIMIT_DELAY * 2)

        print("\n==== FINAL REPORT ====\n")
        if final_report:
            print(final_report)
            # Generate a report title and export report to a markdown file
            report_title = await generate_report_title(session, user_query, aggregated_contexts)
            file_path = save_report_as_markdown(report_title, final_report, user_query)
            print(f"Final report exported to: {file_path}")
        else:
            print("Failed to generate report after all retries.")
            print("\nCollected Contexts:")
            for idx, context in enumerate(aggregated_contexts, 1):
                print(f"\nContext {idx}:")
                print(context[:500] + "..." if len(context) > 500 else context)

def main():
    # Instead of asyncio.run(), use get_event_loop() since we're in a notebook
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())

# In a Jupyter notebook environment, just call main() directly
# Remove the if __name__ == "__main__" check as it's not needed in notebooks
main()
