# Create Enriched Master Dataset

This notebook reads the cleaned yearly CSV files and adds supplemental columns needed for analysis.

**What this does:**
- Reads all `cleaned_*.csv` files from `../data/cleaned/`
- Adds supplemental columns (SEVIS_ID, IS_STEM, geographic mappings, etc.)
- Outputs a single enriched master Parquet file

**Supporting data required:**
- DHS STEM CIP code list (2024)
- CIP code to NSF subject field mapping
- ZIP code to LMA mapping (quarterly, 2010-2024)
- ZIP code to county mapping (quarterly, 2010-2024)
- Working population by county (yearly, 2004-2023)

**Run this once** after cleaning the data and before creating staging tables.

**Expected runtime:** 10-30 minutes depending on your machine and data size.

In [1]:
import duckdb
import os
from pathlib import Path

# Connect to temporary DuckDB database (will persist for this notebook session)
con = duckdb.connect('tmp_pipeline.db')

# Configure DuckDB for large datasets (optimized for M3 Mac with 18GB RAM)
con.execute("SET memory_limit='14GB'")  # Leave ~4GB for OS and other processes
con.execute("SET threads=6")  # M3 can handle more threads efficiently
con.execute("SET preserve_insertion_order=false")  # Disable to save memory

print("Connected to DuckDB with optimized settings for M3 Mac (18GB RAM)")

Connected to DuckDB with optimized settings for M3 Mac (18GB RAM)


## Configuration

In [2]:
# File paths
RAW_DATA_PATH = '../data/cleaned/cleaned_*_all.csv'
SUPPORTING_DATA_DIR = '../data/supporting'
OUTPUT_PATH = '../data/sevis_f1_enriched_master.parquet'

# Supporting data files
DHS_STEM_LIST = f'{SUPPORTING_DATA_DIR}/dhs_stem_cip_code_list_July2024.csv'
CIP_TO_NSF_MAPPING = f'{SUPPORTING_DATA_DIR}/cip_code_to_nsf_subject_field_mapping.csv'
ZIP_LMA_MAPPING = f'{SUPPORTING_DATA_DIR}/zip_county_lma_quarterly.csv'
ZIP_COUNTY_CROSSWALK = f'{SUPPORTING_DATA_DIR}/HUD_zip_code_to_county_crosswalk_2010-2024.csv'
WORKING_POP_BY_COUNTY = f'{SUPPORTING_DATA_DIR}/working_pop_by_county_fips_2004-2023.csv'

print(f"✓ Configuration loaded")
print(f"  Input: {RAW_DATA_PATH}")
print(f"  Output: {OUTPUT_PATH}")

✓ Configuration loaded
  Input: ../data/cleaned/cleaned_*_all.csv
  Output: ../data/sevis_f1_enriched_master.parquet


## Step 1: Create Base Table with SEVIS_ID

In [3]:
print("Loading base data from CSV files...")

# Override all date columns and CIP code columns as VARCHAR to handle invalid/unparseable values
con.execute("""
    CREATE OR REPLACE TEMP TABLE base_data AS
    SELECT
      *,
      CONCAT(CAST(Year AS VARCHAR), Individual_Key) AS SEVIS_ID
    FROM read_csv_auto('../data/cleaned/cleaned_*_all.csv',
                       union_by_name=true,
                       types={
                         'FIRST_ENTRY_DATE': 'VARCHAR',
                         'LAST_ENTRY_DATE': 'VARCHAR',
                         'LAST_DEPARTURE_DATE': 'VARCHAR',
                         'VISA_ISSUE_DATE': 'VARCHAR',
                         'VISA_EXPIRATION_DATE': 'VARCHAR',
                         'PROGRAM_START_DATE': 'VARCHAR',
                         'PROGRAM_END_DATE': 'VARCHAR',
                         'AUTHORIZATION_START_DATE': 'VARCHAR',
                         'AUTHORIZATION_END_DATE': 'VARCHAR',
                         'OPT_AUTHORIZATION_START_DATE': 'VARCHAR',
                         'OPT_AUTHORIZATION_END_DATE': 'VARCHAR',
                         'OPT_EMPLOYER_START_DATE': 'VARCHAR',
                         'OPT_EMPLOYER_END_DATE': 'VARCHAR',
                         'Major_1_CIP_Code': 'VARCHAR',
                         'Major_2_CIP_Code': 'VARCHAR',
                         'Minor_CIP_Code': 'VARCHAR'
                       })
""")

# Display row count
result = con.execute("SELECT COUNT(*) as total_rows FROM base_data").df()
print(f"✓ Loaded {result['total_rows'][0]:,} rows")
print(f"✓ Added SEVIS_ID column (Year + Individual_Key)")

Loading base data from CSV files...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

✓ Loaded 70,230,345 rows
✓ Added SEVIS_ID column (Year + Individual_Key)


## Step 2: Add IS_STEM Column

In [4]:
print("Adding IS_STEM column...")

# Load the DHS STEM list (override type to VARCHAR since CIP codes are categorical identifiers)
con.execute(f"""
    CREATE OR REPLACE TEMP TABLE dhs_stem_list AS
    SELECT * FROM read_csv_auto('{DHS_STEM_LIST}',
                                types={{'2020_cip_code': 'VARCHAR'}})
""")

con.execute("""
    CREATE OR REPLACE TEMP TABLE with_stem AS
    SELECT
      b.*,
      CASE
        -- If all CIP fields are missing/blank, keep unknown as NULL
        WHEN NULLIF(b.Major_1_CIP_Code, '') IS NULL
         AND NULLIF(b.Major_2_CIP_Code, '') IS NULL
         AND NULLIF(b.Minor_CIP_Code, '') IS NULL
        THEN NULL
        -- Otherwise TRUE if any CIP matches the DHS STEM list
        ELSE EXISTS (
          SELECT 1
          FROM (
            SELECT UNNEST([b.Major_1_CIP_Code, b.Major_2_CIP_Code, b.Minor_CIP_Code]) AS cip
          ) cips
          JOIN dhs_stem_list AS stem_list
            ON LOWER(stem_list."2020_cip_code") = LOWER(cips.cip)
        )
      END AS IS_STEM
    FROM base_data b
""")

# Display STEM statistics
result = con.execute("""
    SELECT
      COUNT(*) as total,
      SUM(CASE WHEN IS_STEM = TRUE THEN 1 ELSE 0 END) as stem_count,
      SUM(CASE WHEN IS_STEM IS NULL THEN 1 ELSE 0 END) as unknown_count
    FROM with_stem
""").df()

print(f"✓ IS_STEM column added")
print(f"  STEM students: {result['stem_count'][0]:,}")
print(f"  Unknown: {result['unknown_count'][0]:,}")

Adding IS_STEM column...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

OutOfMemoryException: Out of Memory Error: failed to offload data block of size 96.0 KiB (74.7 GiB/74.7 GiB used).
This limit was set by the 'max_temp_directory_size' setting.
By default, this setting utilizes the available disk space on the drive where the 'temp_directory' is located.
You can adjust this setting, by using (for example) PRAGMA max_temp_directory_size='10GiB'

Possible solutions:
* Reducing the number of threads (SET threads=X)
* Disabling insertion-order preservation (SET preserve_insertion_order=false)
* Increasing the memory limit (SET memory_limit='...GB')

See also https://duckdb.org/docs/stable/guides/performance/how_to_tune_workloads

## Step 3: Add NSF_SUBJ_FIELD_BROAD Column

In [None]:
print("Adding NSF subject field mappings...")

con.execute(f"""
    CREATE OR REPLACE TEMP TABLE with_nsf AS
    SELECT
      w.*,
      m.NSF_BROAD_FIELD AS NSF_SUBJ_FIELD_BROAD
    FROM with_stem w
    LEFT JOIN (
      SELECT DISTINCT
        -- Normalize CIP code to MM.mmmm format
        CONCAT(
          LPAD(REGEXP_EXTRACT(CIPCODE_ORIGINAL, '(\\d+)', 1), 2, '0'),
          '.',
          RPAD(COALESCE(REGEXP_EXTRACT(CIPCODE_ORIGINAL, '\\d+\\.(\\d+)$', 1), ''), 4, '0')
        ) AS cip_normalized,
        NSF_BROAD_FIELD
      FROM read_csv_auto('{CIP_TO_NSF_MAPPING}')
    ) m
      ON CONCAT(
           LPAD(REGEXP_EXTRACT(w.Major_1_CIP_Code, '(\\d+)', 1), 2, '0'),
           '.',
           RPAD(COALESCE(REGEXP_EXTRACT(w.Major_1_CIP_Code, '\\d+\\.(\\d+)$', 1), ''), 4, '0')
         ) = m.cip_normalized
""")

# Display mapping statistics
result = con.execute("""
    SELECT
      COUNT(*) as total,
      COUNT(NSF_SUBJ_FIELD_BROAD) as mapped_count
    FROM with_nsf
""").df()

print(f"✓ NSF subject field column added")
print(f"  Mapped: {result['mapped_count'][0]:,} of {result['total'][0]:,}")

## Step 4: Add Geographic Columns (LMA and County)

In [None]:
print("Creating geographic lookup tables...")

# Create ZIP→LMA lookup
con.execute(f"""
    CREATE OR REPLACE TEMP TABLE zip_lma_slim AS
    WITH cleaned AS (
      SELECT
        CAST(YEAR AS INTEGER) AS year,
        CAST(QUARTER AS INTEGER) AS quarter,
        SUBSTR(REGEXP_REPLACE(TRIM(zip5), '[^0-9]', ''), 1, 5) AS zip5_norm,
        lma_name
      FROM read_csv_auto('{ZIP_LMA_MAPPING}')
    )
    SELECT year, quarter, zip5_norm, lma_name
    FROM cleaned
    WHERE year IS NOT NULL
      AND quarter BETWEEN 1 AND 4
      AND LENGTH(zip5_norm) = 5
      AND zip5_norm <> '00000'
    QUALIFY ROW_NUMBER() OVER (
      PARTITION BY year, quarter, zip5_norm
      ORDER BY lma_name
    ) = 1
""")

print("✓ ZIP→LMA lookup created")

# Create ZIP→County lookup
con.execute(f"""
    CREATE OR REPLACE TEMP TABLE zip_county_slim AS
    WITH cw AS (
      SELECT
        CAST(YEAR AS INTEGER) AS year,
        CAST(QUARTER AS INTEGER) AS quarter,
        SUBSTR(REGEXP_REPLACE(TRIM(ZIP), '[^0-9]', ''), 1, 5) AS zip5_norm,
        COUNTY AS county5
      FROM read_csv_auto('{ZIP_COUNTY_CROSSWALK}')
    ),
    wp AS (
      SELECT
        LPAD(CAST(CAST(TRIM(CAST(County_LAUS_areacode AS VARCHAR)) AS INTEGER) AS VARCHAR), 5, '0') AS county5,
        County_Name_State_Abbreviation
      FROM read_csv_auto('{WORKING_POP_BY_COUNTY}')
    )
    SELECT
      cw.year,
      cw.quarter,
      cw.zip5_norm,
      wp.County_Name_State_Abbreviation
    FROM cw
    JOIN wp USING (county5)
    WHERE cw.year IS NOT NULL
      AND cw.quarter BETWEEN 1 AND 4
      AND LENGTH(cw.zip5_norm) = 5
      AND cw.zip5_norm <> '00000'
    QUALIFY ROW_NUMBER() OVER (
      PARTITION BY year, quarter, zip5_norm
      ORDER BY County_Name_State_Abbreviation
    ) = 1
""")

print("✓ ZIP→County lookup created")

In [None]:
print("Joining geographic data...")

con.execute("""
    CREATE OR REPLACE TEMP TABLE with_geo AS
    SELECT 
      n.*,
      lma_campus.lma_name AS CAMPUS_LMA,
      lma_employer.lma_name AS EMPLOYER_LMA,
      county_campus.County_Name_State_Abbreviation AS CAMPUS_COUNTY,
      county_employer.County_Name_State_Abbreviation AS EMPLOYER_COUNTY
    FROM with_nsf n
    LEFT JOIN zip_lma_slim lma_campus
      ON EXTRACT(YEAR FROM TRY_CAST(n.Program_End_Date AS DATE)) = lma_campus.year
     AND EXTRACT(QUARTER FROM TRY_CAST(n.Program_End_Date AS DATE)) = lma_campus.quarter
     AND n.Campus_Zip_Code = lma_campus.zip5_norm
    LEFT JOIN zip_lma_slim lma_employer
      ON EXTRACT(YEAR FROM TRY_CAST(n.Program_End_Date AS DATE)) = lma_employer.year
     AND EXTRACT(QUARTER FROM TRY_CAST(n.Program_End_Date AS DATE)) = lma_employer.quarter
     AND n.Employer_Zip_Code = lma_employer.zip5_norm
    LEFT JOIN zip_county_slim county_campus
      ON EXTRACT(YEAR FROM TRY_CAST(n.Program_End_Date AS DATE)) = county_campus.year
     AND EXTRACT(QUARTER FROM TRY_CAST(n.Program_End_Date AS DATE)) = county_campus.quarter
     AND n.Campus_Zip_Code = county_campus.zip5_norm
    LEFT JOIN zip_county_slim county_employer
      ON EXTRACT(YEAR FROM TRY_CAST(n.Program_End_Date AS DATE)) = county_employer.year
     AND EXTRACT(QUARTER FROM TRY_CAST(n.Program_End_Date AS DATE)) = county_employer.quarter
     AND n.Employer_Zip_Code = county_employer.zip5_norm
""")

# Display mapping statistics
result = con.execute("""
    SELECT 
      COUNT(*) as total,
      COUNT(CAMPUS_LMA) as campus_lma_count,
      COUNT(EMPLOYER_LMA) as employer_lma_count
    FROM with_geo
""").df()

print(f"✓ Geographic columns added")
print(f"  Campus LMA mapped: {result['campus_lma_count'][0]:,}")
print(f"  Employer LMA mapped: {result['employer_lma_count'][0]:,}")

## Step 5: Add Working Population Columns

In [None]:
print("Creating working population lookup tables...")

# Unpivot working population data
con.execute(f"""
    CREATE OR REPLACE TEMP TABLE workpop_long AS
    SELECT
      County_Name_State_Abbreviation,
      state_name,
      CAST(year_col AS INTEGER) AS year,
      CASE
        WHEN LOWER(TRIM(pop_value)) IN ('missing data', '') THEN NULL
        ELSE CAST(REPLACE(TRIM(pop_value), ',', '') AS INTEGER)
      END AS working_pop
    FROM (
      SELECT
        County_Name_State_Abbreviation,
        state_name,
        UNNEST([
          '2004','2005','2006','2007','2008','2009','2010','2011','2012','2013',
          '2014','2015','2016','2017','2018','2019','2020','2021','2022','2023','2024'
        ]) AS year_col,
        UNNEST([
          LMA_WORKING_POP_2004, LMA_WORKING_POP_2005, LMA_WORKING_POP_2006, LMA_WORKING_POP_2007,
          LMA_WORKING_POP_2008, LMA_WORKING_POP_2009, LMA_WORKING_POP_2010, LMA_WORKING_POP_2011,
          LMA_WORKING_POP_2012, LMA_WORKING_POP_2013, LMA_WORKING_POP_2014, LMA_WORKING_POP_2015,
          LMA_WORKING_POP_2016, LMA_WORKING_POP_2017, LMA_WORKING_POP_2018, LMA_WORKING_POP_2019,
          LMA_WORKING_POP_2020, LMA_WORKING_POP_2021, LMA_WORKING_POP_2022, LMA_WORKING_POP_2023,
          LMA_WORKING_POP_2024
        ]) AS pop_value
      FROM read_csv_auto('{WORKING_POP_BY_COUNTY}')
    )
    WHERE year BETWEEN 2004 AND 2024
""")

print("✓ Working population by county unpivoted")

# Create state-level aggregates
con.execute("""
    CREATE OR REPLACE TEMP TABLE workpop_state AS
    SELECT
      state_name,
      year,
      SUM(working_pop) AS state_working_pop
    FROM workpop_long
    WHERE state_name IS NOT NULL
    GROUP BY state_name, year
""")

print("✓ State-level working population aggregated")

In [None]:
print("Adding working population columns...")

# Create final enriched master table
con.execute("""
    CREATE OR REPLACE TEMP TABLE enriched_master AS
    SELECT 
      g.*,
      wp_lma.working_pop AS EMPLOYER_LMA_WORKPOP_YR,
      wp_state.state_working_pop AS EMPLOYER_STATE_WORKPOP_YR
    FROM with_geo g
    LEFT JOIN workpop_long wp_lma
      ON g.EMPLOYER_COUNTY = wp_lma.County_Name_State_Abbreviation
     AND EXTRACT(YEAR FROM TRY_CAST(g.Authorization_Start_Date AS DATE)) = wp_lma.year
    LEFT JOIN workpop_state wp_state
      ON g.Employer_State = wp_state.state_name
     AND EXTRACT(YEAR FROM TRY_CAST(g.Authorization_Start_Date AS DATE)) = wp_state.year
""")

# Display final statistics
result = con.execute("""
    SELECT 
      COUNT(*) as total,
      COUNT(EMPLOYER_LMA_WORKPOP_YR) as lma_workpop_count,
      COUNT(EMPLOYER_STATE_WORKPOP_YR) as state_workpop_count
    FROM enriched_master
""").df()

print(f"✓ Working population columns added")
print(f"  LMA working pop mapped: {result['lma_workpop_count'][0]:,}")
print(f"  State working pop mapped: {result['state_workpop_count'][0]:,}")

## Step 6: Export Enriched Master Dataset

In [None]:
print("Exporting to Parquet...")

# Ensure output directory exists
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)

# Export to Parquet with compression
con.execute(f"""
    COPY (
      SELECT * FROM enriched_master
    ) TO '{OUTPUT_PATH}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
""")

# Display final file size
file_size = os.path.getsize(OUTPUT_PATH)
print(f"\n✓ Enriched master dataset created!")
print(f"  Output: {OUTPUT_PATH}")
print(f"  Size: {file_size / (1024**3):.2f} GB")
print(f"\n✓ Next step: Run create_staging_tables.ipynb")

In [None]:
# Clean up: close connection and remove temporary database
con.close()

if os.path.exists('tmp_pipeline.db'):
    os.remove('tmp_pipeline.db')
    print("✓ Temporary database cleaned up")