<a href="https://colab.research.google.com/github/Han529/Fund-Scraper-Assessment/blob/main/Junior_Data_Analytics_Assessment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

13F Filings Scraper with Post-Fetch Caps (100 Managers / 3,284 Quarter Links)  
**Note:** Applied AFTER getting all links.







In [1]:
# ==============================================================================
#                            Imports and Setup
# ==============================================================================
import requests
import pandas as pd
from bs4 import BeautifulSoup
import time
import random
import re
import numpy as np
import os
from datetime import datetime
from urllib.parse import urljoin
import json
import logging
from typing import List, Dict, Optional, Tuple, Any, Set
from concurrent.futures import ThreadPoolExecutor, as_completed # For Parallelism
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_exception # For Backoff

# --- Configuration ---

# Logging Setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)-8s - %(threadName)-10s - %(message)s', # Added threadName
    datefmt='%Y-%m-%d %H:%M:%S'
)
# Suppress overly verbose logs from underlying libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)

# --- Constants ---
BASE_URL: str = "https://13f.info"
DEFAULT_USER_AGENT: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
CSV_FILENAME: str = "Fund Manager Shares Analysis.csv"

# Define the exact columns desired in the final output, in order
TARGET_FINAL_COLUMNS: List[str] = [
    'fund_name', 'filing_date', 'quarter', 'stock symbol', 'cl',
    'value_usd_000', 'shares', 'change', 'pct_change', 'inferred_transaction_type'
]

# Parallelism Configuration
MAX_WORKERS: int = 15 # Adjust based on your system/network and risk tolerance (start lower, e.g., 10)
# Backoff Configuration for Tenacity
MAX_RETRIES: int = 4 # Max attempts per request (1 initial + 3 retries)
INITIAL_BACKOFF_DELAY: float = 1.0 # Seconds for first retry delay base
MAX_BACKOFF_DELAY: float = 10.0 # Max seconds to wait between retries

logging.info("Script started. Libraries imported, logging and constants configured.")
logging.info(f"Parallelism enabled with MAX_WORKERS={MAX_WORKERS}")
logging.info(f"Exponential backoff configured: MAX_RETRIES={MAX_RETRIES}, INITIAL_DELAY={INITIAL_BACKOFF_DELAY}s, MAX_DELAY={MAX_BACKOFF_DELAY}s")

# ==============================================================================
#              Request Function with Exponential Backoff (using Tenacity)
# ==============================================================================

# Define what network/connection exceptions tenacity should retry on
RETRYABLE_NETWORK_EXCEPTIONS = (
    requests.exceptions.Timeout,
    requests.exceptions.ConnectionError,
    requests.exceptions.ChunkedEncodingError # Can happen on intermittent network issues
)

# Define specific HTTP status codes to retry on (server errors and rate limiting)
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

# Custom retry condition check for HTTP errors
def _should_retry_http_error(exception: BaseException) -> bool:
    """Return True if the exception is an HTTPError with a retryable status code."""
    return (
        isinstance(exception, requests.exceptions.HTTPError) and
        hasattr(exception, 'response') and # Ensure response attribute exists
        exception.response is not None and
        exception.response.status_code in RETRYABLE_STATUS_CODES
    )

# Define the retry decorator
retry_config = retry(
    stop=stop_after_attempt(MAX_RETRIES),
    wait=wait_exponential(multiplier=1, min=INITIAL_BACKOFF_DELAY, max=MAX_BACKOFF_DELAY),
    # Retry on specific network exceptions OR specific HTTP error codes
    retry=(retry_if_exception_type(RETRYABLE_NETWORK_EXCEPTIONS) | retry_if_exception(_should_retry_http_error)),
    # Log before sleeping on retry
    before_sleep=lambda retry_state: logging.warning(
        f"Retrying request for {retry_state.args[0]} "
        f"due to {retry_state.outcome.exception().__class__.__name__}: {retry_state.outcome.exception()} "
        f"(Attempt {retry_state.attempt_number}/{MAX_RETRIES}). Waiting {retry_state.next_action.sleep:.2f}s..."
    )
)

@retry_config
def make_request_with_retries(url: str, headers: Dict[str, str], params: Optional[Dict[str, Any]] = None, timeout: int = 45) -> requests.Response:
    """
    Makes a GET request using requests library with exponential backoff for specific errors,
    leveraging the tenacity library.

    Args:
        url (str): The URL to request.
        headers (Dict[str, str]): Request headers.
        params (Optional[Dict[str, Any]]): URL parameters.
        timeout (int): Request timeout in seconds.

    Returns:
        requests.Response: The response object if successful within retry limits.

    Raises:
        requests.exceptions.RequestException: If the request fails after all retries (or for non-retryable HTTP errors).
        Exception: For other unexpected errors during the request phase.
    """
    try:
        response = requests.get(url, headers=headers, params=params, timeout=timeout)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        return response # Success!
    except requests.exceptions.HTTPError as http_err:
        # Log the final non-retryable HTTP error before raising
        if not _should_retry_http_error(http_err):
            logging.error(f"Request to {url} failed permanently with status {http_err.response.status_code}. Error: {http_err}")
        raise # Re-raise the exception (tenacity decides based on should_retry)
    except requests.exceptions.RequestException as req_err:
        # Log other request exceptions before raising/retrying
        logging.error(f"Request to {url} failed with connection/timeout error: {req_err}")
        raise # Re-raise the exception for tenacity
    except Exception as e:
         logging.error(f"Unexpected error during request to {url}: {e}", exc_info=True)
         raise # Re-raise unexpected errors

# ==============================================================================
#                        Scraping Function Definitions
# ==============================================================================
# --- Helper Functions ---
def _clean_header(header_text: str) -> str:
    """Helper function to consistently clean table header text."""
    if not isinstance(header_text, str): return ""
    text = header_text.lower().strip()
    text = text.replace(' ', '_')
    text = text.replace('($000)', 'usd_000')
    text = text.replace('%', 'pct')
    text = re.sub(r'[^\w_]+', '', text) # Keep underscores
    return text

def _extract_headers(table_tag: BeautifulSoup) -> List[str]:
    """Robustly extracts and cleans header text from a table tag."""
    header_cells = []
    thead = table_tag.find('thead')
    if thead:
        header_row = thead.find('tr')
        if header_row: header_cells = header_row.find_all(['th', 'td'])
        if not header_cells: header_cells = thead.find_all('th')
    if not header_cells:
        first_row = table_tag.find('tr')
        if first_row: header_cells = first_row.find_all(['th', 'td'])
    if not header_cells:
        logging.warning("Could not find any header cells (th/td) using multiple strategies.")
        return []
    headers = [_clean_header(h.get_text()) for h in header_cells]
    headers = [h for h in headers if h] # Filter empty strings
    # logging.info(f"Extracted {len(headers)} headers: {headers}") # Can be noisy
    return headers

# --- Fallback BS4 Parsing ---
def parse_tables_directly_bs4(soup_or_table_tag: Any, url: str, single_table: bool = False) -> List[pd.DataFrame]:
    """Parses table(s) directly from HTML using BeautifulSoup. Used as a fallback."""
    dataframes = []
    potential_tables = []
    if single_table:
        potential_tables = [soup_or_table_tag] if soup_or_table_tag else []
        mode = "single table"
    else:
        potential_tables = soup_or_table_tag.find_all('table', {'class': re.compile(r'\btable\b')}) if soup_or_table_tag else []
        mode = "all tables"
    # logging.info(f"[Direct BS4 - {mode}] Parsing {len(potential_tables)} potential table(s) on {url}")
    if not potential_tables: return []

    for i, table_tag in enumerate(potential_tables):
        # logging.debug(f"[Direct BS4] Processing table {i+1}")
        headers_list = _extract_headers(table_tag)
        if not headers_list: continue
        tbody = table_tag.find('tbody')
        rows_in_body = tbody.find_all('tr') if tbody else (table_tag.find_all('tr')[1:] if len(table_tag.find_all('tr')) > 1 else [])
        if not rows_in_body: continue

        data_rows = []
        expected_cols = len(headers_list)
        column_mismatch_logged = False
        for row_idx, row in enumerate(rows_in_body):
            cells = row.find_all('td')
            actual_cols = len(cells)
            if actual_cols == expected_cols:
                row_data = [c.get_text(strip=True) if c else None for c in cells]
                data_rows.append(row_data)
            else:
                 if not column_mismatch_logged:
                    logging.warning(f"[Direct BS4] Table {i+1} on {url}: Row {row_idx} has {actual_cols} cells, expected {expected_cols}. Skipping mismatched rows for this table.")
                    column_mismatch_logged = True
        if data_rows:
            try:
                df = pd.DataFrame(data_rows, columns=headers_list)
                df['SourceURL'] = url
                df = df.replace(r'^\s*$', np.nan, regex=True)
                dataframes.append(df)
                # logging.info(f"[Direct BS4] Successfully created DataFrame (Shape: {df.shape}) from table {i+1} on {url}")
            except Exception as e:
                 logging.error(f"[Direct BS4] Error creating DataFrame for table {i+1} on {url}: {e}", exc_info=True)
    return dataframes

# --- Primary Scraping Functions (using make_request_with_retries) ---

def scrape_az_index_links(managers_index_page_url: str) -> List[str]:
    """Scrapes the A-Z (and 0-9) index links from the main managers listing page."""
    logging.info(f"Scraping A-Z index links from: {managers_index_page_url}")
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    index_links = set()
    try:
        response = make_request_with_retries(managers_index_page_url, headers=headers, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')
        index_container = soup.find('div', class_='mb-12')
        links_in_container = index_container.find_all('a', href=True) if index_container else soup.select('ul > li > a[href]') # Fallback selector

        for link_tag in links_in_container:
            href = link_tag.get('href')
            if isinstance(href, str) and href.startswith('/managers/') and len(href.split('/')[-1]) == 1:
                index_links.add(href)

        if not index_links:
             logging.warning(f"No valid A-Z index links extracted from {managers_index_page_url}")
             return []

        logging.info(f"Found {len(index_links)} A-Z index links: {sorted(list(index_links))}")
        return sorted(list(index_links))

    except Exception as e: # Catch errors after retries from make_request or parsing errors
        logging.error(f"Failed to scrape or parse A-Z index page {managers_index_page_url} after retries. Error: {e}")
        return []


def scrape_managers_from_letter_page(letter_page_relative_url: str, base_url: str) -> Set[str]:
    """Scrapes all actual manager links from a specific A-Z index page."""
    full_url = urljoin(base_url, letter_page_relative_url)
    logging.info(f"Scraping manager links from letter page: {full_url}")
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    manager_links_on_page = set()
    try:
        response = make_request_with_retries(full_url, headers=headers, timeout=45)
        soup = BeautifulSoup(response.text, 'html.parser')
        all_links_on_page = soup.find_all('a', href=True)
        for link_tag in all_links_on_page:
            href = link_tag.get('href')
            if isinstance(href, str) and href.startswith('/manager/'):
                path_parts = href.strip('/').split('/')
                if len(path_parts) == 2 and path_parts[0] == 'manager' and len(path_parts[1]) > 1:
                    manager_links_on_page.add(href)
        # logging.info(f"Found {len(manager_links_on_page)} manager links on {full_url}") # Can be noisy
        return manager_links_on_page
    except Exception as e:
        logging.error(f"Failed to scrape or parse letter page {full_url} after retries. Error: {e}")
        return set()


def get_all_manager_links_via_az_index(base_url: str, executor: ThreadPoolExecutor) -> Tuple[List[str], Dict[str, int]]:
    """
    Orchestrates scraping manager links from A-Z index pages in parallel.
    """
    all_unique_manager_links: Set[str] = set()
    manager_counts_per_index_page: Dict[str, int] = {}
    managers_main_page_url = urljoin(base_url, "/managers")

    logging.info("Starting A-Z index link scraping...")
    az_index_links = scrape_az_index_links(managers_main_page_url)
    if not az_index_links:
        logging.critical("Could not retrieve A-Z index links. Cannot proceed.")
        return [], {}

    total_az_pages = len(az_index_links)
    logging.info(f"Found {total_az_pages} A-Z index pages to process in parallel.")
    print(f"Found {total_az_pages} A-Z index pages. Processing in parallel...")

    # Submit tasks to the executor
    future_to_url = {executor.submit(scrape_managers_from_letter_page, url, base_url): url for url in az_index_links}
    processed_count = 0

    for future in as_completed(future_to_url):
        rel_url = future_to_url[future]
        index_char = rel_url.split('/')[-1].upper()
        processed_count += 1
        try:
            manager_links_set = future.result() # Get the set of links from the completed task
            count = len(manager_links_set)
            manager_counts_per_index_page[rel_url] = count
            all_unique_manager_links.update(manager_links_set)
            logging.info(f"[{processed_count}/{total_az_pages}] Index '{index_char}' completed. Found {count} links. Total unique: {len(all_unique_manager_links)}")
            print(f"  Processed index '{index_char}'. Found {count} links. Total unique: {len(all_unique_manager_links)}")
        except Exception as exc:
            logging.error(f"Index page {rel_url} generated an exception: {exc}")
            manager_counts_per_index_page[rel_url] = 0 # Record failure

    final_unique_count = len(all_unique_manager_links)
    logging.info(f"Finished parallel A-Z index scraping. Total unique manager links found: {final_unique_count}")
    print(f"\nFinished A-Z index scraping. Found {final_unique_count} unique manager links.")
    return sorted(list(all_unique_manager_links)), manager_counts_per_index_page


def scrape_quarter_links_task(rel_link: str, base_url: str) -> Tuple[str, List[str]]:
    """Task wrapper for scraping quarter links for use with executor."""
    links = scrape_quarter_links_from_manager(rel_link, base_url) # Uses make_request internally
    return rel_link, links

def scrape_quarter_links_from_manager(relative_manager_link: str, base_url: str) -> List[str]:
    """Scrapes quarterly filing URLs from a manager page."""
    full_url = urljoin(base_url, relative_manager_link)
    # logging.info(f"Scraping quarter links from: {full_url}") # Can be noisy in parallel
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    quarter_links = []
    try:
        response = make_request_with_retries(full_url, headers=headers, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')
        table = soup.find('table', {'class': re.compile(r'\btable\b')})
        if not table: return []
        header_row = table.find('thead') or table.find('tr')
        if not header_row: return []
        headers_list = header_row.find_all(['th', 'td'])
        quarter_col_index = -1
        for i, header in enumerate(headers_list):
            if header.get_text(strip=True).lower() == "quarter":
                quarter_col_index = i; break
        if quarter_col_index == -1: return []
        tbody = table.find('tbody')
        data_rows = tbody.find_all('tr') if tbody else (table.find_all('tr')[1:] if len(table.find_all('tr')) > 1 else [])
        if not data_rows: return []
        for row in data_rows:
            cells = row.find_all('td')
            if len(cells) > quarter_col_index:
                link_tag = cells[quarter_col_index].find('a', href=True)
                href_val = link_tag.get('href') if link_tag else None
                if isinstance(href_val, str) and href_val.strip():
                    quarter_links.append(urljoin(base_url, href_val))
        # logging.info(f"Found {len(quarter_links)} quarter links on {full_url}.") # Noisy
        return quarter_links
    except Exception as e:
        logging.error(f"Failed scrape_quarter_links_from_manager for {full_url}. Error: {e}")
        return [] # Return empty list on failure after retries

# ---
def scrape_fund_name(manager_page_url: str) -> str:
    """Scrapes the fund name from the manager's main page (using retries)."""
    # logging.info(f"Scraping fund name from: {manager_page_url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    default_name = "Unknown Fund Name"
    try:
        response = make_request_with_retries(manager_page_url, headers=headers, timeout=25)
        soup = BeautifulSoup(response.text, 'lxml')
        h1_tag = soup.find('h1')
        if h1_tag:
            name = h1_tag.get_text(strip=True)
            if ' CIK#' in name: name = name.split(' CIK#')[0].strip()
            # logging.info(f"Found fund name via H1: {name}") # Noisy
            return name if name else default_name
        else: # Fallback to URL parsing
            logging.warning(f"H1 tag not found on {manager_page_url}. Attempting URL parsing.")
            try:
                path_parts = manager_page_url.strip('/').split('/')
                if len(path_parts) > 3 and path_parts[-2] == 'manager':
                    url_name_part = path_parts[-1]
                    first_dash_index = url_name_part.find('-')
                    if first_dash_index != -1:
                         parsed_name = url_name_part[first_dash_index:].strip('-').replace('-', ' ').title()
                         if parsed_name: return parsed_name
            except Exception: pass
            return default_name
    except Exception as e:
        logging.error(f"Failed scrape_fund_name for {manager_page_url} after retries. Error: {e}")
        return default_name

# ---
def scrape_table_data_and_metadata(url: str) -> Dict[str, Any]:
    """Scrapes table data and metadata from a filing URL (using retries)."""
    # logging.info(f"Scraping table data & metadata from: {url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT, 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest'}
    result: Dict[str, Any] = {'dataframes': [], 'filing_date': None, 'quarter': None}
    html_content = None
    soup = None
    try:
        response_page = make_request_with_retries(url, headers=headers, timeout=45)
        html_content = response_page.text
    except Exception as e:
        logging.error(f"Failed to retrieve page {url} after retries. Error: {e}")
        return result # Cannot proceed

    try:
        soup = BeautifulSoup(html_content, 'lxml')
        # Extract Metadata
        date_dt = soup.find('dt', string=lambda t: t and 'Date filed' in t.strip())
        if date_dt and date_dt.find_next_sibling('dd'): result['filing_date'] = date_dt.find_next_sibling('dd').get_text(strip=True)
        qtr_h = soup.find(['h1', 'h2', 'h3'], string=lambda t: t and ('13F Holdings' in t or 'Quarter' in t))
        if qtr_h: result['quarter'] = qtr_h.get_text(strip=True)
    except Exception as e: logging.error(f"Error parsing metadata from {url}: {e}")

    target_table = soup.find('table', id='filingAggregated') if soup else None
    if not target_table:
        # logging.warning(f"Target table not found. Parsing all tables on page: {url}")
        if soup: result['dataframes'] = parse_tables_directly_bs4(soup, url)
        return result

    headers_list = _extract_headers(target_table)
    if not headers_list:
        logging.error(f"CRITICAL: Failed to extract HTML headers from target table on {url}.")
        result['dataframes'] = parse_tables_directly_bs4(target_table, url, single_table=True)
        return result

    data_url_path = target_table.get('data-url')
    ajax_succeeded = False
    if data_url_path:
        ajax_url = urljoin(BASE_URL, data_url_path)
        # logging.info(f"AJAX endpoint detected. Attempting fetch from: {ajax_url}") # Noisy
        try:
            response_ajax = make_request_with_retries(ajax_url, headers=headers, timeout=45)
            ajax_data = response_ajax.json()
            table_rows_data = ajax_data.get('data')
            if table_rows_data and isinstance(table_rows_data, list):
                expected_cols = len(headers_list)
                cleaned_rows = []
                for row in table_rows_data:
                    if isinstance(row, list):
                        actual_cols = len(row)
                        if actual_cols >= expected_cols:
                            processed_row = row[:expected_cols]
                            cleaned_cells = [BeautifulSoup(str(c), 'lxml').get_text(strip=True) if isinstance(c, str) else c for c in processed_row]
                            cleaned_rows.append(cleaned_cells)
                if cleaned_rows:
                    df = pd.DataFrame(cleaned_rows, columns=headers_list)
                    df['SourceURL'] = url
                    result['dataframes'].append(df)
                    ajax_succeeded = True
                    # logging.info(f"Created DataFrame (Shape: {df.shape}) from AJAX.") # Noisy
            else: logging.warning(f"AJAX 'data' key missing or not list for {ajax_url}.")
        except Exception as e: logging.error(f"AJAX request/processing failed for {ajax_url} after retries. Error: {e}")
        if ajax_succeeded: return result
        else: logging.warning(f"AJAX did not yield DataFrame for {url}.")

    # Fallback if no data-url or AJAX failed
    # logging.info(f"Falling back to direct HTML parsing for target table on: {url}") # Noisy
    result['dataframes'] = parse_tables_directly_bs4(target_table, url, single_table=True)
    return result


# --- Wrapper function for parallel data scraping task ---
def scrape_filing_data_task(data_url: str, fund_name: str) -> List[pd.DataFrame]:
    """Scrapes data for one filing URL and adds fund_name."""
    scrape_result = scrape_table_data_and_metadata(data_url)
    processed_dfs = []
    for df in scrape_result.get('dataframes', []):
        df['fund_name'] = fund_name
        df['filing_date'] = scrape_result.get('filing_date')
        df['quarter'] = scrape_result.get('quarter')
        processed_dfs.append(df)
    return processed_dfs


# ==============================================================================
#                        Execution Stage 1: Get All Manager Links (A-Z Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 1: Scraping All Manager Links via A-Z Index (Parallel)")
print("="*70)

start_time_stage1 = time.time()
all_manager_links = []
manager_counts = {}

# Use ThreadPoolExecutor for parallel execution
# The 'with' statement ensures threads are cleaned up properly
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='AZScrape') as executor:
    all_manager_links, manager_counts = get_all_manager_links_via_az_index(BASE_URL, executor)

end_time_stage1 = time.time()
print(f"\n--- Manager Link Scraping Summary ---")
print(f"Total unique manager links found via A-Z Index: {len(all_manager_links)}")
if manager_counts:
    print("\nCounts per Index Page:")
    # Sort counts by index page URL for consistent output
    for page_url, count in sorted(manager_counts.items()):
        index_char = page_url.split('/')[-1].upper()
        print(f"  Index '{index_char}' ({page_url}): {count} managers")
print(f"Stage 1 Duration: {end_time_stage1 - start_time_stage1:.2f} seconds")

if not all_manager_links:
    print("\nCRITICAL: No manager links were found. Cannot proceed.")
    exit()

# Optional: Limit managers for testing (apply AFTER getting all links)
test_limit_stage1 = 100 # Set a reasonable limit for testing
if len(all_manager_links) > test_limit_stage1:
    logging.info(f"TESTING: Limiting run to first {test_limit_stage1} managers found.")
    print(f"\n*** LIMITING MANAGERS TO {test_limit_stage1} FOR TESTING/ASSESSMENT ***\n")
    manager_links_relative = all_manager_links[:test_limit_stage1]
else:
     manager_links_relative = all_manager_links # Process all if less than limit

time.sleep(random.uniform(0.5, 1.0)) # Small delay


# ==============================================================================
#                 Execution Stage 2: Get Quarter Links (Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 2: Scraping Quarter Links for Each Selected Manager (Parallel)")
print("="*70)

start_time_stage2 = time.time()
manager_quarter_links: Dict[str, List[str]] = {}
num_managers_to_process = len(manager_links_relative)
processed_count_stage2 = 0

with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='QuarterLinkScrape') as executor:
    future_to_rel_link = {executor.submit(scrape_quarter_links_task, rel_link, BASE_URL): rel_link for rel_link in manager_links_relative}

    for future in as_completed(future_to_rel_link):
        rel_link = future_to_rel_link[future]
        processed_count_stage2 += 1
        try:
            _rel_link_result, links = future.result() # Unpack tuple
            manager_quarter_links[rel_link] = links
            # Log progress periodically or based on count
            if processed_count_stage2 % 50 == 0 or processed_count_stage2 == num_managers_to_process:
                 logging.info(f"Quarter links progress: {processed_count_stage2}/{num_managers_to_process} managers processed.")
                 print(f"  Processed quarter links for manager {processed_count_stage2}/{num_managers_to_process}...")
            if not links:
                 logging.warning(f"No quarter links found or error occurred for {rel_link}")
        except Exception as exc:
            logging.error(f"Getting quarter links for {rel_link} generated an exception: {exc}")
            manager_quarter_links[rel_link] = [] # Ensure entry exists even on failure

end_time_stage2 = time.time()
print(f"\nFinished getting quarter links stage for {len(manager_quarter_links)} managers.")
print(f"Stage 2 Duration: {end_time_stage2 - start_time_stage2:.2f} seconds")

# ==============================================================================
#                 Execution Stage 3: Scrape Filing Data (Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 3: Scraping Filing Data for Each Manager/Quarter (Parallel)")
print("="*70)

start_time_stage3 = time.time()
all_scraped_dataframes: List[pd.DataFrame] = []
fund_name_map: Dict[str, str] = {} # Cache fund names

# Prepare list of tasks: (data_url, fund_name)
tasks_to_submit = []
print("Preparing filing data scrape tasks...")
for i, (manager_link, quarter_links) in enumerate(manager_quarter_links.items()):
    # Get or Cache Fund Name
    if manager_link not in fund_name_map:
         manager_page_url = urljoin(BASE_URL, manager_link)
         fund_name_map[manager_link] = scrape_fund_name(manager_page_url)
         # Brief pause after name scrape, before launching quarter scrapes
         time.sleep(random.uniform(0.1, 0.4))

    current_fund_name = fund_name_map[manager_link]

    # Determine URLs to scrape for this manager
    urls_to_scrape = quarter_links if quarter_links else [urljoin(BASE_URL, manager_link)]
    if not quarter_links:
        logging.warning(f"No specific quarter links for {manager_link}, will attempt manager page scrape.")

    for data_url in urls_to_scrape:
        tasks_to_submit.append((data_url, current_fund_name))

total_filing_urls = len(tasks_to_submit)
print(f"Prepared {total_filing_urls} total filing URLs to scrape in parallel.")
logging.info(f"Submitting {total_filing_urls} filing data scraping tasks to executor.")

processed_count_stage3 = 0
# Use ThreadPoolExecutor for parallel data scraping
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='FilingDataScrape') as executor:
    future_to_task_info = {executor.submit(scrape_filing_data_task, url, name): (url, name) for url, name in tasks_to_submit}

    for future in as_completed(future_to_task_info):
        url, name = future_to_task_info[future]
        processed_count_stage3 += 1
        try:
            list_of_dfs = future.result() # Result is a list of DFs for that URL
            if list_of_dfs:
                all_scraped_dataframes.extend(list_of_dfs)
                # Log progress periodically
                if processed_count_stage3 % 100 == 0 or processed_count_stage3 == total_filing_urls:
                    logging.info(f"Filing data progress: {processed_count_stage3}/{total_filing_urls} URLs processed. Total DFs collected: {len(all_scraped_dataframes)}")
                    print(f"  Processed filing URL {processed_count_stage3}/{total_filing_urls}...")
            # else: # No dataframes found for this URL (already logged in scrape function)
                # pass
        except Exception as exc:
            logging.error(f"Scraping task for URL {url} (Fund: {name}) generated an exception: {exc}")

end_time_stage3 = time.time()
print("\n--- Finished Scraping All Filing Data ---")
print(f"Stage 3 Duration: {end_time_stage3 - start_time_stage3:.2f} seconds")


# ==============================================================================
#               Execution Stage 4: Data Consolidation & Processing
# ==============================================================================
print("\n" + "="*70)
print(" Stage 4: Consolidating and Processing All Scraped Data")
print("="*70)

start_time_stage4 = time.time()

if not all_scraped_dataframes:
    logging.warning("No dataframes were scraped in Stage 3. Cannot proceed with processing.")
    final_processed_df = pd.DataFrame() # Ensure variable exists but is empty
else:
    # --- Combine all scraped dataframes ---
    logging.info(f"Combining {len(all_scraped_dataframes)} scraped dataframes...")
    print(f"Combining {len(all_scraped_dataframes)} scraped tables...")
    try:
        combined_df = pd.concat(all_scraped_dataframes, ignore_index=True, sort=False)
        logging.info(f"Initial combined DataFrame shape: {combined_df.shape}")
        print(f"Combined DataFrame shape: {combined_df.shape}.")
        # It's crucial to work on a copy for significant processing
        processed_df = combined_df.copy()
        del combined_df, all_scraped_dataframes # Free up memory
    except Exception as e:
        logging.critical(f"CRITICAL Error during DataFrame concatenation: {e}", exc_info=True)
        processed_df = pd.DataFrame() # Assign empty df on critical error

# --- Process the combined dataframe (only if concatenation was successful) ---
if not processed_df.empty:
    logging.info("Starting data processing steps...")
    print("\nProcessing combined data...")

    # --- Standardize Column Names ---
    processed_df.columns = [_clean_header(col) for col in processed_df.columns]
    logging.info(f"Standardized column names: {processed_df.columns.tolist()}")

    # --- Filter for Common Stock ('COM') ---
    if 'cl' in processed_df.columns:
        original_rows = len(processed_df)
        processed_df = processed_df[processed_df['cl'].astype(str).str.upper() == 'COM']
        logging.info(f"Filtered for 'cl' == 'COM'. Rows reduced from {original_rows} to {len(processed_df)}.")
        print(f"Filtered for 'COM' stock. Rows remaining: {len(processed_df)}.")
    else: logging.warning("'cl' column not found.")

    # --- Data Type Conversion and Cleaning ---
    logging.info("Cleaning and converting data types...")
    # Shares
    if 'shares' in processed_df.columns:
        processed_df['shares'] = processed_df['shares'].astype(str).str.replace(',', '', regex=False)
        processed_df['shares_numeric'] = pd.to_numeric(processed_df['shares'], errors='coerce')
        if processed_df['shares_numeric'].isna().any(): logging.warning("Non-numeric shares coerced to NaN.")
    else: processed_df['shares_numeric'] = np.nan
    # Value
    value_col = 'value_usd_000'
    if value_col in processed_df.columns:
         processed_df[value_col] = processed_df[value_col].astype(str).str.replace(',', '', regex=False)
         processed_df[value_col] = pd.to_numeric(processed_df[value_col], errors='coerce')
         if processed_df[value_col].isna().any(): logging.warning(f"Non-numeric '{value_col}' coerced to NaN.")
    else: logging.warning(f"Value column '{value_col}' not found.")
    # Quarter Period
    if 'quarter' in processed_df.columns:
        def parse_quarter_to_period(q_str: Optional[str]) -> Optional[pd.Period]:
            if not isinstance(q_str, str): return pd.NaT
            match = re.search(r'(Q[1-4])\s*(\d{4})', q_str, re.IGNORECASE)
            if match:
                q_num, year = match.group(1).upper(), match.group(2)
                try: return pd.Period(f"{year}{q_num}", freq='Q')
                except ValueError: return pd.NaT
            return pd.NaT
        processed_df['quarter_period'] = processed_df['quarter'].apply(parse_quarter_to_period)
        if processed_df['quarter_period'].isna().any(): logging.warning("Some quarters could not be parsed.")
    else: processed_df['quarter_period'] = pd.NaT

    # --- Identify Stock Identifier ---
    stock_id_col = 'sym' if 'sym' in processed_df.columns else ('stock_symbol' if 'stock_symbol' in processed_df.columns else None)
    if not stock_id_col: logging.error("CRITICAL: No stock identifier found.")

    # --- Calculate Changes ---
    can_calculate_changes = stock_id_col and ('shares_numeric' in processed_df.columns) and ('quarter_period' in processed_df.columns) and processed_df['quarter_period'].notna().any()
    if can_calculate_changes:
        logging.info(f"Calculating changes grouped by 'fund_name', '{stock_id_col}'...")
        print(f"Calculating quarterly changes grouped by fund and '{stock_id_col}'...")
        essential_cols = ['fund_name', stock_id_col, 'quarter_period', 'shares_numeric']
        processed_df.dropna(subset=[col for col in essential_cols if col in processed_df.columns], inplace=True)
        processed_df = processed_df.sort_values(by=['fund_name', stock_id_col, 'quarter_period'], ascending=True)
        group_cols = ['fund_name', stock_id_col]
        for col in ['change', 'pct_change', 'inferred_transaction_type']:
             if col not in processed_df.columns: processed_df[col] = pd.NA
        processed_df['change'] = processed_df.groupby(group_cols, observed=True)['shares_numeric'].diff().fillna(0)
        pct_change_raw = processed_df.groupby(group_cols, observed=True)['shares_numeric'].pct_change()
        processed_df['pct_change'] = pct_change_raw.replace([np.inf, -np.inf], np.nan).fillna(0)
        change_numeric = pd.to_numeric(processed_df['change'], errors='coerce')
        conditions = [change_numeric < 0, change_numeric == 0, change_numeric > 0]
        choices = ['Sell', 'Hold', 'Buy']
        processed_df['inferred_transaction_type'] = np.select(conditions, choices, default='Unknown')
        logging.info("Change calculations complete.")
        print("Quarterly change calculations complete.")
        try: processed_df['change'] = processed_df['change'].astype(pd.Int64Dtype())
        except TypeError: pass
    else:
        logging.warning("Skipping quarterly change calculations.")
        print("Skipping quarterly change calculations.")
        for col in ['change', 'pct_change', 'inferred_transaction_type']:
             if col not in processed_df.columns: processed_df[col] = pd.NA

    # --- Final Column Formatting ---
    logging.info("Formatting final columns...")
    print("\nFormatting final columns for output...")
    if stock_id_col == 'sym' and 'sym' in processed_df.columns:
        processed_df.rename(columns={'sym': 'stock symbol'}, inplace=True)
        logging.info("Renamed 'sym' column to 'stock symbol'.")
    elif 'stock symbol' not in processed_df.columns and 'stock symbol' in TARGET_FINAL_COLUMNS:
         logging.warning("Final target 'stock symbol' column not found. Creating with NaNs.")
         processed_df['stock symbol'] = np.nan

    final_columns_existing = [col for col in TARGET_FINAL_COLUMNS if col in processed_df.columns]
    missing_target_cols = [col for col in TARGET_FINAL_COLUMNS if col not in final_columns_existing]
    if missing_target_cols: logging.warning(f"Final target columns missing: {missing_target_cols}")

    final_processed_df = processed_df[final_columns_existing]
    logging.info(f"Final selected columns: {final_processed_df.columns.tolist()}")
    print(f"Final DataFrame columns: {final_processed_df.columns.tolist()}")
    final_processed_df = final_processed_df.drop(columns=['shares_numeric', 'quarter_period'], errors='ignore')

else: # combined_df was empty or processing failed
    logging.warning("Combined DataFrame was empty or processing failed.")
    final_processed_df = pd.DataFrame()

end_time_stage4 = time.time()
print(f"\nStage 4 Duration: {end_time_stage4 - start_time_stage4:.2f} seconds")


# ==============================================================================
#                        Execution Stage 5: Final Output
# ==============================================================================
print("\n" + "="*70)
print(" Stage 5: Final Output")
print("="*70)

if not final_processed_df.empty:
    # --- Print Head ---
    print(f"\n--- First 5 rows of the final DataFrame (Shape: {final_processed_df.shape}) ---")
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 200)
    pd.set_option('display.max_colwidth', 70)
    print(final_processed_df.head().to_string())
    pd.reset_option('display.max_colwidth')

    # --- Save to CSV ---
    print(f"\n--- Saving final DataFrame to '{CSV_FILENAME}' ---")
    try:
        final_processed_df.to_csv(CSV_FILENAME, index=False, encoding='utf-8-sig')
        print(f"Successfully saved data ({len(final_processed_df)} rows) to '{CSV_FILENAME}'")
    except Exception as e:
        print(f"CRITICAL ERROR: Failed to save the final DataFrame to CSV '{CSV_FILENAME}'. Error: {e}")

else:
    print("\nFinal DataFrame is empty or was not created due to errors. No CSV file saved.")

# --- Calculate and Print Total Runtime ---
total_end_time = time.time()
# Need start time from the very beginning if defined, otherwise calculate from stages
# Assuming start_time_stage1 exists
if 'start_time_stage1' in locals():
     total_runtime = total_end_time - start_time_stage1
     print(f"\nTotal Script Runtime: {total_runtime:.2f} seconds ({total_runtime/60:.2f} minutes)")
else:
     # Sum stage durations if start_time_stage1 wasn't captured
     total_runtime = (end_time_stage1 - start_time_stage1 if 'start_time_stage1' in locals() else 0) + \
                     (end_time_stage2 - start_time_stage2 if 'start_time_stage2' in locals() else 0) + \
                     (end_time_stage3 - start_time_stage3 if 'start_time_stage3' in locals() else 0) + \
                     (end_time_stage4 - start_time_stage4 if 'start_time_stage4' in locals() else 0)
     if total_runtime > 0:
          print(f"\nTotal Script Runtime (Sum of Stages): {total_runtime:.2f} seconds ({total_runtime/60:.2f} minutes)")


print("\n--- Script execution finished ---")


 Stage 1: Scraping All Manager Links via A-Z Index (Parallel)
Found 27 A-Z index pages. Processing in parallel...
  Processed index '0'. Found 72 links. Total unique: 72
  Processed index 'J'. Found 170 links. Total unique: 242
  Processed index 'K'. Found 309 links. Total unique: 551
  Processed index 'D'. Found 346 links. Total unique: 897
  Processed index 'N'. Found 363 links. Total unique: 1260
  Processed index 'Q'. Found 73 links. Total unique: 1333
  Processed index 'I'. Found 315 links. Total unique: 1648
  Processed index 'O'. Found 250 links. Total unique: 1898
  Processed index 'G'. Found 484 links. Total unique: 2382
  Processed index 'H'. Found 458 links. Total unique: 2840
  Processed index 'L'. Found 444 links. Total unique: 3284
  Processed index 'A'. Found 1000 links. Total unique: 4284
  Processed index 'E'. Found 369 links. Total unique: 4653
  Processed index 'F'. Found 579 links. Total unique: 5232
  Processed index 'B'. Found 777 links. Total unique: 6009
  Proc



  Processed filing URL 200/3284...
  Processed filing URL 300/3284...
  Processed filing URL 400/3284...
  Processed filing URL 500/3284...
  Processed filing URL 600/3284...
  Processed filing URL 700/3284...
  Processed filing URL 800/3284...
  Processed filing URL 900/3284...
  Processed filing URL 1000/3284...
  Processed filing URL 1100/3284...
  Processed filing URL 1200/3284...
  Processed filing URL 1300/3284...
  Processed filing URL 1400/3284...
  Processed filing URL 1500/3284...
  Processed filing URL 1600/3284...
  Processed filing URL 1700/3284...
  Processed filing URL 1800/3284...
  Processed filing URL 1900/3284...
  Processed filing URL 2000/3284...
  Processed filing URL 2100/3284...
  Processed filing URL 2200/3284...
  Processed filing URL 2300/3284...
  Processed filing URL 2400/3284...
  Processed filing URL 2500/3284...
  Processed filing URL 2600/3284...
  Processed filing URL 2700/3284...
  Processed filing URL 2800/3284...




  Processed filing URL 2900/3284...
  Processed filing URL 3000/3284...
  Processed filing URL 3100/3284...
  Processed filing URL 3200/3284...
  Processed filing URL 3284/3284...

--- Finished Scraping All Filing Data ---
Stage 3 Duration: 4083.96 seconds

 Stage 4: Consolidating and Processing All Scraped Data
Combining 3283 scraped tables...


  combined_df = pd.concat(all_scraped_dataframes, ignore_index=True, sort=False)


Combined DataFrame shape: (3194491, 13).

Processing combined data...
Filtered for 'COM' stock. Rows remaining: 1329679.




Calculating quarterly changes grouped by fund and 'sym'...
Quarterly change calculations complete.

Formatting final columns for output...
Final DataFrame columns: ['fund_name', 'filing_date', 'quarter', 'stock symbol', 'cl', 'value_usd_000', 'shares', 'change', 'pct_change', 'inferred_transaction_type']

Stage 4 Duration: 70.25 seconds

 Stage 5: Final Output

--- First 5 rows of the final DataFrame (Shape: (1299736, 10)) ---
                      fund_name filing_date               quarter stock symbol   cl  value_usd_000    shares  change  pct_change inferred_transaction_type
2691865  300 NORTH CAPITAL, LLC   2/14/2014  Q4 2013 13F Holdings          ABC  COM           6016   85598.0       0    0.000000                      Hold
2691761  300 NORTH CAPITAL, LLC   5/12/2014  Q1 2014 13F Holdings          ABC  COM           6035   92000.0    6402    0.074791                       Buy
2691646  300 NORTH CAPITAL, LLC    8/5/2014  Q2 2014 13F Holdings          ABC  COM           7059   971

**Code for Full-Scale  13F Filings Scraper (All Managers, No Caps)**

In [None]:
# ==============================================================================
#                            Imports and Setup
# ==============================================================================
import requests
import pandas as pd
from bs4 import BeautifulSoup
import time
import random
import re
import numpy as np
import os
from datetime import datetime
from urllib.parse import urljoin
import json
import logging
from typing import List, Dict, Optional, Tuple, Any, Set
from concurrent.futures import ThreadPoolExecutor, as_completed # For Parallelism
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_exception # For Backoff

# --- Configuration ---

# Logging Setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)-8s - %(threadName)-10s - %(message)s', # Added threadName
    datefmt='%Y-%m-%d %H:%M:%S'
)
# Suppress overly verbose logs from underlying libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)

# --- Constants ---
BASE_URL: str = "https://13f.info"
DEFAULT_USER_AGENT: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
CSV_FILENAME: str = "Fund Manager Shares Analysis.csv"

# Define the exact columns desired in the final output, in order
TARGET_FINAL_COLUMNS: List[str] = [
    'fund_name', 'filing_date', 'quarter', 'stock symbol', 'cl',
    'value_usd_000', 'shares', 'change', 'pct_change', 'inferred_transaction_type'
]

# Parallelism Configuration
MAX_WORKERS: int = 15 # Adjust based on your system/network and risk tolerance (start lower, e.g., 10)
# Backoff Configuration for Tenacity
MAX_RETRIES: int = 4 # Max attempts per request (1 initial + 3 retries)
INITIAL_BACKOFF_DELAY: float = 1.0 # Seconds for first retry delay base
MAX_BACKOFF_DELAY: float = 10.0 # Max seconds to wait between retries

logging.info("Script started. Libraries imported, logging and constants configured.")
logging.info(f"Parallelism enabled with MAX_WORKERS={MAX_WORKERS}")
logging.info(f"Exponential backoff configured: MAX_RETRIES={MAX_RETRIES}, INITIAL_DELAY={INITIAL_BACKOFF_DELAY}s, MAX_DELAY={MAX_BACKOFF_DELAY}s")

# ==============================================================================
#              Request Function with Exponential Backoff (using Tenacity)
# ==============================================================================

# Define what network/connection exceptions tenacity should retry on
RETRYABLE_NETWORK_EXCEPTIONS = (
    requests.exceptions.Timeout,
    requests.exceptions.ConnectionError,
    requests.exceptions.ChunkedEncodingError # Can happen on intermittent network issues
)

# Define specific HTTP status codes to retry on (server errors and rate limiting)
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}

# Custom retry condition check for HTTP errors
def _should_retry_http_error(exception: BaseException) -> bool:
    """Return True if the exception is an HTTPError with a retryable status code."""
    return (
        isinstance(exception, requests.exceptions.HTTPError) and
        hasattr(exception, 'response') and # Ensure response attribute exists
        exception.response is not None and
        exception.response.status_code in RETRYABLE_STATUS_CODES
    )

# Define the retry decorator
retry_config = retry(
    stop=stop_after_attempt(MAX_RETRIES),
    wait=wait_exponential(multiplier=1, min=INITIAL_BACKOFF_DELAY, max=MAX_BACKOFF_DELAY),
    # Retry on specific network exceptions OR specific HTTP error codes
    retry=(retry_if_exception_type(RETRYABLE_NETWORK_EXCEPTIONS) | retry_if_exception(_should_retry_http_error)),
    # Log before sleeping on retry
    before_sleep=lambda retry_state: logging.warning(
        f"Retrying request for {retry_state.args[0]} "
        f"due to {retry_state.outcome.exception().__class__.__name__}: {retry_state.outcome.exception()} "
        f"(Attempt {retry_state.attempt_number}/{MAX_RETRIES}). Waiting {retry_state.next_action.sleep:.2f}s..."
    )
)

@retry_config
def make_request_with_retries(url: str, headers: Dict[str, str], params: Optional[Dict[str, Any]] = None, timeout: int = 45) -> requests.Response:
    """
    Makes a GET request using requests library with exponential backoff for specific errors,
    leveraging the tenacity library.

    Args:
        url (str): The URL to request.
        headers (Dict[str, str]): Request headers.
        params (Optional[Dict[str, Any]]): URL parameters.
        timeout (int): Request timeout in seconds.

    Returns:
        requests.Response: The response object if successful within retry limits.

    Raises:
        requests.exceptions.RequestException: If the request fails after all retries (or for non-retryable HTTP errors).
        Exception: For other unexpected errors during the request phase.
    """
    try:
        response = requests.get(url, headers=headers, params=params, timeout=timeout)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        return response # Success!
    except requests.exceptions.HTTPError as http_err:
        # Log the final non-retryable HTTP error before raising
        if not _should_retry_http_error(http_err):
            logging.error(f"Request to {url} failed permanently with status {http_err.response.status_code}. Error: {http_err}")
        raise # Re-raise the exception (tenacity decides based on should_retry)
    except requests.exceptions.RequestException as req_err:
        # Log other request exceptions before raising/retrying
        logging.error(f"Request to {url} failed with connection/timeout error: {req_err}")
        raise # Re-raise the exception for tenacity
    except Exception as e:
         logging.error(f"Unexpected error during request to {url}: {e}", exc_info=True)
         raise # Re-raise unexpected errors

# ==============================================================================
#                        Scraping Function Definitions
# ==============================================================================
# --- Helper Functions ---
def _clean_header(header_text: str) -> str:
    """Helper function to consistently clean table header text."""
    if not isinstance(header_text, str): return ""
    text = header_text.lower().strip()
    text = text.replace(' ', '_')
    text = text.replace('($000)', 'usd_000')
    text = text.replace('%', 'pct')
    text = re.sub(r'[^\w_]+', '', text) # Keep underscores
    return text

def _extract_headers(table_tag: BeautifulSoup) -> List[str]:
    """Robustly extracts and cleans header text from a table tag."""
    header_cells = []
    thead = table_tag.find('thead')
    if thead:
        header_row = thead.find('tr')
        if header_row: header_cells = header_row.find_all(['th', 'td'])
        if not header_cells: header_cells = thead.find_all('th')
    if not header_cells:
        first_row = table_tag.find('tr')
        if first_row: header_cells = first_row.find_all(['th', 'td'])
    if not header_cells:
        logging.warning("Could not find any header cells (th/td) using multiple strategies.")
        return []
    headers = [_clean_header(h.get_text()) for h in header_cells]
    headers = [h for h in headers if h] # Filter empty strings
    # logging.info(f"Extracted {len(headers)} headers: {headers}") # Can be noisy
    return headers

# --- Fallback BS4 Parsing ---
def parse_tables_directly_bs4(soup_or_table_tag: Any, url: str, single_table: bool = False) -> List[pd.DataFrame]:
    """Parses table(s) directly from HTML using BeautifulSoup. Used as a fallback."""
    dataframes = []
    potential_tables = []
    if single_table:
        potential_tables = [soup_or_table_tag] if soup_or_table_tag else []
        mode = "single table"
    else:
        potential_tables = soup_or_table_tag.find_all('table', {'class': re.compile(r'\btable\b')}) if soup_or_table_tag else []
        mode = "all tables"
    # logging.info(f"[Direct BS4 - {mode}] Parsing {len(potential_tables)} potential table(s) on {url}")
    if not potential_tables: return []

    for i, table_tag in enumerate(potential_tables):
        # logging.debug(f"[Direct BS4] Processing table {i+1}")
        headers_list = _extract_headers(table_tag)
        if not headers_list: continue
        tbody = table_tag.find('tbody')
        rows_in_body = tbody.find_all('tr') if tbody else (table_tag.find_all('tr')[1:] if len(table_tag.find_all('tr')) > 1 else [])
        if not rows_in_body: continue

        data_rows = []
        expected_cols = len(headers_list)
        column_mismatch_logged = False
        for row_idx, row in enumerate(rows_in_body):
            cells = row.find_all('td')
            actual_cols = len(cells)
            if actual_cols == expected_cols:
                row_data = [c.get_text(strip=True) if c else None for c in cells]
                data_rows.append(row_data)
            else:
                 if not column_mismatch_logged:
                    logging.warning(f"[Direct BS4] Table {i+1} on {url}: Row {row_idx} has {actual_cols} cells, expected {expected_cols}. Skipping mismatched rows for this table.")
                    column_mismatch_logged = True
        if data_rows:
            try:
                df = pd.DataFrame(data_rows, columns=headers_list)
                df['SourceURL'] = url
                df = df.replace(r'^\s*$', np.nan, regex=True)
                dataframes.append(df)
                # logging.info(f"[Direct BS4] Successfully created DataFrame (Shape: {df.shape}) from table {i+1} on {url}")
            except Exception as e:
                 logging.error(f"[Direct BS4] Error creating DataFrame for table {i+1} on {url}: {e}", exc_info=True)
    return dataframes

# --- Primary Scraping Functions (using make_request_with_retries) ---

def scrape_az_index_links(managers_index_page_url: str) -> List[str]:
    """Scrapes the A-Z (and 0-9) index links from the main managers listing page."""
    logging.info(f"Scraping A-Z index links from: {managers_index_page_url}")
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    index_links = set()
    try:
        response = make_request_with_retries(managers_index_page_url, headers=headers, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')
        index_container = soup.find('div', class_='mb-12')
        links_in_container = index_container.find_all('a', href=True) if index_container else soup.select('ul > li > a[href]') # Fallback selector

        for link_tag in links_in_container:
            href = link_tag.get('href')
            if isinstance(href, str) and href.startswith('/managers/') and len(href.split('/')[-1]) == 1:
                index_links.add(href)

        if not index_links:
             logging.warning(f"No valid A-Z index links extracted from {managers_index_page_url}")
             return []

        logging.info(f"Found {len(index_links)} A-Z index links: {sorted(list(index_links))}")
        return sorted(list(index_links))

    except Exception as e: # Catch errors after retries from make_request or parsing errors
        logging.error(f"Failed to scrape or parse A-Z index page {managers_index_page_url} after retries. Error: {e}")
        return []


def scrape_managers_from_letter_page(letter_page_relative_url: str, base_url: str) -> Set[str]:
    """Scrapes all actual manager links from a specific A-Z index page."""
    full_url = urljoin(base_url, letter_page_relative_url)
    # logging.info(f"Scraping manager links from letter page: {full_url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    manager_links_on_page = set()
    try:
        response = make_request_with_retries(full_url, headers=headers, timeout=45)
        soup = BeautifulSoup(response.text, 'html.parser')
        all_links_on_page = soup.find_all('a', href=True)
        for link_tag in all_links_on_page:
            href = link_tag.get('href')
            if isinstance(href, str) and href.startswith('/manager/'):
                path_parts = href.strip('/').split('/')
                if len(path_parts) == 2 and path_parts[0] == 'manager' and len(path_parts[1]) > 1:
                    manager_links_on_page.add(href)
        # logging.info(f"Found {len(manager_links_on_page)} manager links on {full_url}") # Can be noisy
        return manager_links_on_page
    except Exception as e:
        logging.error(f"Failed to scrape or parse letter page {full_url} after retries. Error: {e}")
        return set()


def get_all_manager_links_via_az_index(base_url: str, executor: ThreadPoolExecutor) -> Tuple[List[str], Dict[str, int]]:
    """
    Orchestrates scraping manager links from A-Z index pages in parallel.
    """
    all_unique_manager_links: Set[str] = set()
    manager_counts_per_index_page: Dict[str, int] = {}
    managers_main_page_url = urljoin(base_url, "/managers")

    logging.info("Starting A-Z index link scraping...")
    az_index_links = scrape_az_index_links(managers_main_page_url)
    if not az_index_links:
        logging.critical("Could not retrieve A-Z index links. Cannot proceed.")
        return [], {}

    total_az_pages = len(az_index_links)
    logging.info(f"Found {total_az_pages} A-Z index pages to process in parallel.")
    print(f"Found {total_az_pages} A-Z index pages. Processing in parallel...")

    # Submit tasks to the executor
    future_to_url = {executor.submit(scrape_managers_from_letter_page, url, base_url): url for url in az_index_links}
    processed_count = 0

    for future in as_completed(future_to_url):
        rel_url = future_to_url[future]
        index_char = rel_url.split('/')[-1].upper()
        processed_count += 1
        try:
            manager_links_set = future.result() # Get the set of links from the completed task
            count = len(manager_links_set)
            manager_counts_per_index_page[rel_url] = count
            all_unique_manager_links.update(manager_links_set)
            logging.info(f"[{processed_count}/{total_az_pages}] Index '{index_char}' completed. Found {count} links. Total unique: {len(all_unique_manager_links)}")
            print(f"  Processed index '{index_char}'. Found {count} links. Total unique: {len(all_unique_manager_links)}")
        except Exception as exc:
            logging.error(f"Index page {rel_url} generated an exception: {exc}")
            manager_counts_per_index_page[rel_url] = 0 # Record failure

    final_unique_count = len(all_unique_manager_links)
    logging.info(f"Finished parallel A-Z index scraping. Total unique manager links found: {final_unique_count}")
    print(f"\nFinished A-Z index scraping. Found {final_unique_count} unique manager links.")
    return sorted(list(all_unique_manager_links)), manager_counts_per_index_page


def scrape_quarter_links_task(rel_link: str, base_url: str) -> Tuple[str, List[str]]:
    """Task wrapper for scraping quarter links for use with executor."""
    links = scrape_quarter_links_from_manager(rel_link, base_url) # Uses make_request internally
    return rel_link, links

def scrape_quarter_links_from_manager(relative_manager_link: str, base_url: str) -> List[str]:
    """Scrapes quarterly filing URLs from a manager page."""
    full_url = urljoin(base_url, relative_manager_link)
    # logging.info(f"Scraping quarter links from: {full_url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    quarter_links = []
    try:
        response = make_request_with_retries(full_url, headers=headers, timeout=30)
        soup = BeautifulSoup(response.text, 'html.parser')
        table = soup.find('table', {'class': re.compile(r'\btable\b')})
        if not table: return []
        header_row = table.find('thead') or table.find('tr')
        if not header_row: return []
        headers_list = header_row.find_all(['th', 'td'])
        quarter_col_index = -1
        for i, header in enumerate(headers_list):
            if header.get_text(strip=True).lower() == "quarter":
                quarter_col_index = i; break
        if quarter_col_index == -1: return []
        tbody = table.find('tbody')
        data_rows = tbody.find_all('tr') if tbody else (table.find_all('tr')[1:] if len(table.find_all('tr')) > 1 else [])
        if not data_rows: return []
        for row in data_rows:
            cells = row.find_all('td')
            if len(cells) > quarter_col_index:
                link_tag = cells[quarter_col_index].find('a', href=True)
                href_val = link_tag.get('href') if link_tag else None
                if isinstance(href_val, str) and href_val.strip():
                    quarter_links.append(urljoin(base_url, href_val))
        # logging.info(f"Found {len(quarter_links)} quarter links on {full_url}.") # Noisy
        return quarter_links
    except Exception as e:
        logging.error(f"Failed scrape_quarter_links_from_manager for {full_url}. Error: {e}")
        return [] # Return empty list on failure after retries

# ---
def scrape_fund_name(manager_page_url: str) -> str:
    """Scrapes the fund name from the manager's main page (using retries)."""
    # logging.info(f"Scraping fund name from: {manager_page_url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT}
    default_name = "Unknown Fund Name"
    try:
        response = make_request_with_retries(manager_page_url, headers=headers, timeout=25)
        soup = BeautifulSoup(response.text, 'lxml')
        h1_tag = soup.find('h1')
        if h1_tag:
            name = h1_tag.get_text(strip=True)
            if ' CIK#' in name: name = name.split(' CIK#')[0].strip()
            # logging.info(f"Found fund name via H1: {name}") # Noisy
            return name if name else default_name
        else: # Fallback to URL parsing
            logging.warning(f"H1 tag not found on {manager_page_url}. Attempting URL parsing.")
            try:
                path_parts = manager_page_url.strip('/').split('/')
                if len(path_parts) > 3 and path_parts[-2] == 'manager':
                    url_name_part = path_parts[-1]
                    first_dash_index = url_name_part.find('-')
                    if first_dash_index != -1:
                         parsed_name = url_name_part[first_dash_index:].strip('-').replace('-', ' ').title()
                         if parsed_name: return parsed_name
            except Exception: pass
            return default_name
    except Exception as e:
        logging.error(f"Failed scrape_fund_name for {manager_page_url} after retries. Error: {e}")
        return default_name

# ---
def scrape_table_data_and_metadata(url: str) -> Dict[str, Any]:
    """Scrapes table data and metadata from a filing URL (using retries)."""
    # logging.info(f"Scraping table data & metadata from: {url}") # Noisy
    headers = {'User-Agent': DEFAULT_USER_AGENT, 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest'}
    result: Dict[str, Any] = {'dataframes': [], 'filing_date': None, 'quarter': None}
    html_content = None
    soup = None
    try:
        response_page = make_request_with_retries(url, headers=headers, timeout=45)
        html_content = response_page.text
    except Exception as e:
        logging.error(f"Failed to retrieve page {url} after retries. Error: {e}")
        return result # Cannot proceed

    try:
        soup = BeautifulSoup(html_content, 'lxml')
        # Extract Metadata
        date_dt = soup.find('dt', string=lambda t: t and 'Date filed' in t.strip())
        if date_dt and date_dt.find_next_sibling('dd'): result['filing_date'] = date_dt.find_next_sibling('dd').get_text(strip=True)
        qtr_h = soup.find(['h1', 'h2', 'h3'], string=lambda t: t and ('13F Holdings' in t or 'Quarter' in t))
        if qtr_h: result['quarter'] = qtr_h.get_text(strip=True)
    except Exception as e: logging.error(f"Error parsing metadata from {url}: {e}")

    target_table = soup.find('table', id='filingAggregated') if soup else None
    if not target_table:
        # logging.warning(f"Target table not found. Parsing all tables on page: {url}")
        if soup: result['dataframes'] = parse_tables_directly_bs4(soup, url)
        return result

    headers_list = _extract_headers(target_table)
    if not headers_list:
        logging.error(f"CRITICAL: Failed to extract HTML headers from target table on {url}.")
        result['dataframes'] = parse_tables_directly_bs4(target_table, url, single_table=True)
        return result

    data_url_path = target_table.get('data-url')
    ajax_succeeded = False
    if data_url_path:
        ajax_url = urljoin(BASE_URL, data_url_path)
        # logging.info(f"AJAX endpoint detected. Attempting fetch from: {ajax_url}") # Noisy
        try:
            response_ajax = make_request_with_retries(ajax_url, headers=headers, timeout=45)
            ajax_data = response_ajax.json()
            table_rows_data = ajax_data.get('data')
            if table_rows_data and isinstance(table_rows_data, list):
                expected_cols = len(headers_list)
                cleaned_rows = []
                for row in table_rows_data:
                    if isinstance(row, list):
                        actual_cols = len(row)
                        if actual_cols >= expected_cols:
                            processed_row = row[:expected_cols]
                            cleaned_cells = [BeautifulSoup(str(c), 'lxml').get_text(strip=True) if isinstance(c, str) else c for c in processed_row]
                            cleaned_rows.append(cleaned_cells)
                if cleaned_rows:
                    df = pd.DataFrame(cleaned_rows, columns=headers_list)
                    df['SourceURL'] = url
                    result['dataframes'].append(df)
                    ajax_succeeded = True
                    # logging.info(f"Created DataFrame (Shape: {df.shape}) from AJAX.") # Noisy
            else: logging.warning(f"AJAX 'data' key missing or not list for {ajax_url}.")
        except Exception as e: logging.error(f"AJAX request/processing failed for {ajax_url} after retries. Error: {e}")
        if ajax_succeeded: return result
        else: logging.warning(f"AJAX did not yield DataFrame for {url}.")

    # Fallback if no data-url or AJAX failed
    # logging.info(f"Falling back to direct HTML parsing for target table on: {url}") # Noisy
    result['dataframes'] = parse_tables_directly_bs4(target_table, url, single_table=True)
    return result


# --- Wrapper function for parallel data scraping task ---
def scrape_filing_data_task(data_url: str, fund_name: str) -> List[pd.DataFrame]:
    """Scrapes data for one filing URL and adds fund_name."""
    scrape_result = scrape_table_data_and_metadata(data_url)
    processed_dfs = []
    for df in scrape_result.get('dataframes', []):
        df['fund_name'] = fund_name
        df['filing_date'] = scrape_result.get('filing_date')
        df['quarter'] = scrape_result.get('quarter')
        processed_dfs.append(df)
    return processed_dfs


# ==============================================================================
#                        Execution Stage 1: Get All Manager Links (A-Z Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 1: Scraping All Manager Links via A-Z Index (Parallel)")
print("="*70)

start_time_stage1 = time.time()
all_manager_links = []
manager_counts = {}

# Use ThreadPoolExecutor for parallel execution
# The 'with' statement ensures threads are cleaned up properly
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='AZScrape') as executor:
    all_manager_links, manager_counts = get_all_manager_links_via_az_index(BASE_URL, executor)

end_time_stage1 = time.time()
print(f"\n--- Manager Link Scraping Summary ---")
print(f"Total unique manager links found via A-Z Index: {len(all_manager_links)}")
if manager_counts:
    print("\nCounts per Index Page:")
    # Sort counts by index page URL for consistent output
    for page_url, count in sorted(manager_counts.items()):
        index_char = page_url.split('/')[-1].upper()
        print(f"  Index '{index_char}' ({page_url}): {count} managers")
print(f"Stage 1 Duration: {end_time_stage1 - start_time_stage1:.2f} seconds")

if not all_manager_links:
    print("\nCRITICAL: No manager links were found. Cannot proceed.")
    exit()

# --- Remove the Test Limit ---
# The block limiting the managers is now commented out / removed
# Optional: Limit managers for testing (apply AFTER getting all links)
# test_limit_stage1 = 100 # Set a reasonable limit for testing
# if len(all_manager_links) > test_limit_stage1:
#     logging.info(f"TESTING: Limiting run to first {test_limit_stage1} managers found.")
#     print(f"\n*** LIMITING MANAGERS TO {test_limit_stage1} FOR TESTING/ASSESSMENT ***\n")
#     manager_links_relative = all_manager_links[:test_limit_stage1]
# else:
#      manager_links_relative = all_manager_links # Process all if less than limit

# Process ALL manager links found
manager_links_relative = all_manager_links
print(f"\nProcessing ALL {len(manager_links_relative)} unique managers found...")

time.sleep(random.uniform(0.5, 1.0)) # Small delay


# ==============================================================================
#                 Execution Stage 2: Get Quarter Links (Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 2: Scraping Quarter Links for Each Selected Manager (Parallel)")
print("="*70)

start_time_stage2 = time.time()
manager_quarter_links: Dict[str, List[str]] = {}
num_managers_to_process = len(manager_links_relative) # This now uses the full list
processed_count_stage2 = 0
print(f"Fetching quarter links for {num_managers_to_process} managers...")

with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='QuarterLinkScrape') as executor:
    future_to_rel_link = {executor.submit(scrape_quarter_links_task, rel_link, BASE_URL): rel_link for rel_link in manager_links_relative}

    for future in as_completed(future_to_rel_link):
        rel_link = future_to_rel_link[future]
        processed_count_stage2 += 1
        try:
            _rel_link_result, links = future.result() # Unpack tuple
            manager_quarter_links[rel_link] = links
            # Log progress periodically or based on count
            if processed_count_stage2 % 100 == 0 or processed_count_stage2 == num_managers_to_process: # Log every 100
                 logging.info(f"Quarter links progress: {processed_count_stage2}/{num_managers_to_process} managers processed.")
                 print(f"  Processed quarter links for manager {processed_count_stage2}/{num_managers_to_process}...")
            if not links:
                 logging.warning(f"No quarter links found or error occurred for {rel_link}")
        except Exception as exc:
            logging.error(f"Getting quarter links for {rel_link} generated an exception: {exc}")
            manager_quarter_links[rel_link] = [] # Ensure entry exists even on failure

end_time_stage2 = time.time()
print(f"\nFinished getting quarter links stage for {len(manager_quarter_links)} managers.")
print(f"Stage 2 Duration: {end_time_stage2 - start_time_stage2:.2f} seconds")

# ==============================================================================
#                 Execution Stage 3: Scrape Filing Data (Parallel)
# ==============================================================================
print("\n" + "="*70)
print(" Stage 3: Scraping Filing Data for Each Manager/Quarter (Parallel)")
print("="*70)

start_time_stage3 = time.time()
all_scraped_dataframes: List[pd.DataFrame] = []
fund_name_map: Dict[str, str] = {} # Cache fund names

# Prepare list of tasks: (data_url, fund_name)
tasks_to_submit = []
print("Preparing filing data scrape tasks...")
# Use manager_links_relative (which contains ALL managers now) to iterate
for i, manager_link in enumerate(manager_links_relative):
    # Get or Cache Fund Name - Fetch sequentially before parallel data scrape
    # This avoids potential race conditions if multiple threads tried to scrape the same name
    if manager_link not in fund_name_map:
        manager_page_url = urljoin(BASE_URL, manager_link)
        fund_name_map[manager_link] = scrape_fund_name(manager_page_url)
        logging.info(f"Fetched name for {manager_link}: {fund_name_map[manager_link]}")
        if (i + 1) % 50 == 0: # Print progress for name scraping
            print(f"  Fetched names for {i+1}/{num_managers_to_process} managers...")
        time.sleep(random.uniform(0.1, 0.5)) # Small delay after name scrape

    current_fund_name = fund_name_map[manager_link]
    quarter_links = manager_quarter_links.get(manager_link, []) # Get links fetched in Stage 2

    # Determine URLs to scrape for this manager
    urls_to_scrape = quarter_links if quarter_links else [urljoin(BASE_URL, manager_link)]
    if not quarter_links:
        logging.warning(f"No specific quarter links for {manager_link}, will attempt manager page scrape.")

    for data_url in urls_to_scrape:
        tasks_to_submit.append((data_url, current_fund_name))

total_filing_urls = len(tasks_to_submit)
print(f"\nPrepared {total_filing_urls} total filing URLs to scrape in parallel.")
logging.info(f"Submitting {total_filing_urls} filing data scraping tasks to executor.")

processed_count_stage3 = 0
# Use ThreadPoolExecutor for parallel data scraping
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='FilingDataScrape') as executor:
    future_to_task_info = {executor.submit(scrape_filing_data_task, url, name): (url, name) for url, name in tasks_to_submit}

    for future in as_completed(future_to_task_info):
        url, name = future_to_task_info[future]
        processed_count_stage3 += 1
        try:
            list_of_dfs = future.result() # Result is a list of DFs for that URL
            if list_of_dfs:
                all_scraped_dataframes.extend(list_of_dfs)
                # Log progress periodically
                if processed_count_stage3 % 200 == 0 or processed_count_stage3 == total_filing_urls: # Log every 200 URLs
                    logging.info(f"Filing data progress: {processed_count_stage3}/{total_filing_urls} URLs processed. Total DFs collected: {len(all_scraped_dataframes)}")
                    print(f"  Processed filing URL {processed_count_stage3}/{total_filing_urls}...")
            # else: # No dataframes found for this URL (already logged in scrape function)
                # pass
        except Exception as exc:
            logging.error(f"Scraping task for URL {url} (Fund: {name}) generated an exception: {exc}")

end_time_stage3 = time.time()
print("\n--- Finished Scraping All Filing Data ---")
print(f"Collected data resulting in {len(all_scraped_dataframes)} total DataFrames (pre-concat).")
print(f"Stage 3 Duration: {end_time_stage3 - start_time_stage3:.2f} seconds")


# ==============================================================================
#               Execution Stage 4: Data Consolidation & Processing
# ==============================================================================
print("\n" + "="*70)
print(" Stage 4: Consolidating and Processing All Scraped Data")
print("="*70)

start_time_stage4 = time.time()

if not all_scraped_dataframes:
    logging.warning("No dataframes were scraped in Stage 3. Cannot proceed with processing.")
    final_processed_df = pd.DataFrame() # Ensure variable exists but is empty
else:
    # --- Combine all scraped dataframes ---
    logging.info(f"Combining {len(all_scraped_dataframes)} scraped dataframes...")
    print(f"Combining {len(all_scraped_dataframes)} scraped tables...")
    try:
        combined_df = pd.concat(all_scraped_dataframes, ignore_index=True, sort=False)
        logging.info(f"Initial combined DataFrame shape: {combined_df.shape}")
        print(f"Combined DataFrame shape: {combined_df.shape}.")
        # It's crucial to work on a copy for significant processing
        processed_df = combined_df.copy()
        del combined_df, all_scraped_dataframes # Free up memory
    except Exception as e:
        logging.critical(f"CRITICAL Error during DataFrame concatenation: {e}", exc_info=True)
        processed_df = pd.DataFrame() # Assign empty df on critical error

# --- Process the combined dataframe (only if concatenation was successful) ---
if not processed_df.empty:
    logging.info("Starting data processing steps...")
    print("\nProcessing combined data...")

    # --- Standardize Column Names ---
    processed_df.columns = [_clean_header(col) for col in processed_df.columns]
    logging.info(f"Standardized column names: {processed_df.columns.tolist()}")

    # --- Filter for Common Stock ('COM') ---
    if 'cl' in processed_df.columns:
        original_rows = len(processed_df)
        processed_df = processed_df[processed_df['cl'].astype(str).str.upper() == 'COM']
        logging.info(f"Filtered for 'cl' == 'COM'. Rows reduced from {original_rows} to {len(processed_df)}.")
        print(f"Filtered for 'COM' stock. Rows remaining: {len(processed_df)}.")
    else: logging.warning("'cl' column not found.")

    # --- Data Type Conversion and Cleaning ---
    logging.info("Cleaning and converting data types...")
    # Shares
    if 'shares' in processed_df.columns:
        processed_df['shares'] = processed_df['shares'].astype(str).str.replace(',', '', regex=False)
        processed_df['shares_numeric'] = pd.to_numeric(processed_df['shares'], errors='coerce')
        if processed_df['shares_numeric'].isna().any(): logging.warning("Non-numeric shares coerced to NaN.")
    else: processed_df['shares_numeric'] = np.nan
    # Value
    value_col = 'value_usd_000'
    if value_col in processed_df.columns:
         processed_df[value_col] = processed_df[value_col].astype(str).str.replace(',', '', regex=False)
         processed_df[value_col] = pd.to_numeric(processed_df[value_col], errors='coerce')
         if processed_df[value_col].isna().any(): logging.warning(f"Non-numeric '{value_col}' coerced to NaN.")
    else: logging.warning(f"Value column '{value_col}' not found.")
    # Quarter Period
    if 'quarter' in processed_df.columns:
        def parse_quarter_to_period(q_str: Optional[str]) -> Optional[pd.Period]:
            if not isinstance(q_str, str): return pd.NaT
            match = re.search(r'(Q[1-4])\s*(\d{4})', q_str, re.IGNORECASE)
            if match:
                q_num, year = match.group(1).upper(), match.group(2)
                try: return pd.Period(f"{year}{q_num}", freq='Q')
                except ValueError: return pd.NaT
            return pd.NaT
        processed_df['quarter_period'] = processed_df['quarter'].apply(parse_quarter_to_period)
        if processed_df['quarter_period'].isna().any(): logging.warning("Some quarters could not be parsed.")
    else: processed_df['quarter_period'] = pd.NaT

    # --- Identify Stock Identifier ---
    stock_id_col = 'sym' if 'sym' in processed_df.columns else ('stock_symbol' if 'stock_symbol' in processed_df.columns else None)
    if not stock_id_col: logging.error("CRITICAL: No stock identifier found.")

    # --- Calculate Changes ---
    can_calculate_changes = stock_id_col and ('shares_numeric' in processed_df.columns) and ('quarter_period' in processed_df.columns) and processed_df['quarter_period'].notna().any()
    if can_calculate_changes:
        logging.info(f"Calculating changes grouped by 'fund_name', '{stock_id_col}'...")
        print(f"Calculating quarterly changes grouped by fund and '{stock_id_col}'...")
        essential_cols = ['fund_name', stock_id_col, 'quarter_period', 'shares_numeric']
        processed_df.dropna(subset=[col for col in essential_cols if col in processed_df.columns], inplace=True)
        processed_df = processed_df.sort_values(by=['fund_name', stock_id_col, 'quarter_period'], ascending=True)
        group_cols = ['fund_name', stock_id_col]
        for col in ['change', 'pct_change', 'inferred_transaction_type']:
             if col not in processed_df.columns: processed_df[col] = pd.NA
        processed_df['change'] = processed_df.groupby(group_cols, observed=True)['shares_numeric'].diff().fillna(0)
        pct_change_raw = processed_df.groupby(group_cols, observed=True)['shares_numeric'].pct_change()
        processed_df['pct_change'] = pct_change_raw.replace([np.inf, -np.inf], np.nan).fillna(0)
        change_numeric = pd.to_numeric(processed_df['change'], errors='coerce')
        conditions = [change_numeric < 0, change_numeric == 0, change_numeric > 0]
        choices = ['Sell', 'Hold', 'Buy']
        processed_df['inferred_transaction_type'] = np.select(conditions, choices, default='Unknown')
        logging.info("Change calculations complete.")
        print("Quarterly change calculations complete.")
        try: processed_df['change'] = processed_df['change'].astype(pd.Int64Dtype())
        except TypeError: pass
    else:
        logging.warning("Skipping quarterly change calculations.")
        print("Skipping quarterly change calculations.")
        for col in ['change', 'pct_change', 'inferred_transaction_type']:
             if col not in processed_df.columns: processed_df[col] = pd.NA

    # --- Final Column Formatting ---
    logging.info("Formatting final columns...")
    print("\nFormatting final columns for output...")
    if stock_id_col == 'sym' and 'sym' in processed_df.columns:
        processed_df.rename(columns={'sym': 'stock symbol'}, inplace=True)
        logging.info("Renamed 'sym' column to 'stock symbol'.")
    elif 'stock symbol' not in processed_df.columns and 'stock symbol' in TARGET_FINAL_COLUMNS:
         logging.warning("Final target 'stock symbol' column not found. Creating with NaNs.")
         processed_df['stock symbol'] = np.nan

    final_columns_existing = [col for col in TARGET_FINAL_COLUMNS if col in processed_df.columns]
    missing_target_cols = [col for col in TARGET_FINAL_COLUMNS if col not in final_columns_existing]
    if missing_target_cols: logging.warning(f"Final target columns missing: {missing_target_cols}")

    final_processed_df = processed_df[final_columns_existing]
    logging.info(f"Final selected columns: {final_processed_df.columns.tolist()}")
    print(f"Final DataFrame columns: {final_processed_df.columns.tolist()}")
    final_processed_df = final_processed_df.drop(columns=['shares_numeric', 'quarter_period'], errors='ignore')

else: # combined_df was empty or processing failed
    logging.warning("Combined DataFrame was empty or processing failed.")
    final_processed_df = pd.DataFrame()

end_time_stage4 = time.time()
print(f"\nStage 4 Duration: {end_time_stage4 - start_time_stage4:.2f} seconds")


# ==============================================================================
#                        Execution Stage 5: Final Output
# ==============================================================================
print("\n" + "="*70)
print(" Stage 5: Final Output")
print("="*70)

if not final_processed_df.empty:
    # --- Print Head ---
    print(f"\n--- First 5 rows of the final DataFrame (Shape: {final_processed_df.shape}) ---")
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 200)
    pd.set_option('display.max_colwidth', 70)
    print(final_processed_df.head().to_string())
    pd.reset_option('display.max_colwidth')

    # --- Save to CSV ---
    print(f"\n--- Saving final DataFrame to '{CSV_FILENAME}' ---")
    try:
        final_processed_df.to_csv(CSV_FILENAME, index=False, encoding='utf-8-sig')
        print(f"Successfully saved data ({len(final_processed_df)} rows) to '{CSV_FILENAME}'")
    except Exception as e:
        print(f"CRITICAL ERROR: Failed to save the final DataFrame to CSV '{CSV_FILENAME}'. Error: {e}")

else:
    print("\nFinal DataFrame is empty or was not created due to errors. No CSV file saved.")

# --- Calculate and Print Total Runtime ---
total_end_time = time.time()
# Need start time from the very beginning if defined, otherwise calculate from stages
# Assuming start_time_stage1 exists
if 'start_time_stage1' in locals():
     total_runtime = total_end_time - start_time_stage1
     print(f"\nTotal Script Runtime: {total_runtime:.2f} seconds ({total_runtime/60:.2f} minutes)")
else:
     # Sum stage durations if start_time_stage1 wasn't captured
     total_runtime = (end_time_stage1 - start_time_stage1 if 'start_time_stage1' in locals() else 0) + \
                     (end_time_stage2 - start_time_stage2 if 'start_time_stage2' in locals() else 0) + \
                     (end_time_stage3 - start_time_stage3 if 'start_time_stage3' in locals() else 0) + \
                     (end_time_stage4 - start_time_stage4 if 'start_time_stage4' in locals() else 0)
     if total_runtime > 0:
          print(f"\nTotal Script Runtime (Sum of Stages): {total_runtime:.2f} seconds ({total_runtime/60:.2f} minutes)")


print("\n--- Script execution finished ---")