In [1]:
from bs4 import BeautifulSoup
import requests
from urllib.parse import urljoin, urlparse
import re
import os
from collections import deque
import sys
import logging
import time
import random
import json
from PyPDF2 import PdfReader
import time
from datetime import date
import hashlib

ModuleNotFoundError: No module named 'PyPDF2'

In [2]:
save_to = "chroma"  # either "chroma" or "locally"

In [3]:
logging.basicConfig(
    format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
    stream=sys.stdout,
)
log = logging.getLogger("notebook")

In [None]:
import chromadb

chroma_client = chromadb.PersistentClient(
    path="./chroma_storage",
)

collection = chroma_client.get_or_create_collection(
    name="hdm_website",
    metadata={
        "description": "vectorstore containing hdm website content",
        "created": str(date.today()),
    },
)


In [5]:
webpage_directory = "website_data"
extracted_pdf_directory = "pdf_data"
pdf_directory = "pdfs"
start_url = "https://www.hdm-stuttgart.de"
allowed_domains = {
    "hdm-stuttgart.de",
    "hdm-weiterbildung.de",
    "vs-hdm.de",
    "pmt.hdm-stuttgart.de",
    "omm.hdm-stuttgart.de",
}

In [6]:
DISALLOWED_PATHS = [
    "/studienfuehrer/vorlesungsverzeichnis/",
    "/studienfuehrer/Studiengaenge/",
    "/studienfuehrer/dozentenplaene/",
    "/studienfuehrer/raumbelegung/",
    "*/manage",
    "*/manage_main",
    "*/manage_workspace",
    "/pdm/pdm_deutsch/",
    "/pdm/pdm_englisch/",
    "/pdm/pdm_spanisch/",
    "*/html2pdf",
    "*/htmltopdf",
    "*printview=1",
    "/pmm/studiengang/team/mitarbeiter/lindig/",
    "/hochschule/neubau/webcams/tag*",
    "/ifak/startseite/redaktionzukunft/beitrag.html?beitrag_ID=1817&stars=2",
    "/*beitrag.html?beitrag_ID=1817",
    "*view_fotostrecke*",
    "*hdmnewsmail_simple*",
    "/vwif/",
]


def is_allowed(url: str) -> bool:
    """Check if a url is allowed based on the disallowed paths.

    Args:
        url (str): thue url to check

    Returns:
        bool: True if allowed, False if disallowed
    """
    for path in DISALLOWED_PATHS:
        if "*" in path:
            # Match wildcard patterns
            regex_path = path.replace("*", ".*")
            if re.search(regex_path, url):
                return False
        elif path in url:
            return False
    return True

In [7]:
def extract_domain_part(url: str) -> str:
    """Takes in an url and returns a string that is based on the urls domain, path and query.

    Args:
        url (str): The URL of the page being parsed.

    Returns:
        str: The url in a form thats usable as a path.
    """
    try:
        parsed_url = urlparse(url)
        # Extract the base domain (e.g., hdm-stuttgart from www.hdm-stuttgart.de)
        domain_match = re.search(r"(?:www\.)?(.*?)\.(de|com|org|net|pdf)", parsed_url.netloc)
        base_domain = domain_match.group(1) if domain_match else parsed_url.netloc
        # Extract the url path after .de
        path = parsed_url.path.strip("/").replace("/", "_")
        # Extract url query parameters after ?
        query = parsed_url.query.replace("&", "_").replace("=", "_") if parsed_url.query else ""

        # Combine components
        filename = f"{base_domain}"
        if path:
            filename += f"_{path}"
        if query:
            filename += f"_{query}"

        # Ensure filename is safe
        filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
        return filename or "default"  # Fallback if filename is empty
    except Exception as e:
        log.error(f"Error generating filename from URL {url}: {e}")
        return "default"

In [8]:
def extract_relevant_content(soup: BeautifulSoup) -> str:
    """Extract relevant content from the main sections of the page.

    Args:
        soup (BeautifulSoup): The BeautifulSoup object of the page content.

    Returns:
        str: The cleaned and relevant content.
    """
    try:
        # List of potential tags to search for in priority order
        potential_main_tags = [
            {"name": "main", "attrs": {"id": "content_wrapper"}},
            {"name": "main", "attrs": {"id": "site-content"}},
            {"name": "div", "attrs": {"id": "main-body"}},
            {"name": "section", "attrs": {"id": "sp-main-body"}},
            {"name": "div", "attrs": {"id": "main"}},
            {"name": "div", "attrs": {"class": "sc-gsTCUz bhdLno"}},
        ]

        # Iterate through potential tags until one is found
        main_content = None
        for tag in potential_main_tags:
            main_content = soup.find(tag["name"], tag["attrs"])
            if main_content:
                break  # Stop searching once a valid tag is found

        if not main_content:
            log.warning("No relevant content section found.")
            return ""

        # Remove unwanted tags
        for tag in main_content.find_all(["nav", "aside", "script", "style"]):
            tag.decompose()  # Remove the tag and its content

        # Extract text from the cleaned main content
        relevant_text = main_content.get_text(separator=" \n ", strip=True)
        # relevant_text = main_content.get_text(separator=" ", strip=True)

        return relevant_text
    except Exception as e:
        log.error(f"Error extracting relevant content: {e}")
        return ""

In [9]:
def save_to_chromadb(url, title, content, doc_type):
    """Saves data directly to ChromaDB with metadata.

    Args:
        url (str): The URL of the document.
        title (str): The title of the document.
        content (str): The content of the document.
        doc_type (str): The type of document (e.g., 'webpage', 'pdf').
    """
    try:
        # Create metadata for the document

        metadata = {
            "title": title,
            "accessed": str(date.today()),
            "type": doc_type,
            "url": url,
        }

        # Create a unique ID for the document
        # hash gives a positive or negative number
        doc_id = f"{doc_type}-{hashlib.sha1(url.encode()).hexdigest()}"

        # Add to ChromaDB
        collection.add(
            documents=[content],
            metadatas=[metadata],
            ids=[doc_id],
        )

        log.info(f"Document added to ChromaDB: {doc_id}")

    except Exception as e:
        log.error(f"Failed to save to ChromaDB for URL {url}: {e}")

In [10]:
def save_content_to_file(url: str, title: str, content: str, file_path: str) -> None:
    """Saves extracted content to a JSON file.

    Args:
        url (str): The URL of the document.
        title (str): The title of the document.
        content (str): The content of the document.
        file_path (str): The path to save the JSON file.
    """
    try:
        data = {
            "url": url,
            "title": title,
            "accessed": str(date.today()),
            "content": content,
        }
        with open(file_path, "w", encoding="utf-8") as file:
            json.dump(data, file, indent=4, ensure_ascii=False)
        log.info(f"Saved content to {file_path}")
    except Exception as e:
        log.error(f"Failed to save content to {file_path}: {e}")

In [11]:
def save_page_content(soup: BeautifulSoup, url: str) -> None:
    """Saves specific parts of a webpage into a file.

    Args:
        soup (BeautifulSoup): The BeautifulSoup object of the pag content.
        url (str): The URL of the page being parsed.
    """
    try:
        log.info(f"Saving content from {url}")
        relevant_content = extract_relevant_content(soup)
        relevant_content = relevant_content.lower()
        if relevant_content == "":
            log.warning("Not saving into file")
            return

        title = soup.title.get_text(strip=True) if soup.title else "No Title"

        sanitized_url = extract_domain_part(url)
        filename = os.path.join(webpage_directory, sanitized_url + ".json")

        if save_to == "chroma":
            save_to_chromadb(url, title, relevant_content, "webpage")
        elif save_to == "locally":
            save_content_to_file(url=url, title=title, content=relevant_content, file_path=filename)

    except Exception as e:
        log.error(f"Failed to save content from {url}: {e}")

In [12]:
def process_pdf_content(response, url: str) -> None:
    """Process and save PDF content directly from the response.

    Args:
        response: The HTTP response containing the PDF content.
        url (str): The URL of the PDF.
    """
    try:
        # Save the PDF locally
        sanitized_url = extract_domain_part(url)
        if not sanitized_url.endswith(".pdf"):
            sanitized_url += ".pdf"
        pdf_filename = os.path.join(pdf_directory, sanitized_url)
        json_filename = os.path.join(
            extracted_pdf_directory, sanitized_url.replace(".pdf", ".json")
        )

        with open(pdf_filename, "wb") as file:
            file.write(response.content)

        # Extract text from the PDF
        reader = PdfReader(pdf_filename)
        pdf_text = ""
        for page in reader.pages:
            pdf_text += page.extract_text()
        pdf_text = pdf_text.replace("\n", " \n ")
        pdf_text = pdf_text.lower()

        # Extract title from metadata, fallback to filename
        metadata_title = reader.metadata.get("/Title", None) if reader.metadata else None
        if not isinstance(metadata_title, str):
            metadata_title = ""
        parsed_url = urlparse(url)
        filename_title = os.path.basename(parsed_url.path).replace(".pdf", "")
        title = metadata_title or filename_title or "Untitled PDF"

        if save_to == "chroma":
            save_to_chromadb(url, title, relevant_content, "pdf")
        elif save_to == "locally":
            save_content_to_file(
                url=url, title=title, content=relevant_content, file_path=json_filename
            )

        log.info(f"Saved PDF and extracted content: {pdf_filename}, {json_filename}")
    except Exception as e:
        log.error(f"Failed to process PDF {url}: {e}")

In [13]:
def extract_links(soup: BeautifulSoup, url: str, visited: set[str], to_visit: set[str]) -> set[str]:
    """Extracts all valid links from a webpage.

    Args:
        soup (BeautifulSoup): The BeautifulSoup object of the pag content.
        url (str): The URL of the page being parsed.

    Returns:
        Set[str]: A set of valid links extracted from the page.
    """
    try:
        # Extract all <a> tags with href attributes
        filtered_links = set()
        for a in soup.find_all("a", href=True):
            href = a["href"].strip()
            full_url = urljoin(url, href)

            if (
                "#" in full_url
                or "hdm" not in full_url
                or full_url.startswith("mailto:")
                or full_url.lower().endswith((".jpg", ".png", ".gif", ".zip"))
                or not full_url.startswith(("http://", "https://"))
            ):
                continue

            parsed_url = urlparse(full_url)
            domain = parsed_url.netloc.lower()
            if not any(domain.endswith(allowed) for allowed in allowed_domains):
                continue

            if full_url not in visited and full_url not in to_visit:
                filtered_links.add(full_url)

        return filtered_links

    except requests.RequestException as e:
        log.error(f"Failed to fetch links from {url}: {e}")
        return set()

In [14]:
def save_to_visit(to_visit, filename: str = "to_visit.json") -> None:
    """Saves the `to_visit` deque or set into a JSON file.

    Args:
        to_visit (deque or set): The collection of URLs to save.
        filename (str): The file where the URLs will be stored.
    """
    try:
        # Convert deque to a list if necessary
        if isinstance(to_visit, deque):
            to_visit = list(to_visit)

        # Save to JSON file
        with open(filename, "w", encoding="utf-8") as file:
            json.dump(to_visit, file, indent=4, ensure_ascii=False)

        print(f"Saved {len(to_visit)} URLs to {filename}.")
    except Exception as e:
        print(f"Error saving to_visit to {filename}: {e}")

In [15]:
def crawl_website(start_url: str, page_depth: int = None) -> None:
    """Crawl a website starting from a given URL, timing each iteration.

    Args:
        start_url (str): The URL to begin crawling from.
    """
    session = requests.Session()
    visited = set()
    to_visit = deque([start_url])
    to_visit_set = set([start_url])

    i = 0

    if not os.path.isdir(webpage_directory):
        log.info(f"Directory {webpage_directory} doesn't exist, is being created.")
        os.mkdir(webpage_directory)

    if not os.path.isdir(extracted_pdf_directory):
        log.info(f"Directory {extracted_pdf_directory} doesn't exist, is being created.")
        os.mkdir(extracted_pdf_directory)

    if not os.path.isdir(pdf_directory):
        log.info(f"Directory {pdf_directory} doesn't exist, is being created.")
        os.mkdir(pdf_directory)

    while to_visit:
        iteration_start_time = time.time()  # Start timing the iteration
        current_url = to_visit.popleft()
        to_visit_set.remove(current_url)

        if current_url in visited:
            continue

        if not is_allowed(current_url):
            log.warning(f"Skipping disallowed URL: {current_url}")
            continue

        log.info(f"Links visited: {len(visited)}, Links left to visit: {len(to_visit)}")
        log.info(f"Visiting: {current_url}")
        try:
            # Request the website
            start_request = time.time()
            response = session.get(current_url, timeout=10)
            end_request = time.time()
            log.info(f"Fetching {current_url} took {end_request - start_request:.4f} seconds")

            response.raise_for_status()

            # Check if the content is a PDF
            content_type = response.headers.get("Content-Type", "").lower()
            log.info(f"Content type: {content_type}")
            if "text/html" in content_type:
                soup = BeautifulSoup(response.content, "html.parser")

                # Get the content and save it to file
                save_page_content(soup, current_url)

                # Extract the links
                new_links = extract_links(
                    soup=soup, url=current_url, visited=visited, to_visit=to_visit_set
                )
                for link in new_links:
                    if link not in visited and link not in to_visit_set:
                        to_visit.append(link)
                        to_visit_set.add(link)

            elif "application/pdf" in content_type:
                log.info(f"Detected PDF: {current_url}")
                process_pdf_content(response, current_url)  # Process PDF directly from the response
                continue

            else:
                log.error(
                    f"Current URL {current_url} is of unsupported content type {content_type}."
                )
                continue

            visited.add(current_url)

        except Exception as e:
            log.error(f"Failed to process URL {current_url}: {e}")

        if page_depth is not None:
            i += 1
            if i >= page_depth:
                log.info("Reached maximum page depth, stopping.")
                break

        iteration_end_time = time.time()  # End timing the iteration
        elapsed_time = iteration_end_time - iteration_start_time
        log.info(f"Iteration {i} completed in {elapsed_time:.4f} seconds")

        delay = random.uniform(0.5, 2)
        log.info(f"Sleeping for {delay:.2f} seconds")
        time.sleep(delay)

    save_to_visit(to_visit)
    log.info("Crawling complete")

In [None]:
# crawl_website(start_url)
# crawl_website(start_url, page_depth=50)

In [17]:
# test_url = "mailto:friese@hdm-stuttgart.de"
# test_url = "https://curdt.home.hdm-stuttgart.de/PDF/Vogler.pdf"
# test_url = "https://www.hdm-stuttgart.de/hochschule/organisation/rektorat"
# crawl_website(test_url, 1)

In [18]:
def cleanup_website_data():
    """Clean up files in /website_data by deleting those with empty content fields in their JSON."""
    try:
        if not os.path.exists(webpage_directory):
            log.warning(f"Directory {webpage_directory} does not exist.")
            return

        # Iterate through all files in the directory
        for filename in os.listdir(webpage_directory):
            file_path = os.path.join(webpage_directory, filename)

            # Only process JSON files
            if filename.endswith(".json"):
                try:
                    with open(file_path, "r", encoding="utf-8") as file:
                        data = json.load(file)

                    # Check if the "content" field is empty
                    if not data.get("content", "").strip():
                        log.info(f"Deleting {file_path} due to empty content.")
                        os.remove(file_path)
                except (json.JSONDecodeError, FileNotFoundError, PermissionError) as e:
                    log.error(f"Error processing file {file_path}: {e}")

    except Exception as e:
        log.error(f"Failed to clean up website data: {e}")