# Step 1 - CRSS Data Extraction & Preprocessing


This is the code for the first step of the CRSS data pipeline.

In summary:
* Load the 3 CRSS data files (Person, Vehicle, Accident)
* Join them at person level (most granular)
* Delete duplicate features
* Replace "Not reported" or "unknown" codes with null values
* Create FATALITY indicator based on MAX_SEV

Missing code handling is based on the CRSS Analytical User's Manual. Different features use different codes for "unknown" or "not reported" (e.g., 99, 999, 9999). These are mapped to null values.

Note: This code is optimized for CRSS 2016-2023 data. Earlier years may have different missing value codes.


In [158]:
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import polars as pl

### How to run the code:
Run all the sections in order (top to bottom)

##### CRSS field-specific missing codes (based on CRSS Analytical User's Manual)


In [159]:
# CRSS field-specific missing codes (based on CRSS Analytical User's Manual 2016-2023)
FIELD_SPECIFIC_MISSING = {
    
    # Demographics
    "AGE": [998, 999],
    "SEX": [8, 9],
    "RACE": [97, 98, 99],
    "HISPANIC": [8, 9, 98, 99],
    
    # Temporal
    "HOUR": [99, 9999],  # CRSS uses both
    "MINUTE": [99, 999],
    "DAY": [99],
    "MONTH": [99],
    "DAY_WEEK": [9],
    
    # Injury/Outcome
    "INJ_SEV": [9],
    "MAX_SEV": [8,9],  # Important for CRSS!
    "DEATH_HR": [9999],  # Not 88, 99
    "DEATH_MN": [999],   # Not 88, 99
    
    # Behaviors - Substance Use
    "DRINKING": [8, 9, 98, 99],
    "DR_DRINK": [8, 9, 98, 99],  # Added
    "DRUGS": [95, 96, 97, 98, 99],  # Not just [8, 9]
    "DRUG_DET": [5, 6, 8, 9, 95, 96, 97, 98, 99],  # Not just [8]
    "DRUGRES1": [95, 96, 97, 98, 99],  # Not [95, 999]
    "DRUGRES2": [95, 96, 97, 98, 99],
    "DRUGRES3": [95, 96, 97, 98, 99],
    # DO NOT include "ALC_RES" - mixed variable, handled separately!
    "ALC_STATUS": [8, 9],  # Added
    "ALC_DET": [8, 9],     # Added
    
    # Behaviors - Safety Equipment
    "REST_USE": [8, 9, 98, 99],  # Not [20, 96, 98, 99]
    "REST_MIS": [8, 9, 98, 99],  # Not just [7, 8]
    "AIR_BAG": [8, 9, 98, 99],   # Not just [98, 99]
    "HELM_USE": [8, 9],
    "HELM_MIS": [8, 9], 
    "SPEEDREL": [9],
    
    # Behaviors - Ejection
    "EJECTION": [8, 9],
    "EJ_PATH": [8, 9],
    "EXTRICAT": [8, 9],
    
    # Vehicle
    "BODY_TYP": [78, 79, 97, 98, 99],
    "MOD_YEAR": [9999],     
    "GVWR": [9],            
    "V_CONFIG": [9],        
    "CARGO_BT": [9],        
    "HAZ_INV": [9],         
    "BUS_USE": [9],         
    "SPEC_USE": [9],        
    "EMER_USE": [9],   
    "ROLLOVER": [9],
    "ROLINLOC": [9], 
    "IMPACT1": [98, 99],
    "FIRE_EXP": [8, 9],
    "FIRE": [8, 9],
    "DEFORMED": [9],
    "VEH_SEV": [9],
    "TOWED": [9],
    "M_HARM": [98, 99],     # Most harmful event (vehicle level)
    "VEH_CF1": [98, 99],    # Vehicle contributing factors
    "VEH_CF2": [98, 99],
    "NUMOCCS": [99],        # Number of occupants
    
    # Environmental
    "WEATHER": [98, 99],
    "WEATHER1": [10, 98, 99],  # Note: 10 added for CRSS vs FARS
    "WEATHER2": [98, 99],
    "LGT_COND": [9],        # Not [8, 9]
    
    # Location
    "RUR_URB": [9],         # Not [8, 9]
    "FUNC_SYS": [9],        # Not [98, 99]
    "LATITUDE": [77.7777, 88.8888, 99.9999],
    "LONGITUD": [777.7777, 888.8888, 999.9999],
    
    # Roadway
    "HARM_EV": [98, 99],    # Correct
    "MAN_COLL": [9],        # Not [98, 99]
    "RELJCT1": [98, 99],    # Not [8, 9]
    "RELJCT2": [98, 99],    # Not just [98, 99]
    "TYP_INT": [99],        # Not [98, 99]
    "WRK_ZONE": [9],
    "REL_ROAD": [9],        # Not [98, 99]
    "VTRAFWAY": [9],        # Not [8, 9]
    "VNUM_LAN": [9],        # Not [8, 9]
    "VSPD_LIM": [98, 99],
    "VALIGN": [9],
    "VPROFILE": [9],        # Not [8, 9]
    "VPAVETYP": [9],        # Not [8, 9]
    "VSURCOND": [9],        # Not [98, 99]
    "VTRAFCON": [98, 99],   # Not [97, 99]
    
    # Person-specific
    "PER_TYP": [19],     # Not just [19]
    "SEAT_POS": [98, 99],
    "SEATING": [98, 99],    # Removed in recent CRSS
    
    # Notification/arrival times
    "NOT_HOUR": [99, 9999],   # Not [88, 99]
    "NOT_MIN": [99, 999],     # Not [88, 98, 99]
    "ARR_HOUR": [99, 9999],   # Not [88, 99]
    "ARR_MIN": [99, 999],     # Not [88, 98, 99]
    "HOSP_HR": [99, 9999],    # Not [88, 99]
    "HOSP_MN": [99, 999],
}

##### Numeric columns (common)

In [160]:
# Default numeric columns if not specified
NUMERIC_COLUMNS = [
    # ========================================
    # IDs and Keys
    # ========================================
    #"CASENUM",      # CRSS case number (primary key)
    "ST_CASE",      # State case number
    "VEH_NO",       # Vehicle number
    "PER_NO",       # Person number
    "STATE",        # State code
    "COUNTY",       # County code
    
    # ========================================
    # Survey Design Variables (CRITICAL!)
    # ========================================
    "PSU",          # Primary Sampling Unit
    "PJ",           # Jackknife replicate
    "REGION",       # Region code
    "STRATUM",      # Stratum code
    "PSUSTRAT",     # PSU within stratum
    "PSU_VAR",      # PSU for variance estimation
    
    # ========================================
    # Weights (ESSENTIAL FOR ANALYSIS!)
    # ========================================
    #"WEIGHT",       # Survey weight - represents population
    "RATWGT",       # Ratio adjustment weight
    
    # ========================================
    # Demographics
    # ========================================
    "AGE",          # Person age (0-120, 998-999 = missing)
    "SEX",          # Sex (1=Male, 2=Female, 8-9=Unknown)
    "RACE",         # Race code
    "HISPANIC",     # Hispanic origin
    
    # ========================================
    # Temporal
    # ========================================
    "YEAR",         # Crash year
    "MONTH",        # Month (1-12)
    "DAY",          # Day of month
    "DAY_WEEK",     # Day of week (1-7)
    "HOUR",         # Hour (0-23, 99/9999=Unknown)
    "MINUTE",       # Minute (0-59, 99/999=Unknown)
    
    # ========================================
    # Outcomes (Person Level)
    # ========================================
    "INJ_SEV",      # Injury severity (person)
    "MAX_SEV",      # Maximum severity in crash (CRITICAL!)
    "DEATH_HR",     # Hours to death (9999=Unknown)
    "DEATH_MN",     # Minutes to death (999=Unknown)
    
    # ========================================
    # Person Behavior
    # ========================================
    "PER_TYP",      # Person type (driver, passenger, pedestrian, etc.)
    "SEAT_POS",     # Seating position
    "REST_USE",     # Restraint use (seatbelt type)
    "REST_MIS",     # Restraint misuse
    "AIR_BAG",      # Air bag deployment
    "EJECTION",     # Ejection status
    "EJ_PATH",      # Ejection path
    "EXTRICAT",     # Extrication required
    "HELM_USE",     # Helmet use (motorcycles)
    "HELM_MIS",     # Helmet misuse
    
    # ========================================
    # Substance Use
    # ========================================
    "DRINKING",     # Police reported alcohol involvement
    "DR_DRINK",     # Driver drinking
    "ALC_RES",      # Alcohol test result (MIXED VARIABLE!)
    "ALC_STATUS",   # Alcohol test status
    "ALC_DET",      # Alcohol detection method
    "DRUGS",        # Police reported drug involvement
    "DRUG_DET",     # Drug test method
    "DRUGRES1",     # Drug test result 1
    "DRUGRES2",     # Drug test result 2
    "DRUGRES3",     # Drug test result 3
    
    # ========================================
    # Vehicle Characteristics
    # ========================================
    "BODY_TYP",     # Vehicle body type
    "MOD_YEAR",     # Model year (1900-2100, 9999=Unknown)
    "MAKE",         # Vehicle make
    "MODEL",        # Vehicle model
    "MAK_MOD",      # Combined make/model code
    "GVWR",         # Gross vehicle weight rating
    "V_CONFIG",     # Vehicle configuration
    "BUS_USE",      # Bus use
    "SPEC_USE",     # Special use
    "EMER_USE",     # Emergency use
    "CARGO_BT",     # Cargo body type
    "HAZ_INV",      # Hazmat involved
    "NUMOCCS",      # Number of occupants in vehicle
    
    # ========================================
    # Crash Dynamics
    # ========================================
    "ROLLOVER",     # Rollover occurrence
    "ROLINLOC",     # Rollover location
    "IMPACT1",      # Initial impact point
    "FIRE_EXP",     # Fire/explosion (old name)
    "FIRE",         # Fire occurrence (CRSS name)
    "DEFORMED",     # Extent of damage (CRSS)
    "VEH_SEV",      # Vehicle severity (CRSS)
    "TOWED",        # Vehicle towed (replaces TOW_VEH)
    "M_HARM",       # Most harmful event (vehicle level)
    "VEH_CF1",      # Vehicle contributing factor 1
    "VEH_CF2",      # Vehicle contributing factor 2
    "SPEEDREL",     # Speed related
    
    # ========================================
    # Environmental
    # ========================================
    "WEATHER",      # Weather condition
    "WEATHER1",     # Weather condition 1
    "WEATHER2",     # Weather condition 2
    "LGT_COND",     # Light condition
    
    # ========================================
    # Location/Geography
    # ========================================
    "RUR_URB",      # Rural/Urban
    "FUNC_SYS",     # Functional system
    "LATITUDE",     # Latitude coordinate
    "LONGITUD",     # Longitude coordinate
    "URBANICITY",   # Urban/rural classification (some CRSS years)
    
    # ========================================
    # Crash/Roadway Characteristics
    # ========================================
    "HARM_EV",      # First harmful event (crash level)
    "MAN_COLL",     # Manner of collision
    "RELJCT1",      # Junction relation 1
    "RELJCT2",      # Junction relation 2
    "TYP_INT",      # Intersection type
    "WRK_ZONE",     # Work zone
    "REL_ROAD",     # Relation to roadway
    "VTRAFWAY",     # Trafficway description
    "VNUM_LAN",     # Number of travel lanes
    "VALIGN",       # Roadway alignment
    "VPROFILE",     # Roadway profile
    "VPAVETYP",     # Pavement type
    "VSURCOND",     # Surface condition
    "VTRAFCON",     # Traffic control device
    
    # ========================================
    # Notification/Response Times
    # ========================================
    "NOT_HOUR",     # Notification hour
    "NOT_MIN",      # Notification minute
    "ARR_HOUR",     # EMS arrival hour
    "ARR_MIN",      # EMS arrival minute
    "HOSP_HR",      # Hospital arrival hour
    "HOSP_MN",      # Hospital arrival minute
]


### Step 1.1: Data Loading
Load CRSS CSV files with columns as strings to prevent conversion issues.


In [161]:
def load_crss_csv(filename: str, data_dir: Path) -> pl.DataFrame:
    path = Path(data_dir) / filename
    
    if not path.exists():
        raise FileNotFoundError(f"File not found: {path}")
    
    # Load with no schema inference
    df = pl.read_csv(path, infer_schema_length=0)
    
    print(f"Loaded: {filename}")
    print(f"Shape: {df.shape[0]:,} rows × {df.shape[1]} columns")
    
    return df

### Step 1.2: Key Validation

Remove duplicates and review null keys

In [162]:
def remove_duplicate_keys(
    df: pl.DataFrame,
    key_columns: List[str]
) -> pl.DataFrame:
    # Check for NULL keys
    for col in key_columns:
        if col not in df.columns:
            continue
        
        null_count = df.filter(pl.col(col).is_null()).height
        if null_count > 0:
            print(f"NULL values in {col}: {null_count:,}")
    
    # Remove duplicates
    original_count = df.height
    df_unique = df.unique(subset=key_columns, maintain_order=True)
    duplicates = original_count - df_unique.height
    
    if duplicates > 0:
        print(f"Removed {duplicates:,} duplicate rows")
    else: 
        print(f"No duplicates found")
    
    return df_unique

### Step 1.3: Data Type Conversion

Convert the specified columns from string to numeric

In [163]:
def convert_to_numeric(
    df: pl.DataFrame,
    columns: List[str]
) -> pl.DataFrame:

    converted = 0
    errors = 0
    
    for col in columns:
        if col not in df.columns:
            continue
        
        # Skip if already numeric
        if df[col].dtype in [pl.Int8, pl.Int16, pl.Int32, pl.Int64,
                             pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64,
                             pl.Float32, pl.Float64]:
            converted += 1
            continue
        
        try:
            df = df.with_columns(
                pl.col(col).cast(pl.Int32, strict=False).alias(col)
            )
            converted += 1
        except Exception as e:
            errors += 1
    
    print(f"Converted {converted} columns to numeric")
    if errors > 0:
        print(f"Errors in {errors} columns")
    
    return df

### Step 1.4: Missing Code Handling
Use the field-specific missing codes dictionary to apply the missing code handling

In [164]:
def replace_field_specific_missing_codes(df: pl.DataFrame) -> pl.DataFrame:
    total_replaced = 0
    columns_modified = 0
    
    for col in df.columns:
        # Get field-specific missing codes
        if col in FIELD_SPECIFIC_MISSING:
            missing_codes = FIELD_SPECIFIC_MISSING[col]
        else:
            # Skip columns not in numeric list and not explicitly defined
            if col not in NUMERIC_COLUMNS:
                continue
        
        try:
            col_dtype = df[col].dtype
            
            if col_dtype == pl.Utf8:
                # String column
                missing_code_strs = [str(c) for c in missing_codes if isinstance(c, int)]
                if "" in missing_codes:
                    missing_code_strs.append("")
                
                count_before = df.filter(pl.col(col).is_in(missing_code_strs)).height
                # if the code is one of the missing codes, it is converted to None to prevent influencing the analysis results
                if count_before > 0:
                    df = df.with_columns(
                        pl.when(pl.col(col).is_in(missing_code_strs))
                        .then(None)
                        .otherwise(pl.col(col))
                        .alias(col)
                    )
                    total_replaced += count_before
                    columns_modified += 1
            
            elif col_dtype in [pl.Int8, pl.Int16, pl.Int32, pl.Int64,
                               pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64,
                               pl.Float32, pl.Float64]:
                # Numeric column
                numeric_codes = [c for c in missing_codes if isinstance(c, (int, float))]
                
                count_before = df.filter(pl.col(col).is_in(numeric_codes)).height
                
                if count_before > 0:
                    df = df.with_columns(
                        pl.when(pl.col(col).is_in(numeric_codes))
                        .then(None)
                        .otherwise(pl.col(col))
                        .alias(col)
                    )
                    total_replaced += count_before
                    columns_modified += 1
        
        except Exception:
            continue
    
    print(f" Replaced missing codes in {columns_modified} columns")
    print(f" Total replacements: {total_replaced:,}")
    
    return df


### Step 1.5: Dataset Integration

* Integrate person, vehicle, and accident datasets into person-level data

* Join hierarchy: person (base) → vehicle → accident

* Overlapped columns are dropped

In [165]:
def join_person_vehicle_accident(
    person: pl.DataFrame,
    vehicle: pl.DataFrame,
    accident: pl.DataFrame
) -> pl.DataFrame:

    # Identify overlapping columns
    vehicle_cols = set(vehicle.columns)
    person_cols = set(person.columns)
    accident_cols = set(accident.columns)
    
    # Drop overlapping columns from vehicle (keep join keys)
    vehicle_person_overlap = (vehicle_cols & person_cols) - {"CASE_NUM", "VEH_NO"}
    if vehicle_person_overlap:
        vehicle = vehicle.drop(list(vehicle_person_overlap))
    
    # Drop overlapping columns from accident (keep join keys)
    accident_person_overlap = (accident_cols & person_cols) - {"CASE_NUM"}
    if accident_person_overlap:
        accident = accident.drop(list(accident_person_overlap))
    
    # Join person → vehicle
    per_veh = person.join(vehicle, on=["CASE_NUM", "VEH_NO"], how="left")
    
    # Join person-vehicle → accident
    per_full = per_veh.join(accident, on="CASE_NUM", how="left")
    
    return per_full


### Create pipeline of data processing functions

In [178]:
def run_step1_pipeline_CRSS(
    data_dir: Path,
    output_file: Optional[Path] = None,
    numeric_columns: Optional[List[str]] = None
) -> pl.DataFrame:
    """
    CRSS-only pipeline: reads CRSS accident/vehicle/person CSVs,
    keeps CRSS-native column names, and integrates to person level.
    Adds accident-level fatality flag: FATALITY = 1 if MAX_SEV indicates fatal injury, else 0.
    Keys used:
      - Accident: CASENUM
      - Vehicle:  CASENUM, VEH_NO
      - Person:   CASENUM, VEH_NO, PER_NO
    """
    # default numeric columns fallback
    if numeric_columns is None:
        numeric_columns = NUMERIC_COLUMNS

    accident = load_crss_csv("accident.csv", data_dir)
    vehicle  = load_crss_csv("vehicle.csv",  data_dir)
    person   = load_crss_csv("person.csv",   data_dir)


    # Validate keys 
    for name, df, keys in [
        ("accident", accident, ["CASENUM"]),
        ("vehicle",  vehicle,  ["CASENUM", "VEH_NO"]),
        ("person",   person,   ["CASENUM", "VEH_NO", "PER_NO"]),
    ]:
        missing = [k for k in keys if k not in df.columns]
        if missing:
            raise KeyError(f"{name} missing expected key(s): {missing}")

    # Drop duplicate rows by keys
    accident = remove_duplicate_keys(accident, ["CASENUM"])
    vehicle  = remove_duplicate_keys(vehicle,  ["CASENUM", "VEH_NO"])
    person   = remove_duplicate_keys(person,   ["CASENUM", "VEH_NO", "PER_NO"])


    # Numeric conversions
    accident = convert_to_numeric(accident, numeric_columns)
    vehicle  = convert_to_numeric(vehicle,  numeric_columns)
    person   = convert_to_numeric(person,   numeric_columns)

    # Keep only vehicle occupants in person table
    person = person.filter(pl.col("PER_TYP").is_in([1, 2, 3, 4, 9]))
    print(f"  per_typ unique values: {sorted(person['PER_TYP'].unique())}")

    # Missing-code handling
    accident = replace_field_specific_missing_codes(accident)
    vehicle  = replace_field_specific_missing_codes(vehicle)
    person   = replace_field_specific_missing_codes(person)
    
    # Create accident-level fatality flag (cast MAX_SEV first to avoid type errors)
    if "MAX_SEV" in accident.columns:
        accident = accident.with_columns(
            pl.when(pl.col("MAX_SEV").cast(pl.Int32, strict=False) == 4)
              .then(1)
              .otherwise(0)
              .alias("FATALITY")
        )
       
    else:
        accident = accident.with_columns(pl.lit(0).alias("FATALITY"))

    # Integrate to person level with CRSS keys
    df_integrated = (
        person.join(vehicle, on=["CASENUM", "VEH_NO"], how="left", suffix="_veh")
              .join(accident, on=["CASENUM"], how="left", suffix="_acc")
    )

    # 7) Save (optional)
    if output_file is not None:
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        df_integrated.write_parquet(output_path)
        print(f"Saved parquet to: {output_path}")

    print(f"Final CRSS dataset: {df_integrated.shape[0]:,} rows × {df_integrated.shape[1]} columns")
    return df_integrated

### Run pipeline

In [None]:
# Configuration
DATA_DIR = Path("data/CRSS2023CSV")
OUTPUT_FILE = Path("Dataset_Regression/person_level_integrated_CRSS.parquet")

# Run complete pipeline
df = run_step1_pipeline_CRSS(
    data_dir=DATA_DIR,
    output_file=OUTPUT_FILE
)

Loaded: accident.csv
Shape: 50,103 rows × 80 columns
Loaded: vehicle.csv
Shape: 87,461 rows × 169 columns
Loaded: person.csv
Shape: 122,388 rows × 112 columns
No duplicates found
No duplicates found
No duplicates found
Converted 22 columns to numeric
Converted 38 columns to numeric
Converted 38 columns to numeric
  per_typ unique values: [1, 2, 3, 4, 9]
 Replaced missing codes in 9 columns
 Total replacements: 3,946
 Replaced missing codes in 21 columns
 Total replacements: 34,636
 Replaced missing codes in 19 columns
 Total replacements: 152,108
Saved parquet to: data/step1_preprocessed_final/person_level_integrated_CRSS.parquet
Final CRSS dataset: 117,033 rows × 359 columns
