#Import Libraries

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from scipy import stats


#Data Ingestion

In [None]:
def ingest_data(file_path):
    try:
        data = pd.read_csv(file_path)
        print(f"Data loaded successfully with {data.shape[0]} rows and {data.shape[1]} columns.")
        return data
    except Exception as e:
        print(f"Error loading data: {e}")
        return None


#Initial Data Exploration

In [None]:
def initial_exploration(data):
    print("First 5 Rows:\n", data.head())
    print("Data Info:\n", data.info())
    print("Statistical Summary:\n", data.describe())
    return data.isnull().sum()


#Data Validation

In [None]:
def validate_data(data, schema):
    for col, dtype in schema.items():
        if col in data.columns:
            if not data[col].dtype == dtype:
                print(f"Invalid data type in column {col}. Expected {dtype}, got {data[col].dtype}.")
    print("Data validation completed.")


#Handling Missing Values

In [None]:
# def handle_missing_values(data, strategy="mean"):
#     imputer = SimpleImputer(strategy=strategy)
#     data_imputed = pd.DataFrame(imputer.fit_transform(data), columns=data.columns)
#     return data_imputed

def handle_missing_values(data, strategy="mean"):
    numeric_data = data.select_dtypes(include=[np.number])
    non_numeric_data = data.select_dtypes(exclude=[np.number])
    
    imputer = SimpleImputer(strategy=strategy)
    numeric_data_imputed = pd.DataFrame(imputer.fit_transform(numeric_data), columns=numeric_data.columns)
    
    return pd.concat([numeric_data_imputed, non_numeric_data], axis=1)


#Data Manipulation (Feature Transformation, Deriving New Features)

In [None]:
# def manipulate_data(data):
#     # Example: Add a new column as a transformation of existing data
#     data["new_feature"] = data["feature1"] * data["feature2"]
#     return data


def manipulate_data(data):
    if "feature1" in data.columns and "feature2" in data.columns:
        data["new_feature"] = data["feature1"] * data["feature2"]
    return data


#Outlier Detection and Treatment

In [None]:
# def treat_outliers(data, threshold=3):
#     z_scores = np.abs(stats.zscore(data.select_dtypes(include=[np.number])))
#     data_no_outliers = data[(z_scores < threshold).all(axis=1)]
#     print(f"Outliers removed. New data shape: {data_no_outliers.shape}")
#     return data_no_outliers


def treat_outliers(data, threshold=3):
    numeric_data = data.select_dtypes(include=[np.number])
    z_scores = np.abs(stats.zscore(numeric_data))
    data_no_outliers = data[(z_scores < threshold).all(axis=1)]
    print(f"Outliers removed. New data shape: {data_no_outliers.shape}")
    return data_no_outliers


#Data Encoding

In [None]:
def encode_data(data):
    encoder = OneHotEncoder(sparse=False, drop='first')
    categorical_data = pd.DataFrame(encoder.fit_transform(data.select_dtypes(include=["object"])),
                                    columns=encoder.get_feature_names_out())
    data = data.drop(columns=data.select_dtypes(include=["object"]).columns)
    data = pd.concat([data, categorical_data], axis=1)
    return data


#Data Transformation and Normalization

In [None]:
# def transform_data(data, scaler_type="standard"):
#     scaler = StandardScaler() if scaler_type == "standard" else MinMaxScaler()
#     data_transformed = pd.DataFrame(scaler.fit_transform(data), columns=data.columns)
#     return data_transformed


def transform_data(data, scaler_type="standard"):
    scaler = StandardScaler() if scaler_type == "standard" else MinMaxScaler()
    numeric_data = data.select_dtypes(include=[np.number])
    scaled_data = pd.DataFrame(scaler.fit_transform(numeric_data), columns=numeric_data.columns)
    return pd.concat([scaled_data, data.select_dtypes(exclude=[np.number])], axis=1)


#Feature Engineering

In [None]:
# def feature_engineering(data):
#     # Example: Dimensionality reduction using PCA
#     pca = PCA(n_components=5)
#     principal_components = pca.fit_transform(data)
#     data_pca = pd.DataFrame(principal_components, columns=[f"PCA_{i}" for i in range(1, 6)])
#     return pd.concat([data, data_pca], axis=1)


def feature_engineering(data):
    numeric_data = data.select_dtypes(include=[np.number])
    pca = PCA(n_components=min(5, numeric_data.shape[1]))
    principal_components = pca.fit_transform(numeric_data)
    data_pca = pd.DataFrame(principal_components, columns=[f"PCA_{i}" for i in range(1, principal_components.shape[1] + 1)])
    return pd.concat([data, data_pca], axis=1)


#Data Deduplication

In [None]:
def deduplicate_data(data):
    data_deduped = data.drop_duplicates()
    print(f"Duplicates removed. New data shape: {data_deduped.shape}")
    return data_deduped


#Data Export

In [None]:
def export_data(data, file_path):
    try:
        data.to_csv(file_path, index=False)
        print(f"Data exported successfully to {file_path}.")
    except Exception as e:
        print(f"Error exporting data: {e}")


#Error Handling and Logging

In [None]:
import logging

logging.basicConfig(filename='data_cleaning.log', level=logging.INFO)

def log_and_handle_errors(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logging.error(f"Error in {func.__name__}: {e}")
            print(f"Error in {func.__name__}: {e}")
            return None
    return wrapper


In [None]:
ingest_data = log_and_handle_errors(ingest_data)
initial_exploration = log_and_handle_errors(initial_exploration)
validate_data = log_and_handle_errors(validate_data)
handle_missing_values = log_and_handle_errors(handle_missing_values)
manipulate_data = log_and_handle_errors(manipulate_data)
treat_outliers = log_and_handle_errors(treat_outliers)
encode_data = log_and_handle_errors(encode_data)
transform_data = log_and_handle_errors(transform_data)
feature_engineering = log_and_handle_errors(feature_engineering)
deduplicate_data = log_and_handle_errors(deduplicate_data)
export_data = log_and_handle_errors(export_data)


#Main Pipeline Function

In [None]:
def data_cleaning_pipeline(file_path, schema, export_path):
    data = ingest_data(file_path)
    if data is None:
        return
    
    initial_exploration(data)
    validate_data(data, schema)
    
    data = handle_missing_values(data)
    data = manipulate_data(data)
    data = treat_outliers(data)
    data = encode_data(data)
    data = transform_data(data, scaler_type="standard")
    data = feature_engineering(data)
    data = deduplicate_data(data)
    
    export_data(data, export_path)
    print("Data cleaning pipeline completed successfully.")


In [None]:
schema = {
    "feature1": "float64",
    "feature2": "int64",
    "category_feature": "object"
}

data_cleaning_pipeline("healthcare_dataset_raw.csv", schema, "cleaned_healthcare_dataset.csv")
