In [5]:
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, List, Tuple

# Configuration for HCAHPS data processing
HCAHPS_CONFIG = {
    "release_period": "07_2024",
    "columns_to_drop": [
        'HCAHPS Question', 'HCAHPS Answer Description', 
        'Footnote', 'Start Date', 'End Date'
    ],
    "measure_prefixes": [
        'H_COMP_1', 'H_COMP_3', 'H_COMP_2', 'H_CLEAN_HSP',
        'H_QUIET_HSP', 'H_COMP_5', 'H_COMP_6', 'H_HSP_RATING',
        'H_RECMND', 'H_COMP_7'
    ],
    "suffix_categories": {
        'Top-box Percentage': ['_A_P', '_Y_P', '_SA', '_DY', '_9_10'],
        'Bottom-box Percentage': ['_SN_P', '_N_P', '_D_SD', '_DN', '_0_6'],
        'Middle-box Percentage': ['_U_P', '_A', '_PY', '_7_8']
    },
    "output_dir": "processed_data"
}

def process_hcahps_data(input_path: Path, output_path: Path) -> pd.DataFrame:
    """Process HCAHPS survey data from raw CSV to formatted analysis-ready format."""
    df = pd.read_csv(input_path)
    
    required_columns = {'HCAHPS Measure ID', 'HCAHPS Answer Percent'}
    missing = required_columns - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    df.insert(0, 'Release Period', HCAHPS_CONFIG["release_period"])
    filtered = df.drop(columns=HCAHPS_CONFIG["columns_to_drop"], errors="ignore")

    measure_pattern = f"^({'|'.join(HCAHPS_CONFIG['measure_prefixes'])})_"
    mask = filtered['HCAHPS Measure ID'].str.match(measure_pattern, na=False)
    filtered = filtered[mask].copy()

    filtered['Measure ID'] = filtered['HCAHPS Measure ID'].str.extract(measure_pattern)[0]
    
    category_conditions = []
    for category, suffixes in HCAHPS_CONFIG["suffix_categories"].items():
        pattern = f"({'|'.join(suffixes)})$"
        category_conditions.append(filtered['HCAHPS Measure ID'].str.contains(pattern, na=False))
    
    filtered['Response Category'] = np.select(
        category_conditions,
        list(HCAHPS_CONFIG["suffix_categories"].keys()),
        default=pd.NA
    )

    filtered['HCAHPS Answer Percent'] = pd.to_numeric(
        filtered['HCAHPS Answer Percent'], 
        errors='coerce'
    )

    pivot_index = ['Release Period', 'Measure ID']
    if 'State' in filtered.columns:
        pivot_index.insert(1, 'State')

    pivot_df = filtered.pivot_table(
        index=pivot_index,
        columns='Response Category',
        values='HCAHPS Answer Percent',
        aggfunc='mean',
        fill_value=0
    ).reset_index().rename_axis(columns=None)

    percent_columns = list(HCAHPS_CONFIG["suffix_categories"].keys())
    for col in percent_columns:
        if col in pivot_df.columns:
            pivot_df[col] = pivot_df[col].clip(0, 100).astype(int)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    pivot_df.to_csv(output_path, index=False)
    
    return pivot_df

# Configuration for data merging pipeline
PIPELINE_CONFIG = {
    "base_dir": None,  # To be set in main
    "data_paths": {
        "state_results": "maven_data/state_results.csv",
        "national_results": "maven_data/national_results.csv",
        "state_2024": "healps_data/processed_data/state_2024.csv",
        "national_2024": "healps_data/processed_data/national_2024.csv",
        "states": "maven_data/states.csv",
        "measures": "maven_data/measures.csv",
        "reports": "maven_data/reports.csv",
    },
    "merge_keys": {
        "state_merge": ["State"],
        "measure_merge": ["Measure ID"],
        "report_merge": ["Release Period"],
        "covid_merge": ["Year", "State"]
    },
    "output_file": "final_merged_dataset.csv"
}

class DataPipeline:
    """Data processing pipeline for merging multiple healthcare datasets"""
    
    def __init__(self):
        self.data = None
        self._validate_paths()
        
    def _validate_paths(self) -> None:
        """Validate existence of all required data files"""
        for key, rel_path in PIPELINE_CONFIG["data_paths"].items():
            path = PIPELINE_CONFIG["base_dir"] / rel_path
            if not path.exists():
                raise FileNotFoundError(f"Missing data file: {path}")
            PIPELINE_CONFIG["data_paths"][key] = path

    def load_data(self) -> Dict[str, pd.DataFrame]:
        """Load all required datasets with validation"""
        datasets = {}
        required_columns = {
            "state_results": ["State", "Measure ID", "Release Period"],
            "national_results": ["Measure ID", "Release Period"],
            "states": ["State", "State Name"],
            "measures": ["Measure ID", "Measure"]
        }

        for key, path in PIPELINE_CONFIG["data_paths"].items():
            datasets[key] = pd.read_csv(path)
            if key in required_columns:
                missing = set(required_columns[key]) - set(datasets[key].columns)
                if missing:
                    raise ValueError(f"Missing columns in {key}: {missing}")

        return datasets

    def _safe_concat(self, df1: pd.DataFrame, df2: pd.DataFrame, dataset: str) -> pd.DataFrame:
        """Helper function for safe dataframe concatenation"""
        if df1.empty:
            return df2.copy()
        if df2.empty:
            return df1.copy()
        
        if set(df1.columns) != set(df2.columns):
            raise ValueError(f"Column mismatch in {dataset} concatenation")
            
        return pd.concat([df1, df2], ignore_index=True)

    def merge_datasets(self, datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
        """Main data merging pipeline"""
        state_results = self._safe_concat(
            datasets["state_results"], 
            datasets["state_2024"],
            "state results"
        )
        
        national_results = self._safe_concat(
            datasets["national_results"], 
            datasets["national_2024"],
            "national results"
        )

        merged = state_results.merge(
            datasets["states"],
            on=PIPELINE_CONFIG["merge_keys"]["state_merge"],
            how="left",
            validate="m:1"
        ).merge(
            datasets["measures"],
            on=PIPELINE_CONFIG["merge_keys"]["measure_merge"],
            how="left",
            validate="m:1"
        ).merge(
            datasets["reports"],
            on=PIPELINE_CONFIG["merge_keys"]["report_merge"],
            how="left",
            validate="m:1"
        )

        merged = merged.merge(
            national_results,
            on=PIPELINE_CONFIG["merge_keys"]["report_merge"] + PIPELINE_CONFIG["merge_keys"]["measure_merge"],
            how="left",
            suffixes=('', '_national'),
            validate="m:1"
        )

        return merged

    def save_results(self, df: pd.DataFrame) -> None:
        """Save final merged dataset"""
        output_path = PIPELINE_CONFIG["base_dir"] / PIPELINE_CONFIG["output_file"]
        output_path.parent.mkdir(parents=True, exist_ok=True)
        df.to_csv(output_path, index=False)

    def run_pipeline(self) -> None:
        """Execute full data processing pipeline"""
        datasets = self.load_data()
        merged_data = self.merge_datasets(datasets)
        self.save_results(merged_data)

if __name__ == "__main__":
    try:
        base_dir = Path(__file__).resolve().parent.parent
    except NameError:
        base_dir = Path.cwd().parent

    PIPELINE_CONFIG["base_dir"] = base_dir

    # Process HCAHPS data
    input_files = [
        base_dir / "healps_data" / "HCAHPS-National.csv",
        base_dir / "healps_data" / "HCAHPS-State.csv"
    ]
    output_dir = base_dir / "healps_data" / HCAHPS_CONFIG["output_dir"]
    output_files = [
        output_dir / "national_2024.csv",
        output_dir / "state_2024.csv"
    ]

    for path in input_files:
        if not path.exists():
            raise FileNotFoundError(f"Missing input file: {path}")

    for input_path, output_path in zip(input_files, output_files):
        process_hcahps_data(input_path, output_path)

    # Run merging pipeline
    pipeline = DataPipeline()
    pipeline.run_pipeline()

FileNotFoundError: Missing input file: c:\Users\alnem\OneDrive - Institute for Community Alliances\Desktop\Al Nemrawi, Hassan-2-20240821T140221Z-001\portfolio\HCAHPS_Patient_Survey_Analysis\healps_data\HCAHPS-National.csv

In [20]:
import pandas as pd
import numpy as np
from pathlib import Path

CONFIG = {
    "release_period": "07_2024",
    "columns_to_drop": [
        'HCAHPS Question', 'HCAHPS Answer Description',
        'Footnote', 'Start Date', 'End Date'
    ],
    "measure_prefixes": [
        'H_COMP_1','H_COMP_2','H_COMP_3','H_CLEAN_HSP',
        'H_QUIET_HSP','H_COMP_5','H_COMP_6',
        'H_HSP_RATING','H_RECMND','H_COMP_7'
    ],
    "suffix_categories": {
        'Top-box Percentage':    ['_A_P','_Y_P','_SA','_DY','_9_10'],
        'Bottom-box Percentage': ['_SN_P','_N_P','_D_SD','_DN','_0_6'],
        'Middle-box Percentage': ['_U_P','_A','_PY','_7_8']
    },
    "raw_data_dir": Path("../data/hcahps_data"),
    "processed_data_dir": Path("../data/hcahps_data/processed_data"),
    "processed_national": "national_2024.csv",
    "processed_state": "state_2024.csv",
    "maven_data_dir": Path("../data/maven_data"),
    "final_merged_file": Path("../data/final_merged_dataset.csv"),
    "merge_keys": {
        "state_merge": ["State"],
        "measure_merge": ["Measure ID"],
        "report_merge": ["Release Period"]
    }
}

def process_hcahps_data(input_path: Path, output_path: Path) -> pd.DataFrame:
    df = pd.read_csv(input_path)
    df.insert(0, 'Release Period', CONFIG["release_period"])
    df.drop(columns=CONFIG["columns_to_drop"], errors='ignore', inplace=True)
    measure_regex = f"^({'|'.join(CONFIG['measure_prefixes'])})_"
    df = df[df['HCAHPS Measure ID'].str.match(measure_regex, na=False)].copy()
    df['Measure ID'] = df['HCAHPS Measure ID'].str.extract(measure_regex)[0]

    cat_conditions = []
    for cat, suffixes in CONFIG["suffix_categories"].items():
        cat_conditions.append(df['HCAHPS Measure ID'].str.contains(f"(?:{'|'.join(suffixes)})$", na=False))
        
    df['Response Category'] = np.select(cat_conditions, CONFIG["suffix_categories"].keys(), default=pd.NA)
    df['HCAHPS Answer Percent'] = pd.to_numeric(df['HCAHPS Answer Percent'], errors='coerce')

    pivot_index = ['Release Period','Measure ID']
    if 'State' in df.columns:
        pivot_index.insert(1, 'State')

    pivot_df = df.pivot_table(
        index=pivot_index,
        columns='Response Category',
        values='HCAHPS Answer Percent',
        aggfunc='mean',
        fill_value=0
    ).reset_index()
    pivot_df.columns.name = None

    for cat in CONFIG["suffix_categories"].keys():
        if cat in pivot_df.columns:
            pivot_df[cat] = pivot_df[cat].clip(0, 100).astype(int)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    pivot_df.to_csv(output_path, index=False)
    return pivot_df

def safe_concat(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    if df1.empty:
        return df2.copy()
    if df2.empty:
        return df1.copy()
    if set(df1.columns) != set(df2.columns):
        raise ValueError("Cannot concatenate; columns differ.")
    return pd.concat([df1, df2], ignore_index=True)

def merge_datasets(maven_data: dict, new_state: pd.DataFrame, new_national: pd.DataFrame) -> pd.DataFrame:
    state_results = safe_concat(maven_data["state_results"], new_state)
    national_results = safe_concat(maven_data["national_results"], new_national)
    merged = (
        state_results
        .merge(maven_data["states"],   on=CONFIG["merge_keys"]["state_merge"],   how="left", validate="m:1")
        .merge(maven_data["measures"], on=CONFIG["merge_keys"]["measure_merge"], how="left", validate="m:1")
        .merge(maven_data["reports"],  on=CONFIG["merge_keys"]["report_merge"],  how="left", validate="m:1")
    )
    final = merged.merge(
        national_results,
        on=CONFIG["merge_keys"]["report_merge"] + CONFIG["merge_keys"]["measure_merge"],
        how="left",
        suffixes=('', '_national'),
        validate="m:1"
    )
    return final

def run_pipeline():
    national_input = CONFIG["raw_data_dir"] / "HCAHPS-National.csv"
    state_input    = CONFIG["raw_data_dir"] / "HCAHPS-State.csv"
    national_out   = CONFIG["processed_data_dir"] / CONFIG["processed_national"]
    state_out      = CONFIG["processed_data_dir"] / CONFIG["processed_state"]

    national_df = process_hcahps_data(national_input, national_out)
    state_df    = process_hcahps_data(state_input,    state_out)

    maven_dir = CONFIG["maven_data_dir"]
    maven_data = {
        "state_results":    pd.read_csv(maven_dir / "state_results.csv"),
        "national_results": pd.read_csv(maven_dir / "national_results.csv"),
        "states":           pd.read_csv(maven_dir / "states.csv"),
        "measures":         pd.read_csv(maven_dir / "measures.csv"),
        "reports":          pd.read_csv(maven_dir / "reports.csv")
    }

    final_merged = merge_datasets(maven_data, state_df, national_df)
    final_out = CONFIG["final_merged_file"]
    final_out.parent.mkdir(parents=True, exist_ok=True)
    final_merged.to_csv(final_out, index=False)

if __name__ == "__main__":
    run_pipeline()
