# SCT Table Clean Extraction (No LLM)

Deterministic, step-by-step extraction of Summary Compensation Tables (SCT) from HTML into clean CSV/Parquet.

What this notebook does:
- Discover HTML tables and select the best SCT candidate via keyword scoring.
- Detect the header row, flatten headers, and normalize header names (drop footnotes, NBSP, extra spaces).
- Drop placeholder columns (currency symbols, em-dashes) and deduplicate repeated columns.
- Map columns to canonical fields (salary, bonus, stock_awards, option_awards, non_equity_incentive, pension_value, all_other_comp, total, year, name_position).
- Coerce numbers (parentheses→negative, strip $ and commas), split name/position, and set dtypes.
- Validate row totals and add quality flags (total_calc/total_diff/total_ok).
- Save CSV and Parquet to the corresponding `extracted` folder.

Optional at the end:
- Tidy long format (one row per executive×year) as a separate cell you can enable.


In [13]:
import re
from pathlib import Path
from typing import List, Tuple, Dict, Optional

import pandas as pd
pd.set_option('display.max_colwidth', 200)
pd.set_option('display.max_columns', 200)

# lxml HTML parser for XPath selection (match Table_format.ipynb approach)
from lxml import html

# Ensure Parquet support (required as per your preference)
import pyarrow  # noqa: F401  # should be installed; used by DataFrame.to_parquet


## Parameters
- Choose a ticker and form folder.
- The notebook will list HTML files found and let you pick by index, or you can process all.


In [14]:
BASE_DIR = Path('data')
TICKER = 'ABCP'
FORM = 'DEF_14A'

# Default file (you can change this or select by index from HTML_FILES below)
DEFAULT_HTML = BASE_DIR / TICKER / FORM / '2013-04-01_DEF_14A.html'

# Discover all HTML files under the chosen ticker/form
FORM_DIR = BASE_DIR / TICKER / FORM
HTML_FILES: List[Path] = sorted(FORM_DIR.glob('*.html')) if FORM_DIR.exists() else []
print(f'Found {len(HTML_FILES)} HTML files in {FORM_DIR}')
for i, p in enumerate(HTML_FILES[:25]):
    print(f'  [{i}]', p.name)

# Select by index if you want; -1 means use DEFAULT_HTML
SELECT_INDEX = -1
HTML_PATH = (HTML_FILES[SELECT_INDEX] if (0 <= SELECT_INDEX < len(HTML_FILES)) else DEFAULT_HTML)
print('Using HTML_PATH =', HTML_PATH)


Found 0 HTML files in data/ABCP/DEF_14A
Using HTML_PATH = data/ABCP/DEF_14A/2013-04-01_DEF_14A.html


## Canonical field mapping and helpers
We define canonical output fields and token sets to map messy headers to the canonical names.


In [15]:
CANON_ORDER = [
    'executive_name', 'position', 'year',
    'salary', 'bonus', 'stock_awards', 'option_awards',
    'non_equity_incentive', 'pension_value', 'all_other_comp', 'total',
]

KEY_TOKENS: Dict[str, List[str]] = {
    'salary': ['salary'],
    'bonus': ['bonus'],
    'stock_awards': ['stock awards', 'stock-awards'],
    'option_awards': ['option awards', 'option-awards'],
    'non_equity_incentive': ['non-equity incentive', 'non equity incentive'],
    'pension_value': ['change in pension', 'pension value', 'deferred compensation earnings'],
    'all_other_comp': ['all other compensation'],
    'total': ['total'],
    'year': ['year', 'fiscal year'],
    'name_position': ['name and principal position', 'name & principal position', 'principal position', 'name'],
}

PLACEHOLDER_HEADERS = {'', '$', '—', '–', '-'}

def normalize_header(h: str) -> str:
    s = re.sub(r'\s+', ' ', str(h)).strip()
    s = re.sub(r'\([^)]*\)', '', s)  # drop footnotes like (c)(2), ($)
    s = s.replace(' ', ' ')        # NBSP
    s = re.sub(r'\s+', ' ', s).strip().lower()
    return s

def canonical_for(col: str) -> Optional[str]:
    c = normalize_header(col)
    # strong check for name+position composites
    if 'name' in c and 'position' in c:
        return 'name_position'
    for key, toks in KEY_TOKENS.items():
        for t in toks:
            if t in c:
                return key
    return None

def is_placeholder_col(sr: pd.Series) -> bool:
    vals = sr.dropna().astype(str).str.strip().str.replace(' ', ' ', regex=False)
    if vals.empty:
        return True
    return vals.str.fullmatch(r'(\$)?|—|–|-').all()

def to_number(x):
    s = str(x).strip().replace(' ', ' ')
    if s in ('', '-', '–', '—'):
        return pd.NA
    s = re.sub(r'[,$]', '', s)
    m = re.fullmatch(r'\((.*)\)', s)
    if m:
        s = '-' + m.group(1)
    try:
        v = float(s)
        return int(v) if v.is_integer() else v
    except Exception:
        return pd.NA

def report_date_from_filename(p: Path) -> Optional[str]:
    m = re.match(r'(\d{4}-\d{2}-\d{2})_', p.stem)
    return m.group(1) if m else None


## Select SCT table via XPath (same logic as Table_format.ipynb)
Locate the header row containing 'name', 'principal', and 'position', ascend to its table, then parse that table.


In [16]:
def find_sct_table_element(html_path: Path):
    content = html_path.read_text(encoding='utf-8', errors='ignore')
    tree = html.fromstring(content)
    xpath_expr = """
//tr[
  .//text()[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'name')]
  and .//text()[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'principal')]
  and .//text()[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), 'position')]
]
"""
    tr_nodes = tree.xpath(xpath_expr)
    if not tr_nodes:
        return None
    table = tr_nodes[0].getparent()
    while table is not None and getattr(table, 'tag', None) != 'table':
        table = table.getparent()
    return table

tbl_el = find_sct_table_element(HTML_PATH)
print('Found SCT table element?' , tbl_el is not None)
if tbl_el is not None:
    preview = html.tostring(tbl_el)[:500]
    print('Table preview (first 500 bytes):')
    print(preview)
    df_raw = pd.read_html(html.tostring(tbl_el))[0]
    display(df_raw.head(8))
else:
    raise RuntimeError('No SCT header row found via XPath; adjust selection or pick another file.')


FileNotFoundError: [Errno 2] No such file or directory: 'data/ABCP/DEF_14A/2013-04-01_DEF_14A.html'

## Detect header row and flatten columns
Choose a header row within the first few rows, then flatten MultiIndex headers if any.


In [None]:
def detect_header_row(df: pd.DataFrame, max_rows: int = 6) -> pd.DataFrame:
    for i in range(min(max_rows, len(df))):
        row_low = df.iloc[i].astype(str).str.lower()
        hits = 0
        joined = ' '.join(list(row_low))
        for toks in KEY_TOKENS.values():
            if any(tok in joined for tok in toks):
                hits += 1
        if hits >= 2:
            df2 = df.copy()
            df2.columns = df2.iloc[i].astype(str).tolist()
            df2 = df2.iloc[i+1:].reset_index(drop=True)
            return df2
    return df

def flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
    if isinstance(df.columns, pd.MultiIndex):
        cols = [
            ' '.join(str(x) for x in tup if (str(x) and 'unnamed' not in str(x).lower())).strip()
            for tup in df.columns.to_list()
        ]
        df.columns = cols
    else:
        df.columns = [str(c) for c in df.columns]
    return df

step_df = detect_header_row(df_raw)
step_df = flatten_columns(step_df)
print('Columns after header detection/flatten (first 20):', list(step_df.columns)[:20])
step_df.head(8)


## Normalize headers and drop placeholder columns
Lowercase, strip footnotes and NBSPs, drop currency/emdash columns, deduplicate exact duplicates.


In [None]:
def normalize_headers_and_drop_placeholders(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
    df2 = df.dropna(how='all').reset_index(drop=True).copy()
    original_cols = list(df2.columns)
    df2.columns = [normalize_header(c) for c in df2.columns]
    dropped: List[str] = []
    for c in list(df2.columns):
        if c in PLACEHOLDER_HEADERS:
            dropped.append(c)
            continue
        if is_placeholder_col(df2[c]):
            dropped.append(c)
    df2 = df2.drop(columns=list(set(dropped)), errors='ignore')
    # de-duplicate exact duplicates
    df2 = df2.loc[:, ~df2.columns.duplicated()]
    return df2, dropped

norm_df, dropped_cols = normalize_headers_and_drop_placeholders(step_df)
print('Dropped placeholder columns:', dropped_cols)
print('Columns after normalization (first 20):', list(norm_df.columns)[:20])
norm_df.head(8)


## Select best columns per canonical metric
Map headers to canonical fields and keep one column per metric (the one with the most numeric-looking values).


In [None]:
def numeric_score(sr: pd.Series) -> int:
    return pd.to_numeric(sr.astype(str)
                         .str.replace(r'[\$,]', '', regex=True)
                         .str.replace(r'\s', '', regex=True)
                         .str.replace(r'^\((.*)\)$', r'-\1', regex=True),
                         errors='coerce').notna().sum()

def select_best_columns(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, str]]:
    groups: Dict[str, List[str]] = {}
    for c in df.columns:
        key = canonical_for(c)
        if key:
            groups.setdefault(key, []).append(c)
    keep: Dict[str, str] = {}  # canonical -> original column name
    for key, cols in groups.items():
        if key in ('executive_name', 'position'):
            continue
        best = max(cols, key=lambda x: numeric_score(df[x])) if cols else None
        if best:
            keep[key] = best
    if 'year' in groups:
        keep['year'] = max(groups['year'], key=lambda x: numeric_score(df[x]))
    if 'name_position' in groups:
        keep['name_position'] = groups['name_position'][0]
    sel = df[list(keep.values())].copy() if keep else df.copy()
    sel.columns = list(keep.keys()) if keep else sel.columns
    return sel, keep

sel_df, kept_map = select_best_columns(norm_df)
print('Kept columns (canonical -> original):')
for k, v in kept_map.items():
    print(f'  {k:22s} <- {v}')
sel_df.head(8)


## Coerce numbers, split name/position, order columns
Convert numeric fields, split the composite name/position, and re-order columns.


In [None]:
def finalize_fields(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # split name/position
    if 'name_position' in out.columns:
        parts = out['name_position'].astype(str).str.split(',', n=1)
        out['executive_name'] = parts.str[0].str.strip()
        out['position'] = parts.str[1].str.strip() if parts.apply(lambda x: len(x) > 1).any() else ''
        out = out.drop(columns=['name_position'])
    # year
    if 'year' in out.columns:
        out['year'] = pd.to_numeric(out['year'], errors='coerce').astype('Int64')
    # numerics
    for k in ['salary','bonus','stock_awards','option_awards','non_equity_incentive','pension_value','all_other_comp','total']:
        if k in out.columns:
            out[k] = out[k].map(to_number)
    # order
    cols = [c for c in CANON_ORDER if c in out.columns]
    cols += [c for c in out.columns if c not in cols]
    return out[cols]

final_df = finalize_fields(sel_df)
print(final_df.dtypes)
final_df.head(8)


## Quality checks (totals)
Compute total_calc and validate against provided total (tolerance=5).


In [None]:
def add_quality_flags(df: pd.DataFrame) -> pd.DataFrame:
    required = ['salary','bonus','stock_awards','option_awards','non_equity_incentive','pension_value','all_other_comp']
    out = df.copy()
    if 'total' in out.columns:
        for k in required:
            if k not in out.columns:
                out[k] = 0
        base_num = out[required].apply(pd.to_numeric, errors='coerce').fillna(0)
        out['total_calc'] = base_num.sum(axis=1)
        out['total_diff'] = (pd.to_numeric(out.get('total'), errors='coerce') - out['total_calc']).abs()
        out['total_ok'] = (out['total_diff'] <= 5)
    return out

checked_df = add_quality_flags(final_df)
checked_df.head(8)


### Rows with total mismatch (if any)
Helps identify residual misalignment.


In [None]:
mismatch = checked_df[checked_df.get('total_ok') == False]  # noqa: E712
mismatch.head(20)


## Save clean outputs (CSV + Parquet)
Files will be written next to the source under the `extracted` folder with suffix `_SCT.clean`.


In [None]:
ticker = TICKER
date = report_date_from_filename(HTML_PATH) or 'UNKDATE'
out_dir = (HTML_PATH.parent / 'extracted')
out_dir.mkdir(parents=True, exist_ok=True)
csv_path = out_dir / f'{ticker}_{date}_SCT.clean.csv'
pq_path = out_dir / f'{ticker}_{date}_SCT.clean.parquet'

checked_df.to_csv(csv_path, index=False)
checked_df.to_parquet(pq_path, engine='pyarrow', index=False)

print('Wrote:')
print('  CSV   ', csv_path)
print('  Parquet', pq_path)


## (Optional) Batch process all HTML files for this ticker/form
Process all files in the folder using the same cleaning pipeline and write clean CSV/Parquet for each.


In [None]:
def process_one(html_path: Path) -> Tuple[Optional[Path], Optional[Path]]:
    try:
        tbl_el = find_sct_table_element(html_path)
        if tbl_el is None:
            return None, None
        df_raw = pd.read_html(html.tostring(tbl_el))[0]
        step_df = flatten_columns(detect_header_row(df_raw))
        norm_df, _ = normalize_headers_and_drop_placeholders(step_df)
        sel_df, _ = select_best_columns(norm_df)
        final_df = finalize_fields(sel_df)
        checked_df = add_quality_flags(final_df)
        t = html_path.parent.parent.name  # ticker from path data/<TICKER>/<FORM>/file.html
        d = report_date_from_filename(html_path) or 'UNKDATE'
        out_dir = html_path.parent / 'extracted'
        out_dir.mkdir(parents=True, exist_ok=True)
        csv_path = out_dir / f'{t}_{d}_SCT.clean.csv'
        pq_path = out_dir / f'{t}_{d}_SCT.clean.parquet'
        checked_df.to_csv(csv_path, index=False)
        checked_df.to_parquet(pq_path, engine='pyarrow', index=False)
        return csv_path, pq_path
    except Exception as e:
        print('Failed:', html_path.name, '->', e)
        return None, None

# Set RUN_BATCH = True to process all HTML files listed earlier
RUN_BATCH = False
if RUN_BATCH:
    ok = 0; total = 0
    for p in HTML_FILES:
        total += 1
        csvp, pqp = process_one(p)
        if csvp is not None and pqp is not None:
            ok += 1
    print(f'Batch complete: {ok}/{total} files cleaned.')
else:
    print('Batch disabled. Set RUN_BATCH = True to enable.')


## (Optional) Tidy long format (one row per executive×year)
Set `RUN_TIDY = True` to generate a long-format view and optionally save it.


In [None]:
RUN_TIDY = False
if RUN_TIDY:
    df0 = checked_df.copy()
    id_cols = [c for c in ['executive_name','position'] if c in df0.columns]
    val_cols = [c for c in ['salary','bonus','stock_awards','option_awards','non_equity_incentive','pension_value','all_other_comp','total'] if c in df0.columns]
    if 'year' in df0.columns:
        # Already per-year rows in most SCTs; this just ensures tidy format
        tidy = df0[id_cols + ['year'] + val_cols].copy()
    else:
        tidy = df0[id_cols + val_cols].copy()
    display(tidy.head(10))
    tidy_csv = out_dir / f'{ticker}_{date}_SCT.clean.tidy.csv'
    tidy_pq = out_dir / f'{ticker}_{date}_SCT.clean.tidy.parquet'
    tidy.to_csv(tidy_csv, index=False)
    tidy.to_parquet(tidy_pq, engine='pyarrow', index=False)
    print('Wrote tidy:')
    print('  CSV   ', tidy_csv)
    print('  Parquet', tidy_pq)
else:
    print('Tidy step disabled. Set RUN_TIDY = True to enable.')
