# Production ETL Pipeline - US Multiple Myeloma Model

This notebook implements the ETL pipeline to ingest, clean, validate, and standardize CDC WONDER Multiple Myeloma incidence data (1999-2022).

**Objectives:**
1. **Ingest**: Load raw CDC CSV export.
2. **Clean**: Handle footer metadata, standard formatting, and numeric conversion.
3. **Validate**: Ensure integrity of years (1999-2022), sex, and counts.
4. **Transform**: Generate monthly incidence records (Year-Month-Age-Sex).
5. **Output**: Save clean annual and monthly datasets.

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

## 1. Configuration & Helper Functions

In [None]:
def load_and_clean_data(file_path):
    """
    Loads the CDC WONDER dataset and performs initial cleaning.
    """
    if not file_path.exists():
        # Try looking one directory up if running from a subdir or check alternate name
        raise FileNotFoundError(f"Input file not found: {file_path}")

    logger.info(f"Loading data from {file_path}...")
    
    # Read CSV
    df = pd.read_csv(file_path)
    
    # Strip whitespace from column names
    df.columns = df.columns.str.strip()
    
    # Clean 'Year' column and remove footer rows
    # Convert Year to numeric, coerce errors (footer text) to NaN
    df['Year'] = pd.to_numeric(df['Year'], errors='coerce')
    # Drop rows where Year is NaN
    df = df.dropna(subset=['Year'])
    # Convert Year to integer
    df['Year'] = df['Year'].astype(int)
    
    # Clean numeric columns (Count, Population)
    # Remove commas and convert to numeric
    for col in ['Count', 'Population']:
        if df[col].dtype == object:
            df[col] = df[col].str.replace(',', '', regex=False)
        df[col] = pd.to_numeric(df[col], errors='coerce')
        
    # Drop rows with missing Count or Population
    df = df.dropna(subset=['Count', 'Population'])
    
    # Ensure Count and Population are integers
    df['Count'] = df['Count'].astype(int)
    df['Population'] = df['Population'].astype(int)

    logger.info(f"Data loaded and cleaned. {len(df)} rows remaining.")
    return df

In [None]:
def validate_data(df):
    """
    Validates data integrity rules.
    """
    logger.info("Validating data integrity...")
    
    # 1. Confirm Year range 1999â€“2022
    min_year, max_year = df['Year'].min(), df['Year'].max()
    if min_year != 1999 or max_year != 2022:
        logger.warning(f"Year range mismatch! Expected 1999-2022, got {min_year}-{max_year}")
    else:
        logger.info(f"Year range verified: {min_year}-{max_year}")
        
    # 2. Check that both Male and Female exist
    sexes = df['Sex'].unique()
    if 'Male' not in sexes or 'Female' not in sexes:
        logger.error(f"Missing sex categories! Found: {sexes}")
        raise ValueError("Dataset must contain both Male and Female entries.")
    
    # 3. Ensure no negative counts
    if (df['Count'] < 0).any():
        logger.error("Negative counts found!")
        raise ValueError("Dataset contains negative counts.")
        
    # Print annual totals
    annual_counts = df.groupby('Year')['Count'].sum()
    print("\n--- Annual Totals ---")
    print(annual_counts)
    print("---------------------\n")
    
    return annual_counts

In [None]:
def generate_monthly_data(df):
    """
    Expands annual data into monthly data.
    """
    logger.info("Generating monthly incidence data...")
    
    # Vectorized expansion approach
    
    # Repeat the dataframe 12 times (one for each month)
    df_monthly = df.loc[df.index.repeat(12)].copy()
    
    # Assign Month numbers 1-12 tiling
    df_monthly['Month'] = np.tile(np.arange(1, 13), len(df))
    
    # Divide Annual Count by 12 to get Monthly Count
    df_monthly['Annual_Count'] = df_monthly['Count'] # Keep original annual reference
    df_monthly['Count'] = df_monthly['Count'] / 12.0
    
    # Create Date column formatted as YYYY-MM-01
    df_monthly['Date'] = pd.to_datetime(
        df_monthly['Year'].astype(str) + '-' + df_monthly['Month'].astype(str) + '-01'
    )
    
    # Sort for cleanliness
    df_monthly = df_monthly.sort_values(by=['Year', 'Month', 'Age Groups', 'Sex'])
    
    logger.info(f"Monthly data generated. {len(df_monthly)} rows.")
    return df_monthly

## 2. Execution
Define paths and run the pipeline.

In [None]:
# Setup Paths
# Note: When running in a notebook, '.' is usually the directory of the notebook.
base_dir = Path('.')
input_file_name = "United States and Puerto Rico Cancer Statistics, 1999-2022 Incidence.csv"
input_path = base_dir / input_file_name

# Fallback check for the filename mentioned in prompt if standard one is missing
if not input_path.exists():
    fallback_path = base_dir / "United States and Puerto Rico Cancer Statistics, 1999-2022 Incidence (1).csv"
    if fallback_path.exists():
        input_path = fallback_path

output_dir = base_dir / "epi outputs"
output_dir.mkdir(exist_ok=True)

print(f"Input Path: {input_path.resolve()}")
print(f"Output Dir: {output_dir.resolve()}")

In [None]:
try:
    # Step 1: Clean
    df_clean = load_and_clean_data(input_path)
    
    # Display first few rows of clean data
    print("Head of cleaned data:")
    display(df_clean.head())
    
    # Step 2: Validate
    annual_totals = validate_data(df_clean)
    
    # Step 3: Monthly Generation
    df_monthly = generate_monthly_data(df_clean)
    print("Head of monthly data:")
    display(df_monthly.head())
    
    # Step 4: Save Outputs
    logger.info("Saving outputs...")
    
    # A) Clean Data
    clean_path = output_dir / "uscs_myeloma_incidence_clean.csv"
    df_clean.to_csv(clean_path, index=False)
    logger.info(f"Saved clean data to {clean_path}")
    
    # B) Annual Totals
    annual_path = output_dir / "uscs_myeloma_incidence_annual_totals.csv"
    annual_totals.to_csv(annual_path)
    logger.info(f"Saved annual totals to {annual_path}")
    
    # C) Monthly Data
    monthly_path = output_dir / "uscs_myeloma_incidence_monthly.csv"
    df_monthly.to_csv(monthly_path, index=False)
    logger.info(f"Saved monthly data to {monthly_path}")
    
    logger.info("ETL Pipeline completed successfully.")
    
except Exception as e:
    logger.error(f"ETL Pipeline Failed: {e}")
    # Re-raise to show traceback in notebook
    raise