In [23]:
import os
import re
import requests
import pdfplumber
from bs4 import BeautifulSoup
from tqdm import tqdm
from pathlib import Path
from urllib.parse import urlparse, urljoin
from collections import deque
from dotenv import load_dotenv

# Helper functions

text cleaning

In [24]:
def clean_text(text):
    if not text:
        return ""
    text = text.lower()
    text = re.sub(r'\[\d+\]', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

checker if url is valid for crawling

In [25]:
def is_valid_url(url):
    """Check if URL is valid for crawling"""
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)

this keeps only valid urls, removes external links so we stay within same domain, and removes unwanted urls like login/signup etc

In [26]:
def filter_links(links, base_url):
    filtered_links = []
    base_domain = urlparse(base_url).netloc
    for link in links: 
        absolute_link = urljoin(base_url, link)
        parsed_link = urlparse(absolute_link)
        if not is_valid_url(absolute_link):
            continue
        if parsed_link.netloc != base_domain:
            continue
        if any(pattern in absolute_link.lower() for pattern in [
   
        'login', 'signin', 'signup', 'register', 'logout', 'account', 'profile',
        'edit', 'delete', 'create', 'preferences', 'settings',
        'mailto:', 'javascript:', 'tel:', 'sms:', '#', 'print', 'share',
        'oc_lang=', 'lang=', 'translate=', 'setlang=', 'language=', 'translation',
        'search', 'contact', 'feedback', 'help', 'faq',
        'sessionid=', 'trackid=', 'utm_', 'campaign=',
        'action=', 'do=', 'mode=', 'type=',
        'special:', 'talk:', 'user:', 'wikipedia:', 'file:', 'mediawiki:',
        'template:', 'help:', 'category:', 'portal:', 'draft:',
        'index.php?', 'action=edit', 'action=history', 'oldid=', 'diff=',
        'printable=yes', 'mobileaction=', 'title=special:', 'redlink=1',
        'site-footer', 'footer-widgets', 'media', 'events',  'news',

        ]):     
            continue
        filtered_links.append(absolute_link)
    return filtered_links

extract text from HTML content using Beautifulsoup 

In [27]:
def extract_text_from_html(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    for script in soup(["script", "style", "header", "footer", "nav"]):
        script.extract()
    paragraphs = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div'])
    text = ' '.join([p.get_text() for p in paragraphs])
    return clean_text(text)

extract text from PDF file using pdfplumber

In [28]:
def extract_text_from_pdf(pdf_path):
    try:
        text = ""
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                page_text = page.extract_text() or ""
                text += page_text + " "
        return clean_text(text)
    except Exception as e:
        print(f"Error extracting text from PDF {pdf_path}: {e}")
        return ""

split text into chunks (by words) to stay within token limits

In [29]:
def chunk_text(text, chunk_size=15000):
    words = text.split()
    chunks = []
    current_chunk = []
    current_length = 0
    for word in words:
        if current_length + len(word) + 1 > chunk_size:
            chunks.append(' '.join(current_chunk))
            current_chunk = [word]
            current_length = len(word)
        else:
            current_chunk.append(word)
            current_length += len(word) + 1     # +1 for the space 
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    return chunks

Generate qa pairs using gpt

   - template to be used in the final prompt

In [30]:
template = f"""
You are a Q&A generator. Given the text below, produce concise factual question-answer pairs in the format:

Q: <question>
A: <answer>

Generate informative Q&A pairs that directly reference specific facts or information in the text. 
Make sure answers are brief and directly supported by the text.

Text:
"""

In [31]:
def generate_qa(text_chunk):
        prompt = f"""{template}
        Text:
        {text_chunk}"""

        api_url = "http://localhost:11434/api/generate"

        payload = {
            "model": "llama3.1:8b",
            "prompt": prompt,
            "system": "You generate factual Q&A pairs from provided text.",
            "stream": False,
            "options": {
                "temperature": 0.5,
                "num_predict": 1000 }}
        
        response = requests.post(api_url, json=payload)
        response.raise_for_status()  
        
        result = response.json()
        qa_text = result.get("response", "").strip()
        qa_text = re.sub(r'(\nA:[^\n]+)\n(?!$)', r'\1\n\n', qa_text)
        
        return qa_text

download pdf files if they exist within a url search

In [32]:
def download_file(url, timeout=30):
    try:
        response = requests.get(url, timeout=timeout)
        if response.status_code == 200: 
            parsed_url = urlparse(url)
            filename = os.path.basename(parsed_url.path) or "downloaded_file.pdf"
            os.makedirs("downloaded_files", exist_ok=True)
            file_path = os.path.join("downloaded_files", filename)
            with open(file_path, "wb") as f:
                f.write(response.content)
            return file_path
        else:
            print(f"Failed to download {url}: Status code {response.status_code}")
            return None
    except Exception as e:
        print(f"Error downloading {url}: {e}")
        return None

convert domain to a safe filename (remove bad characters) & include portion of url to diffrentiate it from others

In [33]:
def safe_domain_name(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    path = parsed.path.strip('/').replace('/', '_')    
    safe_name = re.sub(r'[^\w\-]', '_', domain)
    if path:
        safe_name += f"_{path[:50]}"  #limit path length to avoid overly long filenames
    
    return safe_name

Prform BFS crawling on a seed URL
- params:
    - seed_url: URL to start crawling from
    - max_depth: max depth for BFS
    - max_pages: max number of pages to crawl

In [34]:
def bfs_crawl(seed_url, max_depth=2, max_pages=20):
    base_domain = urlparse(seed_url).netloc
    queue = deque([(seed_url, 0)])  # url, depth
    visited = set()
    all_text = ""
    pages_visited = 0
    print(f"Starting BFS on {seed_url} (max depth: {max_depth}, max pages: {max_pages})")
    while queue and pages_visited < max_pages:
        current_url, depth = queue.popleft()
        if current_url in visited:
            continue
        visited.add(current_url)
        pages_visited += 1
        print(f"Visiting [{pages_visited}/{max_pages}]: {current_url} (depth {depth})")
        try:
            response = requests.get(current_url, timeout=15)
            if response.status_code != 200:
                print(f"  Failed to fetch {current_url}: Status code {response.status_code}")
                continue
            content_type = response.headers.get("Content-Type", "").lower()
            #process HTML pages
            if "text/html" in content_type:
                extracted_text = extract_text_from_html(response.text)
                all_text += "\n" + extracted_text
                
                # contine bfs if not at max depth
                if depth < max_depth:
                    soup = BeautifulSoup(response.text, "html.parser")
                    links = [a.get("href") for a in soup.find_all("a", href=True)]
                    filtered = filter_links(links, current_url)

                    for link in filtered:
                        if link not in visited:
                            queue.append((link, depth + 1))
            elif "application/pdf" in content_type:   #process pdf files --- include them in the same source as the website
                print(f"  Found PDF: {current_url}")
                pdf_path = download_file(current_url)
                if pdf_path:
                    pdf_text = extract_text_from_pdf(pdf_path)
                    all_text += "\n" + pdf_text
            #other file types - skip or handle as needed
            else:
                print(f"  Skipping unsupported content type: {content_type}")
        except Exception as e:
            print(f"  Error processing {current_url}: {e}")
    print(f"BFS completed for {seed_url} - visited {pages_visited} pages")
    return all_text.strip()

Save qa results to a text file 

In [35]:
def save_qa_result(identifier, qa_text, output_dir="qa_outputs"): 
    os.makedirs(output_dir, exist_ok=True) 
    output_file = os.path.join(output_dir, f"QA_{identifier}.txt")
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(qa_text)
    print(f"Saved Q&A results to {output_file}")

process a single PDF file and generate qa pairs

In [36]:
def process_pdf_and_generate_qa(pdf_path):
    print(f"Processing PDF: {pdf_path}")
    filename = os.path.splitext(os.path.basename(pdf_path))[0] 
    text = extract_text_from_pdf(pdf_path)
    if not text:
        print(f"  No text extracted from {pdf_path}")
        return 
    chunks = chunk_text(text)
    print(f"  Split into {len(chunks)} chunks")
    all_qa = []
    for i, chunk in enumerate(chunks):
        print(f"  Generating Q&A for chunk {i+1}/{len(chunks)}")
        qa = generate_qa(chunk)
        if qa:
            all_qa.append(qa)
    qa_text = "\n\n".join(all_qa)
    save_qa_result(filename, qa_text)

process a single URL file and generate qa pairs

In [37]:
def process_url_and_generate_qa(url, max_depth=2, max_pages=20): 
    print(f"\nProcessing URL: {url}")
    domain = safe_domain_name(url) 
    crawled_text = bfs_crawl(url, max_depth=max_depth, max_pages=max_pages)
    if not crawled_text:
        print(f"  No text extracted from {url}")
        return 
    chunks = chunk_text(crawled_text)
    print(f"  Split into {len(chunks)} chunks") 
    all_qa = []
    for i, chunk in enumerate(chunks):
        print(f"  Generating Q&A for chunk {i+1}/{len(chunks)}")
        qa = generate_qa(chunk)
        if qa:
            all_qa.append(qa) 
    qa_text = "\n\n".join(all_qa)
    save_qa_result(domain, qa_text)

# env setup

In [None]:
load_dotenv()

# Generating Q&A for PDF files

In [40]:
os.makedirs("qa_outputs", exist_ok=True)

In [41]:
pdf_dir = "data_sources/files"

In [None]:
if os.path.exists(pdf_dir):
        pdf_files = [f for f in os.listdir(pdf_dir) if f.lower().endswith('.pdf')]
        print(f"Found {len(pdf_files)} PDF files in {pdf_dir}")
        
        for pdf_file in tqdm(pdf_files, desc="Processing PDFs"):
            pdf_path = os.path.join(pdf_dir, pdf_file)
            process_pdf_and_generate_qa(pdf_path)
else:
        print(f"Directory {pdf_dir} does not exist")

# Generating Q&A for URLs

In [42]:
os.makedirs("qa_outputs", exist_ok=True)

In [43]:
urls_file = "data_sources/urls/urls.txt"

In [None]:
if os.path.exists(urls_file):
        with open(urls_file, "r") as f:
            urls = [line.strip() for line in f if line.strip()]
        print(f"Found {len(urls)} seed URLs in {urls_file}")
        
        for url in urls:
            process_url_and_generate_qa(
                url,
                max_depth=1,  # bfs depth
                max_pages=1  # total pages per domain 
                )
else:
        print(f"URLs file {urls_file} does not exist")