In [1]:
############################
### DATA CLEANING SCRIPT ###
############################

import pandas as pd
import numpy as np
import os
from scipy.stats.mstats import winsorize
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
from sklearn.compose import ColumnTransformer
import json

# ======================
# CONFIGURATION
# ======================
class Config:
    INPUT_PATH = r"C:\Users\soulf\Downloads\archive\HR_Employee_Attrition.csv"
    OUTPUT_PATH = r"C:\Users\soulf\Downloads\archive\HR_Employee_Cleaned.pkl"
    REPORT_PATH = r"C:\Users\soulf\Downloads\archive\cleaning_report.json"
    
    # Define special columns
    TARGET = "Attrition"
    ID_COL = "EmployeeNumber"
    CONSTANT_COLS = ["EmployeeCount", "StandardHours", "Over18"]
    DATE_COLS = []  # Add if temporal features exist
    
    # Ordinal feature encoding mappings
    ORDINAL_FEATURES = {
        "Education": ["Below College", "College", "Bachelor", "Master", "Doctor"],
        "JobSatisfaction": ["Low", "Medium", "High", "Very High"],
        "WorkLifeBalance": ["Bad", "Good", "Better", "Best"]
    }
    
    WINSORIZE_LIMITS = (0.01, 0.01)  # 1% trimming on both ends

# ======================
# HELPER FUNCTIONS
# ======================
def load_data():
    """Load and validate raw data"""
    if not os.path.exists(Config.INPUT_PATH):
        raise FileNotFoundError(f"Input file not found: {Config.INPUT_PATH}")
    
    df = pd.read_csv(Config.INPUT_PATH)
    print(f"✅ Raw data loaded. Shape: {df.shape}")
    return df

def generate_cleaning_report(original_shape, new_shape, missing_info, dupe_info, outlier_info, constant_cols):
    """Generate JSON cleaning report"""
    report = {
        "original_samples": original_shape[0],
        "original_features": original_shape[1],
        "final_samples": new_shape[0],
        "final_features": new_shape[1],
        "missing_values_handled": missing_info,
        "duplicates_removed": dupe_info,
        "outliers_handled": outlier_info,
        "constant_columns_removed": constant_cols,
        "data_loss_percentage": round((1 - new_shape[0]/original_shape[0])*100, 2)
    }
    
    with open(Config.REPORT_PATH, 'w') as f:
        json.dump(report, f, indent=2)
    print(f"⏩ Cleaning report saved to {Config.REPORT_PATH}")

# ======================
# MAIN CLEANING PIPELINE
# ======================
def clean_data(df):
    original_shape = df.shape
    report_data = {}

    # Phase 1: Initial Cleaning
    # --------------------------
    # Remove irrelevant columns
    df = df.drop(columns=[Config.ID_COL] + Config.CONSTANT_COLS + Config.DATE_COLS, errors="ignore")
    report_data["constant_cols"] = Config.CONSTANT_COLS
    
    # Convert columns to proper types
    df[Config.TARGET] = df[Config.TARGET].astype("category")
    
    # Phase 2: Handle Missing Values
    # ------------------------------
    missing_before = df.isna().sum().to_dict()
    threshold = 0.3  # 30% missing threshold
    
    for col in df.columns:
        # Remove high-missing columns
        if df[col].isna().mean() > threshold:
            df.drop(columns=col, inplace=True)
            report_data.setdefault("high_missing_cols", []).append(col)
            continue
            
        # Impute remaining missing values
        if pd.api.types.is_categorical_dtype(df[col]):
            df[col] = df[col].fillna(df[col].mode()[0])
        else:
            df[col] = df[col].fillna(df[col].median())
    
    report_data["missing_info"] = {
        "before": missing_before,
        "after": df.isna().sum().to_dict()
    }

    # Phase 3: Handle Duplicates
    # ---------------------------
    duplicates = df.duplicated().sum()
    df = df.drop_duplicates()
    report_data["dupe_info"] = {
        "duplicates_found": duplicates,
        "remaining_samples": df.shape[0]
    }

    # Phase 4: Outlier Handling
    # --------------------------
    numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
    outlier_report = {}
    
    for col in numeric_cols:
        original_std = df[col].std()
        df[col] = winsorize(df[col], limits=Config.WINSORIZE_LIMITS)
        outlier_report[col] = {
            "original_std": original_std,
            "new_std": df[col].std(),
            "winsorized_percentage": Config.WINSORIZE_LIMITS[0]*100*2
        }
    
    report_data["outlier_info"] = outlier_report

    # Phase 5: Categorical Encoding
    # ------------------------------
    categorical_cols = df.select_dtypes(include=["category", "object"]).columns.tolist()
    ordinal_cols = [col for col in categorical_cols if col in Config.ORDINAL_FEATURES]
    nominal_cols = list(set(categorical_cols) - set(ordinal_cols))
    
    # Ordinal Encoding
    for col in ordinal_cols:
        encoder = OrdinalEncoder(categories=[Config.ORDINAL_FEATURES[col]])
        df[col] = encoder.fit_transform(df[[col]])
    
    # One-Hot Encoding (delayed until after EDA)
    # Preserve original categorical columns for EDA
    df = pd.get_dummies(df, columns=nominal_cols, drop_first=False, dummy_na=False)
    
    # Finalize
    df.to_pickle(Config.OUTPUT_PATH)
    generate_cleaning_report(original_shape, df.shape, 
                           report_data["missing_info"], report_data["dupe_info"],
                           report_data["outlier_info"], report_data["constant_cols"])
    return df

if __name__ == "__main__":
    raw_df = load_data()
    cleaned_df = clean_data(raw_df)
    print(f"🚀 Final cleaned data shape: {cleaned_df.shape}")

FileNotFoundError: Input file not found: C:\Users\soulf\Downloads\archive\HR_Employee_Attrition.csv