In [3]:
# Cell 1: Setup
"""
02_data_cleaning.ipynb - Data Cleaning

Purpose:
    Apply cleaning transformations based on EDA findings:
    - Handle missing values
    - Fix data types
    - Remove duplicates
    - Handle outliers
    - Create clean dataset for feature engineering

Based on insights from 01_eda_platform.ipynb
"""

import pandas as pd
import numpy as np
from pathlib import Path
import mlflow
import yaml

print("Data Cleaning Environment Setup Complete")





Data Cleaning Environment Setup Complete


In [4]:
# Cell 2: Load Config and Data
"""
Load configuration and raw data
"""

# Load config
with open('../configs/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Define paths
PROJECT_ROOT = Path.cwd().parent
INTERIM_PATH = PROJECT_ROOT / 'data' / 'interim'

# Load data
print("Loading data...")
beneficiaries = pd.read_parquet(INTERIM_PATH / 'beneficiaries.parquet')
inpatient = pd.read_parquet(INTERIM_PATH / 'inpatient_claims.parquet')
outpatient = pd.read_parquet(INTERIM_PATH / 'outpatient_claims.parquet')

print(f"‚úì Beneficiaries: {beneficiaries.shape}")
print(f"‚úì Inpatient: {inpatient.shape}")
print(f"‚úì Outpatient: {outpatient.shape}")

# Store original sizes
original_sizes = {
    'beneficiaries': len(beneficiaries),
    'inpatient': len(inpatient),
    'outpatient': len(outpatient)
}


Loading data...
‚úì Beneficiaries: (116352, 32)
‚úì Inpatient: (66773, 81)
‚úì Outpatient: (790790, 76)


In [8]:
# Cell 4: Handle Missing Values - Beneficiaries
"""
Strategy for missing values based on EDA findings
"""

print("=" * 70)
print("HANDLING MISSING VALUES - BENEFICIARIES")
print("=" * 70)

# Calculate missing before
missing_before = beneficiaries.isnull().sum().sum()
print(f"\nTotal missing values before: {missing_before:,}")

# Strategy 1: Death date missing = member is alive (not missing, it's intentional)
if 'BENE_DEATH_DT' in beneficiaries.columns:
    print("\nBENE_DEATH_DT: Missing means alive (no imputation needed)")
    print(f"  Missing: {beneficiaries['BENE_DEATH_DT'].isnull().sum():,}")

# Strategy 2: For chronic condition indicators, missing = 0 (no condition)
chronic_cols = [col for col in beneficiaries.columns if 'CHRONIC' in col or '_SP_' in col]

if chronic_cols:
    print(f"\nChronic condition columns: {len(chronic_cols)}")
    print("  Strategy: Fill missing with 0 (no condition)")
    
    for col in chronic_cols:
        beneficiaries[col] = beneficiaries[col].fillna(0)
    
    print("  ‚úì Filled chronic condition missing values")

# Strategy 3: Drop rows with missing critical fields (if any)
critical_fields = ['DESYNPUF_ID', 'BENE_BIRTH_DT']
missing_critical = beneficiaries[critical_fields].isnull().any(axis=1).sum()

if missing_critical > 0:
    print(f"\nDropping {missing_critical} rows with missing critical fields")
    beneficiaries = beneficiaries.dropna(subset=critical_fields)

# Calculate missing after
missing_after = beneficiaries.isnull().sum().sum()
print(f"\nTotal missing values after: {missing_after:,}")
print(f"Reduction: {missing_before - missing_after:,}")


HANDLING MISSING VALUES - BENEFICIARIES

Total missing values before: 110,891

BENE_DEATH_DT: Missing means alive (no imputation needed)
  Missing: 110,891

Total missing values after: 110,891
Reduction: 0


In [7]:
# Cell 3: Fix Data Types
"""
Convert columns to appropriate data types
"""

print("=" * 70)
print("FIXING DATA TYPES")
print("=" * 70)

# Date columns - convert to datetime
date_columns = ['BENE_BIRTH_DT', 'BENE_DEATH_DT']

for col in date_columns:
    if col in beneficiaries.columns:
        print(f"\nConverting {col} to datetime...")
        beneficiaries[col] = pd.to_datetime(
            beneficiaries[col], 
            format='%Y%m%d', 
            errors='coerce'
        )
        print(f" Converted. Missing after conversion: {beneficiaries[col].isnull().sum()}")

# Claims date columns
claim_date_cols = ['CLM_FROM_DT', 'CLM_THRU_DT', 'CLM_ADMSN_DT', 'NCH_BENE_DSCHRG_DT']

for df_name, df in [('Inpatient', inpatient), ('Outpatient', outpatient)]:
    print(f"\n{df_name} Claims:")
    for col in claim_date_cols:
        if col in df.columns:
            print(f"  Converting {col}...")
            df[col] = pd.to_datetime(df[col], format='%Y%m%d', errors='coerce')

print("\n Data type conversions complete")


FIXING DATA TYPES

Converting BENE_BIRTH_DT to datetime...
 Converted. Missing after conversion: 0

Converting BENE_DEATH_DT to datetime...
 Converted. Missing after conversion: 110891

Inpatient Claims:
  Converting CLM_FROM_DT...
  Converting CLM_THRU_DT...
  Converting CLM_ADMSN_DT...
  Converting NCH_BENE_DSCHRG_DT...

Outpatient Claims:
  Converting CLM_FROM_DT...
  Converting CLM_THRU_DT...

 Data type conversions complete


In [9]:
# Cell 4: Handle Missing Values - Beneficiaries
"""
Strategy for missing values based on EDA findings
"""

print("=" * 70)
print("HANDLING MISSING VALUES - BENEFICIARIES")
print("=" * 70)

# Calculate missing before
missing_before = beneficiaries.isnull().sum().sum()
print(f"\nTotal missing values before: {missing_before:,}")

# Strategy 1: Death date missing = member is alive (not missing, it's intentional)
if 'BENE_DEATH_DT' in beneficiaries.columns:
    print("\nBENE_DEATH_DT: Missing means alive (no imputation needed)")
    print(f"  Missing: {beneficiaries['BENE_DEATH_DT'].isnull().sum():,}")

# Strategy 2: For chronic condition indicators, missing = 0 (no condition)
chronic_cols = [col for col in beneficiaries.columns if 'CHRONIC' in col or '_SP_' in col]

if chronic_cols:
    print(f"\nChronic condition columns: {len(chronic_cols)}")
    print("  Strategy: Fill missing with 0 (no condition)")
    
    for col in chronic_cols:
        beneficiaries[col] = beneficiaries[col].fillna(0)
    
    print("  ‚úì Filled chronic condition missing values")

# Strategy 3: Drop rows with missing critical fields (if any)
critical_fields = ['DESYNPUF_ID', 'BENE_BIRTH_DT']
missing_critical = beneficiaries[critical_fields].isnull().any(axis=1).sum()

if missing_critical > 0:
    print(f"\nDropping {missing_critical} rows with missing critical fields")
    beneficiaries = beneficiaries.dropna(subset=critical_fields)

# Calculate missing after
missing_after = beneficiaries.isnull().sum().sum()
print(f"\nTotal missing values after: {missing_after:,}")
print(f"Reduction: {missing_before - missing_after:,}")


HANDLING MISSING VALUES - BENEFICIARIES

Total missing values before: 110,891

BENE_DEATH_DT: Missing means alive (no imputation needed)
  Missing: 110,891

Total missing values after: 110,891
Reduction: 0


In [10]:
# Cell 5: Remove Duplicates
"""
Check for and remove duplicate records
"""

print("=" * 70)
print("REMOVING DUPLICATES")
print("=" * 70)

# Beneficiaries - check for duplicate IDs
print("\nBeneficiaries:")
duplicates = beneficiaries['DESYNPUF_ID'].duplicated().sum()
print(f"  Duplicate member IDs: {duplicates}")

if duplicates > 0:
    print("  Removing duplicates (keeping last)...")
    beneficiaries = beneficiaries.drop_duplicates(subset='DESYNPUF_ID', keep='last')
    print("  ‚úì Duplicates removed")

# Claims - check for duplicate claim IDs
print("\nInpatient Claims:")
if 'CLM_ID' in inpatient.columns:
    inp_dups = inpatient['CLM_ID'].duplicated().sum()
    print(f"  Duplicate claim IDs: {inp_dups}")
    
    if inp_dups > 0:
        inpatient = inpatient.drop_duplicates(subset='CLM_ID', keep='first')
        print("  ‚úì Duplicates removed")

print("\nOutpatient Claims:")
if 'CLM_ID' in outpatient.columns:
    out_dups = outpatient['CLM_ID'].duplicated().sum()
    print(f"  Duplicate claim IDs: {out_dups}")
    
    if out_dups > 0:
        outpatient = outpatient.drop_duplicates(subset='CLM_ID', keep='first')
        print("  ‚úì Duplicates removed")


REMOVING DUPLICATES

Beneficiaries:
  Duplicate member IDs: 0

Inpatient Claims:
  Duplicate claim IDs: 68
  ‚úì Duplicates removed

Outpatient Claims:
  Duplicate claim IDs: 10975
  ‚úì Duplicates removed


In [14]:
# Cell 6: Handle Outliers
"""
Identify and handle extreme values in costs
"""

print("=" * 70)
print("HANDLING OUTLIERS")
print("=" * 70)

# Find payment columns
payment_cols_inp = [col for col in inpatient.columns if 'PMT' in col or 'PAYMENT' in col]
payment_cols_out = [col for col in outpatient.columns if 'PMT' in col or 'PAYMENT' in col]

def cap_outliers(df, columns, percentile=99):
    """
    Cap extreme values at specified percentile
    Strategy: Replace values above 99th percentile with 99th percentile value
    """
    for col in columns:
        if col in df.columns:
            threshold = df[col].quantile(percentile / 100)
            outliers = (df[col] > threshold).sum()
            
            if outliers > 0:
                print(f"  {col}: Capping {outliers} values above ${threshold:.2f}")
                df[col] = df[col].clip(upper=threshold)
    
    return df

if payment_cols_inp:
    print("\nInpatient Claims - Capping extreme costs at 99th percentile:")
    inpatient = cap_outliers(inpatient, payment_cols_inp, percentile=99)

if payment_cols_out:
    print("\nOutpatient Claims - Capping extreme costs at 99th percentile:")
    outpatient = cap_outliers(outpatient, payment_cols_out, percentile=99)

print("\n‚úì Outlier handling complete")


HANDLING OUTLIERS

Inpatient Claims - Capping extreme costs at 99th percentile:

Outpatient Claims - Capping extreme costs at 99th percentile:

‚úì Outlier handling complete


In [12]:
# Cell 7: Data Validation
"""
Validate cleaned data meets quality standards
"""

print("=" * 70)
print("DATA VALIDATION")
print("=" * 70)

# Check 1: No null values in critical fields
print("\n1. Critical Field Validation:")
critical_fields = ['DESYNPUF_ID']

for field in critical_fields:
    null_count = beneficiaries[field].isnull().sum()
    status = "‚úì PASS" if null_count == 0 else "‚úó FAIL"
    print(f"   {field}: {status} ({null_count} nulls)")

# Check 2: No duplicate IDs
print("\n2. Duplicate ID Check:")
dup_check = beneficiaries['DESYNPUF_ID'].duplicated().sum()
status = "‚úì PASS" if dup_check == 0 else "‚úó FAIL"
print(f"   Beneficiaries: {status} ({dup_check} duplicates)")

# Check 3: Data volume check
print("\n3. Data Volume Check:")
min_rows = config['data_loading']['min_rows']

for name, df in [('Beneficiaries', beneficiaries), ('Inpatient', inpatient), ('Outpatient', outpatient)]:
    status = "‚úì PASS" if len(df) >= min_rows else "‚úó FAIL"
    print(f"   {name}: {status} ({len(df):,} rows)")

# Check 4: Date range validation
print("\n4. Date Range Validation:")
if 'BENE_BIRTH_DT' in beneficiaries.columns:
    min_date = beneficiaries['BENE_BIRTH_DT'].min()
    max_date = beneficiaries['BENE_BIRTH_DT'].max()
    print(f"   Birth dates: {min_date.date()} to {max_date.date()}")

print("\n‚úì Validation complete")


DATA VALIDATION

1. Critical Field Validation:
   DESYNPUF_ID: ‚úì PASS (0 nulls)

2. Duplicate ID Check:
   Beneficiaries: ‚úì PASS (0 duplicates)

3. Data Volume Check:
   Beneficiaries: ‚úì PASS (116,352 rows)
   Inpatient: ‚úì PASS (66,705 rows)
   Outpatient: ‚úì PASS (779,815 rows)

4. Date Range Validation:
   Birth dates: 1909-01-01 to 1983-12-01

‚úì Validation complete


In [16]:
# Cell 8: Save Cleaned Data
"""
Save cleaned data back to interim/ folder (overwrite raw interim data)
"""

print("=" * 70)
print("SAVING CLEANED DATA")
print("=" * 70)

# Save with MLflow tracking
mlflow.set_tracking_uri(config['mlflow']['tracking_uri'])
mlflow.set_experiment("data_cleaning")

with mlflow.start_run(run_name="data_cleaning_pipeline"):
    
    # Log original sizes
    for key, value in original_sizes.items():
        mlflow.log_param(f"{key}_original", value)
    
    # Log cleaned sizes
    mlflow.log_param("beneficiaries_cleaned", len(beneficiaries))
    mlflow.log_param("inpatient_cleaned", len(inpatient))
    mlflow.log_param("outpatient_cleaned", len(outpatient))
    
    # Calculate retention rates
    ben_retention = len(beneficiaries) / original_sizes['beneficiaries'] * 100
    inp_retention = len(inpatient) / original_sizes['inpatient'] * 100
    out_retention = len(outpatient) / original_sizes['outpatient'] * 100
    
    mlflow.log_metric("beneficiaries_retention_pct", ben_retention)
    mlflow.log_metric("inpatient_retention_pct", inp_retention)
    mlflow.log_metric("outpatient_retention_pct", out_retention)
    
    # Save cleaned data
    print("\nSaving cleaned data to interim/...")
    beneficiaries.to_parquet(INTERIM_PATH / 'beneficiaries.parquet', index=False)
    inpatient.to_parquet(INTERIM_PATH / 'inpatient_claims.parquet', index=False)
    outpatient.to_parquet(INTERIM_PATH / 'outpatient_claims.parquet', index=False)
    
    print("‚úì Beneficiaries saved")
    print("‚úì Inpatient claims saved")
    print("‚úì Outpatient claims saved")

print("\n" + "=" * 70)
print("DATA CLEANING COMPLETE")
print("=" * 70)
print(f"Beneficiaries: {len(beneficiaries):,} ({ben_retention:.2f}% retained)")
print(f"Inpatient: {len(inpatient):,} ({inp_retention:.2f}% retained)")
print(f"Outpatient: {len(outpatient):,} ({out_retention:.2f}% retained)")
print("=" * 70)


SAVING CLEANED DATA


2026/01/08 08:57:19 INFO mlflow.tracking.fluent: Experiment with name 'data_cleaning' does not exist. Creating a new experiment.



Saving cleaned data to interim/...
‚úì Beneficiaries saved
‚úì Inpatient claims saved
‚úì Outpatient claims saved
üèÉ View run data_cleaning_pipeline at: http://localhost:5000/#/experiments/242354557831061142/runs/d34991eda6ef48f785f1c31490a85c89
üß™ View experiment at: http://localhost:5000/#/experiments/242354557831061142

DATA CLEANING COMPLETE
Beneficiaries: 116,352 (100.00% retained)
Inpatient: 66,705 (99.90% retained)
Outpatient: 779,815 (98.61% retained)
