In [1]:
#Libraries
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
import re

In [6]:
class DataCleaningPipeline:
    def __init__(self, data_path):
        self.data_path = data_path
        self.df = None

    def load_data(self):
        #Load dataset
        try:
            self.df = pd.read_csv(self.data_path)
            print(f"Data loaded successfully with {self.df.shape[0]} rows and {self.df.shape[1]} columns.")
        except Exception as e:
            print(f"Error loading data: {e}")
            raise
        
    def schema_validation(self, expected_columns):
        #Checking expected columns
        missing_columns = set(expected_columns) - set(self.df.columns)
        if missing_columns:
            raise ValueError(f"Missing columns: {missing_columns}")
        print("validation passed.")

    def handle_missing_data(self):
        # Dropping columns where more than 50% of data is missing
        self.df = self.df.dropna(thresh=self.df.shape[0]*0.5, axis=1)
        
        # Imputing missing values
        imputer = SimpleImputer(strategy='mean')
        numeric_cols = self.df.select_dtypes(include=np.number).columns
        self.df[numeric_cols] = imputer.fit_transform(self.df[numeric_cols])
        print(f"Missing data handled. Remaining columns with NaNs: {self.df.isna().sum().sum()}")

    def handle_duplicates(self):
        #Remove duplicates
        before = self.df.shape[0]
        self.df = self.df.drop_duplicates()
        after = self.df.shape[0]
        print(f"Removed {before - after} duplicate rows.")
        
    def detect_outliers(self, z_thresh=3):
        #handle outlier (Z-score)
        numeric_cols = self.df.select_dtypes(include=np.number).columns
        z_scores = np.abs((self.df[numeric_cols] - self.df[numeric_cols].mean()) / self.df[numeric_cols].std())
        self.df = self.df[(z_scores < z_thresh).all(axis=1)]
        print(f"Outliers removed, {self.df.shape[0]} rows remaining.")

    def clean_strings(self):
        #further cleaning (e.g., trim, lowercase, remove special chars).
        string_cols = self.df.select_dtypes(include='object').columns
        for col in string_cols:
            self.df[col] = self.df[col].apply(lambda x: re.sub(r'[^\w\s]', '', str(x).lower().strip()))
        print(f"String columns cleaned.")

    def data_type_conversion(self):
        # Convert 'date' columns to datetime
        date_cols = [col for col in self.df.columns if 'date' in col.lower()]
        for col in date_cols:
            self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
        
        # Convert categorical columns to 'category' type
        categorical_cols = self.df.select_dtypes(include='object').columns
        for col in categorical_cols:
            self.df[col] = self.df[col].astype('category')
        
        print(f"Data type conversion completed.")
    
    def encode_and_scale(self):
        # One-hot encoding for categorical variables
        categorical_cols = self.df.select_dtypes(include='category').columns
        self.df = pd.get_dummies(self.df, columns=categorical_cols, drop_first=True)
        
        # Scaling numerical columns
        numeric_cols = self.df.select_dtypes(include=np.number).columns
        scaler = StandardScaler()
        self.df[numeric_cols] = scaler.fit_transform(self.df[numeric_cols])
        print(f"Data encoding and scaling completed.")
    
    def validate_data(self):
        #Final validation
        if self.df.isnull().sum().sum() > 0:
            raise ValueError("Data contains null values after cleaning.")
        
        # Ensure no negative or impossible values in specific columns
        if 'age' in self.df.columns:
            if (self.df['age'] < 0).any():
                raise ValueError("Age contains negative values.")
        
        print("Data validation passed.")
    
    def save_cleaned_data(self, output_path):
        self.df.to_csv(output_path, index=False)
        print(f"Cleaned data saved to {output_path}.")
    
    def run_pipeline(self, expected_columns, output_path):
        self.load_data()
        self.schema_validation(expected_columns)
        self.handle_missing_data()
        self.handle_duplicates()
        self.detect_outliers()
        self.clean_strings()
        self.data_type_conversion()
        self.encode_and_scale()
        self.validate_data()
        self.save_cleaned_data(output_path)

In [7]:
#usage
if __name__ == "__main__":
    data_path = '/Users/harikrishnans/Downloads/hospital_readmissions.csv'  
    output_path = '/Users/harikrishnans/Downloads/cleaned_data.csv'
    
    expected_columns = ['age', 'time_in_hospital', 'n_lab_procedures', 'n_procedures',
       'n_medications', 'n_outpatient', 'n_inpatient', 'n_emergency',
       'medical_specialty', 'diag_1', 'diag_2', 'diag_3', 'glucose_test',
       'A1Ctest', 'change', 'diabetes_med', 'readmitted']  # Define expected columns

    pipeline = DataCleaningPipeline(data_path)
    pipeline.run_pipeline(expected_columns, output_path)

Data loaded successfully with 25000 rows and 17 columns.
validation passed.
Missing data handled. Remaining columns with NaNs: 0
Removed 0 duplicate rows.
Outliers removed, 23121 rows remaining.
String columns cleaned.
Data type conversion completed.
Data encoding and scaling completed.
Data validation passed.
Cleaned data saved to /Users/harikrishnans/Downloads/cleaned_data.csv.
