<a href="https://colab.research.google.com/github/Manish927/Algorithm/blob/patch1/SearchEngine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Outlining how Google search works and a Python implementation.
## Core components:
Web crawling: discover and fetch pages
Indexing: build an inverted index
Ranking: score and order results
Query processing: parse and match queries
Python implementation:

# How Google Search Works
Core components:
Web Crawling: Discovers and downloads web pages
Indexing: Builds an inverted index mapping words to documents
Ranking: Scores pages using algorithms like PageRank and TF-IDF
Query Processing: Parses queries and retrieves relevant results
A Python implementation:

In [1]:
# search_engine.py
import re
import math
from collections import defaultdict, Counter
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
import requests
from bs4 import BeautifulSoup
from typing import List, Dict, Set, Tuple
import json
import time

class WebCrawler:
    """Crawls web pages and extracts content"""

    def __init__(self, max_pages: int = 100, delay: float = 1.0):
        self.max_pages = max_pages
        self.delay = delay
        self.visited = set()
        self.robots_parser = {}
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; SearchBot/1.0)'
        })

    def can_fetch(self, url: str) -> bool:
        """Check if URL can be fetched according to robots.txt"""
        parsed = urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"

        if base_url not in self.robots_parser:
            robots_url = urljoin(base_url, '/robots.txt')
            try:
                rp = RobotFileParser()
                rp.set_url(robots_url)
                rp.read()
                self.robots_parser[base_url] = rp
            except:
                self.robots_parser[base_url] = None

        parser = self.robots_parser[base_url]
        return parser is None or parser.can_fetch('*', url)

    def extract_links(self, html: str, base_url: str) -> List[str]:
        """Extract all links from HTML"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []

        for tag in soup.find_all('a', href=True):
            href = tag['href']
            absolute_url = urljoin(base_url, href)
            parsed = urlparse(absolute_url)

            # Only follow http/https links
            if parsed.scheme in ['http', 'https']:
                links.append(absolute_url)

        return links

    def extract_text(self, html: str) -> str:
        """Extract text content from HTML"""
        soup = BeautifulSoup(html, 'html.parser')

        # Remove script and style elements
        for script in soup(["script", "style"]):
            script.decompose()

        # Get text
        text = soup.get_text()

        # Clean up whitespace
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = ' '.join(chunk for chunk in chunks if chunk)

        return text

    def crawl(self, start_url: str) -> List[Dict]:
        """Crawl web starting from start_url"""
        queue = [start_url]
        pages = []

        while queue and len(pages) < self.max_pages:
            url = queue.pop(0)

            if url in self.visited:
                continue

            if not self.can_fetch(url):
                continue

            try:
                print(f"Crawling: {url}")
                response = self.session.get(url, timeout=10)
                response.raise_for_status()

                if 'text/html' not in response.headers.get('Content-Type', ''):
                    continue

                html = response.text
                text = self.extract_text(html)

                if len(text) < 100:  # Skip pages with too little content
                    continue

                pages.append({
                    'url': url,
                    'title': self.extract_title(html),
                    'text': text,
                    'links': self.extract_links(html, url)
                })

                self.visited.add(url)

                # Add new links to queue
                for link in self.extract_links(html, url):
                    if link not in self.visited and len(queue) < 1000:
                        queue.append(link)

                time.sleep(self.delay)  # Be polite

            except Exception as e:
                print(f"Error crawling {url}: {e}")
                continue

        return pages

    def extract_title(self, html: str) -> str:
        """Extract page title"""
        soup = BeautifulSoup(html, 'html.parser')
        title_tag = soup.find('title')
        return title_tag.get_text() if title_tag else "Untitled"


class Indexer:
    """Builds inverted index from documents"""

    def __init__(self):
        self.inverted_index = defaultdict(dict)  # word -> {doc_id: tf}
        self.documents = {}  # doc_id -> document metadata
        self.doc_id_counter = 0
        self.idf_cache = {}

    def tokenize(self, text: str) -> List[str]:
        """Tokenize text into words"""
        # Convert to lowercase and split on non-word characters
        words = re.findall(r'\b[a-z]+\b', text.lower())
        return words

    def compute_tf(self, words: List[str]) -> Dict[str, float]:
        """Compute Term Frequency"""
        word_count = Counter(words)
        total_words = len(words)

        # Normalized TF
        tf = {word: count / total_words for word, count in word_count.items()}
        return tf

    def add_document(self, doc: Dict):
        """Add document to index"""
        doc_id = self.doc_id_counter
        self.doc_id_counter += 1

        text = doc.get('text', '')
        words = self.tokenize(text)

        if not words:
            return

        # Compute TF for this document
        tf = self.compute_tf(words)

        # Add to inverted index
        for word, tf_value in tf.items():
            self.inverted_index[word][doc_id] = tf_value

        # Store document metadata
        self.documents[doc_id] = {
            'url': doc['url'],
            'title': doc.get('title', 'Untitled'),
            'text': text[:500] + '...' if len(text) > 500 else text,  # Store snippet
            'word_count': len(words)
        }

    def compute_idf(self, word: str) -> float:
        """Compute Inverse Document Frequency"""
        if word in self.idf_cache:
            return self.idf_cache[word]

        doc_frequency = len(self.inverted_index[word])
        total_docs = len(self.documents)

        if doc_frequency == 0:
            idf = 0
        else:
            idf = math.log(total_docs / doc_frequency)

        self.idf_cache[word] = idf
        return idf

    def compute_tfidf(self, word: str, doc_id: int) -> float:
        """Compute TF-IDF score"""
        if doc_id not in self.inverted_index[word]:
            return 0

        tf = self.inverted_index[word][doc_id]
        idf = self.compute_idf(word)

        return tf * idf


class PageRank:
    """PageRank algorithm for ranking pages"""

    def __init__(self, damping_factor: float = 0.85, iterations: int = 10):
        self.damping_factor = damping_factor
        self.iterations = iterations

    def compute(self, documents: List[Dict]) -> Dict[str, float]:
        """Compute PageRank scores"""
        # Build link graph
        url_to_id = {doc['url']: i for i, doc in enumerate(documents)}
        id_to_url = {i: doc['url'] for i, doc in enumerate(documents)}

        # Build adjacency list
        graph = defaultdict(list)
        for i, doc in enumerate(documents):
            for link in doc.get('links', []):
                if link in url_to_id:
                    graph[i].append(url_to_id[link])

        # Initialize PageRank scores
        n = len(documents)
        pr = {i: 1.0 / n for i in range(n)}

        # Iterate PageRank algorithm
        for _ in range(self.iterations):
            new_pr = {}
            for i in range(n):
                # Damping factor contribution
                new_pr[i] = (1 - self.damping_factor) / n

                # Sum contributions from incoming links
                for j in range(n):
                    if i in graph[j]:
                        out_links = len(graph[j])
                        if out_links > 0:
                            new_pr[i] += self.damping_factor * pr[j] / out_links

            pr = new_pr

        # Convert to URL-based scores
        return {id_to_url[i]: score for i, score in pr.items()}


class SearchEngine:
    """Main search engine class"""

    def __init__(self):
        self.indexer = Indexer()
        self.page_rank = PageRank()
        self.page_rank_scores = {}

    def index_documents(self, documents: List[Dict]):
        """Index a collection of documents"""
        print(f"Indexing {len(documents)} documents...")

        for doc in documents:
            self.indexer.add_document(doc)

        # Compute PageRank scores
        print("Computing PageRank scores...")
        self.page_rank_scores = self.page_rank.compute(documents)

        print(f"Indexed {len(self.indexer.documents)} documents")
        print(f"Index contains {len(self.indexer.inverted_index)} unique words")

    def search(self, query: str, top_k: int = 10) -> List[Dict]:
        """Search for documents matching query"""
        query_words = self.indexer.tokenize(query)

        if not query_words:
            return []

        # Score each document
        doc_scores = defaultdict(float)

        for word in query_words:
            if word not in self.indexer.inverted_index:
                continue

            idf = self.indexer.compute_idf(word)

            for doc_id in self.indexer.inverted_index[word]:
                tfidf = self.indexer.compute_tfidf(word, doc_id)
                doc_scores[doc_id] += tfidf

        # Combine TF-IDF with PageRank
        results = []
        for doc_id, tfidf_score in doc_scores.items():
            doc = self.indexer.documents[doc_id]
            url = doc['url']

            # Combine scores (weighted combination)
            pagerank_score = self.page_rank_scores.get(url, 0.0)
            combined_score = 0.7 * tfidf_score + 0.3 * pagerank_score

            results.append({
                'doc_id': doc_id,
                'url': url,
                'title': doc['title'],
                'snippet': doc['text'],
                'score': combined_score,
                'tfidf_score': tfidf_score,
                'pagerank_score': pagerank_score
            })

        # Sort by score and return top K
        results.sort(key=lambda x: x['score'], reverse=True)
        return results[:top_k]

    def save_index(self, filepath: str):
        """Save index to file"""
        data = {
            'inverted_index': {k: dict(v) for k, v in self.indexer.inverted_index.items()},
            'documents': self.indexer.documents,
            'page_rank_scores': self.page_rank_scores
        }

        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)

    def load_index(self, filepath: str):
        """Load index from file"""
        with open(filepath, 'r') as f:
            data = json.load(f)

        self.indexer.inverted_index = defaultdict(dict, {
            k: defaultdict(float, v) for k, v in data['inverted_index'].items()
        })
        self.indexer.documents = data['documents']
        self.page_rank_scores = data['page_rank_scores']


# Example usage
if __name__ == "__main__":
    # Initialize components
    crawler = WebCrawler(max_pages=50, delay=1.0)
    search_engine = SearchEngine()

    # Crawl web pages (example: Wikipedia)
    print("Starting web crawl...")
    documents = crawler.crawl("https://en.wikipedia.org/wiki/Python_(programming_language)")

    # Index documents
    search_engine.index_documents(documents)

    # Perform searches
    print("\n" + "="*50)
    print("Search Results")
    print("="*50)

    queries = ["python programming", "web development", "data structures"]

    for query in queries:
        print(f"\nQuery: '{query}'")
        print("-" * 50)
        results = search_engine.search(query, top_k=5)

        for i, result in enumerate(results, 1):
            print(f"\n{i}. {result['title']}")
            print(f"   URL: {result['url']}")
            print(f"   Score: {result['score']:.4f}")
            print(f"   Snippet: {result['snippet'][:200]}...")

    # Save index for later use
    search_engine.save_index("search_index.json")
    print("\nIndex saved to search_index.json")

Starting web crawl...
Indexing 0 documents...
Computing PageRank scores...
Indexed 0 documents
Index contains 0 unique words

Search Results

Query: 'python programming'
--------------------------------------------------

Query: 'web development'
--------------------------------------------------

Query: 'data structures'
--------------------------------------------------

Index saved to search_index.json


# Key features:
WebCrawler: Respects robots.txt, extracts links and text
Indexer: Builds inverted index with TF-IDF
PageRank: Computes page importance from link graph
SearchEngine: Combines TF-IDF and PageRank for ranking
# How to use:

In [None]:
# requirements.txt
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=4.9.0

In [None]:
# Install dependencies
pip install requests beautifulsoup4 lxml

# Run the search engine
python search_engine.py

In [None]:
# Simplified version for local documents

# simple_search.py
import os
import re
from collections import defaultdict
from typing import List, Dict

class SimpleSearchEngine:
    """Simple search engine for local text files"""

    def __init__(self):
        self.index = defaultdict(set)  # word -> set of file paths
        self.documents = {}  # file_path -> content

    def tokenize(self, text: str) -> List[str]:
        """Extract words from text"""
        return re.findall(r'\b[a-z]+\b', text.lower())

    def index_file(self, filepath: str):
        """Index a text file"""
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()

            self.documents[filepath] = content
            words = self.tokenize(content)

            for word in words:
                self.index[word].add(filepath)

            print(f"Indexed: {filepath} ({len(words)} words)")
        except Exception as e:
            print(f"Error indexing {filepath}: {e}")

    def index_directory(self, directory: str):
        """Index all .txt files in directory"""
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.txt'):
                    filepath = os.path.join(root, file)
                    self.index_file(filepath)

    def search(self, query: str) -> List[str]:
        """Search for files containing query words"""
        query_words = self.tokenize(query)

        if not query_words:
            return []

        # Find files containing all query words (AND search)
        result_files = None
        for word in query_words:
            if word in self.index:
                files = self.index[word]
                if result_files is None:
                    result_files = files.copy()
                else:
                    result_files &= files  # Intersection

        return list(result_files) if result_files else []

# Usage
if __name__ == "__main__":
    engine = SimpleSearchEngine()

    # Index a directory
    engine.index_directory("./documents")

    # Search
    results = engine.search("python programming")
    print(f"Found {len(results)} files:")
    for file in results:
        print(f"  - {file}")

It covers the core concepts. For production, add:
- Distributed crawling
- Database storage
- Caching
- Query expansion
- Spell correction
- Advanced ranking signals
Should I add any specific features or explain any part in more detail?