In [None]:
# tools\__init__.py

from .tool_manager import ToolManager
from .search_manager import SearchManager, initialize_search_manager
from . import read_document
from . import fetch_latest_arxiv_papers
from . import foia_search
from . import get_yt_comments

__all__ = [
    'ToolManager',
    'SearchManager',
    'initialize_search_manager',
    'read_document',
    'fetch_latest_arxiv_papers',
    'foia_search',
    'get_yt_comments'
]


In [None]:
# tools\base\tool.py

"""
Base tool interface and implementation.
"""
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod

class BaseTool(ABC):
    """Abstract base class for all tools."""
    
    def __init__(self, tool_config: Optional[Dict[str, Any]] = None):
        self.tool_config = tool_config or {}
        
    @abstractmethod
    def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute the tool with the given parameters.
        
        Args:
            params: Tool parameters
            
        Returns:
            Dictionary containing tool execution results
        """
        pass
        
    @property
    @abstractmethod
    def tool_name(self) -> str:
        """Get the name of this tool."""
        pass
        
    @property
    @abstractmethod
    def description(self) -> str:
        """Get the description of this tool."""
        pass
        
    @property
    def required_params(self) -> list:
        """Get list of required parameters for this tool."""
        return []
        
    def validate_params(self, params: Dict[str, Any]) -> bool:
        """
        Validate that all required parameters are present.
        
        Args:
            params: Parameters to validate
            
        Returns:
            True if all required parameters are present, False otherwise
        """
        return all(param in params for param in self.required_params)


In [None]:
# tools\base_tool.py

"""Base tool class defining the interface for all tools."""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from dataclasses import dataclass

@dataclass
class ToolResult:
    """Container for tool execution results."""
    success: bool
    result: Any
    error: Optional[str] = None

class BaseTool(ABC):
    """Abstract base class for all tools."""
    
    def __init__(self, name: str, description: str, use_case: str = None, operation: str = None):
        """Initialize a tool.
        
        Args:
            name: Tool name
            description: Brief description of the tool's purpose
            use_case: Detailed description of when to use this tool
            operation: Technical description of how the tool works
        """
        self.name = name
        self.description = description
        self.use_case = use_case
        self.operation = operation
        
    @abstractmethod
    def execute(self, **kwargs) -> ToolResult:
        """Execute the tool with the given parameters.
        
        Args:
            **kwargs: Tool-specific parameters
            
        Returns:
            ToolResult containing execution status and result
        """
        pass
    
    @property
    @abstractmethod
    def parameters(self) -> Dict[str, Dict[str, Any]]:
        """Get the tool's parameter specifications.
        
        Returns:
            Dictionary mapping parameter names to their specifications:
            {
                "param_name": {
                    "type": str,  # Parameter type
                    "description": str,  # Parameter description
                    "required": bool,  # Whether parameter is required
                    "default": Any  # Default value if any
                }
            }
        """
        pass


In [None]:
# tools\content_extractor.py

from bs4 import BeautifulSoup, Comment
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium_stealth import stealth
from newspaper import Article
import logging
from typing import Optional, Tuple
from fake_useragent import UserAgent
import html2text
from urllib.parse import urlparse
import re

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def fetch_article_text(url: str) -> Tuple[str, str, Optional[str], str]:
    """Fetch article text and metadata using newspaper3k.
    
    Args:
        url (str): The URL of the article to fetch
        
    Returns:
        Tuple[str, str, Optional[str], str]: (title, author, publish_date, article_text)
    """
    article = Article(url)
    article.download()
    article.parse()
    
    title = article.title
    author = ', '.join(article.authors)
    pub_date = article.publish_date
    article_text = article.text
    
    return title, author, pub_date, article_text

class WebContentExtractor:
    """Extracts web content from a given URL with improved error handling and retry logic."""
    
    _driver = None
    
    def __init__(self, max_retries: int = 3, timeout: int = 10):
        self.max_retries = max_retries
        self.timeout = timeout
        self._initialize_driver()
    
    @classmethod
    def get_driver(cls):
        """Returns the shared WebDriver instance."""
        if cls._driver is None:
            cls._initialize_driver()
        return cls._driver
    
    @classmethod
    def _initialize_driver(cls):
        """Initializes Chrome WebDriver with enhanced anti-detection measures."""
        chrome_options = ChromeOptions()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        service = ChromeService(ChromeDriverManager().install())
        cls._driver = webdriver.Chrome(service=service, options=chrome_options)
        
        stealth(cls._driver,
                languages=["en-US", "en"],
                vendor="Google Inc.",
                platform="Win32",
                webgl_vendor="Intel Inc.",
                renderer="Intel Iris OpenGL Engine",
                fix_hairline=True)
    
    @classmethod
    def quit_driver(cls):
        """Quits the WebDriver."""
        if cls._driver:
            cls._driver.quit()
            cls._driver = None
    
    def __del__(self):
        """Ensure proper cleanup of WebDriver."""
        self.quit_driver()
    
    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Checks if a URL is valid."""
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except:
            return False
    
    @staticmethod
    def _extract_content_from_soup(soup: BeautifulSoup) -> str:
        """Helper method to extract and clean content from BeautifulSoup object."""
        # Remove unwanted elements
        for element in soup.find_all(['script', 'style', 'nav', 'footer', 'header']):
            element.decompose()
        
        # Remove comments
        for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
            comment.extract()
        
        # Get text content
        text = soup.get_text(separator=' ', strip=True)
        
        # Clean up whitespace
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    
    def _extract_with_requests(self, url: str) -> Optional[str]:
        """Extracts content using requests."""
        try:
            headers = {'User-Agent': UserAgent().random}
            response = requests.get(url, headers=headers, timeout=self.timeout)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            return self._extract_content_from_soup(soup)
        except Exception as e:
            logger.error(f"Error extracting content with requests: {e}")
            return None
    
    def _extract_with_newspaper(self, url: str) -> Optional[str]:
        """Extracts content using newspaper3k."""
        try:
            article = Article(url)
            article.download()
            article.parse()
            return article.text.strip()
        except Exception as e:
            logger.error(f"Error extracting content with newspaper3k: {e}")
            return None
    
    def extract_with_selenium(self, url: str) -> Optional[str]:
        """Extracts content using Selenium with better error handling and wait conditions."""
        try:
            driver = self.get_driver()
            driver.get(url)
            
            # Wait for body to be present
            WebDriverWait(driver, self.timeout).until(
                EC.presence_of_element_located((By.TAG_NAME, "body"))
            )
            
            # Get page source and parse with BeautifulSoup
            soup = BeautifulSoup(driver.page_source, 'html.parser')
            return self._extract_content_from_soup(soup)
        except Exception as e:
            logger.error(f"Error extracting content with Selenium: {e}")
            return None
    
    def extract_content(self, url: str, retry_count: int = 0) -> Optional[str]:
        """Extract content with automatic retry and fallback mechanisms.
        
        The extraction methods are tried in the following order:
        1. requests with BeautifulSoup (fastest, works for simple pages)
        2. newspaper3k (best for article content)
        3. Selenium (best for dynamic content, slowest)
        """
        if not self.is_valid_url(url):
            logger.error(f"Invalid URL: {url}")
            return None
        
        # Try different extraction methods in order
        content = None
        
        # Try requests first (fastest)
        content = self._extract_with_requests(url)
        if content and len(content.strip()) >= 200:
            return content
        
        # Try newspaper3k second (good for articles)
        content = self._extract_with_newspaper(url)
        if content and len(content.strip()) >= 200:
            return content
        
        # Finally, try Selenium (best for dynamic content)
        content = self.extract_with_selenium(url)
        if content and len(content.strip()) >= 200:
            return content
        
        # If all methods fail and we haven't exceeded max retries
        if retry_count < self.max_retries:
            logger.info(f"Retrying content extraction for {url} (attempt {retry_count + 1})")
            return self.extract_content(url, retry_count + 1)
        
        logger.error(f"All content extraction methods failed for {url}")
        return None


In [None]:
# tools\core\__init__.py



In [None]:
# tools\core\code_analysis_tool.py

"""
Code analysis tool implementation.
"""
from typing import Dict, Any, List
import ast
from ..base.tool import BaseTool

class CodeAnalysisTool(BaseTool):
    """Tool for analyzing Python code."""
    
    def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Analyze Python code for various metrics and issues.
        
        Args:
            params: Must contain 'code' key with Python code to analyze
            
        Returns:
            Dictionary containing analysis results
        """
        if not self.validate_params(params):
            return {"error": "Missing required parameters"}
            
        code = params["code"]
        try:
            tree = ast.parse(code)
            return {
                "metrics": self._compute_metrics(tree),
                "issues": self._find_issues(tree)
            }
        except SyntaxError as e:
            return {"error": f"Syntax error in code: {str(e)}"}
            
    def _compute_metrics(self, tree: ast.AST) -> Dict[str, Any]:
        """Compute code metrics."""
        metrics = {
            "num_functions": len([n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)]),
            "num_classes": len([n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)]),
            "num_imports": len([n for n in ast.walk(tree) if isinstance(n, (ast.Import, ast.ImportFrom))]),
            "complexity": self._compute_complexity(tree)
        }
        return metrics
        
    def _compute_complexity(self, tree: ast.AST) -> int:
        """Compute cyclomatic complexity."""
        complexity = 0
        for node in ast.walk(tree):
            if isinstance(node, (ast.If, ast.While, ast.For, ast.Try, ast.ExceptHandler)):
                complexity += 1
        return complexity
        
    def _find_issues(self, tree: ast.AST) -> List[Dict[str, Any]]:
        """Find potential code issues."""
        issues = []
        
        # Check for bare excepts
        for node in ast.walk(tree):
            if isinstance(node, ast.ExceptHandler) and node.type is None:
                issues.append({
                    "type": "bare_except",
                    "message": "Bare except clause found",
                    "line": node.lineno
                })
                
        # Check for unused imports
        imports = {}
        for node in ast.walk(tree):
            if isinstance(node, (ast.Import, ast.ImportFrom)):
                for name in node.names:
                    imports[name.asname or name.name] = node.lineno
                    
        # Add more checks as needed
        return issues
        
    @property
    def tool_name(self) -> str:
        return "code_analysis"
        
    @property
    def description(self) -> str:
        return "Analyze Python code for metrics and potential issues"
        
    @property
    def required_params(self) -> List[str]:
        return ["code"]


In [None]:
# tools\core\search_tool.py

"""
Search tool implementation.
"""
from typing import Dict, Any, List
import requests
from ..base.tool import BaseTool
from ...config import TAVILY_API_KEY

class SearchTool(BaseTool):
    """Tool for performing web searches."""
    
    def __init__(self, tool_config: Dict[str, Any] = None):
        super().__init__(tool_config)
        self._api_key = TAVILY_API_KEY
        
    def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute a web search.
        
        Args:
            params: Must contain 'query' key with search query
            
        Returns:
            Dictionary containing search results
        """
        if not self.validate_params(params):
            return {"error": "Missing required parameters"}
            
        query = params["query"]
        max_results = params.get("max_results", 5)
        
        try:
            response = requests.post(
                "https://api.tavily.com/search",
                json={
                    "api_key": self._api_key,
                    "query": query,
                    "max_results": max_results
                }
            )
            response.raise_for_status()
            return {"results": response.json()["results"]}
            
        except requests.exceptions.RequestException as e:
            return {"error": f"Search failed: {str(e)}"}
            
    @property
    def tool_name(self) -> str:
        return "web_search"
        
    @property
    def description(self) -> str:
        return "Perform web searches to find relevant information"
        
    @property
    def required_params(self) -> List[str]:
        return ["query"]


In [None]:
# tools\extract_with_newspaper.py


from newspaper import Article

def fetch_article_text(url):
    article = Article(url)
    article.download()
    article.parse()
    
    title = article.title
    author = ', '.join(article.authors)
    pub_date = article.publish_date
    article_text = article.text
    
    return title, author, pub_date, article_text

if __name__ == '__main__':
    url = 'https://www.aha.io/roadmapping/guide/ideation/templates'
    title, author, pub_date, article_text = fetch_article_text(url)
    
    print(f"Title: {title}\n")
    print(f"Author: {author}\n")
    print(f"Published Date: {pub_date}\n")
    print(f"Article Text: {article_text}\n")

In [None]:
# tools\fetch_latest_arxiv_papers.py


import requests
import feedparser
def fetch_latest_arxiv_papers(topic: str) -> list:
    """
    Fetch the latest arXiv results for the given topic.

    Parameters
    ----------
    topic : str
        The topic to search for.

    Returns
    -------
    list
        A list of dictionaries containing the title, authors, summary and
        published date of each paper.
    """
    url = f'http://export.arxiv.org/api/query?search_query=all:{topic}&start=0&max_results=5&sortBy=submittedDate&sortOrder=descending'
    response = requests.get(url)
    feed = feedparser.parse(response.content)

    results = []
    for entry in feed.entries:
        paper = {
            'title': entry.title,
            'authors': [author.name for author in entry.authors],
            'summary': entry.summary,
            'published': entry.published,
            'link': entry.link
        }
        results.append(paper)
    return results

if __name__ == '__main__':
    topic = input("Enter a topic to search for: ")
    results = fetch_latest_arxiv_papers(topic)
    for paper in results:
        print(f"Title: {paper['title']}\n")
        print(f"Authors: {', '.join(paper['authors'])}\n")
        print(f"Published: {paper['published']}\n")
        print(f"Summary: {paper['summary']}\n")
        print(f"Link: {paper['link']}\n")
        print('-' * 80)


In [None]:
# tools\foia_search.py

import requests
from bs4 import BeautifulSoup
import random
from typing import List
from search_manager import WebContentExtractor
import logging

logger = logging.getLogger(__name__)

def foia_search(query: str) -> List[str]:
    """Searches FOIA.gov for the given query and returns a list of relevant content."""
    url = f"https://search.foia.gov/search?utf8=%E2%9C%93&m=true&affiliate=foia.gov&query={query.replace(' ', '+')}"
    web_content_extractor = WebContentExtractor()
    headers = {
        'User-Agent': random.choice(web_content_extractor.USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'DNT': '1',
    }
    try:
        response = requests.get(url, headers=headers, timeout=web_content_extractor.TIMEOUT)
        response.raise_for_status()
        html_content = response.content.decode('utf-8')
        soup = BeautifulSoup(html_content, 'html.parser')

        result_links = [a['href'] for a in soup.select('.result-title a') if a.has_attr('href')]

        content = []
        for link in result_links:
            try:
                if extracted_content := WebContentExtractor.extract_content(link):
                    content.append(extracted_content)
            except Exception as e:
                logger.error(f"Error extracting content from {link}: {e}")

        return content
    except requests.exceptions.RequestException as e:
        logger.error(f"Error searching FOIA.gov: {e}")
        return []

In [None]:
# tools\get_yt_comments.py

import os
import sys
import importlib
from youtube_comment_downloader import YoutubeCommentDownloader

# Get the directory of the current file
current_dir = os.path.dirname(os.path.abspath(__file__))

# Add the parent directory to the Python path
parent_dir = os.path.dirname(current_dir)  # Get the parent directory
sys.path.insert(0, parent_dir)

import models  # Now import models

# Get the ModelManager class
ModelManager = models.ModelManager

# Rest of the code...
model_manager = ModelManager()  # Initialize *after* defining ModelManager
from youtube_comment_downloader import YoutubeCommentDownloader
def fetch_comments(video_url):
    downloader = YoutubeCommentDownloader()
    return [
        comment['text']
        for comment in downloader.get_comments_from_url(video_url)
    ]

def save_comments_to_file(comments, filename):
    with open(filename, 'w', encoding='utf-8') as file:
        for comment in comments:
            file.write(comment + '\n')

def main():
    url = input("Enter youtube URL: ")
    comments = fetch_comments(url)
    save_comments_to_file(comments, 'comments.txt')
    model_manager = ModelManager()
    prompt = f"Summarize and extract all insights from the following comment sections for a Youtube Video: \n {comments}"
    response = model_manager.generate_response(prompt)
    for comment in comments:
        print(comment)
    print(response.text)

if __name__ == '__main__':
    main()


In [None]:
# tools\google_search.py

import pprint
from googleapiclient.discovery import build
import os
import dotenv
import sys
import os
# Get the parent directory
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
from ..config import GEMINI_API_KEY, GOOGLE_CUSTOM_SEARCH_API_KEY, GOOGLE_CUSTOM_SEARCH_ENGINE_ID

# Add the parent directory to sys.path
sys.path.insert(0, parent_dir)
dotenv.load_dotenv()
GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY')
GOOGLE_CUSTOM_SEARCH_ENGINE_ID = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_ID')

print(GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY)
print(GOOGLE_CUSTOM_SEARCH_ENGINE_ID)
def search(query):
    service = build("customsearch", "v1", developerKey=GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY)
    res = (
        service.cse()
        .list(
            q=query,
            cx=GOOGLE_CUSTOM_SEARCH_ENGINE_ID,
        )
        .execute()
    )
    pprint.pprint(res)
    return res

search("lectures")



def main():
    # Build a service object for interacting with the API. Visit
    # the Google APIs Console <http://code.google.com/apis/console>
    # to get an API key for your own application.
    service = build(
        "customsearch", "v1", developerKey=GOOGLE_CUSTOM_SEARCH_API_KEY
    )

    res = (
        service.cse()
        .list(
            q="lectures",
            cx="32e9bbeb5cbee467a:omuauf_lfve",
        )
        .execute()
    )
    pprint.pprint(res)


if __name__ == "__main__":
    main()

In [None]:
# tools\llm_tools.py

"""LLM-compatible tools using the @tool decorator pattern."""
from typing import List, Dict, Optional, Any
from langchain.tools import tool
from .specialized_search import FOIASearchProvider, ArXivSearchProvider
import asyncio
from dataclasses import dataclass
from datetime import datetime

@dataclass
class SearchResult:
    """Standardized search result format for LLM consumption."""
    title: str
    url: str
    snippet: str
    content: str
    metadata: Dict[str, Any]

class AsyncRunner:
    """Helper class to run async functions in sync context."""
    @staticmethod
    def run(coro):
        return asyncio.get_event_loop().run_until_complete(coro)

@tool
def search_foia(query: str, max_results: int = 10) -> List[Dict[str, str]]:
    """
    Search FOIA.gov for government records and documents.
    
    Args:
        query: The search query to find FOIA records
        max_results: Maximum number of results to return (default: 10)
    
    Returns:
        List of dictionaries containing:
        - title: Document title
        - url: Document URL
        - snippet: Brief description
        - content: Full document content if available
    """
    provider = FOIASearchProvider()
    results = AsyncRunner.run(provider.search(query, max_results))
    
    return [
        {
            "title": r.title,
            "url": r.url,
            "snippet": r.snippet,
            "content": r.content
        }
        for r in results
    ]

@tool
def search_arxiv(query: str, max_results: int = 10) -> List[Dict[str, str]]:
    """
    Search arXiv for scientific papers.
    
    Args:
        query: The search query to find papers
        max_results: Maximum number of results to return (default: 10)
    
    Returns:
        List of dictionaries containing:
        - title: Paper title
        - url: arXiv URL
        - snippet: Abstract preview
        - content: Full paper details including abstract and metadata
    """
    provider = ArXivSearchProvider()
    results = AsyncRunner.run(provider.search(query, max_results))
    
    return [
        {
            "title": r.title,
            "url": r.url,
            "snippet": r.snippet,
            "content": r.content
        }
        for r in results
    ]

@tool
def get_latest_arxiv_papers(
    category: Optional[str] = None,
    max_results: int = 10,
    days: int = 7
) -> List[Dict[str, Any]]:
    """
    Get the latest papers from arXiv, optionally filtered by category.
    
    Args:
        category: arXiv category (e.g., 'cs.AI', 'physics') (optional)
        max_results: Maximum number of results to return (default: 10)
        days: Number of past days to search (default: 7)
    
    Returns:
        List of dictionaries containing:
        - title: Paper title
        - url: arXiv URL
        - authors: List of author names
        - published_date: Publication date
        - updated_date: Last update date
        - categories: List of arXiv categories
        - abstract: Full paper abstract
        - pdf_url: Direct link to PDF
    """
    provider = ArXivSearchProvider()
    results = AsyncRunner.run(provider.get_latest_papers(
        category=category,
        max_results=max_results,
        days=days
    ))
    
    formatted_results = []
    for r in results:
        # Parse the content string into structured data
        content_lines = r.content.split('\n')
        metadata = {}
        current_field = None
        
        for line in content_lines:
            if line.startswith('Authors:'):
                metadata['authors'] = line.replace('Authors:', '').strip().split(', ')
            elif line.startswith('Published:'):
                metadata['published_date'] = line.replace('Published:', '').strip()
            elif line.startswith('Updated:'):
                metadata['updated_date'] = line.replace('Updated:', '').strip()
            elif line.startswith('Categories:'):
                metadata['categories'] = line.replace('Categories:', '').strip().split(', ')
            elif line.startswith('Abstract:'):
                current_field = 'abstract'
                metadata['abstract'] = ''
            elif line.startswith('PDF URL:'):
                metadata['pdf_url'] = line.replace('PDF URL:', '').strip()
            elif current_field == 'abstract' and line.strip():
                metadata['abstract'] = metadata.get('abstract', '') + line + '\n'
        
        formatted_results.append({
            "title": r.title,
            "url": r.url,
            **metadata
        })
    
    return formatted_results

def get_llm_tools():
    """Get all LLM-compatible tools."""
    return [
        search_foia,
        search_arxiv,
        get_latest_arxiv_papers
    ]


In [None]:
# tools\print_directory_structure.py

import os
import sys

def get_input_directory():
    while True:
        if len(sys.argv) > 1:
            input_directory = sys.argv[1]
        else:
            user_input = input('Specify the directory for which you wish to print its structure or press Enter to use the current directory: ')
            if user_input == '':
                return os.getcwd()
            else:
                input_directory = user_input
        
        # Validate the provided path
        if not os.path.isdir(input_directory):
            print("The specified path is not a directory. Please try again.")
            continue
        else:
            return input_directory

def print_directory_structure(root_dir):
    for root, dirs, files in os.walk(root_dir):
        level = root.replace(root_dir, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print('{}{}/'.format(indent, os.path.basename(root)))
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print('{}{}'.format(subindent, f))

if __name__ == "__main__":
    input_directory = get_input_directory()
    print_directory_structure(input_directory)

In [None]:
# tools\read_document.py

import os

def read_document(file_path: str) -> str:
    """Reads the content of a document file.
    
    Args:
        file_path (str): The path to the document file.
        
    Returns:
        str: The content of the document.
        
    Raises:
        FileNotFoundError: If the file does not exist.
        IOError: If the file cannot be read.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except Exception as e:
        raise IOError(f"Error reading file {file_path}: {e}")

In [None]:
# tools\research_summary.py

from tools.search_manager import SearchManager, initialize_search_manager
from tools.tool_manager import ToolManager
from models.model_manager import ModelManager
from config import GEMINI_API_KEY
from models.llm_providers import GeminiProvider
from agents.agent import AgentManager, Agent

import json
from typing import List, Dict, Optional
import logging
from dataclasses import dataclass
from datetime import datetime
import re
import os

logger = logging.getLogger(__name__)

@dataclass
class ResearchIteration:
    query: str
    search_results: str
    summary: str
    extracted_info: Dict[str, str]
    timestamp: datetime
    confidence_level: float
    relevance_score: float
    sources: List[Dict[str, str]]

class ResearchResponse:
    def __init__(self, raw_response: str):
        self.raw_response = raw_response
        self._parse_response()
    
    def _parse_response(self):
        """Parse all sections in one pass for better efficiency."""
        # Initialize default values
        self.search_query = None
        self.extracted_info = {}
        self.final_report = None
        self.confidence_level = 0.0
        self.relevance_score = 0.0
        
        # Define all patterns
        patterns = {
            'search_query': r"SEARCH QUERY:\s*\{([^}]+)\}",
            'extracted_info': r"EXTRACTED INFO:\s*\{([^}]+)\}",
            'final_report': r"FINAL REPORT:\s*\{([^}]+)\}",
            'confidence': r"CONFIDENCE:\s*(\d*\.?\d+)",
            'relevance': r"RELEVANCE:\s*(\d*\.?\d+)"
        }
        
        # Extract all sections in one pass
        for section, pattern in patterns.items():
            match = re.search(pattern, self.raw_response, re.DOTALL)
            if match:
                if section == 'extracted_info':
                    self.extracted_info = self._parse_info_section(match.group(1))
                elif section == 'confidence':
                    self.confidence_level = float(match.group(1))
                elif section == 'relevance':
                    self.relevance_score = float(match.group(1))
                else:
                    setattr(self, section, match.group(1).strip())

    def _parse_info_section(self, info_text: str) -> Dict[str, str]:
        """Enhanced parsing of the extracted info section."""
        info_dict = {}
        current_key = None
        current_value = []
        
        for line in info_text.split('\n'):
            line = line.strip()
            if not line:
                continue
                
            if ':' in line and not current_key:
                key, value = line.split(':', 1)
                current_key = key.strip()
                current_value = [value.strip()]
            elif current_key and line.startswith(' '):
                current_value.append(line.strip())
            elif current_key:
                info_dict[current_key] = '\n'.join(current_value)
                if ':' in line:
                    key, value = line.split(':', 1)
                    current_key = key.strip()
                    current_value = [value.strip()]
                
        if current_key:
            info_dict[current_key] = '\n'.join(current_value)
            
        return info_dict

@dataclass
class AnalystFeedback:
    relevance_assessment: float
    direction_assessment: float
    coverage_assessment: float
    feedback: str
    recommendations: List[str]
    timestamp: datetime

@dataclass
class ResearchLog:
    researcher_output: str
    analyst_feedback: AnalystFeedback
    iteration: int
    timestamp: datetime

class ResearchAnalyst:
    def __init__(self, model_manager: ModelManager):
        self.model_manager = model_manager
        self.system_prompt = """You are an expert research analyst who evaluates the quality, relevance, and direction of ongoing research.
        
RESPONSE FORMAT INSTRUCTIONS:
Always structure your responses using these exact sections:

RESEARCH ASSESSMENT: {
    relevance_score: [0.0-1.0]  // How relevant is the collected information
    direction_score: [0.0-1.0]  // How well aligned with the research goal
    coverage_score: [0.0-1.0]   // How comprehensive is the coverage
    
    strengths: {
        - List key strengths of current research direction
        - Highlight particularly valuable findings
    }
    
    weaknesses: {
        - Identify gaps in coverage
        - Point out potential biases or oversights
        - Flag any irrelevant tangents
    }
    
    recommendations: {
        - Specific suggestions for next steps
        - Areas needing more focus
        - Topics to avoid or de-prioritize
    }
}

DETAILED FEEDBACK: {
    Provide specific, actionable feedback on:
    1. Information Quality
    2. Research Direction
    3. Knowledge Gaps
    4. Methodology
    5. Source Quality
}

Your task is to critically analyze research progress and provide guidance to keep the research focused and effective."""

        self.analyst = self._initialize_analyst()

    def _initialize_analyst(self) -> Agent:
        """Initialize the analyst agent."""
        try:
            agent_manager = AgentManager(tool_manager=None)  # Analyst doesn't need tools
            return agent_manager.create_agent(
                "analyst",
                "research_analyst",
                self.model_manager,
                None,
                instruction=self.system_prompt,
                model_config={
                    "temperature": 0.7,
                    "max_tokens": 2048,
                    "top_p": 0.95
                }
            )
        except Exception as e:
            logger.error(f"Failed to initialize analyst agent: {e}")
            raise

    def analyze_research(self, topic: str, research_log: Dict[str, List[ResearchLog]], 
                        current_knowledge: Dict[str, Dict]) -> AnalystFeedback:
        """Analyze current research progress and provide feedback."""
        try:
            # Prepare analysis prompt
            analysis_prompt = self._prepare_analysis_prompt(topic, research_log, current_knowledge)
            
            # Get analyst's assessment
            response = self.analyst.generate_response(analysis_prompt)
            
            # Parse analyst's response
            feedback = self._parse_analyst_response(response)
            
            return feedback
            
        except Exception as e:
            logger.error(f"Research analysis failed: {e}")
            raise

    def _prepare_analysis_prompt(self, topic: str, research_log: Dict[str, List[ResearchLog]], 
                               current_knowledge: Dict[str, Dict]) -> str:
        """Prepare the prompt for the analyst."""
        return f"""Analyze the current research progress on topic: {topic}

Research History:
{self._format_research_log(research_log)}

Current Knowledge State:
{json.dumps(current_knowledge, indent=2)}

Provide a comprehensive analysis following the required response format."""

    def _format_research_log(self, research_log: Dict[str, List[ResearchLog]]) -> str:
        """Format research log for the prompt."""
        formatted_log = []
        for iteration, logs in research_log.items():
            formatted_log.append(f"\nIteration {iteration}:")
            for log in logs:
                formatted_log.append(f"Researcher Output: {log.researcher_output}")
                formatted_log.append(f"Previous Analysis: {log.analyst_feedback.feedback}")
        return "\n".join(formatted_log)

    def _parse_analyst_response(self, response: str) -> AnalystFeedback:
        """Parse the analyst's response into structured feedback."""
        # Extract scores using regex
        relevance_score = float(re.search(r"relevance_score:\s*(\d*\.?\d+)", response).group(1))
        direction_score = float(re.search(r"direction_score:\s*(\d*\.?\d+)", response).group(1))
        coverage_score = float(re.search(r"coverage_score:\s*(\d*\.?\d+)", response).group(1))
        
        # Extract recommendations
        recommendations_match = re.search(r"recommendations:\s*\{([^}]+)\}", response, re.DOTALL)
        recommendations = [r.strip() for r in recommendations_match.group(1).split('-') if r.strip()]
        
        # Extract detailed feedback
        feedback_match = re.search(r"DETAILED FEEDBACK:\s*\{([^}]+)\}", response, re.DOTALL)
        feedback = feedback_match.group(1).strip()
        
        return AnalystFeedback(
            relevance_assessment=relevance_score,
            direction_assessment=direction_score,
            coverage_assessment=coverage_score,
            feedback=feedback,
            recommendations=recommendations,
            timestamp=datetime.now()
        )

class ResearchAgent:
    def __init__(self, model_manager: ModelManager, tool_manager: ToolManager):
        self.model_manager = model_manager
        self.tool_manager = tool_manager
        self.research_history: List[ResearchIteration] = []
        self.knowledge_base: Dict[str, Dict] = {
            'facts': {},
            'sources': {},
            'topics': {},
            'uncertainties': {},
            'metadata': {
                'last_updated': None,
                'confidence_history': [],
                'relevance_history': []
            }
        }
        
        # Load agent configuration
        with open('agents.json', encoding='utf-8', errors='ignore') as f:
            agents_config = json.load(f)
            self.researcher_config = agents_config['researcher']
        
        self.system_prompt = """You are an expert research agent that conducts thorough investigations through iterative web searches.

RESPONSE FORMAT INSTRUCTIONS:
Always structure your responses using these exact sections:

SEARCH QUERY: {
    Write a specific, focused search query based on current knowledge gaps
    Format: "keyword1 keyword2 -exclude_term site:domain.com"
}

EXTRACTED INFO: {
    key_fact1: detailed description with source reference
    key_fact2: multi-line description
               with continuation and source
    source1: {url: link, credibility: score, date: timestamp}
    uncertainty1: specific areas needing clarification
}

CONFIDENCE: [0.0-1.0]
RELEVANCE: [0.0-1.0]

FINAL REPORT: {
    Only include this section when confidence >= 0.9 or max iterations reached
    Structure the report with:
    1. Executive Summary
    2. Key Findings
    3. Detailed Analysis
    4. Sources and Citations
    5. Reliability Assessment
    6. Further Research Needed
}

RESEARCH PROCESS:
1. Analyze current knowledge gaps
2. Formulate precise search queries using search operators
3. Extract & organize key information with source tracking
4. Assess information reliability and relevance
5. Generate comprehensive report when ready

EVALUATION CRITERIA:
- Information relevance and reliability (scored 0-1)
- Source credibility with explicit scoring
- Knowledge completeness with gap analysis
- Logical connections between facts
- Contradictions and uncertainties tracking

Your task is to build comprehensive understanding through iterative research while maintaining strict response formatting."""
        
        self.researcher = self._initialize_researcher()
        self.research_log = {}
        self.current_iteration = 0
        self.analyst = ResearchAnalyst(model_manager)

    def _initialize_researcher(self) -> Agent:
        """Initialize the researcher agent with proper configuration."""
        try:
            agent_manager = AgentManager(tool_manager=self.tool_manager)
            return agent_manager.create_agent(
                "researcher",
                self.researcher_config['agent_type'],
                self.model_manager,
                self.tool_manager,
                instruction=self.system_prompt,
                tools=["web_search"],
                model_config={
                    "temperature": 0.7,
                    "max_tokens": 2048,
                    "top_p": 0.95
                }
            )
        except Exception as e:
            logger.error(f"Failed to initialize researcher agent: {e}")
            raise

    def update_knowledge_base(self, extracted_info: Dict[str, str], relevance_score: float):
        """Enhanced knowledge base update with metadata tracking."""
        current_time = datetime.now()
        
        # Update metadata
        self.knowledge_base['metadata']['last_updated'] = current_time
        self.knowledge_base['metadata']['relevance_history'].append({
            'score': relevance_score,
            'timestamp': current_time
        })
        
        # Process extracted information with improved categorization
        for key, value in extracted_info.items():
            category = self._categorize_info(key)
            if category:
                if isinstance(value, dict):
                    self.knowledge_base[category][key] = {
                        'content': value,
                        'added': current_time,
                        'relevance': relevance_score
                    }
                else:
                    self.knowledge_base[category][key] = {
                        'content': value,
                        'added': current_time,
                        'relevance': relevance_score
                    }

    def _categorize_info(self, key: str) -> Optional[str]:
        """Improved information categorization."""
        prefixes = {
            'fact_': 'facts',
            'source_': 'sources',
            'topic_': 'topics',
            'uncertainty_': 'uncertainties'
        }
        
        for prefix, category in prefixes.items():
            if key.startswith(prefix):
                return category
                
        # Try to infer category from content
        return self._infer_category(key)

    def _infer_category(self, key: str) -> str:
        """Infer the category of information based on content patterns."""
        key_lower = key.lower()
        if any(word in key_lower for word in ['url', 'link', 'source', 'reference']):
            return 'sources'
        elif any(word in key_lower for word in ['unknown', 'unclear', 'question']):
            return 'uncertainties'
        elif any(word in key_lower for word in ['topic', 'subject', 'theme']):
            return 'topics'
        return 'facts'

    def conduct_research(self, topic: str, max_iterations: int = 5, results_per_query: int = 5) -> str:
        """Conduct iterative research on a topic with analyst feedback."""
        try:
            iteration = 0
            final_report = None

            while iteration < max_iterations:
                # Generate research prompt with current knowledge state and analyst feedback
                research_prompt = self._prepare_research_prompt(topic, iteration)

                # Get structured response from researcher
                raw_response = self.researcher.generate_response(research_prompt)
                response = ResearchResponse(raw_response)

                # Get analyst feedback
                analyst_feedback = self.analyst.analyze_research(
                    topic,
                    self.research_log,
                    self.knowledge_base
                )

                # Store research log
                if iteration not in self.research_log:
                    self.research_log[iteration] = []
                
                self.research_log[iteration].append(ResearchLog(
                    researcher_output=raw_response,
                    analyst_feedback=analyst_feedback,
                    iteration=iteration,
                    timestamp=datetime.now()
                ))

                if response.search_query:
                    # Perform web search
                    try:
                        search_results = self.tool_manager.web_search(
                            response.search_query,
                            results_per_query
                        )
                        
                        # Update knowledge base with new information
                        self.update_knowledge_base(response.extracted_info, response.relevance_score)
                        
                        # Store research iteration
                        self.research_history.append(ResearchIteration(
                            query=response.search_query,
                            search_results=search_results,
                            summary=raw_response,
                            extracted_info=response.extracted_info,
                            timestamp=datetime.now(),
                            confidence_level=response.confidence_level,
                            relevance_score=response.relevance_score,
                            sources=response.sources
                        ))

                    except Exception as e:
                        logger.error(f"Search failed: {e}")
                        continue

                # Check if we should continue based on analyst feedback
                if analyst_feedback.coverage_assessment >= 0.9 and analyst_feedback.relevance_assessment >= 0.9:
                    final_report = self.generate_final_report(topic)
                    break

                iteration += 1

            # Generate final report if not already done
            if not final_report:
                final_report = self.generate_final_report(topic)

            return final_report

        except Exception as e:
            logger.error(f"Research process failed: {e}")
            raise

    def _prepare_research_prompt(self, topic: str, iteration: int) -> str:
        """Prepare research prompt with analyst feedback."""
        prompt = f"""Research Topic: {topic}

Current Knowledge Base:
{json.dumps(self.knowledge_base, indent=2)}

Research History and Analysis:
{self._format_research_history(iteration)}

Analyze the current state of research and respond in the required format with either:
1. A new search query to fill knowledge gaps, or
2. A final report if confidence level is sufficient (>=0.9)"""

        return prompt

    def _format_research_history(self, current_iteration: int) -> str:
        """Format research history with analyst feedback."""
        history = []
        for iteration in range(current_iteration):
            if iteration in self.research_log:
                for log in self.research_log[iteration]:
                    history.append(f"\nIteration {iteration}:")
                    history.append(f"Research Output: {log.researcher_output}")
                    history.append(f"Analyst Feedback: {log.analyst_feedback.feedback}")
                    history.append("Recommendations:")
                    for rec in log.analyst_feedback.recommendations:
                        history.append(f"- {rec}")
        return "\n".join(history)

    def generate_final_report(self, topic: str) -> str:
        """Generate a final report based on accumulated knowledge."""
        report_prompt = f"""Generate a comprehensive final report on '{topic}' using the accumulated knowledge:

Knowledge Base:
{json.dumps(self.knowledge_base, indent=2)}

Research History:
{json.dumps([{
    'query': iter.query,
    'summary': iter.summary,
    'confidence': iter.confidence_level
} for iter in self.research_history], indent=2)}

Provide a complete report in the FINAL REPORT format."""

        response = self.researcher.generate_response(report_prompt)
        parsed_response = ResearchResponse(response)
        return parsed_response.final_report or "Failed to generate final report"

def main():
    """Main execution function."""
    try:
        # Initialize required components
        search_manager = initialize_search_manager()
        tool_manager = tools.ToolManager(search_manager)
        model_manager = ModelManager()
        
        # Create research agent
        researcher = ResearchAgent(model_manager, tool_manager)
        
        # Get user input
        topic = input("Enter research topic: ")
        max_iterations = int(input("Enter maximum research iterations (1-5): "))
        results_per_query = int(input("Enter results per search (1-10): "))
        
        # Validate inputs
        max_iterations = min(max(1, max_iterations), 5)
        results_per_query = min(max(1, results_per_query), 10)
        
        # Conduct research and generate report
        report = researcher.conduct_research(
            topic, 
            max_iterations=max_iterations,
            results_per_query=results_per_query
        )
        
        # Print report
        print("\nFinal Research Report:")
        print("=" * 80)
        print(report)
        
    except Exception as e:
        logger.error(f"Research process failed: {e}")
        raise

if __name__ == "__main__":
    main()


In [None]:
# tools\scrape_current_window_url.py

import pygetwindow as gw
import pyperclip
import keyboard
import sys
import os

# Get the directory of this file
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory (project root)
parent_dir = os.path.dirname(current_dir)
# Add the project root to the Python path
sys.path.insert(0, parent_dir)

from search_manager import WebContentExtractor, SearchManager, SearchProvider, SearchAPI, DuckDuckGoSearchProvider
import datetime
import os
import psutil

def is_browser_window(window_title):
    """Check if the current window is a browser window."""
    browsers = ["Microsoft Edge", "Google Chrome", "Firefox", "Safari", "Opera"]
    return any(browser in window_title for browser in browsers)

def setup_hotkey():
    """Set up a hotkey (Ctrl+Alt+C) to capture and scrape the current browser URL."""
    def on_hotkey():
        url = capture_url()
        if url:
            content_extractor = WebContentExtractor()
            content = content_extractor.extract_content(url)
            save_to_file(url, content)
            print(f"Captured and saved content from: {url}")
        else:
            print("No valid URL found in current window")

    keyboard.add_hotkey('ctrl+alt+c', on_hotkey)
    print("Hotkey (Ctrl+Alt+C) registered for URL capture")


def get_edge_url_with_psutil():
    for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
        if proc.info['name'] == 'msedge.exe':
            cmdline = proc.info['cmdline']
            for arg in cmdline:
                if arg.startswith('https://'):
                    return arg

def capture_url():
    # Get the active Edge window
    try:
        edge_window = gw.getActiveWindow()

        if "Microsoft Edge" not in edge_window.title:
            return None
        # Get the URL from the clipboard
        url = pyperclip.paste()

        if not url.startswith("http"):
            # If the clipboard doesn't contain a URL, try to get the URL from the Edge window title
            url = edge_window.title.split(" - ")[0]
        if not url.startswith("https://"):
            url = get_edge_url_with_psutil()
            
        return url
    except Exception as e:
        print(f"An error occurred: {e}")
        return None
    
def save_to_file(url, content):
    """Save the URL and content to a new text file with a timestamp on the desktop."""
    if url:
        timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        desktop = os.path.join(os.path.join(os.environ['USERPROFILE']), 'Desktop')
        with open(os.path.join(desktop, f"Scraped_URL_{timestamp}.txt"), "w") as f:
            f.write(f"{url}\n{content}")


url = capture_url()
if url:
    content_extractor = WebContentExtractor()
    content = content_extractor.extract_content(url)
    save_to_file(url, content)
    print(f"Captured URL: {url}")
    print(f"Captured content: {content}")
input("Press Enter to exit...")


In [None]:
# tools\scrape_url_in_clipboard.py

import pyperclip
from ..agent_tools.search_manager import search_manager, WebContentExtractor    

def main():
    url = pyperclip.paste()
    content = WebContentExtractor.extract_content(url)
    # Print the URL to verify
    print(f"The content from: {url}\n{content}")
def scrape_url_from_clipboard():
    """
    Scrapes content from URL in clipboard and returns the extracted content.
    Returns:
        tuple: (url, content) where url is the URL that was scraped and content is the extracted text
    """
    url = pyperclip.paste()
    content = WebContentExtractor.extract_content(url)
    return url, content

# Update main to use the new function
def main():
    url, content = scrape_url_from_clipboard()
    # Print the URL to verify  
    print(f"The content from: {url}\n{content}")
    input("Press Enter to continue...")
    if input("Do you want to save the content to a file? (y/n): ") == "y":
        file_name = input("Enter the name of the file: ")
        with open(file_name, "w") as file:
            file.write(content)
        print(f"Content saved to {file_name}")  
        
if __name__ == "__main__":
    main()
            
    #Usage:
    #1. Copy the URL to the clipboard
    #2. Run the script
    #3. The script will print the content of the URL to the console
    
    #Call from another script:
    #url, content = scrape_url_from_clipboard()
    #print(f"The content from: {url}\n{content}")


In [None]:
# tools\scrape_w_playwright_v2.py

import asyncio
import logging
from typing import List, Dict, Optional, NamedTuple
from tenacity import retry, stop_after_attempt, wait_exponential
from playwright.async_api import async_playwright, Error as PlaywrightError
import csv
from dataclasses import dataclass
import yaml
from aiohttp import ClientSession, ClientError
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import random
from functools import wraps
import time
import rate_limiter

retries=3
timeout=30
results_per_page=5  # Add this if not present
pages_to_scrape=5
rate_limit=1.0

from urllib.parse import urlparse





logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class ProxyConfig:
    server: str
    port: int
    type: str
    failures: int = 0
    last_used: float = 0

@dataclass
class ScraperConfig:
    query: str
    retries: int
    timeout: int
    proxy_configs: List[ProxyConfig]
    pages_to_scrape: int
    results_per_page: int
    rate_limit: float

class SearchResult(NamedTuple):
    title: str
    url: str

def singleton(cls):
    instances = {}
    @wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance

def load_config(file_path: str) -> ScraperConfig:
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            config = yaml.safe_load(f)
        proxies = []
        for p in config['proxy_configs']:
            parsed_proxy = urlparse(p)
            if parsed_proxy.scheme and parsed_proxy.hostname and parsed_proxy.port:
                proxies.append(ProxyConfig(server=parsed_proxy.hostname, port=parsed_proxy.port, type=parsed_proxy.scheme))
            else:
                logger.warning(f"Invalid proxy format: {p}. Skipping.")
        return ScraperConfig(
            query=config['query'],
            retries=config['retries'],
            timeout=config['timeout'],
            proxy_configs=proxies,
            pages_to_scrape=config.get('pages_to_scrape', 5),
            results_per_page=config.get('results_per_page', 5),
            rate_limit=config.get('rate_limit', 1.0)
        )
    except FileNotFoundError:
        logger.error(f"Config file not found: {file_path}")
        raise
    except yaml.YAMLError:
        logger.error(f"Invalid YAML in config file: {file_path}")
        raise
    except KeyError as e:
        logger.error(f"Missing required key in config: {str(e)}")
        raise
     
@singleton
class UserAgentRotator:
    def __init__(self):
        self.ua = UserAgent()
        self.user_agents = [self.ua.chrome, self.ua.firefox, self.ua.safari, self.ua.edge]

    def get_random_user_agent(self):
        return random.choice(self.user_agents)

ua_rotator = UserAgentRotator()

class ProxyManager:
    def __init__(self, proxies: List[ProxyConfig]):
        self.proxies = proxies
        self.current_index = 0

    def get_next_proxy(self) -> ProxyConfig:
        proxy = self.proxies[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.proxies)
        return proxy

    def mark_proxy_failure(self, proxy: ProxyConfig):
        proxy.failures += 1
        if proxy.failures > 3:
            self.proxies.remove(proxy)
            logger.warning(f"Removed failing proxy: {proxy.server}:{proxy.port}")

    def mark_proxy_success(self, proxy: ProxyConfig):
        proxy.failures = 0
        proxy.last_used = time.time()

def fetch_with_proxy(ua_rotator: UserAgentRotator, proxy_manager: ProxyManager, rate_limiter: AsyncLimiter, **kwargs) -> Optional[str]:
    url = kwargs['url']
    timeout = kwargs['timeout']

    for _ in range(3):  # Try up to 3 different proxies
        proxy = proxy_manager.get_next_proxy()
        try:
            with rate_limiter:
                with async_playwright() as p:
                    browser = p.chromium.launch(
                        proxy={"server": f"{proxy.type}://{proxy.server}:{proxy.port}"},
                        headless=random.choice([True, False])
                    )
                    with browser.new_context(
                        user_agent=ua_rotator.get_random_user_agent(),
                        viewport={'width': random.randint(1024, 1920), 'height': random.randint(768, 1080)}
                    ) as context:
                        with context.new_page() as page:
                            simulate_human_behavior(page)
                            page.goto(url, timeout=timeout, wait_until='networkidle')
                            simulate_browsing(page)
                            content = page.content()
                            proxy_manager.mark_proxy_success(proxy)
                            return content
        except PlaywrightError as e:
            logger.error(f"Playwright error with proxy {proxy.server}: {str(e)}")
        except TimeoutError as e:
            logger.error(f"Timeout error with proxy {proxy.server}: {str(e)}")
        except Exception as e:
            logger.error(f"Unexpected error with proxy {proxy.server}: {str(e)}")
        
        proxy_manager.mark_proxy_failure(proxy)

    logger.error(f"Failed to fetch {url} after trying multiple proxies")
    return None


def simulate_human_behavior(page) -> None:
    try:
        page.mouse.move(random.randint(0, 1920), random.randint(0, 1080))
        asyncio.sleep(random.uniform(2, 5))
    except Exception as e:
        logger.error(f"Error simulating human behavior: {str(e)}")

def simulate_browsing(page) -> None:
    try:
        asyncio.sleep(random.uniform(3, 7))
        for _ in range(random.randint(2, 5)):
            page.evaluate("window.scrollBy(0, Math.floor(Math.random() * window.innerHeight))")
            asyncio.sleep(random.uniform(1, 3))

            if random.random() < 0.3:
                page.mouse.click(random.randint(0, 1920), random.randint(0, 1080))
    except Exception as e:
        logger.error(f"Error simulating browsing: {str(e)}")

def parse_search_results(html_content: str) -> List[SearchResult]:
    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        for item in soup.select('div.g'):
            title_elem = item.select_one('h3')
            url_elem = item.select_one('div.yuRUbf > a')
            if title_elem and url_elem:
                title = title_elem.get_text()
                url = url_elem.get('href')
                if title and url:
                    results.append(SearchResult(title=title, url=url))
        return results
    except Exception as e:
        logger.error(f"Error parsing search results: {str(e)}")
        return []

def scrape_url(url: str, context) -> str:
    try:
        page = context.new_page()
        page.goto(url, wait_until='networkidle')
        simulate_browsing(page)
        content = page.content()
        page.close()
        return content
    except Exception as e:
        logger.error(f"Error scraping result page {url}: {str(e)}")
        return ""

def scrape_urls(content: str, context, results_per_page: int) -> List[Dict[str, str]]:
    results = parse_search_results(content)
    scraped_results = []
    # Use the results_per_page parameter instead of hardcoded 5
    for result in results[:10]:
        try:
            page_content = scrape_url(result.url, context)
            scraped_text = extract_text_from_html(page_content)
            scraped_results.append({
                'title': result.title,
                'url': result.url,
                'content': scraped_text
            })
        except Exception as e:
            logger.error(f"Error extracting and scraping result {result.url}: {str(e)}")
    return scraped_results


def extract_text_from_html(html_content: str) -> str:
    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        return soup.get_text(separator=' ', strip=True)
    except Exception as e:
        logger.error(f"Error extracting text from HTML: {str(e)}")
        return ""

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry_error_callback=lambda _: None
)
def fetch_with_retry(url: str, proxy_config: ProxyConfig, timeout: float) -> Optional[str]:
    proxy_url = f"{proxy_config.type}://{proxy_config.server}:{proxy_config.port}"
    headers = {
        'User-Agent': ua_rotator.get_random_user_agent(),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'DNT': '1',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'Sec-Fetch-Dest': 'document',
        'Sec-Fetch-Mode': 'navigate',
        'Sec-Fetch-Site': 'none',
        'Sec-Fetch-User': '?1',
    }

    try:
        with ClientSession() as session:
            with session.get(url, proxy=proxy_url, timeout=timeout, headers=headers, allow_redirects=True) as response:
                logger.info(f"Response status: {response.status}")
                logger.info(f"Response headers: {response.headers}")
                asyncio.sleep(random.uniform(1, 3))
                response.raise_for_status()
                return response.text()
    except ClientError as e:
        logger.error(f"Client error during fetch: {str(e)}")
    except asyncio.TimeoutError:
        logger.error(f"Timeout error during fetch for URL: {url}")
    except Exception as e:
        logger.error(f"Unexpected error during fetch: {str(e)}")
    return None


def scrape_pages(config: ScraperConfig, context, proxy_manager: ProxyManager, rate_limiter: AsyncLimiter):
    all_results = []
    for page_num in range(config.pages_to_scrape):
        url = f"https://www.google.com/search?q={config.query}&start={page_num * 10}"
        content = fetch_with_proxy(ua_rotator, proxy_manager, rate_limiter, url=url, timeout=config.timeout)
        if content:
            results = scrape_urls(content, context, config.results_per_page)
            all_results.extend(results)
    return all_results

def save_results_to_csv(results: List[Dict[str, str]], filename: str):
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=['title', 'url', 'content'])
        writer.writeheader()
        writer.writerows(results)
    logger.info(f"Scraping completed. {len(results)} results saved to {filename}")
    
    
def main():
    try:
        config = load_config('scraper_config.yaml')
        proxy_manager = ProxyManager(config.proxy_configs)
        rate_limiter = Rate_limiter(1, config.rate_limit)
        
        with async_playwright() as p:
            browser = p.chromium.launch(headless=True)
            with browser.new_context(user_agent=ua_rotator.get_random_user_agent()) as context:
                all_results = scrape_pages(config, context, proxy_manager, rate_limiter)
        
        save_results_to_csv(all_results, 'search_results_with_content.csv')
    except Exception as e:
        logger.error(f"Error in main function: {str(e)}")

if __name__ == "__main__":
    main()

In [None]:
# tools\search_api.py

import requests
from bs4 import BeautifulSoup, Comment
import time
import re
from urllib.parse import urlparse
from typing import List, Dict, Any, Optional, Union
import logging
from dotenv import load_dotenv
import os
from abc import ABC, abstractmethod
from fake_useragent import UserAgent
import html2text
from duckduckgo_search import DDGS
import random
from selenium import webdriver
from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
from selenium_stealth import stealth
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import gzip
#$end
from newspaper import Article
from functools import lru_cache
from datetime import datetime, timedelta


def fetch_article_text(url):
    article = Article(url)
    article.download()
    article.parse()
    
    title = article.title
    author = ', '.join(article.authors)
    pub_date = article.publish_date
    article_text = article.text
    
    return title, author, pub_date, article_text

# Load environment variables
load_dotenv()

# Google API keys
GOOGLE_CUSTOM_SEARCH_ENGINE_ID = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_ID')
GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY= os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY')
BRAVE_SEARCH_API_KEY = os.getenv('BRAVE_SEARCH_API_KEY')  # Brave Search API key (if available)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)  # Get a logger instance

# --- [Improved] More descriptive error handling ---
def initialize_apis() -> List['SearchAPI']:
    """Initializes the APIs.

    Returns:
        List[SearchAPI]: A list of initialized SearchAPI objects.
    
    Raises:
        ValueError: If a required environment variable for an API is not set. 
    """
    if GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY is None or GOOGLE_CUSTOM_SEARCH_ENGINE_ID is None:
        raise ValueError("GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY and GOOGLE_CUSTOM_SEARCH_ENGINE_ID must be set in .env.")
    apis = [
        SearchAPI(
            "Google",
            GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY,
            "https://www.googleapis.com/customsearch/v1",
            {"cx": GOOGLE_CUSTOM_SEARCH_ENGINE_ID},
            100,
            'items',
            1,
        )
    ]
    if BRAVE_SEARCH_API_KEY:
        apis.append(SearchAPI("Brave", BRAVE_SEARCH_API_KEY, "https://api.search.brave.com/res/v1/web/search",
                            {}, 2000, 'results', 1))

    apis.append(SearchAPI("DuckDuckGo", "", "https://api.duckduckgo.com/",
                        {"format": "json"}, float('inf'), 'RelatedTopics', 0)) 

    return apis
# --- (end) ---


def configure_search_settings() -> Dict[str, Any]:
    """Prompts the user to enable/disable search functionality.

    Returns:
        Dict[str, Any]: A dictionary containing search settings, 
                        including 'search_enabled' (bool) and, if enabled, 
                        'all_search_result_data', 'search_session_counter', 
                        'search_session_id', and 'apis'. 
    """
    while True:
        try:
            user_input = input("Do you want to enable search functionality? (Y/N): ").strip().lower()
            if user_input == 'y':
                return {
                    'search_enabled': True,
                    'all_search_result_data': {},
                    'search_session_counter': 0,
                    'search_session_id': 0,
                    'apis': initialize_apis(),
                }
            elif user_input == 'n':
                return {'search_enabled': False}
            else:
                print("Invalid input. Please enter Y or N.")
        except Exception as e:
            print(f"An error occurred: {e}. Please try again.")


class SearchProvider(ABC):
    """Abstract base class for search providers."""

    @abstractmethod
    def search(self, query: str, num_results: int) -> List['SearchResult']:
        """Perform a search and return a list of SearchResult objects."""
        pass 


class SearchResult:
    """Represents a single search result."""

    def __init__(self, title: str, url: str, snippet: str, content: str = ""):
        self.title = title
        self.url = url
        self.snippet = snippet
        self.content = content


class SearchAPI(SearchProvider):
    """Represents a search API with rate limiting and quota management."""

    def __init__(self, name: str, api_key: str, base_url: str, params: dict, quota: int, results_path: str,
                 rate_limit: int):
        self.name = name
        self.api_key = api_key
        self.base_url = base_url
        self.params = params.copy()  # Create a copy to avoid modifying the original
        if api_key:
            self.params['key'] = api_key
        self.quota = quota
        self.used = 0
        self.results_path = results_path
        self.rate_limit = rate_limit
        self.last_request_time = 0
        self.user_agent_rotator = UserAgent()

    def is_within_quota(self) -> bool:
        """Checks if the API is within its usage quota."""
        return self.used < self.quota

    def respect_rate_limit(self):
        """Pauses execution to respect the API's rate limit."""
        time_since_last_request = time.time() - self.last_request_time
        if time_since_last_request < self.rate_limit:
            time.sleep(self.rate_limit - time_since_last_request)

    def search(self, query: str, num_results: int) -> List[SearchResult]:
        """Performs a search using the API."""
        self.respect_rate_limit()
        logger.info(f"Searching {self.name} for: {query}")
        params = self.params.copy()
        params['q'] = query

        # Google Custom Search has a max of 10 results per request
        params['num'] = min(num_results, 10) if self.name == 'Google' else num_results
        headers = {'User-Agent': self.user_agent_rotator.random}
        try:
            response = requests.get(self.base_url, params=params, headers=headers, timeout=10)
            response.raise_for_status()
            self.used += 1
            self.last_request_time = time.time()
            data = response.json()

            results = []
            for item in data.get(self.results_path, []):
                url = item.get('link') or item.get('url')
                title = item.get('title') or "No title"
                snippet = item.get('snippet') or "No snippet"
                results.append(SearchResult(title, url, snippet))
            return results
        except requests.exceptions.RequestException as e:
            logger.error(f"Error during {self.name} search: {e}")
            return []


class DuckDuckGoSearchProvider(SearchProvider):
    """Provides search functionality using DuckDuckGo."""

    def search(self, query: str, max_results: int) -> List[SearchResult]:
        """Searches DuckDuckGo and returns a list of SearchResult objects."""
        try:
            sanitized_query = self._sanitize_query(query)
            with DDGS() as ddgs:
                results = list(ddgs.text(sanitized_query, region='wt-wt', safesearch='off', timelimit='y'))[
                          :max_results]
            return [SearchResult(r['title'], r['href'], r['body']) for r in results]
        except Exception as e:
            logging.error(f"Error searching DuckDuckGo: {e}")
            return []

    def _sanitize_query(self, query: str) -> str:
        """Sanitizes the search query for DuckDuckGo."""
        query = re.sub(r'[^\w\s]', '', query)
        query = re.sub(r'\s+', ' ', query).strip()
        return query[:5000]


class WebContentExtractor:
    """Extracts web content from a given URL."""
    MAX_RETRIES = 2
    TIMEOUT = 5
    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36',
        'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
        'Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/91.0.4472.80 Mobile/15E148 Safari/604.1',
        'Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36 OPR/78.0.4093.147',
    ]
        # Class variable to hold the WebDriver instance
    _driver = None 

    @classmethod
    def get_driver(cls):
        """Returns the shared WebDriver instance."""
        if cls._driver is None:
            cls._initialize_driver()
        return cls._driver

    @classmethod
    def _initialize_driver(cls):
        """Initializes the Selenium WebDriver with anti-detection measures."""
        edge_options = Options()
        edge_options.add_argument("--headless=new")
        edge_options.add_argument("--disable-gpu")
        edge_options.add_argument("--no-sandbox")

        user_agent = random.choice(cls.USER_AGENTS)
        edge_options.add_argument(f"user-agent={user_agent}")

        cls._driver = webdriver.Edge(service=Service(EdgeChromiumDriverManager().install()), options=edge_options)

        stealth(cls._driver,
                languages=["en-US", "en"],
                vendor="Google Inc.",
                platform="Win32",
                webgl_vendor="Intel Inc.",
                renderer="Angle",
                fix_hairline=True, 
        )

    @classmethod
    def quit_driver(cls):
        """Quits the WebDriver."""
        if cls._driver is not None:
            cls._driver.quit()
            cls._driver = None
            
    @classmethod
    def extract_with_selenium(cls, url: str) -> str:
        """Extracts content using Selenium as a fallback."""
        cls._initialize_driver()  # Ensure the driver is initialized
        try:
            cls._driver.get(url)
            time.sleep(5)  # Wait for the page to load
            html_content = cls._driver.page_source
            soup = BeautifulSoup(html_content, 'html.parser')
            main_content = soup.find(['div', 'main', 'article'],
                                      class_=re.compile(
                    r'\b(content|main-content|post-content|entry-content|article-body|'
                    r'product-description|the-content|post-entry|entry|sqs-block-content|'
                    r'content-wrapper|post-body|rich-text-section|postArticle-content|'
                    r'post-full-content|item-description|message-body|thread-content|'
                    r'story-content|news-article-body)\b',
                    re.IGNORECASE
                )) or soup.body
            main_text = main_content.get_text(separator=' ', strip=True) if main_content else ''
            return re.sub(r'\s+', ' ', main_text)
        except Exception as e:
            logging.error(f"Selenium extraction failed for {url}: {e}")
            return ""

    @classmethod
    def quit_driver(cls):
        """Quits the WebDriver if it is running."""
        if cls._driver is None:
            return
        cls._driver.quit()
        cls._driver = None  # Reset the driver to None after quitting
        
    @classmethod
    def extract_content(cls, url: str) -> str:
        """Extracts content, handling dynamic content and fallbacks."""
        if not cls.is_valid_url(url):
            logger.error(f"Invalid URL: {url}")
            return ""
        for extractor in [cls._extract_with_requests, cls._extract_with_newspaper, cls.extract_with_selenium]:
            text = extractor(url)
            if len(text.strip()) >= 200:
                return text
        return ""
    
    @classmethod
    def _extract_with_requests(cls, url: str) -> str:
        """Extracts content using requests."""
        for attempt in range(1, cls.MAX_RETRIES + 1):
            try:
                headers = {
                    'User-Agent': random.choice(cls.USER_AGENTS),
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                    'Accept-Language': 'en-US,en;q=0.9',
                    'Accept-Encoding': 'gzip, deflate, br',
                    'Connection': 'keep-alive',
                    'Upgrade-Insecure-Requests': '1',
                    'Cache-Control': 'max-age=0',
                    'DNT': '1',
                }
                response = requests.get(url, headers=headers, timeout=cls.TIMEOUT)
                response.raise_for_status()
                content_type = response.headers.get('Content-Type', '').lower()

                if 'text/html' not in content_type:
                    logger.warning(f"Non-HTML content returned for {url}: {content_type}")
                    return ""
                if response.headers.get('content-encoding') == 'gzip':
                    try:
                        html_content = gzip.decompress(response.content).decode('utf-8', errors='ignore')
                    except (OSError, gzip.BadGzipFile) as e:
                        logger.warning(f"Error decoding gzip content: {e}. Using raw content.")
                        html_content = response.text
                else:
                    html_content = response.text

                soup = BeautifulSoup(html_content, 'html.parser')
                return cls._extract_content_from_soup(soup) 

            except requests.exceptions.RequestException as e:
                if attempt < cls.MAX_RETRIES:
                    logger.warning(f"Error with requests for {url} (attempt {attempt}): {e}. Retrying...")
                    time.sleep(2 ** attempt) 
                else:
                    logger.warning(f"Error with requests for {url} after {cls.MAX_RETRIES} attempts: {e}. Giving up.")
                    return ""  

    @classmethod
    def _extract_with_newspaper(cls, url: str) -> str:
        """Extracts content using newspaper3k."""
        try:
            article = Article(url)
            article.download()
            article.parse()
            return article.text 
        except Exception as e:
            logger.warning(f"Newspaper error for {url}: {e}")
            return ""

    @classmethod
    def extract_with_selenium(cls, url: str) -> str:
        """Extracts content using Selenium (for dynamic content)."""
        driver = cls.get_driver() 
        try:
            driver.get(url)
            # Wait for the body or a specific element
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
            html_content = driver.page_source
            soup = BeautifulSoup(html_content, 'html.parser')
            main_content = soup.find(
                ['div', 'main', 'article'],
                class_=re.compile(
                    r'\b(content|main-content|post-content|entry-content|article-body|'
                    r'product-description|the-content|post-entry|entry|sqs-block-content|'
                    r'content-wrapper|post-body|rich-text-section|postArticle-content|'
                    r'post-full-content|item-description|message-body|thread-content|'
                    r'story-content|news-article-body)\b',
                    re.IGNORECASE
                )
            ) or soup.body
            main_text = main_content.get_text(separator=' ', strip=True) if main_content else ''
            return re.sub(r'\s+', ' ', main_text)
        except Exception as e:
            logging.error(f"Selenium extraction failed for {url}: {e}")
            return ""
        

    @staticmethod
    def _extract_content_from_soup(soup: BeautifulSoup) -> str:
        """Helper method to extract and clean content from BeautifulSoup object."""
        for element in soup(['nav', 'header', 'footer', 'aside', 'script', 'style']):
            element.decompose()

        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()

        content = soup.find('main') or soup.find('article') or soup.find(
            'div', class_=re.compile(r'content|main-content|post-content|body|main-body|body-content|main', re.IGNORECASE))

        if not content:
            content = soup.body

        if content:
            h = html2text.HTML2Text()
            h.ignore_links = True
            h.ignore_images = True
            text = h.handle(str(content))

            text = re.sub(r'\n+', '\n', text)
            text = re.sub(r'\s+', ' ', text)
            text = text.strip()
            return text
        else:
            return ""

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Checks if a URL is valid."""
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except ValueError:
            return False


class SearchManager:
    """Manages searches across multiple APIs and providers with enhanced caching."""

    def __init__(self, apis: Optional[List[SearchAPI]] = None, 
                 web_search_provider: Optional[SearchProvider] = None,
                 max_content_length: int = 10000, 
                 cache_size: int = 100, 
                 cache_ttl: int = 3600):
        """Initialize SearchManager with flexible configuration.
        
        Args:
            apis: Optional list of SearchAPI instances
            web_search_provider: Optional SearchProvider instance
            max_content_length: Maximum length of extracted content
            cache_size: Maximum number of cached results
            cache_ttl: Cache time-to-live in seconds
        """
        self.apis = apis or []
        self.web_search_provider = web_search_provider or DuckDuckGoSearchProvider()
        self.content_extractor = WebContentExtractor()
        self.max_content_length = max_content_length
        self.cache = {}
        self.cache_timestamps = {}
        self.cache_size = cache_size
        self.cache_ttl = cache_ttl
        
    def search(self, query: str, num_results: int = 10) -> List[Dict[str, str]]:
        """
        Performs a cached search using available APIs and the web search provider.
        
        Args:
            query: The search query
            num_results: Maximum number of results to return
            
        Returns:
            List of dictionaries containing search results with metadata
        """
        if not query.strip():
            return []

        cache_key = f"{query}:{num_results}"
        cached_results = self._get_cached_results(cache_key)
        if cached_results:
            return cached_results

        detailed_results = self._try_api_search(query, num_results)
        
        if not detailed_results:
            logger.info(f"Falling back to DuckDuckGo for query: {query}")
            duck_results = self.web_search_provider.search(query, num_results)
            detailed_results = self._process_search_results(duck_results)

        if detailed_results:
            self._cache_results(cache_key, detailed_results)
            
        return detailed_results

    def _get_cached_results(self, cache_key: str) -> Optional[List[Dict[str, str]]]:
        """Get results from cache if valid."""
        if cache_key in self.cache:
            timestamp = self.cache_timestamps.get(cache_key)
            if timestamp and (datetime.now() - timestamp) < timedelta(seconds=self.cache_ttl):
                logger.info(f"Returning cached results for key: {cache_key}")
                return self.cache[cache_key]
        return None

    def _try_api_search(self, query: str, num_results: int) -> Optional[List[Dict[str, str]]]:
        """Try searching using available APIs in order of preference."""
        api_order = ["Google", "Brave", "DuckDuckGo"]
        
        for api_name in api_order:
            api = next((api for api in self.apis if api.name == api_name), None)
            if api and api.is_within_quota():
                try:
                    logger.info(f"Trying {api_name} for query: {query}")
                    search_results = api.search(query, num_results)
                    if search_results:
                        return self._process_search_results(search_results)
                except Exception as e:
                    logger.error(f"Error searching {api_name}: {e}")
                    continue
        return None

    def _process_search_results(self, search_results: List[SearchResult]) -> List[Dict[str, str]]:
        """Process search results and extract content safely."""
        detailed_results = []
        for result in search_results:
            try:
                content = self.content_extractor.extract_content(result.url)
                detailed_results.append({
                    'title': result.title[:500],  # Limit title length
                    'url': result.url[:1000],     # Limit URL length
                    'snippet': result.snippet[:1000],  # Limit snippet length
                    'content': content[:self.max_content_length] if content else ""
                })
            except Exception as e:
                logger.error(f"Error processing result {result.url}: {e}")
                continue
        return detailed_results

    def _cache_results(self, cache_key: str, results: List[Dict]):
        """Cache search results with timestamp."""
        self.cache[cache_key] = results
        self.cache_timestamps[cache_key] = datetime.now()
        
        # Remove oldest entries if cache is full
        while len(self.cache) > self.cache_size:
            oldest_key = min(self.cache_timestamps.items(), key=lambda x: x[1])[0]
            del self.cache[oldest_key]
            del self.cache_timestamps[oldest_key]

    def clear_expired_cache(self):
        """Clear expired cache entries."""
        current_time = datetime.now()
        expired_keys = [
            key for key, timestamp in self.cache_timestamps.items()
            if (current_time - timestamp).total_seconds() > self.cache_ttl
        ]
        for key in expired_keys:
            del self.cache[key]
            del self.cache_timestamps[key]



def initialize_search_manager() -> SearchManager:
    """Initialize SearchManager with default configuration."""
    try:
        apis = []
        if GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY and GOOGLE_CUSTOM_SEARCH_ENGINE_ID:
            apis.append(SearchAPI(
                "Google",
                GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY,
                "https://www.googleapis.com/customsearch/v1",
                {"cx": GOOGLE_CUSTOM_SEARCH_ENGINE_ID},
                100,
                'items',
                1
            ))
        
        if BRAVE_SEARCH_API_KEY:
            apis.append(SearchAPI(
                "Brave",
                BRAVE_SEARCH_API_KEY,
                "https://api.search.brave.com/res/v1/web/search",
                {},
                2000,
                'results',
                1
            ))

        # DuckDuckGo is always available as fallback
        web_search_provider = DuckDuckGoSearchProvider()
        
        return SearchManager(
            apis=apis,
            web_search_provider=web_search_provider,
            max_content_length=10000,
            cache_size=100,
            cache_ttl=3600
        )
    except Exception as e:
        logger.error(f"Error initializing SearchManager: {e}")
        # Return a SearchManager with just DuckDuckGo as fallback
        return SearchManager(
            apis=[],
            web_search_provider=DuckDuckGoSearchProvider()
        )


# Example tool function (from your description)
def foia_search(query: str) -> List[str]:
    """Searches FOIA.gov for the given query and returns a list of relevant content.

    Args:
        query (str): The search query.

    Returns:
        List[str]: A list of text content extracted from relevant FOIA.gov search results.
    """
    url = f"https://search.foia.gov/search?utf8=%E2%9C%93&m=true&affiliate=foia.gov&query={query.replace(' ', '+')}"
    web_content_extractor = WebContentExtractor()
    headers = {
        'User-Agent': random.choice(web_content_extractor.USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'DNT': '1',
    }
    try:
        response = requests.get(url, headers=headers, timeout=web_content_extractor.TIMEOUT)
        response.raise_for_status()
        html_content = response.content.decode('utf-8')
        soup = BeautifulSoup(html_content, 'html.parser')

        # Extract links to actual FOIA results (not navigation links)
        result_links = [a['href'] for a in soup.select('.result-title a') if a.has_attr('href')]

        content = []
        for link in result_links:
            try:
                if extracted_content := WebContentExtractor.extract_content(
                    link
                ):
                    content.append(extracted_content)
            except Exception as e:
                logger.error(f"Error extracting content from {link}: {e}")

        return content
    except requests.exceptions.RequestException as e:
        logger.error(f"Error searching FOIA.gov: {e}")
        return []

num_results = 10

# Example usage
if __name__ == "__main__":
    search_manager = initialize_search_manager()
    query = "test"
    num_results = 15

    if search_manager:
        results = search_manager.search(query, num_results)
        for result in results:
            print(f"Title: {result['title']}")
            print(f"URL: {result['url']}")
            print(f"Snippet: {result['snippet']}")
            print(f"Content: {result['content'][:15000]}...")  
            print("---")
    else:
        print("Search functionality is disabled.")


In [None]:
# tools\search_manager.py

from certifi import contents
import requests
from bs4 import BeautifulSoup, Comment
import time
import re
from urllib.parse import urlparse
from typing import List, Dict, Any, Optional
import logging
from dotenv import load_dotenv
import os
from abc import ABC, abstractmethod
from fake_useragent import UserAgent
import html2text
from duckduckgo_search import DDGS
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import gzip
from utils import log_and_handle_error
from newspaper import Article
from functools import lru_cache
from datetime import datetime, timedelta
from selenium_stealth import stealth
import asyncio
from .web_search import SearchAPI, SearchResult, DuckDuckGoSearchProvider, initialize_apis
from .content_extractor import WebContentExtractor


def fetch_article_text(url):
    article = Article(url)
    article.download()
    article.parse()
    
    title = article.title
    author = ', '.join(article.authors)
    pub_date = article.publish_date
    article_text = article.text
    
    return title, author, pub_date, article_text

# Load environment variables
load_dotenv()

# Google API keys
GOOGLE_CUSTOM_SEARCH_ENGINE_ID = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_ID')
GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY= os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY')
BRAVE_SEARCH_API_KEY = os.getenv('BRAVE_SEARCH_API_KEY')  # Brave Search API key (if available)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)  # Get a logger instance

class SearchManager:
    """Manages searches across multiple APIs and providers with enhanced caching."""
    
    def __init__(self, apis: Optional[List[SearchAPI]] = None,
                 web_search_provider: Optional[DuckDuckGoSearchProvider] = None,
                 max_content_length: int = 10000,
                 cache_size: int = 100,
                 cache_ttl: int = 3600):
        """Initialize SearchManager with flexible configuration."""
        self.apis = apis or []
        self.web_search_provider = web_search_provider or DuckDuckGoSearchProvider()
        self.content_extractor = WebContentExtractor()
        self.max_content_length = max_content_length
        self.cache = {}
        self.cache_timestamps = {}
        self.cache_size = cache_size
        self.cache_ttl = cache_ttl
    
    def _get_cached_results(self, cache_key: str) -> Optional[List[Dict]]:
        """Get results from cache if valid."""
        if cache_key in self.cache:
            timestamp = self.cache_timestamps.get(cache_key, 0)
            if time.time() - timestamp <= self.cache_ttl:
                return self.cache[cache_key]
        return None
    
    async def _try_api_search(self, query: str, num_results: int) -> List[SearchResult]:
        """Try searching using available APIs in order of preference."""
        for api in self.apis:
            if await api.is_within_quota():
                results = await api.search(query, num_results)
                if results:
                    return results
        
        # Fallback to DuckDuckGo if all APIs fail or are over quota
        return await self.web_search_provider.search(query, num_results)
    
    def _process_search_results(self, search_results: List[SearchResult]) -> List[Dict]:
        """Process search results and extract content safely."""
        processed_results = []
        for result in search_results:
            try:
                content = self.content_extractor.extract_content(result.url)
                if content:
                    # Truncate content if it's too long
                    content = content[:self.max_content_length]
                    result.content = content
                processed_results.append(result.to_dict())
            except Exception as e:
                logger.error(f"Error processing result {result.url}: {e}")
        return processed_results
    
    def _cache_results(self, cache_key: str, results: List[Dict]):
        """Cache search results with timestamp."""
        self.cache[cache_key] = results
        self.cache_timestamps[cache_key] = time.time()
        
        # Remove oldest entries if cache is full
        if len(self.cache) > self.cache_size:
            oldest_key = min(self.cache_timestamps.keys(),
                           key=lambda k: self.cache_timestamps[k])
            del self.cache[oldest_key]
            del self.cache_timestamps[oldest_key]
    
    def clear_expired_cache(self):
        """Clear expired cache entries."""
        current_time = time.time()
        expired_keys = [k for k, v in self.cache_timestamps.items()
                       if current_time - v > self.cache_ttl]
        for key in expired_keys:
            del self.cache[key]
            del self.cache_timestamps[key]
    
    async def search(self, query: str, num_results: int = 10) -> List[Dict]:
        """Performs a cached search using available APIs and the web search provider."""
        cache_key = f"{query}:{num_results}"
        
        # Try to get results from cache
        cached_results = self._get_cached_results(cache_key)
        if cached_results is not None:
            return cached_results
        
        # Perform new search
        search_results = await self._try_api_search(query, num_results)
        processed_results = self._process_search_results(search_results)
        
        # Cache the results
        self._cache_results(cache_key, processed_results)
        
        return processed_results

def configure_search_settings() -> Dict[str, Any]:
    """Prompts the user to enable/disable search functionality."""
    while True:
        try:
            user_input = input("Do you want to enable search functionality? (Y/N): ").strip().lower()
            if user_input == 'y':
                return {
                    'search_enabled': True,
                    'all_search_result_data': {},
                    'search_session_counter': 0,
                    'search_session_id': 0,
                    'apis': initialize_apis(),
                }
            elif user_input == 'n':
                return {'search_enabled': False}
            else:
                print("Invalid input. Please enter Y or N.")
        except Exception as e:
            print(f"An error occurred: {e}. Please try again.")

async def initialize_search_manager() -> SearchManager:
    """Initialize SearchManager with default configuration."""
    try:
        apis = []
        if GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY and GOOGLE_CUSTOM_SEARCH_ENGINE_ID:
            apis.append(SearchAPI(
                "Google",
                GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY,
                "https://www.googleapis.com/customsearch/v1",
                {"cx": GOOGLE_CUSTOM_SEARCH_ENGINE_ID},
                100,
                'items',
                1
            ))
        
        if BRAVE_SEARCH_API_KEY:
            apis.append(SearchAPI(
                "Brave",
                BRAVE_SEARCH_API_KEY,
                "https://api.search.brave.com/res/v1/web/search",
                {},
                2000,
                'results',
                1
            ))

        # DuckDuckGo is always available as fallback
        web_search_provider = DuckDuckGoSearchProvider()
        
        return SearchManager(
            apis=apis,
            web_search_provider=web_search_provider,
            max_content_length=10000,
            cache_size=100,
            cache_ttl=3600
        )
    except Exception as e:
        logger.error(f"Error initializing SearchManager: {e}")
        # Return a SearchManager with just DuckDuckGo as fallback
        return SearchManager(
            apis=[],
            web_search_provider=DuckDuckGoSearchProvider()
        )


# Example tool function (from your description)
async def foia_search(query: str) -> List[str]:
    """Searches FOIA.gov for the given query and returns a list of relevant content.

    Args:
        query (str): The search query.

    Returns:
        List[str]: A list of text content extracted from relevant FOIA.gov search results.
    """
    url = f"https://search.foia.gov/search?utf8=%E2%9C%93&m=true&affiliate=foia.gov&query={query.replace(' ', '+')}"
    web_content_extractor = WebContentExtractor()
    headers = {
        'User-Agent': random.choice(web_content_extractor.USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
        'DNT': '1',
    }
    try:
        response = requests.get(url, headers=headers, timeout=web_content_extractor.TIMEOUT)
        response.raise_for_status()
        html_content = response.content.decode('utf-8')
        soup = BeautifulSoup(html_content, 'html.parser')

        # Extract links to actual FOIA results (not navigation links)
        result_links = [a['href'] for a in soup.select('.result-title a') if a.has_attr('href')]

        content = []
        for link in result_links:
            try:
                if extracted_content := await WebContentExtractor.extract_content(
                    link
                ):
                    content.append(extracted_content)
            except Exception as e:
                logger.error(f"Error extracting content from {link}: {e}")

        return content
    except requests.exceptions.RequestException as e:
        logger.error(f"Error searching FOIA.gov: {e}")
        return []

num_results = 10

# Example usage
async def main():
    search_manager = await initialize_search_manager()
    query = "test"
    num_results = 15

    if search_manager:
        results = await search_manager.search(query, num_results)
        for result in results:
            print(f"Title: {result['title']}")
            print(f"URL: {result['url']}")
            print(f"Snippet: {result['snippet']}")
            print(f"Content: {result['content'][:15000]}...")  
            print("---")
    else:
        print("Search functionality is disabled.")

asyncio.run(main())


In [None]:
# tools\search_tool.py

"""Web search tool implementation."""
from typing import Dict, Any
from .base_tool import BaseTool, ToolResult
from ..config import TIMEOUT

class SearchTool(BaseTool):
    """Tool for performing web searches."""
    
    def __init__(self, search_manager):
        super().__init__(
            name="web_search",
            description="Performs web searches and returns relevant results"
        )
        self.search_manager = search_manager

    @property
    def parameters(self) -> Dict[str, Dict[str, Any]]:
        return {
            "query": {
                "type": "string",
                "description": "The search query",
                "required": True
            },
            "num_results": {
                "type": "integer",
                "description": "Maximum number of results to return",
                "default": 10,
                "minimum": 1
            },
            "timeout_seconds": {
                "type": "integer",
                "description": "Maximum time to wait for results",
                "default": TIMEOUT
            }
        }

    def execute(self, **kwargs) -> ToolResult:
        try:
            query = kwargs.get("query")
            if not query:
                return ToolResult(
                    success=False,
                    result=None,
                    error="Query parameter is required"
                )

            num_results = kwargs.get("num_results", 10)
            timeout_seconds = kwargs.get("timeout_seconds", TIMEOUT)

            results = self.search_manager.search(
                query,
                num_results=num_results,
                timeout=timeout_seconds
            )

            return ToolResult(
                success=True,
                result=results
            )

        except Exception as e:
            return ToolResult(
                success=False,
                result=None,
                error=str(e)
            )


In [None]:
# tools\search_tools.py

"""Tools for specialized search functionality."""
from typing import Dict, Any, List, Optional
from .base_tool import BaseTool, ToolResult
from .specialized_search import FOIASearchProvider, ArXivSearchProvider
import asyncio

class FOIASearchTool(BaseTool):
    """Tool for searching FOIA.gov records."""
    
    def __init__(self):
        super().__init__(
            name="foia_search",
            description="Search FOIA.gov for government records",
            use_case="Use this tool to search for Freedom of Information Act (FOIA) records and documents.",
            operation="Searches FOIA.gov's database and returns relevant documents with their content."
        )
        self.provider = FOIASearchProvider()
    
    @property
    def parameters(self) -> Dict[str, Dict[str, Any]]:
        return {
            "query": {
                "type": str,
                "description": "The search query",
                "required": True
            },
            "max_results": {
                "type": int,
                "description": "Maximum number of results to return",
                "required": False,
                "default": 10
            }
        }
    
    def execute(self, **kwargs) -> ToolResult:
        try:
            query = kwargs["query"]
            max_results = kwargs.get("max_results", 10)
            
            # Run the async search
            results = asyncio.run(self.provider.search(query, max_results))
            
            # Format results for output
            formatted_results = []
            for result in results:
                formatted_results.append({
                    "title": result.title,
                    "url": result.url,
                    "snippet": result.snippet,
                    "content": result.content
                })
            
            return ToolResult(
                success=True,
                result=formatted_results
            )
        except Exception as e:
            return ToolResult(
                success=False,
                result=None,
                error=f"FOIA search failed: {str(e)}"
            )

class ArXivSearchTool(BaseTool):
    """Tool for searching arXiv papers."""
    
    def __init__(self):
        super().__init__(
            name="arxiv_search",
            description="Search arXiv for scientific papers",
            use_case="Use this tool to search for scientific papers on arXiv.",
            operation="Searches arXiv's database and returns papers with their abstracts and metadata."
        )
        self.provider = ArXivSearchProvider()
    
    @property
    def parameters(self) -> Dict[str, Dict[str, Any]]:
        return {
            "query": {
                "type": str,
                "description": "The search query",
                "required": True
            },
            "max_results": {
                "type": int,
                "description": "Maximum number of results to return",
                "required": False,
                "default": 10
            }
        }
    
    def execute(self, **kwargs) -> ToolResult:
        try:
            query = kwargs["query"]
            max_results = kwargs.get("max_results", 10)
            
            # Run the async search
            results = asyncio.run(self.provider.search(query, max_results))
            
            # Format results for output
            formatted_results = []
            for result in results:
                formatted_results.append({
                    "title": result.title,
                    "url": result.url,
                    "snippet": result.snippet,
                    "content": result.content
                })
            
            return ToolResult(
                success=True,
                result=formatted_results
            )
        except Exception as e:
            return ToolResult(
                success=False,
                result=None,
                error=f"arXiv search failed: {str(e)}"
            )

class ArXivLatestTool(BaseTool):
    """Tool for fetching latest arXiv papers."""
    
    def __init__(self):
        super().__init__(
            name="arxiv_latest",
            description="Get latest papers from arXiv",
            use_case="Use this tool to get the most recent papers from arXiv, optionally filtered by category.",
            operation="Fetches the latest papers from arXiv within a specified timeframe and category."
        )
        self.provider = ArXivSearchProvider()
    
    @property
    def parameters(self) -> Dict[str, Dict[str, Any]]:
        return {
            "category": {
                "type": str,
                "description": "arXiv category (e.g., 'cs.AI', 'physics')",
                "required": False,
                "default": None
            },
            "max_results": {
                "type": int,
                "description": "Maximum number of results to return",
                "required": False,
                "default": 10
            },
            "days": {
                "type": int,
                "description": "Number of past days to search",
                "required": False,
                "default": 7
            }
        }
    
    def execute(self, **kwargs) -> ToolResult:
        try:
            category = kwargs.get("category")
            max_results = kwargs.get("max_results", 10)
            days = kwargs.get("days", 7)
            
            # Run the async search
            results = asyncio.run(self.provider.get_latest_papers(
                category=category,
                max_results=max_results,
                days=days
            ))
            
            # Format results for output
            formatted_results = []
            for result in results:
                formatted_results.append({
                    "title": result.title,
                    "url": result.url,
                    "snippet": result.snippet,
                    "content": result.content
                })
            
            return ToolResult(
                success=True,
                result=formatted_results
            )
        except Exception as e:
            return ToolResult(
                success=False,
                result=None,
                error=f"arXiv latest papers fetch failed: {str(e)}"
            )


In [None]:
# tools\specialized_search.py

import requests
from bs4 import BeautifulSoup
import logging
from typing import List, Dict, Optional
import arxiv
from datetime import datetime, timedelta
from .content_extractor import WebContentExtractor
from .web_search import SearchResult

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class FOIASearchProvider:
    """Provider for searching FOIA.gov records."""
    
    BASE_URL = "https://www.foia.gov/api/search"
    
    def __init__(self):
        self.content_extractor = WebContentExtractor()
    
    async def search(self, query: str, max_results: int = 10) -> List[SearchResult]:
        """Search FOIA.gov for records matching the query.
        
        Args:
            query (str): The search query
            max_results (int): Maximum number of results to return
            
        Returns:
            List[SearchResult]: List of search results
        """
        try:
            # FOIA.gov API parameters
            params = {
                "q": query,
                "size": max_results,
                "from": 0,
                "sort": "relevance",
            }
            
            response = requests.get(self.BASE_URL, params=params)
            response.raise_for_status()
            data = response.json()
            
            results = []
            for item in data.get("results", [])[:max_results]:
                title = item.get("title", "No Title")
                url = item.get("url") or f"https://www.foia.gov/request/{item.get('id')}"
                snippet = item.get("description", "No description available")
                
                # Try to extract content if URL is available
                content = ""
                if "url" in item:
                    content = self.content_extractor.extract_content(item["url"]) or ""
                
                results.append(SearchResult(
                    title=title,
                    url=url,
                    snippet=snippet,
                    content=content
                ))
            
            return results
            
        except Exception as e:
            logger.error(f"Error searching FOIA.gov: {e}")
            return []

class ArXivSearchProvider:
    """Provider for searching arXiv papers."""
    
    def __init__(self):
        self.client = arxiv.Client()
    
    async def get_latest_papers(self, category: str = None, max_results: int = 10,
                              days: int = 7) -> List[SearchResult]:
        """Get the latest arXiv papers, optionally filtered by category.
        
        Args:
            category (str, optional): arXiv category (e.g., 'cs.AI', 'physics')
            max_results (int): Maximum number of results to return
            days (int): Number of past days to search
            
        Returns:
            List[SearchResult]: List of search results
        """
        try:
            # Calculate date range
            end_date = datetime.now()
            start_date = end_date - timedelta(days=days)
            
            # Build search query
            search_query = f"submittedDate:[{start_date.strftime('%Y%m%d')}* TO {end_date.strftime('%Y%m%d')}*]"
            if category:
                search_query += f" AND cat:{category}"
            
            # Create search
            search = arxiv.Search(
                query=search_query,
                max_results=max_results,
                sort_by=arxiv.SortCriterion.SubmittedDate,
                sort_order=arxiv.SortOrder.Descending
            )
            
            results = []
            for paper in self.client.results(search):
                # Extract authors
                authors = ", ".join([author.name for author in paper.authors])
                
                # Create content combining abstract and metadata
                content = f"Title: {paper.title}\n\n"
                content += f"Authors: {authors}\n\n"
                content += f"Published: {paper.published}\n"
                content += f"Updated: {paper.updated}\n"
                content += f"DOI: {paper.doi}\n" if paper.doi else ""
                content += f"Primary Category: {paper.primary_category}\n"
                content += f"Categories: {', '.join(paper.categories)}\n\n"
                content += f"Abstract:\n{paper.summary}\n\n"
                content += f"PDF URL: {paper.pdf_url}\n"
                
                results.append(SearchResult(
                    title=paper.title,
                    url=paper.entry_id,
                    snippet=paper.summary[:200] + "...",
                    content=content
                ))
            
            return results
            
        except Exception as e:
            logger.error(f"Error fetching arXiv papers: {e}")
            return []
    
    async def search(self, query: str, max_results: int = 10) -> List[SearchResult]:
        """Search arXiv papers by query.
        
        Args:
            query (str): The search query
            max_results (int): Maximum number of results to return
            
        Returns:
            List[SearchResult]: List of search results
        """
        try:
            search = arxiv.Search(
                query=query,
                max_results=max_results,
                sort_by=arxiv.SortCriterion.Relevance
            )
            
            results = []
            for paper in self.client.results(search):
                # Extract authors
                authors = ", ".join([author.name for author in paper.authors])
                
                # Create content combining abstract and metadata
                content = f"Title: {paper.title}\n\n"
                content += f"Authors: {authors}\n\n"
                content += f"Published: {paper.published}\n"
                content += f"Updated: {paper.updated}\n"
                content += f"DOI: {paper.doi}\n" if paper.doi else ""
                content += f"Primary Category: {paper.primary_category}\n"
                content += f"Categories: {', '.join(paper.categories)}\n\n"
                content += f"Abstract:\n{paper.summary}\n\n"
                content += f"PDF URL: {paper.pdf_url}\n"
                
                results.append(SearchResult(
                    title=paper.title,
                    url=paper.entry_id,
                    snippet=paper.summary[:200] + "...",
                    content=content
                ))
            
            return results
            
        except Exception as e:
            logger.error(f"Error searching arXiv: {e}")
            return []


In [None]:
# tools\tool_manager (2).py

"""Tool management functionality."""
import logging
from typing import Callable, Dict, Any, List
from .search_manager import SearchManager

logger = logging.getLogger(__name__)

class ToolManager:
    def __init__(self, search_manager: SearchManager = None):
        self.search_manager = search_manager
        self.tools = {}

    def register_tool(self, name: str, tool_func: Callable):
        self.tools[name] = tool_func

    def execute_tool(self, name: str, *args, **kwargs):
        if name not in self.tools:
            raise ValueError(f"Tool '{name}' not found.")
        return self.tools[name](*args, **kwargs)

    def list_tools(self) -> List[str]:
        return list(self.tools.keys())

In [None]:
# tools\tool_manager.py

"""
Tool manager for creating and managing different tools.
"""
from typing import Dict, Any, List, Optional, Type, Union
from .base.tool import BaseTool
from .core.search_tool import SearchTool
from .core.code_analysis_tool import CodeAnalysisTool
from langchain.tools import BaseTool as LangChainTool
from .llm_tools import get_llm_tools

class ToolManager:
    """Manages the creation and lifecycle of tools."""
    
    def __init__(self):
        self.tools: Dict[str, BaseTool] = {}
        self._tool_classes: Dict[str, Type[BaseTool]] = {}
        self._llm_tools: List[LangChainTool] = []
        self.register_tools()
        
    def register_tools(self):
        """Register all available tools."""
        from .search_tools import FOIASearchTool, ArXivSearchTool, ArXivLatestTool
        from .core.search_tool import SearchTool
        from .core.code_analysis_tool import CodeAnalysisTool
        
        # Register traditional tools
        self._tool_classes.update({
            "web_search": SearchTool,
            "code_analysis": CodeAnalysisTool,
            "foia_search": FOIASearchTool,
            "arxiv_search": ArXivSearchTool,
            "arxiv_latest": ArXivLatestTool
        })
        
        # Register LLM tools
        self._llm_tools.extend(get_llm_tools())
    
    def get_llm_tools(self) -> List[LangChainTool]:
        """Get all registered LLM tools."""
        return self._llm_tools
    
    def get_traditional_tools(self) -> Dict[str, Any]:
        """Get all registered traditional tools."""
        return self._tool_classes
    
    def get_tool(self, tool_type: str) -> Optional[Union[Any, LangChainTool]]:
        """Get a tool by type."""
        # First check traditional tools
        if tool_type in self._tool_classes:
            return self._tool_classes[tool_type]()
        
        # Then check LLM tools
        for tool in self._llm_tools:
            if tool.__name__ == tool_type:
                return tool
        
        return None
        
    def create_tool(
        self,
        tool_type: str,
        tool_config: Optional[Dict[str, Any]] = None
    ) -> BaseTool:
        """
        Create a new tool instance.
        
        Args:
            tool_type: Type of tool to create
            tool_config: Optional tool configuration
            
        Returns:
            Created tool instance
        """
        if tool_type not in self._tool_classes:
            raise ValueError(f"Unknown tool type: {tool_type}")
            
        tool_class = self._tool_classes[tool_type]
        tool = tool_class(tool_config)
        
        # Store tool instance for reuse
        self.tools[tool_type] = tool
        return tool
        
    def list_tools(self) -> Dict[str, str]:
        """List all available tool types."""
        return {name: tool.description for name, tool in self.tools.items()}
        
    def get_tools_for_agent(self, agent_type: str) -> List[str]:
        """Get list of tools available to a specific agent type."""
        # This could be expanded to include agent-specific tool restrictions
        return list(self._tool_classes.keys())
        
    def execute_tool(self, tool_type: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute a tool with the given parameters.
        
        Args:
            tool_type: Type of tool to execute
            params: Tool parameters
            
        Returns:
            Tool execution results
        """
        tool = self.get_tool(tool_type)
        if not tool:
            if tool_type in self._tool_classes:
                tool = self.create_tool(tool_type)
            else:
                raise ValueError(f"Unknown tool type: {tool_type}")
            
        return tool.execute(params)


In [None]:
# tools\tools.py

"""Module for defining and managing external tools for the AI assistant."""
from typing import List, Annotated, Optional
from search_manager import SearchManager
from agent_tools.fetch_latest_arxiv_papers import fetch_latest_arxiv_results
from langchain.tools import tool
from langchain_experimental.utilities import PythonREPL
import signal
from contextlib import contextmanager
import time

class TimeoutError(Exception):
    pass

@contextmanager
def timeout(seconds: int):
    """Context manager for timing out operations after specified seconds."""
    def signal_handler(signum, frame):
        raise TimeoutError("Operation timed out")
    
    # Register a function to raise a TimeoutError on the signal
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    
    try:
        yield
    finally:
        # Disable the alarm
        signal.alarm(0)

repl = PythonREPL()

@tool
def web_search(
    search_manager: SearchManager, 
    query: str, 
    num_results: int = 10,
    timeout_seconds: int = 30
) -> str:
    """Performs a web search using the provided SearchManager.
            
    Args:
        search_manager: The SearchManager instance to use.
        query (str): The search query.
        num_results (int, optional): The maximum number of results to return. Defaults to 10.
        timeout_seconds (int, optional): Maximum time to wait for search results. Defaults to 30.

    Returns:
        str: A formatted string containing the search results.

    Raises:
        ValueError: If num_results is less than 1 or query is empty.
        TimeoutError: If the search operation times out.
    """
    if not query.strip():
        raise ValueError("Search query cannot be empty")
    
    if num_results < 1:
        raise ValueError("num_results must be at least 1")

    try:
        with timeout(timeout_seconds):
            results = search_manager.search(query, num_results)
            if not results:
                return "No results found for the given query."
            
            return "\n\n".join(
                [
                    f"**{result['title']}** ({result['url']})\n{result['snippet']}\n{result['content'][:50000]}"
                    for result in results
                ]
            )
    except TimeoutError:
        return "Search operation timed out. Please try again or refine your query."
    except Exception as e:
        return f"Search failed: {str(e)}"

@tool
def fetch_recent_arxiv_papers_by_topic(
    topic: str,
    timeout_seconds: int = 30
) -> List[str]:
    """Fetches recent arXiv papers based on a given topic.
    
    Args:
        topic (str): The topic to search for papers.
        timeout_seconds (int, optional): Maximum time to wait for results. Defaults to 30.
        
    Returns:
        List[str]: List of paper information.
        
    Raises:
        ValueError: If topic is empty.
        TimeoutError: If the operation times out.
    """
    if not topic.strip():
        raise ValueError("Topic cannot be empty")

    try:
        with timeout(timeout_seconds):
            return fetch_latest_arxiv_results(topic)
    except TimeoutError:
        return ["Operation timed out while fetching arXiv papers. Please try again."]
    except Exception as e:
        return [f"Failed to fetch arXiv papers: {str(e)}"]

@tool
def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."],
    timeout_seconds: int = 10,
    max_output_length: int = 10000
) -> str:
    """Executes Python code and returns the output.
    
    Args:
        code (str): The Python code to execute.
        timeout_seconds (int, optional): Maximum execution time in seconds. Defaults to 10.
        max_output_length (int, optional): Maximum length of output to return. Defaults to 10000.
        
    Returns:
        str: The execution result or error message.
        
    Raises:
        TimeoutError: If code execution exceeds timeout_seconds.
    """
    if not code.strip():
        return "No code provided to execute."

    try:
        with timeout(timeout_seconds):
            start_time = time.time()
            result = repl.run(code)
            execution_time = time.time() - start_time
            
            if len(str(result)) > max_output_length:
                result = str(result)[:max_output_length] + "... (output truncated)"
            
            return (
                f"Successfully executed in {execution_time:.2f}s:\n"
                f"```python\n{code}\n```\n"
                f"Stdout: {result}"
            )
    except TimeoutError:
        return f"Code execution timed out after {timeout_seconds} seconds"
    except Exception as e:
        return f"Failed to execute. Error: {repr(e)}"




In [None]:
# tools\web_search.py

from typing import List, Dict, Any, Optional, Union
import requests
import time
import os
import logging
from abc import ABC, abstractmethod
from fake_useragent import UserAgent
from duckduckgo_search import DDGS
import asyncio
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Google API keys
GOOGLE_CUSTOM_SEARCH_ENGINE_ID = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_ID')
GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY = os.getenv('GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY')
BRAVE_SEARCH_API_KEY = os.getenv('BRAVE_SEARCH_API_KEY')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SearchProvider(ABC):
    """Abstract base class for search providers."""
    
    @abstractmethod
    async def search(self, query: str, num_results: int) -> List['SearchResult']:
        """Perform a search and return a list of SearchResult objects."""
        pass

class SearchResult:
    """Represents a single search result."""
    
    def __init__(self, title: str, url: str, snippet: str, content: str = ""):
        self.title = title
        self.url = url
        self.snippet = snippet
        self.content = content
    
    @classmethod
    def from_dict(cls, data: Dict[str, str]):
        return cls(data['title'], data['url'], data['snippet'], data['content'])
    
    def to_dict(self):
        return {
            'title': self.title,
            'url': self.url,
            'snippet': self.snippet,
            'content': self.content
        }

class SearchAPI(SearchProvider):
    """Represents a search API with rate limiting and quota management."""
    
    def __init__(self, name: str, api_key: str, base_url: str, params: dict, quota: int, results_path: str,
                 rate_limit: int):
        self.name = name
        self.api_key = api_key
        self.base_url = base_url
        self.params = params.copy()
        if api_key:
            self.params['key'] = api_key
        self.quota = quota
        self.used = 0
        self.results_path = results_path
        self.rate_limit = rate_limit
        self.last_request_time = 0
        self.user_agent_rotator = UserAgent()
    
    async def is_within_quota(self) -> bool:
        """Checks if the API is within its usage quota."""
        return self.used < self.quota
    
    async def respect_rate_limit(self):
        """Pauses execution to respect the API's rate limit."""
        time_since_last_request = time.time() - self.last_request_time
        if time_since_last_request < self.rate_limit:
            await asyncio.sleep(self.rate_limit - time_since_last_request)
    
    async def search(self, query: str, num_results: int) -> List[SearchResult]:
        """Performs a search using the API."""
        await self.respect_rate_limit()
        logger.info(f"Searching {self.name} for: {query}")
        params = self.params.copy()
        params['q'] = query
        params['num'] = min(num_results, 10) if self.name == 'Google' else num_results
        headers = {'User-Agent': self.user_agent_rotator.random}
        
        try:
            response = requests.get(self.base_url, params=params, headers=headers, timeout=10)
            response.raise_for_status()
            self.used += 1
            self.last_request_time = time.time()
            data = response.json()
            
            results = []
            for item in data.get(self.results_path, []):
                url = item.get('link') or item.get('url')
                title = item.get('title') or "No title"
                snippet = item.get('snippet') or "No snippet"
                results.append(SearchResult(title, url, snippet))
            return results
        except requests.exceptions.RequestException as e:
            logger.error(f"Error searching {self.name}: {e}")
            return []

class DuckDuckGoSearchProvider(SearchProvider):
    """Provides search functionality using DuckDuckGo."""
    
    async def search(self, query: str, max_results: int) -> List[SearchResult]:
        """Searches DuckDuckGo and returns a list of SearchResult objects."""
        try:
            with DDGS() as ddgs:
                results = []
                for r in ddgs.text(query, max_results=max_results):
                    result = SearchResult(
                        title=r.get('title', ''),
                        url=r.get('link', ''),
                        snippet=r.get('body', '')
                    )
                    results.append(result)
                return results
        except Exception as e:
            logger.error(f"DuckDuckGo search error: {e}")
            return []
    
    def _sanitize_query(self, query: str) -> str:
        """Sanitizes the search query for DuckDuckGo."""
        return query.strip()

def initialize_apis() -> List[SearchAPI]:
    """Initializes the APIs."""
    if GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY is None or GOOGLE_CUSTOM_SEARCH_ENGINE_ID is None:
        raise ValueError("GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY and GOOGLE_CUSTOM_SEARCH_ENGINE_ID must be set in .env.")
    
    apis = [
        SearchAPI(
            "Google",
            GOOGLE_CUSTOM_SEARCH_ENGINE_API_KEY,
            "https://www.googleapis.com/customsearch/v1",
            {"cx": GOOGLE_CUSTOM_SEARCH_ENGINE_ID},
            100,
            'items',
            1,
        )
    ]
    
    if BRAVE_SEARCH_API_KEY:
        apis.append(SearchAPI("Brave", BRAVE_SEARCH_API_KEY, "https://api.search.brave.com/res/v1/web/search",
                            {}, 2000, 'results', 1))
    
    apis.append(SearchAPI("DuckDuckGo", "", "https://api.duckduckgo.com/",
                         {"format": "json"}, float('inf'), 'RelatedTopics', 0))
    
    return apis
