# UChicago MS in Applied Data Science Website - Content Processor

This notebook handles Phase 2 of the scraping process: processing the previously discovered links and converting their content to Markdown format.

In [None]:
# Import required libraries
import os
import re
import json
import requests
from bs4 import BeautifulSoup, NavigableString
from markdownify import markdownify as md
from urllib.parse import urlparse, urljoin
import time
import logging
import pandas as pd
from datetime import datetime
from pathlib import Path # For more robust path handling
import unicodedata # For slugify

In [None]:
try:
    import trafilatura
    TRAFILATURA_AVAILABLE = True
except ImportError:
    TRAFILATURA_AVAILABLE = False
    print("Trafilatura library not found. Falling back to BeautifulSoup for content extraction. "
          "For potentially better results, consider installing it (`pip install trafilatura`).")

In [None]:
BASE_URL = "https://datascience.uchicago.edu/"
# Directory where data files (JSON, CSV) are stored/will be stored
DATA_DIR = "../data"
# Directory where Markdown files will be saved
MARKDOWN_DIR = os.path.join(DATA_DIR, "markdown_processed")
# Directory for log files
LOGS_DIR = "../logs"
# JSON file containing discovered links (output from link_discovery.ipynb)
LINKS_FILE = os.path.join(DATA_DIR, "discovered_links.json")
# JSON file to store processing status
PROCESSED_FILE = os.path.join(DATA_DIR, "processed_links_status.json")
# Delay between requests (in seconds)
REQUEST_DELAY = 0.15
# Timeout for requests (in seconds)
REQUEST_TIMEOUT = 20 # Increased timeout for content fetching
# User-Agent for requests
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 GenAICodeReviewBot/1.0 ContentProcessor/1.0'


In [None]:
Path(MARKDOWN_DIR).mkdir(parents=True, exist_ok=True)
Path(LOGS_DIR).mkdir(parents=True, exist_ok=True)

In [None]:
log_file = Path(LOGS_DIR) / f"content_processor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, encoding='utf-8'),
        logging.StreamHandler()  # Also log to console
    ]
)
logger = logging.getLogger(__name__)

In [None]:
session = requests.Session()
session.headers.update({'User-Agent': USER_AGENT})

In [None]:
def slugify(value, allow_unicode=False):
    """
    Convert to a slug.
    Converts to lowercase, removes non-word characters (alphanumerics and
    underscores) and converts spaces to hyphens. Also strips leading and
    trailing whitespace.
    Adapted from Django's slugify function.
    """
    value = str(value)
    if allow_unicode:
        value = unicodedata.normalize('NFKC', value)
    else:
        value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
    value = re.sub(r'[^\w\s-]', '', value.lower())
    return re.sub(r'[-\s]+', '-', value).strip('-_')

In [None]:
def load_discovered_links(filepath):
    """
    Load the previously discovered links from the JSON file.
    """
    if not os.path.exists(filepath):
        logger.error(f"Links file not found: {filepath}")
        return []
    
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        links = data.get('links', [])
        discovery_date = data.get('discovery_date', 'Unknown')
        logger.info(f"Loaded {len(links)} links, discovered on {discovery_date}, from {filepath}")
        return links
    except json.JSONDecodeError as e:
        logger.error(f"Error decoding JSON from {filepath}: {e}")
        return []
    except Exception as e:
        logger.error(f"Unexpected error loading links from {filepath}: {e}", exc_info=True)
        return []

In [None]:
def get_page_html(url):
    """
    Fetch the HTML content of a page.
    Returns HTML string or None on error.
    """
    if url.startswith('mailto:') or url.startswith('tel:'):
        logger.info(f"Skipping non-HTTP(S) URL: {url}")
        return None
        
    try:
        response = session.get(url, timeout=REQUEST_TIMEOUT)
        response.raise_for_status()  # Raise HTTPError for bad responses (4XX or 5XX)
        
        content_type = response.headers.get('Content-Type', '').lower()
        if 'text/html' not in content_type:
            logger.warning(f"Content-Type for {url} is not text/html ({content_type}). Skipping HTML processing.")
            return None # Or handle differently, e.g., save raw content if it's text-based
            
        # Try to decode using apparent encoding, then fall back to utf-8
        try:
            html_content = response.content.decode(response.apparent_encoding)
        except (UnicodeDecodeError, TypeError):
            logger.warning(f"Failed to decode with apparent_encoding for {url}, falling back to utf-8.")
            html_content = response.content.decode('utf-8', errors='replace')
            
        return html_content
    except requests.exceptions.Timeout:
        logger.error(f"Timeout while fetching {url}")
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP error fetching {url}: {e.response.status_code} {e.response.reason}")
    except requests.exceptions.ConnectionError as e:
        logger.error(f"Connection error fetching {url}: {e}")
    except requests.exceptions.RequestException as e:
        logger.error(f"Generic request error fetching {url}: {e}")
    except Exception as e:
        logger.error(f"Unexpected error fetching HTML for {url}: {e}", exc_info=True)
    return None

In [None]:
def get_title_from_soup(soup):
    """
    Extract the title from a BeautifulSoup object.
    Prioritizes <title>, then <meta property="og:title">, then <h1>.
    """
    if soup.title and soup.title.string:
        return soup.title.string.strip()
    
    og_title = soup.find('meta', property='og:title')
    if og_title and og_title.get('content'):
        return og_title['content'].strip()
        
    h1 = soup.find('h1')
    if h1 and h1.get_text(strip=True):
        return h1.get_text(strip=True)
        
    return "No Title Found"

In [None]:
def extract_main_content_with_trafilatura(html_content, url):
    """
    Extract main content using Trafilatura.
    Returns extracted text or None.
    """
    if not TRAFILATURA_AVAILABLE:
        return None
    try:
        # include_links=True can be useful for RAG context
        # favor_precision=True might give cleaner but potentially shorter output
        extracted_text = trafilatura.extract(html_content, 
                                             url=url,
                                             include_comments=False, 
                                             include_tables=True, # Trafilatura can extract tables
                                             favor_recall=True) # Or favor_precision=True
        if extracted_text:
            logger.info(f"Successfully extracted content using Trafilatura for {url}")
            return extracted_text # Trafilatura returns text, not HTML soup for markdownify
        else:
            logger.warning(f"Trafilatura returned no content for {url}")
            return None
    except Exception as e:
        logger.error(f"Error using Trafilatura for {url}: {e}", exc_info=True)
        return None

In [None]:
def extract_main_content_with_bs(soup, url):
    """
    Extract the main content from a BeautifulSoup object using heuristics.
    This is a fallback if Trafilatura is not available or fails.
    """
    logger.info(f"Attempting main content extraction with BeautifulSoup for {url}")
    # Common semantic tags for main content
    content_selectors = ['main', 'article', 'div.content', 'div.entry-content', 'div#content', 
                         'div.main-content', 'div.td-post-content'] # Added more specific selectors
    
    for selector in content_selectors:
        element = soup.select_one(selector)
        if element:
            logger.info(f"Found main content using selector '{selector}' for {url}")
            return str(element) # Return as HTML string for markdownify

    logger.warning(f"No specific main content element found for {url} using selectors. Falling back to soup.body.")
    return str(soup.body) if soup.body else str(soup)

In [None]:
def extract_category_from_url(url):
    """
    Extract a category from the URL path for better organization.
    """
    parsed = urlparse(url)
    path = parsed.path.strip('/')
    
    if not path or path == "index.html" or path == "index.php": # Handle homepage variations
        return 'homepage'
    
    parts = path.split('/')
    # Filter out empty parts that can result from multiple slashes, e.g. /news//article
    parts = [part for part in parts if part] 
    
    if len(parts) > 0:
        # Take the first significant part as category, could be extended
        # e.g. if parts[0] is 'people' and len(parts) > 1, category could be 'people/sub-category'
        return parts[0] 
    
    return 'uncategorized'

In [None]:
def enhance_soup_for_markdown(soup, base_url):
    """
    Modify the BeautifulSoup object in-place to improve Markdown conversion.
    - Converts images to Markdown format with alt text and absolute URLs.
    - (Future: Could add more enhancements here, e.g., for figures, blockquotes)
    """
    # Process images: make src absolute and ensure alt text is present
    for img in soup.find_all('img'):
        alt_text = img.get('alt', '').strip()
        if not alt_text: # If alt text is missing, try to use filename from src
            src = img.get('src', '')
            if src:
                alt_text = os.path.basename(urlparse(src).path)
                alt_text = alt_text.replace('-', ' ').replace('_', ' ').rsplit('.',1)[0] # Basic cleanup
                alt_text = f"Image: {alt_text.capitalize()}" if alt_text else "Image" # Default if still empty
        else:
            alt_text = f"Image: {alt_text}"

        src = img.get('src')
        if src:
            img['src'] = urljoin(base_url, src) # Ensure absolute URL
        
        # Replace img tag with a paragraph containing Markdown image syntax
        # This helps markdownify handle it better or allows direct text representation
        # markdown_image_text = f"![{alt_text}]({img.get('src', '')})"
        # p_tag = soup.new_tag('p')
        # p_tag.string = markdown_image_text
        # img.replace_with(p_tag) 
        # Self-correction: markdownify handles <img> tags well. We just need to ensure src is absolute.
        # The original approach of adding a <p> tag with "[Image: alt_text]" after the image
        # might be better if we want both the image (if markdown viewer supports it) AND the text.
        # For pure text RAG, just the alt text is key.
        # Let's stick to the original idea of adding a descriptive paragraph if alt text exists.
        if alt_text and alt_text != "Image": # Only add if meaningful alt text
            desc_text = f"[{alt_text}]"
            if img.get('src'):
                 desc_text += f" (Source: {img['src']})"
            
            # Create a text node or a simple tag to represent this.
            # A simple NavigableString might be best to avoid complex tag structures.
            img_desc_node = NavigableString(f"\n{desc_text}\n")
            img.insert_after(img_desc_node)


    # Table handling: markdownify generally handles basic tables.
    # Custom table conversion can be very complex for varied HTML.
    # The previous custom table logic was removed in favor of relying on markdownify's capabilities.
    # If markdownify's table output is insufficient, a more robust custom solution or
    # pre-processing tables into a simpler HTML structure might be needed.
    # For div-based "tables", a general solution is extremely hard.
    # It's better to identify patterns on the specific site and write targeted extractors if needed.
    # For now, we'll let markdownify do its best with tables.

    return soup


In [None]:
def html_to_markdown_content(html_input, url, is_full_html_page=True):
    """
    Convert HTML content to Markdown.
    If `is_full_html_page` is True, it expects a full HTML page to parse with BeautifulSoup.
    If False, it assumes `html_input` is already processed text (e.g., from Trafilatura).
    """
    if is_full_html_page: # Input is raw HTML string from get_main_content_with_bs
        soup = BeautifulSoup(html_input, 'html.parser')
        # Enhance soup (e.g., image alt text, absolute URLs) before markdownify
        soup = enhance_soup_for_markdown(soup, url)
        html_to_convert = str(soup)
    else: # Input is already extracted text (e.g., from Trafilatura)
        html_to_convert = html_input # Trafilatura might return plain text or minimal HTML

    # Markdownify options
    # More aggressive stripping can be done here if main content extraction is not perfect.
    # e.g., strip=['script', 'style', 'nav', 'aside', 'footer', 'header']
    # However, if get_main_content is effective, this might not be necessary.
    options = {
        'strip': ['script', 'style'], # Basic stripping
        'heading_style': 'atx',      # Use '#' for headings
        'bullets': '-*+',            # Cycle through bullet styles for nested lists
        'strong_em_symbol': 'asterisk', # Use * for bold/italic
        'code_language_callback': lambda el: el.get('class')[0].replace("language-", "") if el.get('class') else None,
        # 'default_title': True, # If you want markdownify to try to add a title if none exists
    }
    
    try:
        markdown_text = md(html_to_convert, **options)
    except Exception as e:
        logger.error(f"Markdownify failed for {url}: {e}", exc_info=True)
        return f"Error during Markdown conversion: {e}" # Return error message in content

    # Basic Markdown cleanup
    markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text)  # Remove excess newlines
    markdown_text = re.sub(r'\[\s*\]\(([^)]+)\)', r'\1', markdown_text)  # Remove empty links, keep URL if text was empty
    markdown_text = re.sub(r'!\[\s*\]\(([^)]+)\)', r'[Image: \1](\1)', markdown_text) # Ensure alt text for images if missing
    markdown_text = markdown_text.strip()
    
    return markdown_text

In [None]:
def save_markdown_file(title, markdown_content, url, category, base_dir):
    """
    Save markdown content to a file with YAML frontmatter.
    Uses a slugified version of the URL path for the filename.
    """
    parsed_url = urlparse(url)
    path_parts = [part for part in parsed_url.path.strip('/').split('/') if part and part not in ("index.html", "index.php")]
    
    if not path_parts: # Handle homepage or root URLs
        filename_base = 'index'
    else:
        # Slugify each part and join, or just the last part
        # Using the full path slugified is generally safer for uniqueness
        filename_base = slugify('-'.join(path_parts))
        if not filename_base: # if slugify results in empty string (e.g. path was just '/')
            filename_base = 'index'
            
    filename = f"{filename_base}.md"
    filepath = Path(base_dir) / category / filename # Organize by category subdirectory
    
    # Create category directory if it doesn't exist
    (Path(base_dir) / category).mkdir(parents=True, exist_ok=True)
    
    # Add frontmatter with metadata
    frontmatter = (
        f"---\n"
        f"title: \"{title.replace('\"', '\\\"')}\"\n" # Escape quotes in title
        f"original_url: {url}\n"
        f"category: {category}\n"
        f"processing_date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
        f"---\n\n"
    )
    
    full_content = frontmatter + markdown_content
    
    try:
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(full_content)
        logger.info(f"Successfully saved Markdown to: {filepath}")
        return str(filepath)
    except IOError as e:
        logger.error(f"Failed to save Markdown for {url} to {filepath}: {e}")
    except Exception as e:
        logger.error(f"Unexpected error saving Markdown for {url} to {filepath}: {e}", exc_info=True)
    return None

In [None]:
def save_processing_summary(processed_details, failed_urls, summary_filepath):
    """
    Save the processing summary (processed URLs with details, failed URLs) to a JSON file.
    """
    summary_data = {
        'summary_date': datetime.now().isoformat(),
        'total_processed_successfully': len(processed_details),
        'total_failed': len(failed_urls),
        'processed_details': processed_details, # List of dicts {url: str, filepath: str, title: str, category: str}
        'failed_urls': failed_urls # List of dicts {url: str, error: str}
    }
    
    try:
        with open(summary_filepath, 'w', encoding='utf-8') as f:
            json.dump(summary_data, f, indent=2, ensure_ascii=False)
        logger.info(f"Successfully saved processing summary to {summary_filepath}")
    except IOError as e:
        logger.error(f"Failed to save processing summary to {summary_filepath}: {e}")
    return summary_filepath

In [None]:
def process_all_links():
    """
    Main function to process all discovered links.
    """
    links_to_process = load_discovered_links(LINKS_FILE)
    if not links_to_process:
        logger.error("No links loaded to process. Exiting.")
        return

    processed_details_list = []
    failed_urls_list = []
    total_links = len(links_to_process)
    
    logger.info(f"--- Starting Content Processing for {total_links} links ---")

    for i, url in enumerate(links_to_process):
        logger.info(f"Processing {i+1}/{total_links}: {url}")

        # Skip SVG files directly based on URL extension, as they won't be HTML
        if url.lower().endswith('.svg'):
            logger.info(f"Skipping SVG file based on URL extension: {url}")
            # Optionally, add to failed_urls_list with a specific reason or handle differently
            # failed_urls_list.append({"url": url, "error": "Skipped SVG file"})
            continue

        html_content = get_page_html(url)
        if not html_content:
            logger.warning(f"Failed to get HTML content for {url}. Adding to failed list.")
            failed_urls_list.append({"url": url, "error": "Failed to fetch HTML content"})
            time.sleep(REQUEST_DELAY) # Still delay even on failure
            continue
        
        page_title = "Untitled" # Default title
        markdown_output = ""
        category = "uncategorized"

        try:
            # Attempt extraction with Trafilatura first if available
            main_content_text = None
            if TRAFILATURA_AVAILABLE:
                main_content_text = extract_main_content_with_trafilatura(html_content, url)

            if main_content_text:
                # If Trafilatura succeeds, it returns text/minimal HTML.
                # We still need a title. Parse the original HTML for it.
                temp_soup_for_title = BeautifulSoup(html_content, 'html.parser')
                page_title = get_title_from_soup(temp_soup_for_title)
                markdown_output = html_to_markdown_content(main_content_text, url, is_full_html_page=False)
            else:
                # Fallback to BeautifulSoup for content extraction and title
                logger.info(f"Trafilatura failed or unavailable for {url}. Using BeautifulSoup fallback.")
                soup = BeautifulSoup(html_content, 'html.parser')
                page_title = get_title_from_soup(soup)
                main_content_html_bs = extract_main_content_with_bs(soup, url)
                if main_content_html_bs:
                    markdown_output = html_to_markdown_content(main_content_html_bs, url, is_full_html_page=True)
                else:
                    logger.error(f"BeautifulSoup fallback also failed to extract main content for {url}")
                    markdown_output = "Error: Could not extract main content."
            
            category = extract_category_from_url(url)
            
            # Save the markdown content
            saved_filepath = save_markdown_file(page_title, markdown_output, url, category, MARKDOWN_DIR)
            
            if saved_filepath:
                processed_details_list.append({
                    "url": url, 
                    "filepath": saved_filepath, 
                    "title": page_title, 
                    "category": category
                })
            else:
                failed_urls_list.append({"url": url, "error": "Failed to save Markdown file"})

        except Exception as e:
            logger.error(f"Critical error processing {url}: {e}", exc_info=True)
            failed_urls_list.append({"url": url, "error": str(e)})
        
        # Save progress periodically (e.g., every 20 pages)
        if (i + 1) % 20 == 0:
            logger.info(f"Processed {i+1} links. Saving intermediate summary...")
            save_processing_summary(processed_details_list, failed_urls_list, PROCESSED_FILE)
            
        time.sleep(REQUEST_DELAY)
    
    # Save final processing summary
    logger.info("Content processing loop finished. Saving final summary...")
    save_processing_summary(processed_details_list, failed_urls_list, PROCESSED_FILE)
    
    logger.info(f"--- Content Processing Completed ---")
    logger.info(f"Successfully processed: {len(processed_details_list)} pages")
    logger.info(f"Failed to process: {len(failed_urls_list)} pages")
    
    return processed_details_list, failed_urls_list

In [None]:
def generate_category_summary(processed_details, output_dir):
    """Generates a CSV summary of processed pages by category."""
    if not processed_details:
        logger.info("No processed details to generate category summary.")
        return

    categories = {}
    for item in processed_details:
        category = item.get('category', 'uncategorized')
        categories[category] = categories.get(category, 0) + 1
    
    category_df = pd.DataFrame({
        'Category': list(categories.keys()),
        'Count': list(categories.values())
    }).sort_values('Count', ascending=False).reset_index(drop=True)
    
    try:
        summary_path = Path(output_dir) / 'processed_category_summary.csv'
        category_df.to_csv(summary_path, index=False, encoding='utf-8')
        logger.info(f"Category summary saved to {summary_path}")
        print("\nCategory Summary:")
        print(category_df.to_string())
    except Exception as e:
        logger.error(f"Failed to save category summary: {e}", exc_info=True)

In [None]:
def generate_word_count_summary(processed_details, output_dir):
    """Generates a CSV summary with word counts for each processed Markdown file."""
    if not processed_details:
        logger.info("No processed details to generate word count summary.")
        return

    word_counts_data = []
    for item in processed_details:
        filepath = item.get('filepath')
        if filepath and os.path.exists(filepath):
            try:
                with open(filepath, 'r', encoding='utf-8') as f:
                    content = f.read()
                # Remove frontmatter before counting words
                content_body = re.sub(r'^---\s*[\s\S]*?---', '', content, flags=re.DOTALL).strip()
                words = re.findall(r'\b\w+\b', content_body.lower()) # Basic word count
                word_count = len(words)
                word_counts_data.append({
                    'Filename': os.path.basename(filepath),
                    'Title': item.get('title', 'N/A'),
                    'URL': item.get('url', 'N/A'),
                    'Category': item.get('category', 'uncategorized'),
                    'Word Count': word_count
                })
            except Exception as e:
                logger.error(f"Error counting words for {filepath}: {e}", exc_info=True)
                word_counts_data.append({
                    'Filename': os.path.basename(filepath),
                    'Title': item.get('title', 'N/A'),
                    'URL': item.get('url', 'N/A'),
                    'Category': item.get('category', 'uncategorized'),
                    'Word Count': -1 # Indicate error
                })
        else:
            logger.warning(f"Filepath not found for word count: {filepath}")

    word_count_df = pd.DataFrame(word_counts_data).sort_values('Word Count', ascending=False).reset_index(drop=True)
    
    try:
        summary_path = Path(output_dir) / 'processed_word_count_summary.csv'
        word_count_df.to_csv(summary_path, index=False, encoding='utf-8')
        logger.info(f"Word count summary saved to {summary_path}")
        print("\nTop 10 Pages by Word Count:")
        print(word_count_df.head(10).to_string())
    except Exception as e:
        logger.error(f"Failed to save word count summary: {e}", exc_info=True)

In [None]:
processing_start_time = time.time()

# Process all links
processed_items, failed_items = process_all_links()

# Generate summaries if processing was successful for some items
if processed_items:
    generate_category_summary(processed_items, DATA_DIR)
    generate_word_count_summary(processed_items, DATA_DIR)
else:
    logger.info("No items were processed successfully, skipping summary generation.")

processing_end_time = time.time()
elapsed_time = processing_end_time - processing_start_time
logger.info(f"--- Total Content Processing Script finished in {elapsed_time:.2f} seconds ---")