# LFS Harmonizer v8 Notebook
Refactors the LFS harmonizer script into reusable notebook cells for single-file and batch processing.

Dependencies:
- pandas
- pyarrow (for parquet)

If needed:

```
pip install pandas pyarrow
```

## 1. Load Configuration and Paths
Set input/output paths, batch size, and runtime options.

In [1]:
from pathlib import Path

input_dir = Path("./raw")
output_dir = Path("./output_v8")

# batch processing 
batch_size = 5

single_file = ""

# Basic runtime options
show_samples = True

## 2. Define Column Priority and Output Schema
Standardize field selection and ordering.

In [2]:
COLUMN_PRIORITY = {
    # Survey identifiers
    "PUFREG": ["PUFREG", "CREG", "REG"],
    "PUFSVYYR": ["PUFSVYYR", "SVYYR", "CYEAR"],
    "PUFSVYMO": ["PUFSVYMO", "SVYMO", "CMONTH"],
    "PUFHHNUM": ["PUFHHNUM", "HHNUM"],
    "PUFPSU": ["PUFPSU", "PSU", "PSU_NO", "STRATUM"],
    "PUFHHSIZE": ["PUFHHSIZE", "HHID"],
    "PUFRPL": ["PUFRPL", "CRPM"],

    # Weight
    "PUFPWGTPRV": ["PUFPWGTPRV", "PUFPWGT", "PUFPWGTFIN", "CFWGT", "FWGT", "PWGT"],

    # Demographics
    "PUFC01_LNO": ["PUFC01_LNO", "C101_LNO", "CC101_LNO", "C04_LNO", "A01_LNO"],
    "PUFC03_REL": ["PUFC03_REL", "C05_REL", "CC05_REL", "C03_NEWMEM", "CC03_NEWMEM"],
    "PUFC04_SEX": ["PUFC04_SEX", "C06_SEX", "CC06_SEX"],
    "PUFC05_AGE": ["PUFC05_AGE", "C07_AGE", "CC07_AGE"],
    "PUFC06_MSTAT": ["PUFC06_MSTAT", "C08_MSTAT", "C08_MS", "CC08_MSTAT", "CC08_MS"],
    "PUFC07_GRADE": ["PUFC07_GRADE", "J12C09_GRADE", "C09_GRD", "C09_GRADE", "CC09_GRADE"],
    "PUFC08_CURSCH": ["PUFC08_CURSCH", "A02_CURSCH", "A02_CSCH"],
    "PUFC09_GRADTECH": ["PUFC09_GRADTECH", "J12C11_GRADTECH", "J12C11COURSE"],

    # Employment status
    "PUFC10_CONWR": ["PUFC10_CONWR", "PUFC08_CONWR", "C10_CONWR", "C10_CNWR", "CC10_CONWR"],
    "PUFC11_WORK": ["PUFC11_WORK", "PUFC09_WORK", "C13_WORK", "CC13_WORK", "CC01_WORK", "B01_WORK"],
    "PUFC12_JOB": ["PUFC12_JOB", "PUFC10_JOB", "C14_JOB", "CC14_JOB", "CC02_JOB", "B02_JOB"],
    "PUFNEWEMPSTAT": ["PUFNEWEMPSTAT", "NEWEMPSTAT", "CEMPST1", "CEMPST2", "NEWEMPST"],

    # Occupation and industry
    "PUFC14_PROCC": ["PUFC14_PROCC", "PUFC13_PROCC", "C16_PROCC", "C16_PROC", "CC16_PROCC",
                      "C16F2_PROCC", "C16L2_PROCC", "CC12_USOCC", "J01_USOCC", "J01_USOC"],
    "PUFC16_PKB": ["PUFC16_PKB", "PUFC15_PKB", "C18_PKB", "CC18_PKB",
                    "C18F2_PKB", "C18L2_PKB", "CC06_IND", "J03_OKB"],
    "PUFC17_NATEM": ["PUFC17_NATEM", "PUFC16_NATEM", "C20_NATEM", "C20_NTEM", "CC20_NATEM"],

    # Working hours
    "PUFC18_PNWHRS": ["PUFC18_PNWHRS", "PUFC17_PNWHRS", "C21_PNWHRS", "C21_PWHR", "CC21_PNWHRS", "CC18_PNWHRS"],
    "PUFC19_PHOURS": ["PUFC19_PHOURS", "PUFC18_PHOURS", "C22_PHOURS", "C22_PHRS", "CC22_PHOURS"],

    # Underemployment
    "PUFC20_PWMORE": ["PUFC20_PWMORE", "PUFC19_PWMORE", "C23_PWMORE", "C23_PWMR", "CC23_PWMORE"],
    "PUFC21_PLADDW": ["PUFC21_PLADDW", "PUFC20_PLADDW", "C24_PLADDW", "C24_PLAW", "CC24_PLADDW"],
    "PUFC22_PFWRK": ["PUFC22_PFWRK", "PUFC20B_FTWORK", "C25_PFWRK", "C25_PFWK", "CC25_PFWRK"],
    "PUFC23_PCLASS": ["PUFC23_PCLASS", "PUFC21_PCLASS", "C19_PCLASS", "C19PCLAS", "CC19_PCLASS"],

    # Pay
    "PUFC24_PBASIS": ["PUFC24_PBASIS", "C26_PBASIS", "C26_PBIS", "CC26_PBASIS"],
    "PUFC25_PBASIC": ["PUFC25_PBASIC", "C27_PBASIC", "C27_PBSC", "CC27_PBASIC", "C36_OBASIC", "C36_OBIC"],

    # Other job
    "PUFC26_OJOB": ["PUFC26_OJOB", "PUFC22_OJOB", "C28_OJOB", "CC28_OJOB"],

    # Multiple jobs
    "PUFC27_NJOBS": ["PUFC27_NJOBS", "A03_JOBS"],
    "PUFC28_THOURS": ["PUFC28_THOURS", "PUFC23_THOURS", "A04_THOURS", "A04_THRS"],
    "PUFC29_WWM48H": ["PUFC29_WWM48H", "PUFC24_WWM48H", "A05_RWM48H", "A05_R48H"],

    # Job search
    "PUFC30_LOOKW": ["PUFC30_LOOKW", "PUFC25_LOOKW", "C38_LOOKW", "C38_LOKW", "CC38_LOOKW", "CC30_LOOKW"],
    "PUFC31_FLWRK": ["PUFC31_FLWRK", "PUFC25B_FTWORK", "C41_FLWRK", "C41_FLWK", "CC41_FLWRK"],
    "PUFC32_JOBSM": ["PUFC32_JOBSM", "C39_JOBSM", "C39_JBSM", "CC39_JOBSM", "CC32_JOBSM"],
    "PUFC33_WEEKS": ["PUFC33_WEEKS", "C40_WEEKS", "C40_WKS", "CC40_WEEKS", "CC33_WEEKS"],
    "PUFC34_WYNOT": ["PUFC34_WYNOT", "PUFC26_WYNOT", "C42_WYNOT", "C42_WYNT", "CC42_WYNOT"],
    "PUFC35_LTLOOKW": ["PUFC35_LTLOOKW", "A06_LTLOOKW", "A06_LLKW", "CC35_LTLOOKW"],
    "PUFC36_AVAIL": ["PUFC36_AVAIL", "PUFC27_AVAIL", "C37_AVAIL", "C37_AVIL", "CC37_AVAIL", "CC36_AVAIL"],
    "PUFC37_WILLING": ["PUFC37_WILLING", "A07_WILLING", "A07_WLNG"],

    # Previous work
    "PUFC38_PREVJOB": ["PUFC38_PREVJOB", "PUFC28_PREVJOB", "C43_LBEF", "CC43_LBEF"],
    "PUFC39_YEAR": ["PUFC39_YEAR", "PUFC29_YEAR"],
    "PUFC39_MONTH": ["PUFC39_MONTH", "PUFC29_MONTH"],
    "PUFC41_POCC": ["PUFC41_POCC", "PUFC40_POCC", "PUFC31_POCC", "C45_POCC", "CC45_POCC",
                     "C45F2_POCC", "C45L2_POCC", "CC10_POCC"],
    "PUFC43_QKB": ["PUFC43_QKB", "PUFC33_QKB", "A09_PQKB", "A09F2_PQKB", "A09L2_PQKB", "PQKB", "QKB"],
}

OUTPUT_SCHEMA = [
    "PUFREG", "PUFSVYYR", "PUFSVYMO", "PUFHHNUM", "PUFPSU", "PUFHHSIZE", "PUFRPL",
    "PUFPWGTPRV",
    "PUFC01_LNO", "PUFC03_REL", "PUFC04_SEX", "PUFC05_AGE", "PUFC06_MSTAT",
    "PUFC07_GRADE", "PUFC08_CURSCH", "PUFC09_GRADTECH",
    "PUFC10_CONWR", "PUFC11_WORK", "PUFC12_JOB", "PUFNEWEMPSTAT",
    "PUFC14_PROCC", "PUFC16_PKB", "PUFC17_NATEM",
    "PUFC18_PNWHRS", "PUFC19_PHOURS",
    "PUFC20_PWMORE", "PUFC21_PLADDW", "PUFC22_PFWRK", "PUFC23_PCLASS",
    "PUFC24_PBASIS", "PUFC25_PBASIC",
    "PUFC26_OJOB",
    "PUFC27_NJOBS", "PUFC28_THOURS", "PUFC29_WWM48H",
    "PUFC30_LOOKW", "PUFC31_FLWRK", "PUFC32_JOBSM", "PUFC33_WEEKS",
    "PUFC34_WYNOT", "PUFC35_LTLOOKW", "PUFC36_AVAIL", "PUFC37_WILLING",
    "PUFC38_PREVJOB", "PUFC39_YEAR", "PUFC39_MONTH", "PUFC41_POCC", "PUFC43_QKB",
]

## 3. Implement Utility and Translation Functions
Helpers for parsing, column selection, and year-specific code translations.

In [3]:
import pandas as pd
import numpy as np
import json
import re
import gc
from datetime import datetime
import warnings

warnings.filterwarnings("ignore")


def safe_numeric(x):
    if x is None:
        return np.nan
    if isinstance(x, (int, np.integer)):
        return float(x)
    if isinstance(x, (float, np.floating)):
        return x if not np.isnan(x) else np.nan
    if isinstance(x, str):
        x = x.strip()
        if x == "" or x.lower() in ["nan", "na", ".", "none"]:
            return np.nan
        try:
            return float(x)
        except Exception:
            return np.nan
    return np.nan


def safe_int(x):
    val = safe_numeric(x)
    return np.nan if pd.isna(val) else int(val)


def clean_column(series):
    return pd.to_numeric(series, errors="coerce")


def get_column(df, target, col_map_upper):
    for src in COLUMN_PRIORITY.get(target, [target]):
        if src.upper() in col_map_upper:
            col = df[col_map_upper[src.upper()]].copy()
            non_empty = col.dropna()
            if len(non_empty) > 0:
                if col.dtype == object:
                    non_whitespace = non_empty[non_empty.astype(str).str.strip() != ""]
                    if len(non_whitespace) > 0:
                        return col
                else:
                    return col
    return None


def translate_mstat(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    if year <= 2010:
        return {1: 1, 2: 2, 3: 4, 4: 6, 5: 8}.get(code, np.nan)
    if year <= 2014:
        return {1: 1, 2: 2, 3: 4, 4: 6, 5: 8, 6: 7}.get(code, np.nan)
    if year <= 2023:
        return {1: 1, 2: 2, 3: 4, 4: 6, 5: 7, 6: 8}.get(code, np.nan)
    return code


def translate_grade(code, year):
    """Map education grade codes to PSCED 2017 levels 0-8.

    PSCED 2017 Levels:
        0 = Early Childhood Education / No Grade Completed
        1 = Primary Education
        2 = Lower Secondary Education
        3 = Upper Secondary Education
        4 = Post-Secondary Non-Tertiary Education
        5 = Short-Cycle Tertiary Education or Equivalent
        6 = Bachelor Level Education or Equivalent
        7 = Master Level Education or Equivalent
        8 = Doctoral Level Education or Equivalent

    Raw code schemes vary by era:
        <=2011  : Old LFS codes 0-78
        2012-2018: K-12 transition 3-digit codes 0-940
            NOTE: 801-899 are bachelor's degree field codes (NOT doctoral).
            The 3-digit coding uses 6xx AND 8xx for bachelor's fields.
        2019+   : Full PSCED 5-digit codes (e.g. 10011, 60413)
            Here 8xxxx ARE doctoral (PSCED Level 8).
    """
    code = safe_int(code)
    if pd.isna(code):
        return np.nan

    # ---- Era 1: Pre-K12 system (<=2011) ----
    if year <= 2011:
        if code == 0:
            return 0                    # No grade completed
        if code in (1, 2):
            return 1                    # Elementary undergrad/graduate -> Primary
        if code == 3:
            return 2                    # High school undergrad -> Lower Secondary
        if code == 4:
            return 3                    # High school graduate -> Upper Secondary
        if code == 5:
            return 5                    # Post-secondary / vocational -> Short-Cycle Tertiary
        if 60 <= code <= 68:
            return 5                    # College undergrad -> Short-Cycle Tertiary
        if 70 <= code <= 76:
            return 6                    # College graduate -> Bachelor's
        if code == 78:
            return 7                    # Post-baccalaureate -> Master's
        return np.nan

    # ---- Era 2: K-12 transition 3-digit codes (2012-2018) ----
    # NOTE: In this coding scheme, 801-899 are additional bachelor's
    # field-of-study codes, NOT doctoral programs. The 3-digit era has
    # no explicit doctoral code â€” only master's (710-760).
    if year <= 2018:
        if code == 0:
            return 0                    # No grade completed
        if code in (1, 2, 10):
            return 0                    # Pre-school / Kindergarten
        if 110 <= code <= 192:
            return 1                    # Elementary Gr1-6, grad, ALS primary
        if 210 <= code <= 270:
            return 2                    # JHS / HS undergrad (old 4-yr HS)
        if code == 280:
            return 3                    # HS Graduate (old 4-year system)
        if 310 <= code <= 350:
            return 3                    # SHS grades / SHS Graduate
        if 410 <= code <= 499:
            return 4                    # Post-secondary non-tertiary
        if 500 <= code <= 559:
            return 5                    # Short-cycle tertiary / TVET
        if 560 <= code <= 599:
            return 5                    # College undergrad (pursuing bachelor's)
        if 601 <= code <= 699:
            return 6                    # Bachelor's degree (field set A)
        if 710 <= code <= 760:
            return 7                    # Master's degree
        if 801 <= code <= 899:
            return 6                    # Bachelor's degree (field set B)
        # 900-940: Other/unspecified education -> NaN
        return np.nan

    # ---- Era 3: Full PSCED 5-digit codes (2019+) ----
    if code == 0:
        return 0                        # No grade completed
    if code == 1000:
        return 0                        # Early childhood / Kindergarten
    if code == 2000:
        return 1                        # Basic education placeholder -> Primary

    # Extract PSCED level from first digit of 5-digit code
    if 10000 <= code <= 19999:
        return 1                        # Level 1: Primary Education
    if 20000 <= code <= 29999:
        return 2                        # Level 2: Lower Secondary
    if 30000 <= code <= 39999:
        return 3                        # Level 3: Upper Secondary
    if 40000 <= code <= 49999:
        return 4                        # Level 4: Post-Secondary Non-Tertiary
    if 50000 <= code <= 59999:
        return 5                        # Level 5: Short-Cycle Tertiary
    if 60000 <= code <= 69999:
        return 6                        # Level 6: Bachelor's
    if 70000 <= code <= 79999:
        return 7                        # Level 7: Master's
    if 80000 <= code <= 89999:
        return 8                        # Level 8: Doctoral
    return np.nan


def translate_pclass(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if code in [0, 1, 2, 3, 4, 5, 6] else np.nan


def translate_natem(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if code in [1, 2, 3] else np.nan


def translate_pbasis(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if code in [0, 1, 2, 3, 4, 5, 6, 7] else np.nan


def translate_wynot(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    if year < 2021:
        if code == 6:
            return 61
        return code if code in [0, 1, 2, 3, 4, 5, 7, 8, 9] else np.nan
    return code


def translate_conwr(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if code in [1, 2, 3, 4, 5] else np.nan


def translate_yesno(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if code in [1, 2] else np.nan


def translate_rel(code, year):
    code = safe_int(code)
    if pd.isna(code):
        return np.nan
    return code if 1 <= code <= 26 else np.nan


TRANSLATION_MAP = {
    "PUFC03_REL": translate_rel,
    "PUFC06_MSTAT": translate_mstat,
    "PUFC07_GRADE": translate_grade,
    "PUFC10_CONWR": translate_conwr,
    "PUFC17_NATEM": translate_natem,
    "PUFC20_PWMORE": translate_yesno,
    "PUFC21_PLADDW": translate_yesno,
    "PUFC22_PFWRK": translate_yesno,
    "PUFC23_PCLASS": translate_pclass,
    "PUFC24_PBASIS": translate_pbasis,
    "PUFC26_OJOB": translate_yesno,
    "PUFC30_LOOKW": translate_yesno,
    "PUFC31_FLWRK": translate_yesno,
    "PUFC34_WYNOT": translate_wynot,
    "PUFC36_AVAIL": translate_yesno,
    "PUFC37_WILLING": translate_yesno,
    "PUFC38_PREVJOB": translate_yesno,
    "PUFC11_WORK": translate_yesno,
    "PUFC12_JOB": translate_yesno,
}

## 4. Process a Single CSV File
Read one CSV, detect year/month, map columns, apply translations, and return a harmonized DataFrame plus per-file diagnostics.

In [4]:
from collections import Counter

def extract_year_month(filepath):
    filename = Path(filepath).stem.upper()
    month_map = {
        "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
        "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12,
    }
    year_match = re.search(r"(20\d{2}|199\d)", filename)
    year = int(year_match.group(1)) if year_match else None
    month = next((m_num for m_name, m_num in month_map.items() if m_name in filename), None)
    return year, month


def read_csv(filepath):
    na_vals = ["", "\t", " ", "  ", "   ", ".", "NA", "nan", "NaN", "N/A"]
    for enc in ["utf-8", "latin-1", "cp1252"]:
        try:
            return pd.read_csv(filepath, encoding=enc, low_memory=False, na_values=na_vals)
        except pd.errors.ParserError:
            try:
                return pd.read_csv(
                    filepath,
                    encoding=enc,
                    na_values=na_vals,
                    engine="python",
                    on_bad_lines="warn",
                )
            except Exception:
                continue
        except Exception:
            continue
    return pd.read_csv(filepath, encoding="utf-8", engine="python", on_bad_lines="warn")


def get_column_with_source(df, target, col_map_upper):
    """Like get_column but also returns which source column name was used."""
    for src in COLUMN_PRIORITY.get(target, [target]):
        if src.upper() in col_map_upper:
            col = df[col_map_upper[src.upper()]].copy()
            non_empty = col.dropna()
            if len(non_empty) > 0:
                if col.dtype == object:
                    non_whitespace = non_empty[non_empty.astype(str).str.strip() != ""]
                    if len(non_whitespace) > 0:
                        return col, col_map_upper[src.upper()]
                else:
                    return col, col_map_upper[src.upper()]
    return None, None


def process_file(filepath, log_messages):
    """Process a single CSV and return (DataFrame, diagnostics_dict).
    
    Returns (None, error_diagnostics) on read failure.
    """
    fname = Path(filepath).name

    def log(msg):
        print(msg)
        log_messages.append(msg)

    log(f"Processing: {fname}")

    try:
        df = read_csv(filepath)
    except Exception as exc:
        log(f"  ERROR reading file: {exc}")
        return None, {"file_name": fname, "error": str(exc)}

    year, month = extract_year_month(filepath)
    col_map_upper = {c.upper(): c for c in df.columns}

    if year is None:
        for col in ["SVYYR", "CYEAR", "PUFSVYYR"]:
            if col.upper() in col_map_upper:
                try:
                    year = int(pd.to_numeric(df[col_map_upper[col.upper()]], errors="coerce").mode().iloc[0])
                except Exception:
                    pass
                if year:
                    break
        if year is None:
            log("  WARNING: Could not detect year, defaulting to 2020")
            year = 2020

    log(f"  Year: {year}, Month: {month}, Rows: {len(df):,}, Cols: {len(df.columns)}")

    # --- Build harmonized output with diagnostics ---
    out_df = pd.DataFrame(index=df.index)
    missing_cols = []
    source_column_used = {}
    translation_drops = {}
    dropped_code_values = {}

    # Track which raw columns get used
    used_raw_columns = set()

    # Also build set of ALL raw column names that appear in any COLUMN_PRIORITY list
    all_known_sources_upper = set()
    for sources in COLUMN_PRIORITY.values():
        for s in sources:
            all_known_sources_upper.add(s.upper())

    for target in OUTPUT_SCHEMA:
        col_data, src_name = get_column_with_source(df, target, col_map_upper)
        if col_data is not None:
            source_column_used[target] = src_name
            used_raw_columns.add(src_name.upper())
            col_data = clean_column(col_data)

            if target in TRANSLATION_MAP:
                # Count non-null before translation
                pre_valid = col_data.notna().sum()
                translated = col_data.apply(lambda x: TRANSLATION_MAP[target](x, year))
                post_valid = translated.notna().sum()
                drops = int(pre_valid - post_valid)
                translation_drops[target] = drops

                # Capture the actual code values that got dropped
                if drops > 0:
                    mask_dropped = col_data.notna() & translated.isna()
                    dropped_vals = col_data[mask_dropped].dropna()
                    if len(dropped_vals) > 0:
                        vc = dropped_vals.value_counts().head(10)
                        dropped_code_values[target] = {str(int(k)): int(v) for k, v in vc.items()}

                out_df[target] = translated
            else:
                out_df[target] = col_data
        else:
            out_df[target] = np.nan
            missing_cols.append(target)
            source_column_used[target] = None

    # Log all missing columns (not truncated)
    if missing_cols:
        log(f"  Missing columns ({len(missing_cols)}): {', '.join(missing_cols)}")

    out_df["PUFSVYYR"] = year
    if month:
        out_df["PUFSVYMO"] = month

    out_df = out_df[OUTPUT_SCHEMA]

    # --- Compute diagnostics ---
    # Unmapped raw columns: columns in the CSV that are NOT in any COLUMN_PRIORITY list
    unmapped_raw = [c for c in df.columns if c.upper() not in all_known_sources_upper]

    # Null rates
    null_rates = {}
    for col in OUTPUT_SCHEMA:
        total = len(out_df)
        nulls = int(out_df[col].isna().sum())
        null_rates[col] = round(nulls / total * 100, 2) if total > 0 else 0.0

    # Value ranges for numeric columns
    value_ranges = {}
    for col in OUTPUT_SCHEMA:
        s = out_df[col].dropna()
        if len(s) > 0:
            value_ranges[col] = {
                "min": float(s.min()),
                "max": float(s.max()),
                "n_unique": int(s.nunique()),
            }

    # Weight statistics
    weight_stats = {}
    if "PUFPWGTPRV" in out_df.columns:
        w = out_df["PUFPWGTPRV"].dropna()
        if len(w) > 0:
            weight_stats = {
                "min": float(w.min()),
                "max": float(w.max()),
                "mean": round(float(w.mean()), 2),
                "median": float(w.median()),
                "zero_count": int((w == 0).sum()),
                "negative_count": int((w < 0).sum()),
                "sum": float(w.sum()),
            }

    # Sex distribution
    sex_dist = {}
    if "PUFC04_SEX" in out_df.columns:
        vc = out_df["PUFC04_SEX"].dropna().value_counts()
        sex_dist = {str(int(k)): int(v) for k, v in vc.items()}

    # Age statistics
    age_stats = {}
    if "PUFC05_AGE" in out_df.columns:
        ages = out_df["PUFC05_AGE"].dropna()
        if len(ages) > 0:
            age_stats = {
                "min": float(ages.min()),
                "max": float(ages.max()),
                "mean": round(float(ages.mean()), 2),
                "implausible_negative": int((ages < 0).sum()),
                "implausible_over_120": int((ages > 120).sum()),
            }

    # Translation drop summary for log
    if translation_drops:
        drops_nonzero = {k: v for k, v in translation_drops.items() if v > 0}
        if drops_nonzero:
            log(f"  Translation drops: {drops_nonzero}")

    diagnostics = {
        "file_name": fname,
        "year": year,
        "month": month,
        "raw_rows": len(df),
        "raw_cols": len(df.columns),
        "harmonized_rows": len(out_df),
        "mapped_count": len(OUTPUT_SCHEMA) - len(missing_cols),
        "missing_count": len(missing_cols),
        "source_column_used": source_column_used,
        "unmapped_raw_columns": unmapped_raw,
        "missing_target_columns": missing_cols,
        "null_rates": null_rates,
        "translation_drops": translation_drops,
        "dropped_code_values": dropped_code_values,
        "value_ranges": value_ranges,
        "weight_stats": weight_stats,
        "sex_distribution": sex_dist,
        "age_stats": age_stats,
    }

    log(f"  Output: {len(out_df.columns)} columns, {len(out_df):,} rows [OK]")
    return out_df, diagnostics

## 5. Batch Process Directory and Write Parquet Outputs
Iterate over input files, write per-file outputs, and merge in batches.

In [5]:
def process_all_batched(input_dir, output_dir, batch_size=10):
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    individual_dir = output_path / "individual_files"
    individual_dir.mkdir(exist_ok=True)

    all_files = list(input_path.glob("*.csv")) + list(input_path.glob("*.CSV"))
    seen = set()
    files = []
    for f in sorted(all_files, key=lambda x: x.name.lower()):
        name_lower = f.name.lower()
        if name_lower not in seen:
            seen.add(name_lower)
            files.append(f)

    log_messages = []
    errors = []
    all_parquet_files = []
    all_diagnostics = []
    total_rows = 0

    def log(msg):
        print(msg)
        log_messages.append(msg)

    log("=" * 60)
    log("LFS HARMONIZER v8 - 47 CORE COLUMNS (with validation)")
    log(f"Started: {datetime.now().isoformat()}")
    log(f"Files found: {len(files)}")
    log(f"Batch size: {batch_size}")
    log("=" * 60)

    for i, f in enumerate(files, 1):
        log(f"\n[{i}/{len(files)}]")
        try:
            result = process_file(str(f), log_messages)
            df, diag = result
            all_diagnostics.append(diag)
            if df is not None:
                out_name = f"{f.stem}_harmonized.parquet"
                out_file = individual_dir / out_name
                df.to_parquet(out_file, index=False, compression="snappy")
                all_parquet_files.append(out_file)
                total_rows += len(df)
                del df
                gc.collect()
        except Exception as exc:
            log(f"  FATAL ERROR: {exc}")
            errors.append((f.name, str(exc)))

    log(f"\n{'=' * 60}")
    log("INDIVIDUAL FILES COMPLETE")
    log(f"  Processed: {len(all_parquet_files)} files")
    log(f"  Errors: {len(errors)} files")
    log(f"  Total rows: {total_rows:,}")
    log("=" * 60)

    combined_file = None
    if all_parquet_files:
        log(f"\nCombining files in batches of {batch_size}...")
        combined_file = output_path / "lfs_harmonized_2024codes.parquet"

        first_batch = True
        for batch_start in range(0, len(all_parquet_files), batch_size):
            batch_end = min(batch_start + batch_size, len(all_parquet_files))
            batch_files = all_parquet_files[batch_start:batch_end]

            log(f"  Batch {batch_start // batch_size + 1}: files {batch_start + 1}-{batch_end}")

            batch_dfs = [pd.read_parquet(f) for f in batch_files]
            batch_combined = pd.concat(batch_dfs, ignore_index=True)
            del batch_dfs
            gc.collect()

            if first_batch:
                batch_combined.to_parquet(combined_file, index=False, compression="snappy")
                first_batch = False
            else:
                existing = pd.read_parquet(combined_file)
                combined = pd.concat([existing, batch_combined], ignore_index=True)
                combined.to_parquet(combined_file, index=False, compression="snappy")
                del existing, combined

            del batch_combined
            gc.collect()

        log("\nFinal sorting by year/month...")
        final_df = pd.read_parquet(combined_file)
        if "PUFSVYYR" in final_df.columns:
            final_df = final_df.sort_values(["PUFSVYYR", "PUFSVYMO"]).reset_index(drop=True)
        final_df.to_parquet(combined_file, index=False, compression="snappy")

        log(f"\n{'=' * 60}")
        log("HARMONIZATION COMPLETE!")
        log("=" * 60)
        log(f"Output: {combined_file}")
        log(f"Total rows: {len(final_df):,}")
        log(f"Columns: {len(final_df.columns)}")
        if "PUFSVYYR" in final_df.columns:
            log(f"Years: {int(final_df['PUFSVYYR'].min())} - {int(final_df['PUFSVYYR'].max())}")

        meta = {
            "created": datetime.now().isoformat(),
            "version": "v8-notebook",
            "files_processed": len(all_parquet_files),
            "files_with_errors": len(errors),
            "total_rows": len(final_df),
            "columns": list(final_df.columns),
            "column_count": len(final_df.columns),
            "errors": errors[:20],
        }
        with open(output_path / "metadata.json", "w", encoding="utf-8") as f:
            json.dump(meta, f, indent=2)

        del final_df
        gc.collect()
    else:
        meta = {
            "created": datetime.now().isoformat(),
            "version": "v8-notebook",
            "files_processed": 0,
            "files_with_errors": len(errors),
            "total_rows": 0,
            "columns": [],
            "column_count": 0,
            "errors": errors[:20],
        }
        with open(output_path / "metadata.json", "w", encoding="utf-8") as f:
            json.dump(meta, f, indent=2)

    log_path = output_path / "harmonization_log.txt"
    with open(log_path, "w", encoding="utf-8") as f:
        f.write("\n".join(log_messages))

    # Save raw diagnostics as JSON for the validation report
    diag_path = output_path / "file_diagnostics.json"
    with open(diag_path, "w", encoding="utf-8") as f:
        json.dump(all_diagnostics, f, indent=2, default=str)

    return combined_file, meta, log_path, all_diagnostics

## 6. Summarize Metadata and Logs
Run the harmonizer and write metadata and logs.

In [None]:
output_dir.mkdir(parents=True, exist_ok=True)

log_messages = []
combined_file = None
meta = {}
all_diagnostics = []
log_path = output_dir / "harmonization_log.txt"

if single_file:
    result = process_file(single_file, log_messages)
    df, diag = result
    all_diagnostics.append(diag)
    if df is not None:
        out_file = output_dir / f"{Path(single_file).stem}_harmonized.parquet"
        df.to_parquet(out_file, index=False, compression="snappy")
        combined_file = out_file
        meta = {
            "created": datetime.now().isoformat(),
            "version": "v8-notebook",
            "files_processed": 1,
            "files_with_errors": 0,
            "total_rows": len(df),
            "columns": list(df.columns),
            "column_count": len(df.columns),
            "errors": [],
        }
        if show_samples:
            display(df.head())

    with open(output_dir / "metadata.json", "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2)

    with open(log_path, "w", encoding="utf-8") as f:
        f.write("\n".join(log_messages) if log_messages else "")
else:
    combined_file, meta, log_path, all_diagnostics = process_all_batched(
        input_dir, output_dir, batch_size=batch_size
    )

print("Metadata:")
print(json.dumps(meta, indent=2))
print(f"Log saved to: {log_path}")

if combined_file and Path(combined_file).exists():
    try:
        cols = ["PUFSVYYR"] if "PUFSVYYR" in meta.get("columns", []) else None
        if cols:
            years = pd.read_parquet(combined_file, columns=cols)["PUFSVYYR"].dropna()
            if not years.empty:
                print(f"Year range: {int(years.min())} - {int(years.max())}")
        if show_samples and not single_file:
            sample_df = pd.read_parquet(combined_file).head()
            display(sample_df)
    except Exception as exc:
        print(f"Could not summarize parquet: {exc}")
else:
    print("No output parquet produced.")

print(f"\nDiagnostics collected for {len(all_diagnostics)} files.")

LFS HARMONIZER v8 - 47 CORE COLUMNS (with validation)
Started: 2026-02-12T07:26:13.290356
Files found: 105
Batch size: 5

[1/105]
Processing: 2005-01JAN.CSV
  Year: 2005, Month: 1, Rows: 211,888, Cols: 85


## 7. Validation Report
Generate comprehensive validation outputs from the per-file diagnostics collected during harmonization. Produces:
- **Per-file summary table** (`per_file_diagnostics.csv`)
- **Unmapped variables list** (`unmapped_variables.csv`)
- **Missing columns matrix** (`missing_columns_matrix.csv`)
- **Translation drop details** (`translation_drops.csv`)
- **Human-readable validation report** (`validation_report.txt`)
- **Machine-readable report** (`validation_report.json`)

In [None]:
def generate_validation_report(diagnostics, output_path, output_schema):
    """Generate comprehensive validation reports from per-file diagnostics."""
    output_path = Path(output_path)
    report_lines = []

    def rpt(line=""):
        report_lines.append(line)

    # Filter to successful files only (those with 'raw_rows' key)
    valid_diags = [d for d in diagnostics if "raw_rows" in d]
    error_diags = [d for d in diagnostics if "error" in d]

    rpt("=" * 70)
    rpt("LFS HARMONIZATION - VALIDATION REPORT")
    rpt(f"Generated: {datetime.now().isoformat()}")
    rpt(f"Files analyzed: {len(valid_diags)} successful, {len(error_diags)} errors")
    rpt("=" * 70)

    # ====================================================================
    # A. PER-FILE SUMMARY TABLE
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("A. PER-FILE SUMMARY")
    rpt("=" * 70)

    summary_rows = []
    for d in valid_diags:
        overall_null = np.mean(list(d["null_rates"].values())) if d["null_rates"] else 0
        total_trans_drops = sum(d["translation_drops"].values()) if d["translation_drops"] else 0
        wt_zeros = d.get("weight_stats", {}).get("zero_count", 0)
        wt_sum = d.get("weight_stats", {}).get("sum", 0)

        summary_rows.append({
            "file": d["file_name"],
            "year": d["year"],
            "month": d["month"],
            "raw_rows": d["raw_rows"],
            "raw_cols": d["raw_cols"],
            "harmonized_rows": d["harmonized_rows"],
            "mapped_cols": d["mapped_count"],
            "missing_cols": d["missing_count"],
            "avg_null_pct": round(overall_null, 2),
            "weight_zeros": wt_zeros,
            "weight_sum": round(wt_sum, 0),
            "translation_drops": total_trans_drops,
        })

    summary_df = pd.DataFrame(summary_rows)
    summary_df.to_csv(output_path / "per_file_diagnostics.csv", index=False)

    rpt(f"\nSaved: per_file_diagnostics.csv ({len(summary_df)} rows)")
    rpt(f"\n{'file':<30s} {'year':>4s} {'mo':>2s} {'raw_rows':>10s} {'mapped':>6s} "
        f"{'miss':>4s} {'null%':>6s} {'t_drops':>7s}")
    rpt("-" * 75)
    for _, r in summary_df.iterrows():
        mo_str = str(int(r['month'])) if pd.notna(r['month']) else ''
        rpt(f"{r['file']:<30s} {int(r['year']):>4d} {mo_str:>2s} "
            f"{int(r['raw_rows']):>10,d} {int(r['mapped_cols']):>6d} "
            f"{int(r['missing_cols']):>4d} {r['avg_null_pct']:>6.1f} "
            f"{int(r['translation_drops']):>7d}")

    # ====================================================================
    # B. UNMAPPED VARIABLES REPORT
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("B. UNMAPPED RAW VARIABLES")
    rpt("   (columns in raw CSVs not used by any COLUMN_PRIORITY mapping)")
    rpt("=" * 70)

    unmapped_counter = Counter()
    unmapped_by_year = {}
    for d in valid_diags:
        for col in d.get("unmapped_raw_columns", []):
            col_upper = col.upper()
            unmapped_counter[col_upper] += 1
            if col_upper not in unmapped_by_year:
                unmapped_by_year[col_upper] = set()
            unmapped_by_year[col_upper].add(d["year"])

    unmapped_rows = []
    for var, count in unmapped_counter.most_common():
        years = sorted(unmapped_by_year.get(var, []))
        year_range = f"{years[0]}-{years[-1]}" if years else ""
        unmapped_rows.append({
            "variable": var,
            "file_count": count,
            "year_range": year_range,
            "years": ",".join(str(y) for y in years),
        })

    unmapped_df = pd.DataFrame(unmapped_rows)
    unmapped_df.to_csv(output_path / "unmapped_variables.csv", index=False)

    rpt(f"\nTotal unmapped variables: {len(unmapped_df)}")
    rpt(f"Saved: unmapped_variables.csv")
    rpt(f"\n{'variable':<30s} {'files':>5s} {'year_range':<12s}")
    rpt("-" * 50)
    for _, r in unmapped_df.head(30).iterrows():
        rpt(f"{r['variable']:<30s} {int(r['file_count']):>5d} {r['year_range']:<12s}")
    if len(unmapped_df) > 30:
        rpt(f"  ... and {len(unmapped_df) - 30} more (see unmapped_variables.csv)")

    # ====================================================================
    # C. MISSING TARGET COLUMNS MATRIX
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("C. MISSING TARGET COLUMNS MATRIX")
    rpt("   (1 = missing from that file, 0 = present)")
    rpt("=" * 70)

    matrix_rows = []
    for d in valid_diags:
        row = {"file": d["file_name"], "year": d["year"], "month": d["month"]}
        missing_set = set(d.get("missing_target_columns", []))
        for col in output_schema:
            row[col] = 1 if col in missing_set else 0
        matrix_rows.append(row)

    matrix_df = pd.DataFrame(matrix_rows)
    matrix_df.to_csv(output_path / "missing_columns_matrix.csv", index=False)
    rpt(f"Saved: missing_columns_matrix.csv ({len(matrix_df)} files x {len(output_schema)} columns)")

    # Summarize: which columns are most frequently missing
    miss_totals = {}
    for col in output_schema:
        miss_totals[col] = int(matrix_df[col].sum())

    rpt(f"\n{'target_column':<25s} {'missing_from':>12s} {'pct':>6s}")
    rpt("-" * 45)
    for col in output_schema:
        cnt = miss_totals[col]
        if cnt > 0:
            pct = round(cnt / len(matrix_df) * 100, 1)
            rpt(f"{col:<25s} {cnt:>8d}/{len(matrix_df):<3d} {pct:>5.1f}%")

    # ====================================================================
    # D. TRANSLATION DROP REPORT
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("D. TRANSLATION DROP REPORT")
    rpt("   (values that were non-null in raw but became NaN after translation)")
    rpt("=" * 70)

    trans_rows = []
    all_dropped_codes = {}  # target -> Counter of dropped codes across all files

    for d in valid_diags:
        for target, drops in d.get("translation_drops", {}).items():
            if drops > 0:
                trans_rows.append({
                    "file": d["file_name"],
                    "year": d["year"],
                    "month": d["month"],
                    "target_column": target,
                    "values_dropped": drops,
                    "raw_rows": d["raw_rows"],
                    "drop_pct": round(drops / d["raw_rows"] * 100, 2) if d["raw_rows"] > 0 else 0,
                })
            # Aggregate dropped code values
            for target_col, code_dict in d.get("dropped_code_values", {}).items():
                if target_col not in all_dropped_codes:
                    all_dropped_codes[target_col] = Counter()
                for code_val, count in code_dict.items():
                    all_dropped_codes[target_col][code_val] += count

    trans_df = pd.DataFrame(trans_rows)
    if len(trans_df) > 0:
        trans_df = trans_df.sort_values(["target_column", "year", "month"])
    trans_df.to_csv(output_path / "translation_drops.csv", index=False)
    rpt(f"Saved: translation_drops.csv ({len(trans_df)} records)")

    if len(trans_df) > 0:
        # Summary by target column
        rpt(f"\n{'target_column':<25s} {'total_drops':>12s} {'files_affected':>15s}")
        rpt("-" * 55)
        for target in sorted(trans_df["target_column"].unique()):
            subset = trans_df[trans_df["target_column"] == target]
            rpt(f"{target:<25s} {int(subset['values_dropped'].sum()):>12,d} {len(subset):>15d}")

        # Show the actual dropped code values
        rpt("\nDropped code values (top 10 per variable across all files):")
        for target in sorted(all_dropped_codes.keys()):
            codes = all_dropped_codes[target].most_common(10)
            code_str = ", ".join(f"{c}(n={n:,})" for c, n in codes)
            rpt(f"  {target}: {code_str}")
    else:
        rpt("\nNo translation drops detected.")

    # ====================================================================
    # E. CROSS-FILE CONSISTENCY CHECKS
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("E. CROSS-FILE CONSISTENCY CHECKS")
    rpt("=" * 70)

    flags = []

    # E1. Row count outliers (separate quarterly vs monthly)
    if len(summary_df) > 0:
        # Determine if file is quarterly (Jan/Apr/Jul/Oct) or monthly
        summary_df["freq"] = summary_df["month"].apply(
            lambda m: "quarterly" if m in [1, 4, 7, 10] else "monthly"
        )
        for freq_type in ["quarterly", "monthly"]:
            subset = summary_df[summary_df["freq"] == freq_type]
            if len(subset) > 2:
                median_rows = subset["raw_rows"].median()
                for _, r in subset.iterrows():
                    ratio = r["raw_rows"] / median_rows if median_rows > 0 else 0
                    if ratio < 0.5 or ratio > 1.5:
                        flag = (f"ROW_COUNT_OUTLIER: {r['file']} has {int(r['raw_rows']):,} rows "
                                f"({freq_type} median={int(median_rows):,}, ratio={ratio:.2f})")
                        flags.append(flag)
                        rpt(f"  [FLAG] {flag}")

    # E2. Weight sum jumps (>30% change between adjacent periods)
    if len(summary_df) > 1:
        sorted_summary = summary_df.sort_values(["year", "month"]).reset_index(drop=True)
        for i in range(1, len(sorted_summary)):
            prev_wt = sorted_summary.iloc[i - 1]["weight_sum"]
            curr_wt = sorted_summary.iloc[i]["weight_sum"]
            if prev_wt > 0 and curr_wt > 0:
                change = abs(curr_wt - prev_wt) / prev_wt
                if change > 0.3:
                    flag = (f"WEIGHT_JUMP: {sorted_summary.iloc[i]['file']} weight sum changed "
                            f"{change:.0%} from previous period")
                    flags.append(flag)
                    rpt(f"  [FLAG] {flag}")

    # E3. Sex ratio check
    sex_ratios = []
    for d in valid_diags:
        sd = d.get("sex_distribution", {})
        male = sd.get("1", 0)
        female = sd.get("2", 0)
        if male + female > 0:
            ratio = male / (male + female)
            sex_ratios.append((d["file_name"], ratio))

    if sex_ratios:
        median_sex_ratio = np.median([r for _, r in sex_ratios])
        for fname, ratio in sex_ratios:
            if abs(ratio - median_sex_ratio) > 0.10:
                flag = (f"SEX_RATIO: {fname} male ratio={ratio:.3f} "
                        f"(median={median_sex_ratio:.3f})")
                flags.append(flag)
                rpt(f"  [FLAG] {flag}")

    # E4. Age implausible values
    for d in valid_diags:
        astats = d.get("age_stats", {})
        neg = astats.get("implausible_negative", 0)
        over120 = astats.get("implausible_over_120", 0)
        if neg > 0 or over120 > 0:
            flag = f"AGE_IMPLAUSIBLE: {d['file_name']} has {neg} negative, {over120} over-120 ages"
            flags.append(flag)
            rpt(f"  [FLAG] {flag}")

    if not flags:
        rpt("  No consistency flags raised.")

    rpt(f"\nTotal flags: {len(flags)}")

    # ====================================================================
    # F. AGGREGATE STATISTICS
    # ====================================================================
    rpt("\n" + "=" * 70)
    rpt("F. AGGREGATE STATISTICS")
    rpt("=" * 70)

    total_rows = sum(d["harmonized_rows"] for d in valid_diags)
    rpt(f"\nTotal harmonized rows: {total_rows:,}")
    rpt(f"Files processed: {len(valid_diags)}")
    rpt(f"Files with errors: {len(error_diags)}")

    # Overall null rate per column
    rpt(f"\n{'column':<25s} {'avg_null%':>10s} {'min_null%':>10s} {'max_null%':>10s}")
    rpt("-" * 57)
    for col in output_schema:
        rates = [d["null_rates"].get(col, 100.0) for d in valid_diags]
        if rates:
            rpt(f"{col:<25s} {np.mean(rates):>10.2f} {min(rates):>10.2f} {max(rates):>10.2f}")

    # Column mapping audit trail: which source was used for each target, by year
    rpt("\n" + "-" * 70)
    rpt("COLUMN MAPPING AUDIT TRAIL (which raw column was used per target)")
    rpt("-" * 70)
    for target in output_schema:
        sources_by_year = {}
        for d in valid_diags:
            src = d.get("source_column_used", {}).get(target)
            yr = d["year"]
            if yr not in sources_by_year:
                sources_by_year[yr] = set()
            sources_by_year[yr].add(src if src else "MISSING")

        # Summarize: group consecutive years with same source
        year_source_pairs = sorted(sources_by_year.items())
        summary_parts = []
        for yr, srcs in year_source_pairs:
            src_str = "/".join(sorted(srcs))
            summary_parts.append(f"{yr}:{src_str}")

        # Compress if many years have same source
        if len(set(str(s) for _, s in year_source_pairs)) <= 3:
            # Group by source
            source_to_years = {}
            for yr, srcs in year_source_pairs:
                key = "/".join(sorted(srcs))
                if key not in source_to_years:
                    source_to_years[key] = []
                source_to_years[key].append(yr)

            parts = []
            for src, yrs in source_to_years.items():
                if len(yrs) > 2:
                    parts.append(f"{src} ({min(yrs)}-{max(yrs)})")
                else:
                    parts.append(f"{src} ({','.join(str(y) for y in yrs)})")
            rpt(f"  {target:<25s} -> {' | '.join(parts)}")
        else:
            rpt(f"  {target:<25s} -> (varies, see per_file_diagnostics.csv)")

    # ====================================================================
    # SAVE OUTPUTS
    # ====================================================================
    # Human-readable report
    report_path = output_path / "validation_report.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write("\n".join(report_lines))

    # Machine-readable JSON report
    json_report = {
        "generated": datetime.now().isoformat(),
        "files_analyzed": len(valid_diags),
        "files_with_errors": len(error_diags),
        "total_harmonized_rows": total_rows,
        "flags": flags,
        "unmapped_variable_count": len(unmapped_df),
        "missing_column_summary": miss_totals,
        "translation_drop_summary": {
            target: int(trans_df[trans_df["target_column"] == target]["values_dropped"].sum())
            for target in trans_df["target_column"].unique()
        } if len(trans_df) > 0 else {},
        "dropped_code_values": {k: dict(v.most_common(10)) for k, v in all_dropped_codes.items()},
    }
    with open(output_path / "validation_report.json", "w", encoding="utf-8") as f:
        json.dump(json_report, f, indent=2, default=str)

    print(f"Validation report saved to: {report_path}")
    print(f"JSON report saved to: {output_path / 'validation_report.json'}")
    print(f"Per-file diagnostics: {output_path / 'per_file_diagnostics.csv'}")
    print(f"Unmapped variables: {output_path / 'unmapped_variables.csv'}")
    print(f"Missing columns matrix: {output_path / 'missing_columns_matrix.csv'}")
    print(f"Translation drops: {output_path / 'translation_drops.csv'}")
    print(f"\nTotal flags raised: {len(flags)}")

    return report_lines, flags


# Run the validation report
if all_diagnostics:
    report_lines, flags = generate_validation_report(
        all_diagnostics, output_dir, OUTPUT_SCHEMA
    )
    # Display the text report in the notebook
    print("\n".join(report_lines))
else:
    print("No diagnostics available. Run the harmonization first (Section 6).")

Validation report saved to: output_v8\validation_report.txt
JSON report saved to: output_v8\validation_report.json
Per-file diagnostics: output_v8\per_file_diagnostics.csv
Unmapped variables: output_v8\unmapped_variables.csv
Missing columns matrix: output_v8\missing_columns_matrix.csv
Translation drops: output_v8\translation_drops.csv

Total flags raised: 4
LFS HARMONIZATION - VALIDATION REPORT
Generated: 2026-02-12T04:27:00.054828
Files analyzed: 105 successful, 0 errors

A. PER-FILE SUMMARY

Saved: per_file_diagnostics.csv (105 rows)

file                           year mo   raw_rows mapped miss  null% t_drops
---------------------------------------------------------------------------
2005-01JAN.CSV                 2005  1    211,888     35   13   58.2       0
2005-04APR.CSV                 2005  4    213,475     43    5   56.4       0
2005-07JUL.CSV                 2005  7    206,467     43    5   56.3       2
2005-10OCT.CSV                 2005 10    204,459     43    5   56.2     