In [None]:
import os
import requests
from bs4 import BeautifulSoup
from typing import Type, List
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools import BaseTool
from pydantic import BaseModel, Field
from langchain.agents import initialize_agent
from langchain.agents.agent_types import AgentType
from langchain_community.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper

api_key=os.environ.get("OPENAI_API_KEY")

llm = ChatOpenAI(temperature=0, model="gpt-4o-mini", api_key=api_key)

class TopicSearchToolArgsSchema(BaseModel):
    query: str = Field(
        description="The query you will search for. Example query: XZ backdoor"
    )

class TopicSearchTool(BaseTool):
    name: str = "TopicSearchTool"
    description: str = """
    Use this tool to return the specified URL list.
    It takes a query as an argument.
    """
    args_schema: Type[
        TopicSearchToolArgsSchema
    ] = TopicSearchToolArgsSchema

    def _run(self, query):
        ddg = DuckDuckGoSearchAPIWrapper()
        results = ddg.results(query, max_results=2)  # Retrieve up to 2 results
        return [result['link'] for result in results]

class WebScraperToolArgsSchema(BaseModel):
    urls: List[str] = Field(description="List of URLs to scrape")

class WebScraperTool(BaseTool):
    name: str = "WebScraperTool"
    description: str = "Scrape content from main tags in provided URLs."
    args_schema: Type[WebScraperToolArgsSchema] = WebScraperToolArgsSchema
    
    def _run(self, urls):
        all_content = []
        for url in urls:
            try:
                response = requests.get(url)
                soup = BeautifulSoup(response.text, 'html.parser')
                main_content = soup.find('main')
                if main_content:
                    all_content.append(main_content.get_text(strip=True))
            except Exception as e:
                print(f"Error scraping {url}: {e}")
        return all_content

class ContentSaverToolArgsSchema(BaseModel):
    content: List[str] = Field(description="Content to save to file")
    filename: str = Field(description="Output filename")

class ContentSaverTool(BaseTool):
    name: str = "ContentSaverTool"
    description: str = "Save content to a text file."
    args_schema: Type[ContentSaverToolArgsSchema] = ContentSaverToolArgsSchema
    
    def _run(self, content, filename):
        with open(filename, "w", encoding="utf-8") as f:
            f.write("\n\n".join(content))
        return f"Content saved to {filename}"

agent = initialize_agent(
    llm=llm,
    verbose=True,
    agent=AgentType.OPENAI_FUNCTIONS,
    handle_parsing_errors=True,
    tools=[
        TopicSearchTool(),
        WebScraperTool(),
        ContentSaverTool()
    ],
    agent_kwargs={
        "system_message": SystemMessage(
            content="""
            You are a researcher. Follow these steps:

            1. Use TopicSearchTool to find relevant URLs
            2. Use WebScraperTool to scrape their content
            3. Use ContentSaverTool to analyze the content and save the research about the query to a txt file
        """
        )
    },
)

query = "Research about the XZ backdoor"
result = agent.invoke(query)
print(result["output"].replace("$", "\$"))