In [1]:
"""cps_pipeline.py — Population counts by State × Occupation × Nativity × Employment
====================================================================================
This script builds CPS-style population estimates by
  • State (FIPS)             – GESTFIPS
  • Occupation (2018 SOC, primary job) – PTIO1OCD
  • Foreign-born, not a U.S. citizen   – derived from PRCITSHP
  • Employment status (Employed vs. Unemployed) – PREMPNOT
for civilians age ≥ 16 who are **in** the civilian labor force.

This version is simplified for use in a Google Colab or Jupyter notebook.
Set the file paths in the "Analysis Execution" section at the bottom of the
script, then run the entire script.

Key features
------------
* **Case-insensitive import** – Reads your CSV without assuming upper- or lower-case
  headers; all names are coerced to UPPER-CASE right after ingest, so the script
  works no matter what your extraction software produced.
* **Listwise deletion** before any transformations (on the analysis variable set)
  to avoid weight distortions from missing values.
* Outputs three CSVs: a variable dictionary, a long table, and a wide table.

Dependencies: pandas ≥ 1.5.
"""
import pathlib
import sys
import pandas as pd

# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
# These are the raw variable names required from the input CSV.
RAW_COLS = [
    "GESTFIPS", "PRPERTYP", "PRTAGE", "PWCMPWGT", "PWORWGT", "PTIO1OCD",
    "PRCITSHP", "PENATVTY", "PRINUYER", "PEMLR", "PRCIVLF", "PREMPNOT",
]

# This map provides human-readable names for the raw CPS variables.
RENAME_MAP = {
    "GESTFIPS": "state_fips",
    "PRPERTYP": "person_type",
    "PRTAGE": "age",
    "PWCMPWGT": "weight_cmp",
    "PWORWGT": "weight_or",
    "PTIO1OCD": "occ2018",
    "PRCITSHP": "citizenship",
    "PENATVTY": "nativity_country",
    "PRINUYER": "year_of_entry",
    "PEMLR": "mlr_status",
    "PRCIVLF": "in_civ_lf",
    "PREMPNOT": "emp_recode",
}

# This list provides content for the output variable dictionary CSV.
DICTIONARY_ROWS = [
    ("state_fips", "GESTFIPS", "FIPS state code (household geography)"),
    ("person_type", "PRPERTYP", "Person record type (should be 2 for civilians)"),
    ("age", "PRTAGE", "Age in years"),
    ("weight_cmp", "PWCMPWGT", "Composited person weight (4 implied decimals)"),
    ("weight_or", "PWORWGT", "Outgoing-rotation weight (earnings analyses)"),
    ("occ2018", "PTIO1OCD", "Primary job occupation — 2018 SOC cross-walked"),
    ("citizenship", "PRCITSHP", "Citizenship/nativity recode"),
    ("nativity_country", "PENATVTY", "Country of birth (nativity)"),
    ("year_of_entry", "PRINUYER", "Year of entry into the U.S."),
    ("mlr_status", "PEMLR", "Monthly labor-force recode (7 categories)"),
    ("in_civ_lf", "PRCIVLF", "Civilian labor force flag (1=in LF, 2=not in LF)"),
    ("emp_recode", "PREMPNOT", "Employed/Unemployed/NILF recode"),
]

# These are the variables used in the final analysis. Any record with a missing
# value in one of these columns will be dropped (listwise deletion).
ANALYSIS_VARS = [
    "state_fips", "occ2018", "age", "citizenship", "emp_recode",
    "in_civ_lf", "weight_cmp",
]

# -----------------------------------------------------------------------------
# Helper functions
# -----------------------------------------------------------------------------

def read_cps(path: pathlib.Path) -> pd.DataFrame:
    """Read CPS CSV and force all column names to upper-case."""
    print("→ Reading CPS extract…", file=sys.stderr)
    try:
        df = pd.read_csv(path, low_memory=False)
    except FileNotFoundError:
        print(f"Error: Input file not found at {path}", file=sys.stderr)
        # In a notebook, it's better to raise an error than to exit.
        raise

    # Standardise header case to prevent case sensitivity issues.
    df.columns = df.columns.str.upper()
    print(f"  Loaded {len(df):,} rows × {df.shape[1]} cols", file=sys.stderr)
    # Check that all required columns are present in the dataframe.
    missing = sorted(set(RAW_COLS) - set(df.columns))
    if missing:
        raise KeyError(
            f"Input file lacks required column(s): {', '.join(missing)}.\n"
            "Ensure your extract includes those variables."
        )
    # Subset to only the raw variables needed for the pipeline.
    return df[RAW_COLS].copy()


def rename_vars(df: pd.DataFrame) -> pd.DataFrame:
    """Rename raw CPS variables to short, human-readable names."""
    return df.rename(columns=RENAME_MAP)


def write_dictionary(outdir: pathlib.Path):
    """Save variable dictionary as a CSV file."""
    outdir.mkdir(parents=True, exist_ok=True)
    dict_df = pd.DataFrame(DICTIONARY_ROWS, columns=["name", "source", "description"])
    dict_path = outdir / "variable_dictionary.csv"
    dict_df.to_csv(dict_path, index=False)
    print(f"✓ Wrote {dict_path}")


def derive_flags(
    df: pd.DataFrame,
    *,
    narrow: bool = False,          # ← set to True for the “non-citizen only” split
) -> pd.DataFrame:
    """
    Add nativity & employment flags.

    Parameters
    ----------
    df : DataFrame
        CPS micro-data after renaming.
    narrow : bool, default False
        • False → 'nativity_flag' = 1 if PRCITSHP in {4, 5}  (foreign-born, BLS Table A-7)
        • True  → 'nativity_flag' = 1 if PRCITSHP     == 5   (foreign-born non-citizen)

    Columns created
    ---------------
    foreign_born      (1 = PRCITSHP in {4, 5})
    non_cit_foreign   (1 = PRCITSHP == 5)
    nativity_flag     (whichever of the above is selected by *narrow*)
    emp_status        ('EMPLOYED' / 'UNEMPLOYED')
    """
    # ── citizenship flags ────────────────────────────────────────────────
    cit_num = pd.to_numeric(df["citizenship"], errors="coerce")

    df["foreign_born"]    = cit_num.isin([4, 5]).astype(int)
    df["non_cit_foreign"] = (cit_num == 5).astype(int)

    # master flag used in the later collapse
    df["nativity_flag"] = (
        df["non_cit_foreign"] if narrow else df["foreign_born"]
    )

    # ── employment-status flag ───────────────────────────────────────────
    emp_num = pd.to_numeric(df["emp_recode"], errors="coerce")
    df["emp_status"] = emp_num.map({1: "EMPLOYED", 2: "UNEMPLOYED"})

    # keep only records that mapped successfully (drop NILF 3/4 & miscoded)
    return df[~df["emp_status"].isna()].copy()



def listwise_delete(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    """Drop any record with a missing value in *any* of the given columns."""
    before = len(df)
    df2 = df.dropna(subset=cols)
    after = len(df2)
    if (before - after) > 0:
        print(f"→ Listwise deletion: dropped {before - after:,} rows with missing values", file=sys.stderr)
    return df2


def filter_to_universe(df: pd.DataFrame) -> pd.DataFrame:
    """Keep only civilians aged 16+ who are in the civilian labor force."""
    # --- Debugging ---
    # Show the unique values in the column used for filtering to help diagnose data issues.
    print("--- Debug Info ---", file=sys.stderr)
    print("Unique values in 'in_civ_lf' column before filtering:", df['in_civ_lf'].unique(), file=sys.stderr)
    print("--------------------", file=sys.stderr)
    # -----------------

    # The universe is defined by age and civilian labor force status (PRCIVLF == 1).
    # Using pd.to_numeric is more robust and handles cases where the data might
    # be a float (1.0) or string ('1') instead of just an integer.
    mask = (df["age"] >= 16) & (pd.to_numeric(df["in_civ_lf"], errors='coerce') == 1)
    filtered = df[mask].copy()
    print(f"→ Subset to civilians 16+ in LF: {len(filtered):,} rows remain", file=sys.stderr)
    return filtered


def get_weighted_counts(
    df: pd.DataFrame,
    *,
    weight_col: str = "weight_cmp",     # ← choose "weight_or", "weight_cmp", etc.
    implied_decimals: int = 4,          # 4 for PWCMPWGT / PWORWGT, 0 for pre-scaled
) -> pd.DataFrame:
    """
    Collapse the person-level file to weighted counts by
    state × occupation × nativity × employment.

    Parameters
    ----------
    df : DataFrame
        Person-level CPS data after derive_flags() and universe filtering.
    weight_col : str, default "weight_cmp"
        Column in *df* that contains the raw weight.
    implied_decimals : int, default 4
        Number of implied decimals in the stored weight.  The function will
        divide by 10**implied_decimals to obtain the real-scale weight.

    Returns
    -------
    DataFrame
        One row per (state_fips, occ2018, nativity_flag, emp_status)
        with a 'population' column of weighted counts.
    """
    # ------------------------------------------------------------------
    if weight_col not in df.columns:
        raise KeyError(f"{weight_col!r} not found in DataFrame columns.")

    scale = 10 ** implied_decimals
    df["_wt"] = df[weight_col] / scale

    group_cols = ["state_fips", "occ2018", "nativity_flag", "emp_status"]

    out = (
        df.groupby(group_cols, dropna=False)["_wt"]
          .sum()
          .reset_index(name="population")
    )

    print("→ Collapsed to weighted counts", file=sys.stderr)
    return out



def pivot_wide(df: pd.DataFrame) -> pd.DataFrame:
    """Convert long EMPLOYED/UNEMPLOYED rows into wide columns."""
    wide = df.pivot_table(
        index=["state_fips", "occ2018", "nativity_flag"],
        columns="emp_status",
        values="population",
        fill_value=0,
    ).reset_index()
    wide.columns.name = None
    return wide

def pad_occ_codes(df: pd.DataFrame, col: str = "occ2018") -> pd.DataFrame:
    """
    Ensure that the occupation codes are zero-padded, 4-character strings.

    Examples
    --------
    10     -> '0010'
    '7110' -> '7110'
    NaN    -> NaN  (left untouched to preserve missingness)
    """
    df[col] = (
        df[col]                       # raw column
          .apply(pd.to_numeric, errors="coerce")   # make sure we’re numeric
          .astype("Int64")            # Pandas nullable integer → keeps NaNs
          .astype(str)                # → string
          .str.zfill(4)               # zero-pad to width 4
    )
    return df




In [2]:

# -----------------------------------------------------------------------------
# Analysis Execution
# -----------------------------------------------------------------------------
# TODO: Set your file paths here
# -----------------------------------------------------------------------------
# Path to your input CPS CSV file.
# For Colab, you might upload this or access it from Google Drive.
# Example: cps_csv_path = pathlib.Path('/content/cps_data.csv')
cps_csv_path = pathlib.Path(r"/content/drive/MyDrive/cps_occup_data/basic_cps.csv")

# Path to the directory where you want to save the output files.
# Example: output_dir = pathlib.Path('/content/results/')
output_dir = pathlib.Path("/content/drive/MyDrive/cps_occup_data/results/")
# -----------------------------------------------------------------------------

# Create the output directory if it doesn't exist.
output_dir.mkdir(parents=True, exist_ok=True)

# --- Run the pipeline by calling the functions in order ---
df_raw = read_cps(cps_csv_path)
df_renamed = rename_vars(df_raw)

# Pad occupation codes (must come *before* listwise_delete)
df_renamed = pad_occ_codes(df_renamed)

# Save variable dictionary (once per run).
write_dictionary(output_dir)

# Listwise deletion on analysis variables (before LF filters).
df_clean = listwise_delete(df_renamed, ANALYSIS_VARS)

# Explicitly create a copy after filtering to prevent SettingWithCopyWarning.
# This ensures that derive_flags operates on a clean, independent DataFrame.
df_derived = derive_flags(df_clean.copy())
df_lf = filter_to_universe(df_derived)

# Save raw data - cleaned and filtered
df_lf_path = output_dir / "cps_state_occ_nat_emp_raw_non_collapsed.csv"
df_lf.to_csv(df_lf_path, index=False)
print(f"✓ Wrote {df_lf_path}")

# Collapse to a long table of weighted counts.
long_tbl = get_weighted_counts(df_lf, weight_col="weight_cmp")
long_path = output_dir / "cps_state_occ_nat_emp_long.csv"
long_tbl.to_csv(long_path, index=False)
print(f"✓ Wrote {long_path}")

# Pivot the long table into a wide format.
wide_tbl = pivot_wide(long_tbl)
wide_path = output_dir / "cps_state_occ_nat_emp_wide.csv"
wide_tbl.to_csv(wide_path, index=False)
print(f"✓ Wrote {wide_path}")

print("\nPipeline finished successfully.", file=sys.stderr)

→ Reading CPS extract…
  Loaded 123,461 rows × 393 cols


✓ Wrote /content/drive/MyDrive/cps_occup_data/results/variable_dictionary.csv


→ Listwise deletion: dropped 28,434 rows with missing values
--- Debug Info ---
Unique values in 'in_civ_lf' column before filtering: [1.]
--------------------
→ Subset to civilians 16+ in LF: 46,511 rows remain


✓ Wrote /content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_raw_non_collapsed.csv


→ Collapsed to weighted counts


✓ Wrote /content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_long.csv
✓ Wrote /content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_wide.csv



Pipeline finished successfully.


In [5]:
import pandas as pd
from pathlib import Path

raw_path = pathlib.Path("/content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_raw_non_collapsed.csv")
long_tbl = pd.read_csv(raw_path)
print(long_tbl.shape)
long_tbl.head()

long_path = pathlib.Path("/content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_long.csv")
long_tbl = pd.read_csv(long_path)
print(long_tbl.shape)
long_tbl.head()

wide_path = pathlib.Path("/content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_wide.csv")
wide_tbl = pd.read_csv(wide_path)
print(wide_tbl.shape)
wide_tbl.head()

# Make sure that the occupation codes are zero-padded, 4-character strings.
df_lf["occ2018"]   = df_lf["occ2018"].astype(str).str.zfill(4)
wide_tbl["occ2018"] = wide_tbl["occ2018"].astype(str).str.zfill(4)

# ── FINAL COMPLETENESS CHECK ────────────────────────────────────────────────
# 1. Unique dimension counts in the *pre-collapse* universe
u_states = df_lf["state_fips"].nunique()
u_occ    = df_lf["occ2018"].nunique()
u_cit    = df_lf["non_cit_foreign"].nunique()   # should be 2 (0 / 1)

theoretical_rows = u_states * u_occ * u_cit
actual_rows      = wide_tbl.shape[0]

print(f"Unique states        : {u_states}")
print(f"Unique occupations   : {u_occ}")
print(f"Citizenship flags    : {u_cit}")
print(f"Theoretical row count: {theoretical_rows:,}")
print(f"Actual wide rows     : {actual_rows:,}")

# 2. Quick pass / warn
if actual_rows == theoretical_rows:
    print("✓  Wide table has every possible State × Occupation × Citizenship cell.")
else:
    print("⚠  Wide table is missing",
          theoretical_rows - actual_rows, "cells "
          f"({actual_rows / theoretical_rows:.1%} coverage). Note: Sparcity is expected.")

    # 3. (Optional) list first few missing combinations
    #    Build the full Cartesian product, outer-merge, and flag empties
    import itertools as it
    import pandas as pd

    full_index = pd.MultiIndex.from_product(
        [df_lf["state_fips"].unique(),
         df_lf["occ2018"].unique(),
         df_lf["nativity_flag"].unique()],
        names=["state_fips", "occ2018", "nativity_flag"]
    ).to_frame(index=False)

    missing = (
        pd.MultiIndex.from_product(
            [df_lf["state_fips"].unique(),
            df_lf["occ2018"].unique(),
            df_lf["nativity_flag"].unique()],
            names=["state_fips", "occ2018", "nativity_flag"]
        ).to_frame(index=False)
          .merge(wide_tbl[["state_fips","occ2018","nativity_flag"]],
                on=["state_fips","occ2018","nativity_flag"],
                how="left", indicator=True)
          .query("_merge == 'left_only'")
          .drop(columns="_merge")
    )

    print("\nFirst few missing cells:")
    display(missing.head())


(46511, 16)
(16085, 5)
(15042, 5)
Unique states        : 51
Unique occupations   : 523
Citizenship flags    : 2
Theoretical row count: 53,346
Actual wide rows     : 15,042
⚠  Wide table is missing 38304 cells (28.2% coverage).

First few missing cells:


Unnamed: 0,state_fips,occ2018,nativity_flag
1,1,5400,1
3,1,4720,1
5,1,5840,1
7,1,4920,1
9,1,3870,1


In [6]:
# ─────────────────────────────────────────────────────────────────────────────
# sanity_checks.py  – run after wide_tbl is created
# ─────────────────────────────────────────────────────────────────────────────
import numpy as np

# ❶  Counts BEFORE collapse (df_lf = final person-level universe)
states_before = df_lf["state_fips"].nunique()
occ_before    = df_lf["occ2018"].nunique()
weight_before = (df_lf["weight_cmp"] / 1e4).sum()        # composited weight

# ❷  Counts AFTER wide pivot
states_after = wide_tbl["state_fips"].nunique()
occ_after    = wide_tbl["occ2018"].nunique()
weight_after = wide_tbl[["EMPLOYED", "UNEMPLOYED"]].sum().sum()

# ❸  Assertions
assert states_before == states_after, \
    f"State count changed!  before {states_before}  after {states_after}"

assert occ_before == occ_after, \
    f"Occupation count changed!  before {occ_before}  after {occ_after}"

tol = 1e-6 * weight_before           # tolerance for float rounding
assert np.isclose(weight_before, weight_after, atol=tol), \
    f"Person-weight total drifted!  before {weight_before:.2f}  after {weight_after:.2f}"

print("✓ Sanity checks passed:",
      f"{states_after} states,",
      f"{occ_after} occupations,",
      f"person-weight total {weight_after:,.2f}")
# ─────────────────────────────────────────────────────────────────────────────


✓ Sanity checks passed: 51 states, 523 occupations, person-weight total 170,215,952.86


In [7]:
# curiosity check ────────────────────────────────────────────────────────────
# df_renamed is the DataFrame produced by rename_vars(df_raw)
occ_unique_pre_delete = (
    pd.to_numeric(df_renamed["occ2018"], errors="coerce")
      .dropna()                     # ignore blanks / NIU
      .nunique()
)

print(f"Unique occupation codes BEFORE any row-wise deletion: {occ_unique_pre_delete}")
# ────────────────────────────────────────────────────────────────────────────


Unique occupation codes BEFORE any row-wise deletion: 524


In [9]:
# national_totals.py  – sanity-check unemployment rate
import pandas as pd, pathlib

wide_tbl = pd.read_csv(pathlib.Path("/content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_wide.csv"))

nat = wide_tbl[["EMPLOYED", "UNEMPLOYED"]].sum()      # collapse to U.S. total
unemp_rate = nat["UNEMPLOYED"] / nat.sum()            # rate = U / (E+U)

print(f"Employed persons   : {nat['EMPLOYED']:,.0f}")
print(f"Unemployed persons : {nat['UNEMPLOYED']:,.0f}")
print(f"Unemployment rate  : {unemp_rate:.4%}")


Employed persons   : 163,400,882
Unemployed persons : 6,815,071
Unemployment rate  : 4.0038%


In [10]:
import pandas as pd

# --- 1.  Load the wide file -------------------------------------------------
wide_path = "/content/drive/MyDrive/cps_occup_data/results/cps_state_occ_nat_emp_wide.csv"   # adjust if you saved elsewhere
wide_tbl  = pd.read_csv(wide_path)

# --- 2.  Collapse to native vs foreign --------------------------------------
summary = (
    wide_tbl
      .groupby("nativity_flag")[["EMPLOYED", "UNEMPLOYED"]]
      .sum()
      .rename(index={0: "Native-born", 1: "Foreign, non-citizen"})
)

summary["Unemployment_rate"] = summary["UNEMPLOYED"] / (summary["EMPLOYED"] + summary["UNEMPLOYED"])

# --- 3.  View the result ----------------------------------------------------
def format_table(df):
    return df.style.format({
        "EMPLOYED": "{:,}",
        "UNEMPLOYED": "{:,}",
        "Unemployment_rate": "{:.2%}"
    }).set_caption("Employment Summary by Nativity")

format_table(summary)



Unnamed: 0_level_0,EMPLOYED,UNEMPLOYED,Unemployment_rate
nativity_flag,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Native-born,131857251.5483,5688127.2236,4.14%
"Foreign, non-citizen",31543630.7106,1126943.3784,3.45%


In [11]:
# --- pad missing State×Occupation×Citizenship cells with zeros ---------------
full_index = pd.MultiIndex.from_product(
    [
        df_lf["state_fips"].unique(),         # 51 states
        df_lf["occ2018"].unique(),            # 523 occ codes in the filtered file
        [0, 1],                               # citizenship flags: native / non-cit-foreign
    ],
    names=["state_fips", "occ2018", "nativity_flag"]
)

wide_complete = (
    wide_tbl
      .set_index(["state_fips","occ2018","nativity_flag"])
      .reindex(full_index, fill_value=0)      # inject zeros
      .reset_index()
)

# save or use wide_complete instead of wide_tbl
wide_complete.to_csv(output_dir / "cps_state_occ_nat_emp_wide_full.csv", index=False)
print("✓ Wrote padded wide file with zeros for missing cells")


✓ Wrote padded wide file with zeros for missing cells


In [14]:
!python /content/drive/MyDrive/cps_occup_data/state_labor_force_table.py

✓ Wrote results/cps_state_labor_force_totals.csv


In [15]:
!python /content/drive/MyDrive/cps_occup_data/occupation_labor_force_table.py

✓ Wrote results/cps_occ_labor_force_totals.csv
