# IMPORT

In [None]:
# Crawler
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import tldextract
import validators

# Cleaner
from bs4 import BeautifulSoup
import re

# embeddings
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import os
import pickle

# llm
from google import genai
from google.genai import types

# constants
from constants import (
    API_KEY,
    INDEX_PATH,
    EMBEDDNG_DIM,
    SYSTEM_PROMPT,
    GEMINI_MODEL,
    EMBEDDING_MODEL,
)


In [53]:
# !pip install tldextract validators beautifulsoup4 requests sentence-transformers faiss-cpu google-genai

# Crawler

In [107]:
class Crawler:
    
    def validate_url(self, url):
        if not validators.url(url):
            return False
        return True
    
    def fetch_content_from_url(self, url):
        is_valid_url = self.validate_url(url)
        if not is_valid_url:
            print(f"Invalid URL: {url}")
            return None
        try:
            resp = requests.get(url, timeout=5, headers={'User-Agent': 'rag-chatbot/5.0'})
            resp.raise_for_status()
            return resp.text
        except requests.RequestException as e:
            print(f"Error fetching {url}: {e}")
            return None

    def is_same_domain(base_url, other_url):
        base = tldextract.extract(base_url)
        other = tldextract.extract(other_url)
        return (base.domain, base.suffix) == (other.domain, other.suffix)

    def extract_links(self, base_url, html_content):
        soup = BeautifulSoup(html_content, 'html.parser')
        links = set()
        for link in soup.find_all('a', href=True):
            absolute_link = urljoin(base_url, link['href'])
            parsed_link = urlparse(absolute_link)
            if parsed_link.scheme in ['http', 'https']:
                links.add(absolute_link)
        return links

    def crawl(self, url, max_pages=10, max_depth=5):
        visited = set()
        to_visit = [(url, 0)]
        all_content = []
        
        while to_visit and len(visited) < max_pages:
            current_url, depth = to_visit.pop(0)
            if current_url in visited or depth > max_depth:
                continue

            print(f"Crawling page {len(visited)+1}: {current_url} at depth {depth}")
            content = self.fetch_content_from_url(url=current_url)
            if content:
                all_content.append((current_url, content))
                visited.add(current_url)

                hyperlinks = self.extract_links(base_url=current_url, html_content=content)
                for link in hyperlinks:
                    if link not in visited:
                        to_visit.append((link, depth + 1))
        
        return all_content

In [108]:
crawler = Crawler()
data = crawler.crawl("https://en.wikipedia.org/wiki/Artificial_intelligence")
data

Crawling page 1: https://en.wikipedia.org/wiki/Artificial_intelligence at depth 0
Crawling page 2: https://en.wikipedia.org/wiki/Computer_architecture at depth 1
Crawling page 3: https://en.wikipedia.org/wiki/Artificial_intelligence#cite_note-79 at depth 1
Crawling page 4: https://en.wikipedia.org/wiki/Artificial_intelligence#CITEREFLenatGuha1989 at depth 1
Crawling page 5: https://en.wikipedia.org/wiki/Artificial_intelligence#cite_note-FOOTNOTEMcCorduck2004454–462-400 at depth 1
Invalid URL: https://en.wikipedia.org/wiki/Artificial_intelligence#cite_note-FOOTNOTEMcCorduck2004454–462-400
Crawling page 5: https://en.wikipedia.org/wiki/Artificial_intelligence#cite_ref-51 at depth 1
Crawling page 6: https://en.wikipedia.org/wiki/Naive_Bayes_classifier at depth 1
Crawling page 7: https://en.wikipedia.org/wiki/Energid_Technologies at depth 1
Crawling page 8: https://en.wikipedia.org/wiki/Frank_Cameron_Jackson at depth 1
Crawling page 9: https://en.wikipedia.org/wiki/Artificial_intelligence#

[('https://en.wikipedia.org/wiki/Artificial_intelligence',
 ('https://en.wikipedia.org/wiki/Computer_architecture',
  '<!DOCTYPE html>\n<html class="client-nojs vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled vector-feature-page-tools-pinned-disabled vector-feature-toc-pinned-clientpref-1 vector-feature-main-menu-pinned-disabled vector-feature-limited-width-clientpref-1 vector-feature-limited-width-content-enabled vector-feature-custom-font-size-clientpref-1 vector-feature-appearance-pinned-clientpref-1 vector-feature-night-mode-enabled skin-theme-clientpref-day vector-sticky-header-enabled vector-toc-available" lang="en" dir="ltr">\n<head>\n<meta charset="UTF-8">\n<title>Computer architecture - Wikipedia</title>\n<script>(function(){var className="client-js vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled vector-feature-page-tools-pinned-disabled vector-feature-toc-pinned-clientpref-1 vector-f

# Cleaner

In [110]:
class Cleaner:
    
    def clean_html(self, html):
        """Remove scripts/styles, nav/header/footer/aside tags and common ad elements.
        Return cleaned plain text.
        """
        soup = BeautifulSoup(html, "html.parser")

        # remove script/style/noscript
        for tag in soup(["script", "style", "noscript", "iframe"]):
            tag.decompose()

        # remove common layout blocks
        for selector in ["header", "footer", "nav", "aside", ".sidebar", ".advert", ".ads", ".cookie", "#header", "#footer"]:
            for t in soup.select(selector):
                t.decompose()

        # remove elements with ad-like ids/classes
        for t in soup.find_all(True, {"class": re.compile(r"(^|\\s)(ad|ads|advert|banner|promo|cookie)(\\s|$)", re.I)}):
            t.decompose()
        for t in soup.find_all(True, {"id": re.compile(r"(^|\\s)(ad|ads|advert|banner|promo|cookie)(\\s|$)", re.I)}):
            t.decompose()

        # focus on article, main, section, div
        texts = []
        candidates = soup.find_all(["article", "main", "section", "div", "p"])
        if not candidates:
            # fallback to body text
            body = soup.body
            if body:
                return ' '.join(body.stripped_strings)
            return ''

        seen = set()
        for c in candidates:
            text = ' '.join(c.stripped_strings)
            # drop very short or obviously non-text
            if len(text) < 50:
                continue
            # deduplicate
            if text in seen:
                continue
            seen.add(text)
            texts.append(text)

        # join preserving paragraph breaks
        return '\n\n'.join(texts)
    

In [111]:
cleaner = Cleaner()
cleaned_data = []
for url, html in data:
    cleaned_text = cleaner.clean_html(html)
    cleaned_data.append((url, cleaned_text))

In [112]:
# cleaned_data

# Embedder

In [113]:
import numpy as np
import faiss
import os
import pickle
from google import genai
from google.genai import types
from contants import API_KEY


class Embedder:
    def __init__(self, model_name):
        self.client = genai.Client(api_key=API_KEY)
        self.model_name = model_name

    def embed_texts(self, texts, batch_size=50):
        all_embeddings = []
        
        # Process the list in chunks of batch_size
        for i in range(0, len(texts), batch_size):
            batch = texts[i : i + batch_size]
            
            config = types.EmbedContentConfig(
                output_dimensionality=768,
                task_type="RETRIEVAL_DOCUMENT" # Recommended for RAG storage
            )
            
            result = self.client.models.embed_content(
                model=self.model_name,
                contents=batch,
                config=config
            )
            
            # Extract numerical values from the response
            for embedding in result.embeddings:
                all_embeddings.append(embedding.values)
                
        return np.array(all_embeddings, dtype=np.float32)


class FaissStore:
    def __init__(self, dim, index_path=None):
        self.dim = dim
        self.index = faiss.IndexFlatIP(dim)
        self.texts = []
        self.index_path = index_path

    def add(self, embeddings, texts):
        # normalize for cosine similarity
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings)
        self.texts.extend(texts)

    def search(self, query_emb, k=5):
        faiss.normalize_L2(query_emb)
        D, I = self.index.search(query_emb, k)
        results = []
        for idx_list, score_list in zip(I, D):
            for idx, score in zip(idx_list, score_list):
                if idx < len(self.texts):
                    results.append((self.texts[idx], float(score)))
        return results

    def save(self, path=None):
        path = path or self.index_path
        if not path:
            raise ValueError("index path required to save")
        faiss.write_index(self.index, path + ".index")
        with open(path + ".texts.pkl", "wb") as f:
            pickle.dump(self.texts, f)

    def load(self, path):
        self.index = faiss.read_index(path + ".index")
        with open(path + ".texts.pkl", "rb") as f:
            self.texts = pickle.load(f)


In [115]:
embedder = Embedder(model_name=EMBEDDING_MODEL)
all_texts = [text for url, text in cleaned_data if text.strip()]
embeddings = embedder.embed_texts(all_texts)

In [116]:
embeddings.shape

(10, 768)

In [118]:
faiss_store = FaissStore(dim=embeddings.shape[1])
faiss_store.add(embeddings, all_texts)
faiss_store.save(INDEX_PATH)

# LLM

In [123]:
class QAEngine:
    def __init__(self):
        self.crawler = Crawler()
        self.cleaner = Cleaner()
        self.embedder = Embedder(model_name=EMBEDDING_MODEL)
        self.faiss_store = FaissStore(EMBEDDNG_DIM)
        self.client = genai.Client(api_key=API_KEY)

    def ingest(self, url, max_pages=10):
        # crawler raw html content
        pages = self.crawler.crawl(url, max_pages=max_pages)
        texts = []
        urls = []
        # clean html content
        for u, html in pages:
            parsed_text = self.cleaner.clean_html(html)
            if parsed_text and len(parsed_text) > 30:
                texts.append(parsed_text)
                urls.append(u)

        if not texts:
            raise ValueError("No usable content extracted from site")

        # chunk by simple paragraph chunks
        chunks = []
        for text in texts:
            paras = text.split("\n")
            for para in paras:
                if len(para) > 50:
                    # further split long paragraphs
                    if len(para) > 2000:
                        for i in range(0, len(para), 1500):
                            chunks.append(para[i : i + 1500])
                    else:
                        chunks.append(para)

        # create embedding
        embeddings = self.embedder.embed_texts(texts=chunks)

        # index and store in index_path
        self.faiss_store.add(embeddings=embeddings, texts=chunks)
        self.faiss_store.save(INDEX_PATH)
        return len(chunks)

    def fetch_context(self, question, k=5):
        self.faiss_store.load(INDEX_PATH)
        if not self.faiss_store.texts:
            raise ValueError("No index loaded. Run ingest first.")
        q_emb = self.embedder.embed_texts(texts=[question])
        results = self.faiss_store.search(query_emb=q_emb, k=k)
        # assemble answer: return top passages with scores
        answer = ""
        for paragraph, score in results:
            answer += f"[score={score:.3f}] {paragraph} \n\n"
        return answer

    def generate_response(self, question):
        context = self.fetch_context(question)
        prompt = f"Context: {context}\n\nQuestion: {question}"
        response = self.client.models.generate_content(
            model=GEMINI_MODEL,
            config=types.GenerateContentConfig(system_instruction=SYSTEM_PROMPT),
            contents=prompt,
        )
        return response.text


In [128]:
qaengine = QAEngine()
qaengine.ingest("https://www.geeksforgeeks.org/linux-unix/linux-tutorial/", max_pages=10)

Crawling page 1: https://www.geeksforgeeks.org/linux-unix/linux-tutorial/ at depth 0
Crawling page 2: https://www.geeksforgeeks.org/linux-unix/how-to-check-the-groups-a-user-belongs-to-in-linux/ at depth 1
Crawling page 3: https://www.geeksforgeeks.org/linux-unix/processes-in-linuxunix/ at depth 1
Crawling page 4: https://www.geeksforgeeks.org/linux-unix/bash-script-difference-between-bash-script-and-shell-script/ at depth 1
Crawling page 5: https://www.geeksforgeeks.org/linux-unix/boot-process-with-systemd-in-linux/ at depth 1
Crawling page 6: https://www.geeksforgeeks.org/linux-unix/bash-script-define-bash-variables-and-its-types/ at depth 1
Crawling page 7: https://www.geeksforgeeks.org/ethical-hacking/add-a-linux-firewall-ufw-or-iptables/ at depth 1
Crawling page 8: https://www.geeksforgeeks.org/linux-unix/linux-certifications/ at depth 1
Crawling page 9: https://www.geeksforgeeks.org/linux-unix/linux-file-hierarchy-structure/ at depth 1
Crawling page 10: https://www.geeksforgeeks.

703

In [126]:
qaengine.generate_response("what is bayes theorem")

"Bayes' theorem manipulates probabilities into a statement of probability in terms of likelihood, expressed by the formula: p ( C ∣ D ) = p ( C ) p ( D ∣ C ) p ( D ) {\\displaystyle p(C\\mid D)={\\frac {p(C)\\,p(D\\mid C)}{p(D)}}}."