In [180]:
# ==========================================
# complete BLS toolkit
# ==========================================

import os
import re
import unicodedata
import hashlib
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd


In [182]:
# ---------------------------
# Paths
# ---------------------------
DATA_DIR = "data/processed_data"   
OUT_DIR  = "data/processed_data" 
RAW_DIR = "data/raw"
os.makedirs(OUT_DIR, exist_ok=True)

In [184]:
# -------------------------------------------------------------------
# 0) Generic helpers
# -------------------------------------------------------------------

def save_csv_file(df: pd.DataFrame, folder: str, filename: str) -> str:
    """Save DataFrame to CSV and return full path."""
    os.makedirs(folder, exist_ok=True)
    path = os.path.join(folder, filename)
    df.to_csv(path, index=False)
    print(f"Saved: {path} | shape={df.shape}")
    return path

def _load_csv(name: str) -> pd.DataFrame:
    """Load CSV from DATA_DIR and normalize column names."""
    path = os.path.join(DATA_DIR, name)
    df = pd.read_csv(path)
    return _norm_colnames(df)

def clean_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize column names for BLS sheets."""
    cols = pd.Index(df.columns).map(str)
    cols = (
        cols.str.replace(r"\s+", " ", regex=True)
            .str.replace(r"\[.*?\]", "", regex=True)
            .str.replace(",", "", regex=True)
            .str.replace("—", "-", regex=False)
            .str.replace("–", "-", regex=False)
            .str.strip()
            .str.lower()
            .str.replace(" ", "_", regex=False)
    )
    out = df.copy()
    out.columns = cols
    return out

def _norm_colnames(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize column names for WEF/Kaggle-like CSVs."""
    cols = (pd.Index(df.columns).map(str)
            .str.normalize("NFKC")
            .str.replace(r"\s+", " ", regex=True)
            .str.strip()
            .str.lower()
            .str.replace(" ", "_", regex=False)
            .str.replace("%", "pct", regex=False)
            .str.replace(r"[()]", "", regex=True))
    out = df.copy()
    out.columns = cols
    return out


def normalize_dash_missing(df: pd.DataFrame) -> pd.DataFrame:
    """Replace lone dash or em-dash placeholders with NaN in object columns ONLY."""
    obj_cols = df.select_dtypes(include="object").columns
    if not len(obj_cols):
        return df
    pat = r"\s*[—-]\s*"
    out = df.copy()
    for c in obj_cols:
        ser = out[c]
        mask = ser.astype(str).str.fullmatch(pat, na=False)
        out.loc[mask, c] = np.nan
    return out

# -------------- used for Kaggle dataset --------------
def safe_to_datetime(s: pd.Series) -> pd.Series:
    return pd.to_datetime(s, errors="coerce", utc=True).dt.tz_convert(None)

def md5_hash_text(x: str) -> str:
    return hashlib.md5((x or "").encode("utf-8")).hexdigest()


def normalize_title(title: Optional[str]) -> str:
    t = _norm_text(title)
    t = t.replace("sr.", "senior").replace("jr.", "junior")
    t = t.replace("&", " and ")
    t = re.sub(r"[^a-z0-9 +/#\-]", " ", t)
    t = re.sub(r"\s+", " ", t).strip()
    return t
    
# -------------- end used for Kaggle dataset --------------

def _norm_text(s: Optional[str]) -> str:
    """Unicode-safe, case-insensitive, whitespace-collapsing normalizer."""
    if not isinstance(s, str):
        return ""
    s = unicodedata.normalize("NFKC", s)
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def coerce_numeric_if_exists(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    """Convert listed columns to numeric if present; leave others untouched."""
    out = df.copy()
    for c in cols:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")
    return out

def _pct_bounds(df: pd.DataFrame, cols: List[str], name: str):
    """QC: warn if percentage columns are outside [0,100]."""
    for c in cols:
        if c in df.columns:
            bad = df[c].notna() & ~df[c].between(0, 100)
            if bad.any():
                print(f"[QC WARN] {name}.{c} has values outside [0,100]:")
                print(df.loc[bad, [c]].head())

def drop_footer_rows(df: pd.DataFrame, title_col: str, max_tail_check: int = 8) -> pd.DataFrame:
    """Remove trailing BLS footers like 'Footnotes:', '[1] ...', 'Source: ...'."""
    if title_col not in df.columns:
        return df
    tail = df.tail(max_tail_check).copy()
    mask = (
        tail[title_col].astype(str).str.strip().str.lower()
        .str.startswith(("footnotes", "[1]", "source"))
    )
    return df.drop(index=tail.index[mask])


In [186]:
# -------------------------------------------------------------------
# 1) Tech classifier (SOC-gated)
# -------------------------------------------------------------------

# Always-keep titles (substring, lowercase)
MUST_HAVE_TITLES = [
    "software developers",
    "data scientists",
    "information security analysts",
    "computer systems analysts",
    "database administrators",
    "database architects",
    "computer network architects",
    "computer user support specialists",
    "web developers",
    "web and digital interface designers",
    "software quality assurance analysts and testers",
    "computer programmers",
    "computer and information systems managers"
]

# Editable include/exclude lists
INCLUDE_TITLES = [
    "computer and information systems managers",  # 11-3021
    "operations research analysts",               # 15-2031
    "statisticians", 
]
EXCLUDE_TITLES = [
    "security guard","guards","cashier","teller","bookkeeping",
    "postal","receptionist","customer service","building caretaker",
    "data entry keyer","data entry keyers",
    "telecommunications equipment installer","telecommunications equipment installers",
    "audiovisual equipment installer","audiovisual equipment installers",
    "broadcast technician","computer operator",
]

# SOC rules
SOC_ALWAYS   = {"15"}        # Computer & Mathematical (default keep)
SOC_ENGINEER = {"17"}        # Engineers → allow with computing/electrical/electronics/robotics wording
SOC_OPTIONAL = {"11","13"}   # Mgmt/Business → allow with strong tech evidence

# drop specific SOC-15 roles (lowercase substrings) - for future use if needed
SOC15_EXCLUDE = set([
    # e.g., "actuaries","mathematicians","mathematical science occupations, all other"
])

# ---------- Normalizers ----------
def _norm_code(code: Optional[str]) -> Optional[str]:
    """Normalize SOC code dashes to ASCII '-' and strip."""
    if not isinstance(code, str):
        return None
    c = unicodedata.normalize("NFKC", code).strip()
    c = c.replace("—", "-").replace("–", "-")
    return c if "-" in c else None

# Pre-normalize include/exclude lists for robust matching
NORM_INCLUDE = [_norm_text(x) for x in (INCLUDE_TITLES + MUST_HAVE_TITLES)]
NORM_EXCLUDE = [_norm_text(x) for x in EXCLUDE_TITLES]
NORM_SOC15_EXCLUDE = [_norm_text(x) for x in SOC15_EXCLUDE]

# Keyword patterns
STRONG_KEYWORDS = [
    r"\bsoftware\b", r"\bdeveloper\b", r"\bengineer\b", r"\bdevops\b",
    r"\bml\b", r"\bmachine learning\b", r"\bai\b", r"\bcloud\b",
    r"\bcyber(?:security)?\b", r"\binformation security\b",
    r"\bdata\b", r"\banalytics?\b", r"\bdatabase\b",
    r"\b(?:sre|site reliability)\b", r"\bnetwork(s)?\b",
    r"\bprogrammer\b", r"\bweb\b", r"\bui\b", r"\bux\b",
    r"\bcomputer\b",
]
WEAK_KEYWORDS = [
    r"\bite\b", r"\btech(?:nology)?\b", r"\bautomation\b", r"\brobot(?:ic|ics)\b",
]
STRONG_NO_BOUNDARY = [
    "software","developer","engineer","devops","ai","ml","cloud",
    "cyber","security","data","database","network","programmer","web","ui","ux","computer",
]
STRONG_RE = [re.compile(p, re.I) for p in STRONG_KEYWORDS]
WEAK_RE   = [re.compile(p, re.I) for p in WEAK_KEYWORDS]

BASE_THRESHOLD = 2
THRESHOLD_BY_SOC = {"15": 1, "17": 2, "11": 2, "13": 2}

def keyword_hits(title_norm: str) -> Tuple[int, int, int]:
    s = sum(1 for r in STRONG_RE if r.search(title_norm))
    w = sum(1 for r in WEAK_RE   if r.search(title_norm))
    l = 0 if s > 0 else sum(1 for sub in STRONG_NO_BOUNDARY if sub in title_norm)
    return s, w, l

def effective_threshold(major: Optional[str], base: int = BASE_THRESHOLD) -> int:
    return THRESHOLD_BY_SOC.get(major or "", base)

def is_tech(title: Optional[str], soc_code: Optional[str],
            *, base_threshold: int = BASE_THRESHOLD,
            return_reason: bool = False):
    """
    Keep order:
      1) hard excludes (normalized substring)
      2) MUST_HAVE / INCLUDE (normalized substring)
      3) SOC 15 default keep (unless normalized SOC15_EXCLUDE)
      4) otherwise → SOC gate + keyword score ≥ per-SOC threshold
    """
    t = _norm_text(title)
    code = _norm_code(soc_code)
    major = code.split("-")[0] if code else None

    # 1) hard excludes
    if any(x in t for x in NORM_EXCLUDE):
        out = (False, "hard_exclude_match")
        return out if return_reason else out[0]

    # 2) hard includes (must-haves + includes), robust to case/spacing
    if any(x in t for x in NORM_INCLUDE):
        out = (True, "hard_include_or_must_have")
        return out if return_reason else out[0]

    # 3) SOC 15 default keep (unless explicitly excluded)
    if major in SOC_ALWAYS:
        if any(x in t for x in NORM_SOC15_EXCLUDE):
            out = (False, "soc15_explicit_exclude")
            return out if return_reason else out[0]
        out = (True, "soc15_default_keep")
        return out if return_reason else out[0]

    # 4) scoring + SOC gate
    s, w, l = keyword_hits(t)
    score = s + 0.5 * w + (1 if l >= 2 else 0)

    if major in SOC_ENGINEER:
        soc_pass = any(k in t for k in ["computer", "electrical", "electronics", "robot"])
    elif major in SOC_OPTIONAL:
        soc_pass = (s >= 1) or (score >= 2.5)
    else:
        soc_pass = False

    thr = effective_threshold(major, base_threshold)
    keep = soc_pass and (score >= thr)

    reason = f"soc_gate={soc_pass}, soc={major}, thr={thr}, score={score:.1f} (strong={s}, weak={w}, glued={l})"
    out = (keep, reason)
    return out if return_reason else out[0]

def apply_is_tech(df: pd.DataFrame,
                  title_col: str = "2023_national_employment_matrix_title",
                  code_col: str  = "2023_national_employment_matrix_code",
                  base_threshold: int = BASE_THRESHOLD,
                  with_reason: bool = False) -> pd.DataFrame:
    """Add _is_tech (and _tech_reason) using the classifier above."""
    titles = df[title_col].tolist()
    codes  = df[code_col].tolist()
    results = [
        is_tech(t, c, base_threshold=base_threshold, return_reason=with_reason)
        for t, c in zip(titles, codes)
    ]
    out = df.copy()
    if with_reason:
        flags, reasons = zip(*results) if results else ([], [])
        out["_is_tech"] = list(flags)
        out["_tech_reason"] = list(reasons)
    else:
        out["_is_tech"] = results
    return out

# Kaggle-specific tech tagging
def is_tech_kaggle(title: Optional[str],
                   skills: Optional[str] = None,
                   description: Optional[str] = None,
                   *,
                   base_threshold: int = 1,
                   return_reason: bool = False):
    """Looser tech classifier for Kaggle: look in title + skills + description."""
    t = _norm_text(title)
    s_text = _norm_text(skills) if skills else ""
    d_text = _norm_text(description) if description else ""
    combined = f"{t} {s_text} {d_text}".strip()

    # hard excludes / includes reuse your global lists
    if any(x in combined for x in NORM_EXCLUDE):
        out = (False, "hard_exclude_match")
        return out if return_reason else out[0]
    if any(x in combined for x in NORM_INCLUDE):
        out = (True, "hard_include_or_must_have")
        return out if return_reason else out[0]

    # keyword score (looser threshold)
    s, w, l = keyword_hits(combined)
    score = s + 0.5*w + (1 if l >= 2 else 0)
    keep = score >= base_threshold
    reason = f"kaggle_score={score:.1f} (strong={s}, weak={w}, glued={l})"
    out = (keep, reason)
    return out if return_reason else out[0]


def apply_is_tech_kaggle(df: pd.DataFrame,
                         title_col: str = "job_title",
                         skills_col: Optional[str] = None,
                         desc_col: Optional[str] = "description",
                         *,
                         base_threshold: int = 1,
                         with_reason: bool = True) -> pd.DataFrame:
    """Apply Kaggle classifier over title + skills + description."""
    titles = df[title_col] if title_col in df.columns else pd.Series([""]*len(df), index=df.index)
    skills = df[skills_col] if (skills_col and skills_col in df.columns) else pd.Series([""]*len(df), index=df.index)
    descs  = df[desc_col]  if (desc_col  and desc_col  in df.columns)  else pd.Series([""]*len(df), index=df.index)

    results = [
        is_tech_kaggle(t, s, d, base_threshold=base_threshold, return_reason=with_reason)
        for t, s, d in zip(titles, skills, descs)
    ]
    out = df.copy()
    if with_reason:
        flags, reasons = zip(*results) if results else ([], [])
        out["_is_tech"] = list(flags)
        out["_tech_reason"] = list(reasons)
    else:
        out["_is_tech"] = results
    return out

In [188]:
    
# -------------------------------------------------------------------
# 2) Generic cleaner (works for 1.2, 1.3, 1.4, 1.10; skip tech for 1.11)
# -------------------------------------------------------------------

def clean_bls_table(
    xls_path: str,
    sheet_name: str,
    *,
    numeric_cols: List[str] | None = None,
    line_item_only: bool = True,
    drop_footer_on: str = "2023_national_employment_matrix_title",
    apply_tech_filter: bool = True,
    title_col: str = "2023_national_employment_matrix_title",
    code_col: str  = "2023_national_employment_matrix_code",
    base_threshold: int = 2,
    tag_only: bool = True,
) -> pd.DataFrame:
    """
    Generic BLS sheet cleaner:
      - load sheet, standardize columns
      - replace lone dash/em-dash → NaN (warning-free)
      - (opt) keep only 'Line item'
      - (opt) drop trailing footers using `drop_footer_on`
      - coerce numerics in `numeric_cols`
      - (opt) tag tech via `apply_is_tech`
      - returns tagged (all rows) if tag_only=True, else tech-only rows
    """
    df = pd.read_excel(xls_path, sheet_name=sheet_name, skiprows=1)
    df = clean_columns(df)
    df = normalize_dash_missing(df)

    if line_item_only and "occupation_type" in df.columns:
        df = df[df["occupation_type"].eq("Line item")].copy()

    if drop_footer_on in df.columns:
        df = drop_footer_rows(df, drop_footer_on)

    if numeric_cols:
        df = coerce_numeric_if_exists(df, numeric_cols)

    if apply_tech_filter:
        df = apply_is_tech(
            df,
            title_col=title_col,
            code_col=code_col,
            base_threshold=base_threshold,
            with_reason=True,
        )
        return df.reset_index(drop=True) if tag_only else df[df["_is_tech"]].reset_index(drop=True)

    return df.reset_index(drop=True)



In [190]:
# -------------------------------------------------------------------
# 3) Batch runner + QC for BLS
# -------------------------------------------------------------------

@dataclass
class SheetConfig:
    sheet_name: str
    file_stem: str                      # base filename to save (e.g., "bls_table_1_2")
    numeric_cols: List[str]
    line_item_only: bool = True
    drop_footer_on: str = "2023_national_employment_matrix_title"
    apply_tech_filter: bool = True
    title_col: str = "2023_national_employment_matrix_title"
    code_col: str  = "2023_national_employment_matrix_code"
    must_have_check: bool = True        # run must-have QC (only if apply_tech_filter)
    tag_only_save: bool = True       

def qc_bls_generic(tagged: pd.DataFrame,
                   tech: Optional[pd.DataFrame] = None,
                   *,
                   title_col: str = "2023_national_employment_matrix_title",
                   code_col: str  = "2023_national_employment_matrix_code",
                   must_have_titles: Optional[List[str]] = None) -> None:
    """Reusable QC for any BLS sheet with title + code."""
    print("==== Shapes ====")
    print("Tagged full:", tagged.shape)
    if tech is not None:
        print("Tech only  :", tech.shape)

    if title_col in tagged.columns and code_col in tagged.columns:
        print("\n==== Duplicates (tagged) ====")
        keys = [code_col, title_col]
        print(tagged.duplicated(keys).sum())
    if tech is not None and title_col in tech.columns and code_col in tech.columns:
        print("\n==== Duplicates (tech) ====")
        keys = [code_col, title_col]
        print(tech.duplicated(keys).sum())

    if tech is not None and code_col in tech.columns:
        print("\n==== SOC major distribution (tech) ====")
        soc = tech[code_col].astype(str).str.split("-").str[0]
        print(soc.value_counts())

        print("\n==== Missing SOC-15 (informational) ====")
        soc15_all = tagged[tagged[code_col].astype(str).str.startswith("15-")][[title_col, code_col]]
        soc15_tech = tech[tech[code_col].astype(str).str.startswith("15-")][[title_col, code_col]]
        missing = soc15_all.merge(soc15_tech, how="left", on=[title_col, code_col], indicator=True)\
                           .query("_merge=='left_only'")
        print(missing.shape[0])
        if not missing.empty:
            print(missing.head(10))

    if must_have_titles and tech is not None and title_col in tech.columns:
        norm = (tech[title_col].astype(str).str.normalize("NFKC").str.strip().str.lower())
        present = set(norm.tolist())
        must_norm = [m.lower().strip() for m in must_have_titles]
        missing_must = [m for m in must_norm if m not in present]
        hits = [m for m in must_norm if m in present]
        print("\n==== Must-have check ====")
        print("Present:", [h.title() for h in hits])
        print("Missing:", [m.title() for m in missing_must])

def batch_clean_bls(
    xls_path: str,
    out_dir: str,
    sheets: List[SheetConfig],
    *,
    base_threshold: int = 2,
    run_qc: bool = True,
) -> Dict[str, Tuple[pd.DataFrame, Optional[pd.DataFrame]]]:
    """
    Batch-clean multiple BLS sheets.
    Returns {file_stem: (tagged_df, tech_df_or_None)}
    Saves CSVs:
      - <file_stem>_tagged.csv (if tag_only_save=True)
      - <file_stem>.csv        (tech-only if apply_tech_filter=True; else cleaned sheet)
    """
    os.makedirs(out_dir, exist_ok=True)
    outputs: Dict[str, Tuple[pd.DataFrame, Optional[pd.DataFrame]]] = {}

    for cfg in sheets:
        print(f"\n=== Cleaning {cfg.sheet_name} → {cfg.file_stem} ===")

        # Tagged (keep-all)
        tagged = clean_bls_table(
            xls_path,
            cfg.sheet_name,
            numeric_cols=cfg.numeric_cols,
            line_item_only=cfg.line_item_only,
            drop_footer_on=cfg.drop_footer_on,
            apply_tech_filter=cfg.apply_tech_filter,
            title_col=cfg.title_col,
            code_col=cfg.code_col,
            base_threshold=base_threshold,
            tag_only=True,
        )
        tech_df: Optional[pd.DataFrame] = None

        if cfg.tag_only_save:
            save_csv_file(tagged, out_dir, f"{cfg.file_stem}_tagged.csv")

        if cfg.apply_tech_filter:
            tech_df = clean_bls_table(
                xls_path,
                cfg.sheet_name,
                numeric_cols=cfg.numeric_cols,
                line_item_only=cfg.line_item_only,
                drop_footer_on=cfg.drop_footer_on,
                apply_tech_filter=True,
                title_col=cfg.title_col,
                code_col=cfg.code_col,
                base_threshold=base_threshold,
                tag_only=False,
            )
            save_csv_file(tech_df, out_dir, f"{cfg.file_stem}.csv")
        else:
            save_csv_file(tagged, out_dir, f"{cfg.file_stem}.csv")

        if run_qc:
            if cfg.apply_tech_filter:
                qc_bls_generic(
                    tagged=tagged,
                    tech=tech_df,
                    title_col=cfg.title_col,
                    code_col=cfg.code_col,
                    must_have_titles=MUST_HAVE_TITLES if cfg.must_have_check else None,
                )
            else:
                qc_bls_generic(tagged=tagged, tech=None, title_col=cfg.title_col, code_col=cfg.code_col)

        outputs[cfg.file_stem] = (tagged, tech_df)

    return outputs



In [200]:
# -------------------------------------------------------------------
# WEF builders + QC
# -------------------------------------------------------------------

# ---------------------------
# Canonical skill mapping
# ---------------------------
# Keep skill wording consistent across WEF tables so joins are stable.

SKILL_CANON_MAP = {
    "ai and big data": "AI & Big Data",
    "networks and cybersecurity": "Networks & Cybersecurity",
    "technological literacy": "Technology Literacy",
    "technology literacy": "Technology Literacy",
    "programming": "Programming",
    "analytical thinking": "Analytical Thinking",
    "creative thinking": "Creative Thinking",
    "curiosity and lifelong learning": "Curiosity & Lifelong Learning",
    "leadership and social influence": "Leadership & Social Influence",
    "systems thinking": "Systems Thinking",
    "environmental stewardship": "Environmental Stewardship",
    "design and user experience": "Design & UX",
}
def canon_skill(x: str) -> str:
    key = _norm_text(x)
    return SKILL_CANON_MAP.get(key, x.strip())

def build_wef_skill_growth() -> pd.DataFrame:
    df = _load_csv("wef_core_skills_page_35.csv")
    if "net_increase_pct" not in df.columns:
        df = df.rename(columns={"net_increase_%": "net_increase_pct"})
    df["skill"] = df["skill"].map(canon_skill)
    df = coerce_numeric_if_exists(df, ["net_increase_pct"])
    _pct_bounds(df, ["net_increase_pct"], "wef_skill_growth")
    return df.sort_values("net_increase_pct", ascending=False).reset_index(drop=True)

def build_wef_skill_industry() -> pd.DataFrame:
    frames = []
    for fname, skill_label in [
        ("wef_ai_big_data_page_39.csv", "AI & Big Data"),
        ("wef_tech_literacy_page_39.csv", "Technology Literacy"),
        ("wef_networks_cyber_page_39.csv", "Networks & Cybersecurity"),
    ]:
        df = _load_csv(fname)
        df = df.rename(columns={"percentage_pct": "pct_increasing"})
        df["skill"] = skill_label
        frames.append(df[["skill", "industry", "pct_increasing"]])

    out = pd.concat(frames, ignore_index=True)
    out["industry"] = out["industry"].map(lambda s: s.strip() if isinstance(s, str) else s)
    out = coerce_numeric_if_exists(out, ["pct_increasing"])  # <-- fixed name
    _pct_bounds(out, ["pct_increasing"], "wef_skill_industry")
    return out.sort_values(["skill", "pct_increasing"], ascending=[True, False]).reset_index(drop=True)

def build_wef_genai_substitution() -> pd.DataFrame:
    df = _load_csv("wef_genai_substitution_page_b3_1_page_44.csv")
    df["skill_group"] = df["skill_group"].map(canon_skill)
    df = coerce_numeric_if_exists(df, ["very_low_capacity_pct","low_capacity_pct","moderate_capacity_pct","high_capacity_pct"])  # <-- fixed name
    _pct_bounds(df, ["very_low_capacity_pct","low_capacity_pct","moderate_capacity_pct","high_capacity_pct"], "wef_genai_substitution")
    row_sum = df[["very_low_capacity_pct","low_capacity_pct","moderate_capacity_pct","high_capacity_pct"]].sum(axis=1)
    off = (row_sum - 100).abs() > 5
    if off.any():
        print("[QC INFO] substitution rows not summing ~100% (tolerated due to chart eyeballing)")
        print(df.loc[off, ["skill_group","very_low_capacity_pct","low_capacity_pct","moderate_capacity_pct","high_capacity_pct"]].head())
    return df.rename(columns={"skill_group":"skill"})

def build_wef_training_completion() -> pd.DataFrame:
    df = _load_csv("wef_training_completion_page_46.csv")
    df = df.rename(columns={"training_completion_pct":"training_completion_2025_pct"})
    df["industry"] = df["industry"].map(lambda s: s.strip() if isinstance(s, str) else s)
    df = coerce_numeric_if_exists(df, ["training_completion_2025_pct"])
    _pct_bounds(df, ["training_completion_2025_pct"], "wef_training_completion")
    return df.sort_values("training_completion_2025_pct", ascending=False).reset_index(drop=True)

def load_wef_job_lists() -> Tuple[pd.DataFrame, pd.DataFrame]:
    growing = _load_csv("wef_fastest_growing_jobs_page_19.csv")
    declining = _load_csv("wef_fastest_declining_jobs_page_19.csv")
    growing = growing.rename(columns={"net_growth_%":"net_growth_pct"})
    declining = declining.rename(columns={"net_job_destruction_(millions)":"net_job_destruction_millions"})
    growing["job_title_norm"] = growing["job_title"].map(_norm_text)
    declining["job_title_norm"] = declining["job_title"].map(_norm_text)
    return growing, declining

def build_all_wef(save: bool = True) -> Dict[str, pd.DataFrame]:
    skill_growth   = build_wef_skill_growth()
    skill_industry = build_wef_skill_industry()
    genai_subst    = build_wef_genai_substitution()
    training       = build_wef_training_completion()
    out = {
        "wef_skill_growth": skill_growth,
        "wef_skill_industry": skill_industry,
        "wef_genai_substitution": genai_subst,
        "wef_training_completion": training,
    }
    if save:
        save_csv_file(skill_growth,   OUT_DIR, "wef_skill_growth_tidy.csv")    
        save_csv_file(skill_industry, OUT_DIR, "wef_skill_industry_tidy.csv")
        save_csv_file(genai_subst,    OUT_DIR, "wef_genai_substitution_tidy.csv")
        save_csv_file(training,       OUT_DIR, "wef_training_completion_tidy.csv")
    return out

def qc_wef_tables(dfs: Dict[str, pd.DataFrame]):
    sg = dfs["wef_skill_growth"]
    si = dfs["wef_skill_industry"]
    gs = dfs["wef_genai_substitution"]
    tr = dfs["wef_training_completion"]

    print("\n=== Skill growth (top 6) ===")
    print(sg.head(6))

    print("\n=== Skill × Industry (AI & Big Data) — top 6 ===")
    print(si[si["skill"].eq("AI & Big Data")].head(6))

    print("\n=== GenAI substitution (sample) ===")
    print(gs.head(6))

    print("\n=== Training completion by industry (top 6) ===")
    print(tr.head(6))

In [210]:
# -------------------------------------------------------------------
# Kaggle builders + QC
# -------------------------------------------------------------------

WEF_CANON_SKILLS = [
    "AI & Big Data",
    "Networks & Cybersecurity",
    "Technology Literacy",
    "Programming",
    "Design & UX",
]

CANON_RULES: Dict[str, List[re.Pattern]] = {
    "AI & Big Data": [
        re.compile(r"\b(ai|artificial intelligence|machine learning|ml|deep learning|pytorch|tensorflow|nlp|llm|prompt)\b", re.I),
        re.compile(r"\b(data(science| scientist| analyst| engineering)?|etl|warehouse|warehousing|spark|pyspark|hadoop|snowflake|databricks|airflow|kafka|hive)\b", re.I),
        re.compile(r"\b(statistics?|probability|pandas|numpy|matplotlib|seaborn)\b", re.I),
    ],
    "Programming": [
        re.compile(r"\b(program(ming|mer)?|software|developer|engineer|devops|sre|site reliability)\b", re.I),
        re.compile(r"\b(python|java|javascript|typescript|go|golang|rust|swift|kotlin|scala|c\+\+|c#|sql|bash|shell|linux|powershell|regex)\b", re.I),
        re.compile(r"\b(node|react|angular|vue|django|flask|spring|fastapi|.net|dotnet|pytest|junit)\b", re.I),
        re.compile(r"\b(ci/?cd|jenkins|github actions|gitlab ci)\b", re.I),
    ],
    "Networks & Cybersecurity": [
        re.compile(r"\b(cyber|information security|infosec|security analyst|siem|soc|iam|zero trust|pen(etration)? test|cissp|nist|iso 27001)\b", re.I),
        re.compile(r"\b(network|firewall|vpn|ids|ips)\b", re.I),
    ],
    "Design & UX": [
        re.compile(r"\b(ui|ux|user experience|product design|figma|sketch|wireframe|usability|user research|interaction design)\b", re.I),
    ],
    "Technology Literacy": [
        # keep broad/cloud/infra here
        re.compile(r"\b(aws|azure|gcp|cloud|kubernetes|docker|terraform|ansible|vmware)\b", re.I),
        re.compile(r"\b(service now|servicenow|itil|sre practices)\b", re.I),
    ],
}


JUNK_TOKENS = {
    "skills", "and", "etc", "na", "n/a",
    "bachelor", "master", "masters", "phd", "degree",
    "related to education:", "related to experience:", "experience-related skills:",
    "microsoft office", "documentation", "presentation",
    "communication", "written communication", "verbal communication",
    "strong written communication", "interpersonal skills",
    "project management", "portfolio analysis",
    "****"
}
# very short tokens like 'r', 'c' are often noise unless explicitly mapped
MIN_LEN = 3

def split_skill_field(val: Optional[str]) -> List[str]:
    if not isinstance(val, str) or not val.strip():
        return []
    v = re.sub(r"[A-Za-z\- ]*Skills:\s*", "", val, flags=re.I)
    parts = re.split(r"[;,|/]", v)
    out = []
    for p in parts:
        tok = _norm_text(p)
        if not tok or tok in JUNK_TOKENS:
            continue
        if len(tok) < MIN_LEN and tok not in {"r", "c"}:
            continue
        out.append(tok)
    return out

def map_raw_skill_to_canon(raw_skill: str) -> List[str]:
    s = _norm_text(raw_skill)
    if not s:
        return []
    matches = []
    for canon, patterns in CANON_RULES.items():
        if any(p.search(s) for p in patterns):
            matches.append(canon)
    return matches or ["Technology Literacy"]  # gentle fallback

# ---------- loaders ----------
def load_jobs_dataset_processed() -> pd.DataFrame:
    path = os.path.join(RAW_DIR, "jobs_dataset_processed.csv")
    df = pd.read_csv(path)
    return _norm_colnames(df)  # id, query, job_title, description, it_skills, soft_skills, education, experience,...

def load_ai_job_dataset() -> pd.DataFrame:
    path = os.path.join(RAW_DIR, "ai_job_dataset.csv")
    df = pd.read_csv(path)
    return _norm_colnames(df)  # job_id, job_title, salary_usd, required_skills, posting_date, ...

# ---------- cleaners ----------
def clean_jobs_dataset_processed(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["job_title_norm"] = out["job_title"].map(normalize_title)

    out["description_norm"] = out["description"].astype(str).fillna("").map(_norm_text)
    out["description_hash"] = out["description_norm"].map(md5_hash_text)

    skill_cols = [c for c in ["it_skills","soft_skills","education","experience"] if c in out.columns]
    out["skills_blob"] = out[skill_cols].astype(str).agg(" | ".join, axis=1) if skill_cols else ""

    out = apply_is_tech_kaggle(out,
                               title_col="job_title",
                               skills_col="skills_blob",
                               desc_col="description",
                               base_threshold=1,
                               with_reason=True)

    out = out.sort_values("id", na_position="last")
    out = out.drop_duplicates(subset=["job_title_norm","description_hash"], keep="first")
    return out

def clean_ai_job_dataset(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["job_title_norm"] = out["job_title"].map(normalize_title)

    if "posting_date" in out.columns:
        out["posting_date"] = safe_to_datetime(out["posting_date"])
        out["post_month"] = out["posting_date"].dt.to_period("M").astype(str)
    for c in ["salary_usd","benefits_score","years_experience","job_description_length","remote_ratio"]:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")

    skill_cols = [c for c in ["required_skills","education_required"] if c in out.columns]
    out["skills_blob"] = out[skill_cols].astype(str).agg(" | ".join, axis=1) if skill_cols else ""

    # pick whichever description column exists
    desc_col_name = "job_description" if "job_description" in out.columns else ("description" if "description" in out.columns else None)

    out = apply_is_tech_kaggle(out,
                               title_col="job_title",
                               skills_col="skills_blob",
                               desc_col=desc_col_name,
                               base_threshold=1,
                               with_reason=True)

    if {"company_name","post_month"}.issubset(out.columns):
        out = out.sort_values(["company_name","post_month"]).drop_duplicates(
            subset=["company_name","job_title_norm","post_month"], keep="first"
        )
    else:
        key = (out["job_title_norm"].fillna("") + "|" +
               out.get("industry","").astype(str).fillna("") + "|" +
               out.get("company_location","").astype(str).fillna(""))
        out["surrogate_hash"] = key.map(md5_hash_text)
        out = out.drop_duplicates(subset=["surrogate_hash"], keep="first")
    return out

# ---------- explode skills to tidy ----------
def explode_skills_from_jobs(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    skill_cols = [c for c in ["it_skills","soft_skills","education","experience"] if c in out.columns]
    rows = []
    for _, r in out.iterrows():
        pid = r.get("id"); title = r.get("job_title_norm")
        for sc in skill_cols:
            for tok in split_skill_field(r.get(sc)):
                rows.append(("jobs_dataset_processed", pid, title, tok))
    tidy = pd.DataFrame(rows, columns=["source","posting_id","job_title_norm","raw_skill"])
    records = []
    for _, rr in tidy.iterrows():
        for canon in map_raw_skill_to_canon(rr["raw_skill"]):
            records.append((rr["source"], rr["posting_id"], rr["job_title_norm"], rr["raw_skill"], canon))
    return pd.DataFrame(records, columns=["source","posting_id","job_title_norm","raw_skill","canon_skill"])

def explode_skills_from_ai(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    rows = []
    for _, r in out.iterrows():
        pid = r.get("job_id"); title = r.get("job_title_norm")
        for tok in split_skill_field(r.get("required_skills")):
            rows.append(("ai_job_dataset", pid, title, tok))
        for tok in split_skill_field(r.get("education_required")):
            rows.append(("ai_job_dataset", pid, title, tok))
    tidy = pd.DataFrame(rows, columns=["source","posting_id","job_title_norm","raw_skill"])
    records = []
    for _, rr in tidy.iterrows():
        for canon in map_raw_skill_to_canon(rr["raw_skill"]):
            records.append((rr["source"], rr["posting_id"], rr["job_title_norm"], rr["raw_skill"], canon))
    return pd.DataFrame(records, columns=["source","posting_id","job_title_norm","raw_skill","canon_skill"])

# ---------- aggregations ----------
def build_posting_metrics_ai(df_ai: pd.DataFrame) -> pd.DataFrame:
    if "post_month" not in df_ai.columns:
        return pd.DataFrame(columns=["job_title_norm","post_month","postings","median_salary_usd","remote_ratio_avg"])
    grp = df_ai.groupby(["job_title_norm","post_month"], dropna=False)
    met = grp.agg(
        postings=("job_title_norm","count"),
        median_salary_usd=("salary_usd","median"),
        remote_ratio_avg=("remote_ratio","mean"),
    ).reset_index()
    return met.sort_values(["job_title_norm","post_month"])

def build_skill_counts(tidy_skills: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    raw_counts = tidy_skills.groupby("raw_skill", dropna=False).size().reset_index(name="count")\
                            .sort_values("count", ascending=False)
    canon_counts = tidy_skills.groupby("canon_skill", dropna=False).size().reset_index(name="count")\
                              .sort_values("count", ascending=False)
    return raw_counts, canon_counts

def build_title_summary(df_all: pd.DataFrame) -> pd.DataFrame:
    grp = df_all.groupby("job_title_norm", dropna=False).size().reset_index(name="postings")
    return grp.sort_values("postings", ascending=False)

# ---------- orchestrator + QC ----------
def run_kaggle_clean(save: bool = True) -> Dict[str, pd.DataFrame]:
    jobs = load_jobs_dataset_processed()
    ai   = load_ai_job_dataset()

    jobs_c = clean_jobs_dataset_processed(jobs)
    ai_c   = clean_ai_job_dataset(ai)

    jobs_tech = jobs_c[jobs_c["_is_tech"]].copy()
    ai_tech   = ai_c[ai_c["_is_tech"]].copy()

    skills_jobs = explode_skills_from_jobs(jobs_tech)
    skills_ai   = explode_skills_from_ai(ai_tech)
    skills_all  = pd.concat([skills_jobs, skills_ai], ignore_index=True)

    ai_metrics                  = build_posting_metrics_ai(ai_tech)
    raw_counts, canon_counts    = build_skill_counts(skills_all)
    title_summary               = build_title_summary(pd.concat(
                                        [jobs_tech[["job_title_norm"]],
                                         ai_tech[["job_title_norm"]]],
                                        ignore_index=True))

    if save:
        save_csv_file(jobs_tech,     OUT_DIR, "kaggle_jobs_tech.csv")
        save_csv_file(ai_tech,       OUT_DIR, "kaggle_ai_tech.csv")
        save_csv_file(skills_all,    OUT_DIR, "kaggle_skills_tidy.csv")
        save_csv_file(ai_metrics,    OUT_DIR, "kaggle_ai_metrics_monthly.csv")
        save_csv_file(raw_counts,    OUT_DIR, "kaggle_skill_counts_raw.csv")
        save_csv_file(canon_counts,  OUT_DIR, "kaggle_skill_counts_canon.csv")
        save_csv_file(title_summary, OUT_DIR, "kaggle_title_summary.csv")

    return {
        "jobs_tech": jobs_tech,
        "ai_tech": ai_tech,
        "skills_all": skills_all,
        "ai_metrics": ai_metrics,
        "skill_counts_raw": raw_counts,
        "skill_counts_canon": canon_counts,
        "title_summary": title_summary,
    }

def qc_kaggle(dfs: Dict[str, pd.DataFrame]) -> None:
    print("\n=== Kaggle shapes ===")
    for k, v in dfs.items():
        if isinstance(v, pd.DataFrame):
            print(k, v.shape)

    if not dfs["ai_tech"].empty:
        print("\n=== Sample AI titles (salary + month) ===")
        print(dfs["ai_tech"][["job_title_norm","salary_usd","post_month"]].head(8))

    print("\n=== Top canonical skills ===")
    print(dfs["skill_counts_canon"].head(10))

    if not dfs["ai_metrics"].empty:
        print("\n=== Monthly postings (sample) ===")
        print(dfs["ai_metrics"].head(10))

    print("\n=== Top titles by postings (both datasets) ===")
    print(dfs["title_summary"].head(10))

def check_salary_coverage(df, title_col="job_title_norm", salary_col="median_salary_usd", min_pct=0.5):
    """
    Checks if each title has at least `min_pct` non-null salary coverage.

    Parameters
    ----------
    df : pd.DataFrame
        Dataframe containing job title and salary columns.
    title_col : str
        Column with normalized job titles.
    salary_col : str
        Column with salary values.
    min_pct : float
        Minimum required proportion of months with salary data.

    Returns
    -------
    pd.Series
        Titles with salary coverage below `min_pct`.
    """
    salary_cov = (
        df.groupby(title_col)[salary_col]
          .apply(lambda x: x.notna().mean())
    )

    low_cov = salary_cov[salary_cov < min_pct]

    if low_cov.empty:
        print(f"All titles have >= {min_pct*100:.0f}% salary coverage.")
    else:
        print(f"Titles with < {min_pct*100:.0f}% salary coverage:")
        print(low_cov)

    return low_cov

In [204]:
# -------------------------------------------------------------------
# Run BLS
# -------------------------------------------------------------------

NUMS_12 = [
    "employment_2023","employment_2033",
    "employment_distribution_percent_2023","employment_distribution_percent_2033",
    "employment_change_numeric_2023-33","employment_change_percent_2023-33",
    "percent_self_employed_2023","occupational_openings_2023-33_annual_average",
    "median_annual_wage_dollars_2024",
]
NUMS_13_14 = [
    "employment_2023","employment_2033",
    "employment_change_numeric_2023-33","employment_change_percent_2023-33",
    "median_annual_wage_dollars_2024",
]
NUMS_110 = [
    "employment_2023","employment_2033",
    "employment_change_numeric_2023-33","employment_change_percent_2023-33",
    "labor_force_exit_rate_2023-33_annual_average",
    "occupational_transfer_rate_2023-33_annual_average",
    "total_occupational_separations_rate_2023-33_annual_average",
    "labor_force_exits_2023-33_annual_average",
    "occupational_transfers_2023-33_annual_average",
    "total_occupational_separations_2023-33_annual_average",
    "occupational_openings_2023-33_annual_average",
]
NUMS_111 = [
    "employment_2023","employment_2033",
    "employment_change_numeric_2023-33","employment_change_percent_2023-33",
    "median_annual_wage_dollars_2024",
]

xls_path = "data/raw/bls_occupation.xlsx"
out_dir  = "data/processed_data"
os.makedirs(out_dir, exist_ok=True)

sheets_cfg = [
        SheetConfig("Table 1.2",  "bls_table_1_2",  NUMS_12,    line_item_only=True,  must_have_check=True),
        SheetConfig("Table 1.3",  "bls_table_1_3",  NUMS_13_14, line_item_only=False, must_have_check=True),
        SheetConfig("Table 1.4",  "bls_table_1_4",  NUMS_13_14, line_item_only=False, must_have_check=True),
        SheetConfig("Table 1.10", "bls_table_1_10", NUMS_110,   line_item_only=False, must_have_check=True),
        # Table 1.11 — different schema; keep all rows, no tech filter
        SheetConfig("Table 1.11", "bls_table_1_11", NUMS_111,   line_item_only=False,
                    apply_tech_filter=False, drop_footer_on="occupation_category", must_have_check=False),
]

outputs = batch_clean_bls(
        xls_path=xls_path,
        out_dir=out_dir,
        sheets=sheets_cfg,
        base_threshold=2,
        run_qc=True,
)
    


=== Cleaning Table 1.2 → bls_table_1_2 ===
Saved: data/processed_data/bls_table_1_2_tagged.csv | shape=(832, 18)
Saved: data/processed_data/bls_table_1_2.csv | shape=(22, 18)
==== Shapes ====
Tagged full: (832, 18)
Tech only  : (22, 18)

==== Duplicates (tagged) ====
0

==== Duplicates (tech) ====
0

==== SOC major distribution (tech) ====
2023_national_employment_matrix_code
15    21
11     1
Name: count, dtype: int64

==== Missing SOC-15 (informational) ====
0

==== Must-have check ====
Present: ['Software Developers', 'Data Scientists', 'Information Security Analysts', 'Computer Systems Analysts', 'Database Administrators', 'Database Architects', 'Computer Network Architects', 'Computer User Support Specialists', 'Web Developers', 'Web And Digital Interface Designers', 'Software Quality Assurance Analysts And Testers', 'Computer Programmers', 'Computer And Information Systems Managers']
Missing: []

=== Cleaning Table 1.3 → bls_table_1_3 ===
Saved: data/processed_data/bls_table_1_3

In [206]:
# -------------------------------------------------------------------
# Run WEF
# -------------------------------------------------------------------
wef = build_all_wef(save=True)
qc_wef_tables(wef)

[QC INFO] substitution rows not summing ~100% (tolerated due to chart eyeballing)
                     skill_group  very_low_capacity_pct  low_capacity_pct  \
0                  AI & Big Data                      0                 8   
1                    Programming                      0                35   
7  Curiosity & Lifelong Learning                      4                45   

   moderate_capacity_pct  high_capacity_pct  
0                     78                 20  
1                     45                 37  
7                     41                  3  
Saved: data/processed_data/wef_skill_growth_tidy.csv | shape=(12, 2)
Saved: data/processed_data/wef_skill_industry_tidy.csv | shape=(30, 3)
Saved: data/processed_data/wef_genai_substitution_tidy.csv | shape=(9, 5)
Saved: data/processed_data/wef_training_completion_tidy.csv | shape=(15, 2)

=== Skill growth (top 6) ===
                                 skill  net_increase_pct
0                        AI & Big Data          

In [196]:
# -------------------------------------------------------------------
# Run Kaggle
# -------------------------------------------------------------------

kaggle_out = run_kaggle_clean(save=True)
qc_kaggle(kaggle_out)

Saved: data/processed_data/kaggle_jobs_tech.csv | shape=(2393, 15)
Saved: data/processed_data/kaggle_ai_tech.csv | shape=(4666, 24)
Saved: data/processed_data/kaggle_skills_tidy.csv | shape=(74260, 5)
Saved: data/processed_data/kaggle_ai_metrics_monthly.csv | shape=(320, 5)
Saved: data/processed_data/kaggle_skill_counts_raw.csv | shape=(22246, 2)
Saved: data/processed_data/kaggle_skill_counts_canon.csv | shape=(5, 2)
Saved: data/processed_data/kaggle_title_summary.csv | shape=(1944, 2)

=== Kaggle shapes ===
jobs_tech (2393, 15)
ai_tech (4666, 24)
skills_all (74260, 5)
ai_metrics (320, 5)
skill_counts_raw (22246, 2)
skill_counts_canon (5, 2)
title_summary (1944, 2)

=== Sample AI titles (salary + month) ===
            job_title_norm  salary_usd post_month
435   ai software engineer      100678    2024-01
1132        data scientist       89495    2024-01
1419    ai product manager       68838    2024-01
1986         ai specialist       51821    2024-01
2215     robotics engineer       

In [212]:
low_cov_titles = check_salary_coverage(ai_metrics)

All titles have >= 50% salary coverage.
