# Simple AI Agent with Web Search and HuggingFace Integration

This notebook implements a simple AI agent that can:
1. Search the internet for information using free tools
2. Use open-source Hugging Face models for text generation
3. Combine these capabilities to answer questions

## Setup and Dependencies

Let's install the required libraries first.

In [None]:
# Install required packages
# !pip install transformers torch requests beautifulsoup4 duckduckgo-search wikipedia

In [None]:
# Import required libraries
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
import wikipedia
import re
from typing import List, Dict
import time
from transformers import BitsAndBytesConfig


## Web Search Tools

First, let's implement tools to search the web without requiring API keys.

In [None]:
class WebSearchTools:
    """Collection of web search tools that don't require API keys"""

    @staticmethod
    def duckduckgo_search(query: str, num_results: int = 5) -> List[Dict[str, str]]:
        """Search the web using DuckDuckGo"""
        try:
            with DDGS() as ddgs:
                results = []
                for r in ddgs.text(query, max_results=num_results):
                    results.append({
                        'title': r['title'],
                        'link': r['href'],
                        'snippet': r['body']
                    })
                return results
        except Exception as e:
            print(f"DuckDuckGo search error: {e}")
            return []

    @staticmethod
    def fetch_webpage_content(url: str) -> str:
        """Fetch and extract text from a webpage"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')

            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.extract()

            # Get text
            text = soup.get_text(separator=' ', strip=True)

            # Clean text (remove extra whitespace)
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = '\n'.join(chunk for chunk in chunks if chunk)

            return text[:5000]  # Limit to first 5000 chars to avoid huge texts
        except Exception as e:
            print(f"Error fetching webpage: {e}")
            return ""

    @staticmethod
    def wikipedia_search(query: str, sentences: int = 3) -> str:
        """Search Wikipedia for information"""
        try:
            # Search for the page
            search_results = wikipedia.search(query)
            if not search_results:
                return ""

            # Get the page
            try:
                page = wikipedia.page(search_results[0])
            except wikipedia.DisambiguationError as e:
                # If there's a disambiguation page, take the first option
                page = wikipedia.page(e.options[0])

            # Get a summary
            summary = wikipedia.summary(page.title, sentences=sentences)
            return f"Wikipedia ({page.title}): {summary}"
        except Exception as e:
            print(f"Wikipedia search error: {e}")
            return ""

## HuggingFace Model Interface

Now, let's create a class to work with open-source Hugging Face models.

In [None]:
class HuggingFaceModel:
    """Interface for working with HuggingFace models"""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        """Initialize the model and tokenizer
        
        Args:
            model_name: HuggingFace model ID
        """
        print(f"Loading model: {model_name}")

        # Check if CUDA is available
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")

        # For faster loading with limited resources (8-bit quantization)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        

        # Configure quantization if using CUDA
        if self.device == "cuda":
            quantization_config = BitsAndBytesConfig(
                load_in_8bit=True,
                llm_int8_enable_fp32_cpu_offload=True
            )
        else:
            quantization_config = None
            
        # Load the model with the new configuration
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
            device_map="auto",
            quantization_config=quantization_config
        )

        # Create a text generation pipeline
        self.generator = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            device_map="auto"
        )

    def generate_text(self, prompt: str, max_new_tokens: int = 512) -> str:
        """Generate text based on a prompt
        
        Args:
            prompt: Input text to the model
            max_new_tokens: Maximum number of new tokens to generate
            
        Returns:
            Generated text
        """
        try:
            outputs = self.generator(
                prompt,
                do_sample=True,
                temperature=0.7,
                top_p=0.95,
                max_new_tokens=max_new_tokens,
                num_return_sequences=1
            )

            # Extract generated text, excluding the prompt
            generated_text = outputs[0]['generated_text']

            # Remove the original prompt from the output
            if generated_text.startswith(prompt):
                generated_text = generated_text[len(prompt):].strip()

            return generated_text
        except Exception as e:
            print(f"Text generation error: {e}")
            return ""

    def generate_response_with_context(self, question: str, context: str, max_new_tokens: int = 1024) -> str:
        """Generate a response to a question given additional context
        
        Args:
            question: User's question
            context: Additional context information
            max_new_tokens: Maximum number of new tokens to generate
            
        Returns:
            Generated answer
        """
        prompt = f"""<s>[INST] I have the following information: 
    
    {context}
    
    Based on this information, please answer the following question:
    {question} [/INST]
    """
        return self.generate_text(prompt, max_new_tokens=max_new_tokens)

## Simple AI Agent

Now, let's integrate everything to create our simple AI agent.

In [None]:
class SimpleAIAgent:
    """A simple AI agent that can search the web and use Hugging Face models"""

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.3"):
        """Initialize the agent with search tools and a language model"""
        self.search_tools = WebSearchTools()
        self.llm = HuggingFaceModel(model_name)

    def search(self, query: str) -> tuple:
        """Search for information using multiple sources
        
        Args:
            query: Search query
            
        Returns:
            Tuple of (formatted_context, raw_results)
        """
        print(f"Searching for information about: {query}")

        # Try Wikipedia first
        wiki_result = self.search_tools.wikipedia_search(query, sentences=5)

        # Try DuckDuckGo search
        ddg_results = self.search_tools.duckduckgo_search(query, num_results=3)

        # Combine information
        collected_info = []
        raw_results = {
            "wikipedia": wiki_result,
            "duckduckgo": ddg_results,
            "webpage_content": None
        }

        if wiki_result:
            collected_info.append(wiki_result)

        for result in ddg_results:
            collected_info.append(f"Title: {result['title']}")
            collected_info.append(f"Snippet: {result['snippet']}")

            # Optionally fetch full content from the first result
            if result == ddg_results[0]:
                try:
                    content = self.search_tools.fetch_webpage_content(result['link'])
                    if content:
                        # Truncate to avoid very long context
                        collected_info.append(f"Content: {content[:1500]}...")
                        raw_results["webpage_content"] = {
                            "url": result['link'],
                            "content": content
                        }
                except Exception as e:
                    print(f"Error fetching content: {e}")

        return "\n\n".join(collected_info), raw_results

    def answer_question(self, question: str, max_new_tokens: int = 1024, return_search_results: bool = False) -> dict:
        """Answer a question by searching for information and generating a response
        
        Args:
            question: User's question
            max_new_tokens: Maximum number of new tokens to generate
            return_search_results: Whether to return search results along with the answer
            
        Returns:
            If return_search_results is True, returns a dictionary with answer and search results
            Otherwise, returns just the answer string
        """
        # First, generate a good search query from the question
        search_query_prompt = f"""<s>[INST] Convert the following question into a search query with only keywords. 
    Do not use special search operators, just extract 3-5 key terms:
    
    Question: {question}
    
    Search query: [/INST]
    """
        search_query = self.llm.generate_text(search_query_prompt).strip()

        # Remove any extra formatting the model might have added
        search_query = re.sub(r'[\"\\[\\]]', '', search_query).strip()
        print(f"Generated search query: {search_query}")

        # Search for information
        formatted_context, raw_results = self.search(search_query)

        # Generate answer based on the collected information
        answer = self.llm.generate_response_with_context(question, formatted_context, max_new_tokens=max_new_tokens)

        if return_search_results:
            return {
                "answer": answer,
                "search_query": search_query,
                "formatted_context": formatted_context,
                "raw_results": raw_results
            }
        else:
            return answer

    def compare_answers(self, question: str, max_new_tokens: int = 1024) -> dict:
        """
        Compare answers generated with and without web search results
        
        Args:
            question: User's question
            max_new_tokens: Maximum number of new tokens to generate
            
        Returns:
            Dictionary containing both answers and search information
        """
        print(f"Comparing answers for question: {question}")

        # First, get answer with search results
        print("Generating answer WITH search...")
        start_time_with_search = time.time()
        result_with_search = self.answer_question(question,
                                                  max_new_tokens=max_new_tokens,
                                                  return_search_results=True)
        time_with_search = time.time() - start_time_with_search

        # Now, generate an answer without search (direct model response)
        print("Generating answer WITHOUT search...")
        start_time_without_search = time.time()

        # Create a prompt for direct question answering
        direct_prompt = f"""<s>[INST] Please answer the following question based on your knowledge:
        
        {question} [/INST]
        """

        # Get model's direct response without web search
        answer_without_search = self.llm.generate_text(direct_prompt, max_new_tokens=max_new_tokens)
        time_without_search = time.time() - start_time_without_search

        # Prepare the comparison results
        comparison = {
            "question": question,
            "with_search": {
                "answer": result_with_search["answer"],
                "search_query": result_with_search["search_query"],
                "search_results_summary": result_with_search["formatted_context"],
                "time_taken": time_with_search
            },
            "without_search": {
                "answer": answer_without_search,
                "time_taken": time_without_search
            }
        }

        return comparison

    def display_comparison(self, question: str, max_new_tokens: int = 1024):
        """
        Display a side-by-side comparison of answers with and without search
        
        Args:
            question: User's question
            max_new_tokens: Maximum number of new tokens to generate
        """
        comparison = self.compare_answers(question, max_new_tokens)

        print("\n" + "=" * 80)
        print(f"QUESTION: {comparison['question']}")
        print("=" * 80)

        print("\n" + "-" * 35 + " WITH SEARCH " + "-" * 35)
        print(f"Search Query: {comparison['with_search']['search_query']}")
        print(f"Time: {comparison['with_search']['time_taken']:.2f} seconds")
        print("\nANSWER WITH SEARCH:")
        print(comparison['with_search']['answer'])

        print("\n" + "-" * 35 + " WITHOUT SEARCH " + "-" * 35)
        print(f"Time: {comparison['without_search']['time_taken']:.2f} seconds")
        print("\nANSWER WITHOUT SEARCH:")
        print(comparison['without_search']['answer'])

        print("\n" + "-" * 35 + " SEARCH RESULTS " + "-" * 35)
        print(comparison['with_search']['search_results_summary'][:1500] +
              "..." if len(comparison['with_search']['search_results_summary']) > 1500
              else comparison['with_search']['search_results_summary'])

        print("\n" + "=" * 80)

        return comparison

## Create and Test the Agent

Let's create our agent and test it with some questions.

In [None]:
agent = SimpleAIAgent(model_name="mistralai/Mistral-7B-Instruct-v0.3")

In [None]:
question = "Who is the current president of the united states?"
detailed_answer = agent.answer_question(question, max_new_tokens=2048)
print(f"Detailed Answer: {detailed_answer}")


In [None]:
question = "Who is the current president of the united states?"
result = agent.answer_question(question, return_search_results=True, max_new_tokens=2048)

print("\n----- SEARCH QUERY -----")
print(result['search_query'])

print("\n----- SEARCH RESULTS -----")
print(result['formatted_context'])

print("\n----- ANSWER -----")
print(result['answer'])


In [None]:
question = "Who is the current president of the united states?"
comparison = agent.display_comparison(question)