# Wrapper to Check for MCAR, MNAR, MAR, Skewed, Multimodal, Outliers

In [None]:
import numpy as np
from scipy.stats import normaltest
import statsmodels.api as sm
from sklearn.linear_model import LogisticRegression

# Categorize columns by data type
int64_cols = []
float64_cols = []
object_cols = []
category_cols = []

# Iterate through columns and categorize them based on dtype
for column in data_cate.columns:
    dtype = data_cate[column].dtype
    if dtype == 'int64':
        int64_cols.append(column)
    elif dtype == 'float64':
        float64_cols.append(column)
    elif dtype == 'object':
        object_cols.append(column)
    elif dtype == 'category':
        category_cols.append(column)

# Function to check for normality using the D'Agostino and Pearson's test
def check_normality(column_data):
    stat, p = normaltest(column_data.dropna())
    return p > 0.05  # p > 0.05 indicates the data is likely normal

# Function to determine missing value mechanism using statistical tests
def determine_missing_mechanism(df, column_name):
    column = df[column_name]

    # If all values are missing, the mechanism is MNAR
    if column.isnull().sum() == column.shape[0]:
        return "MNAR (all values missing)"

    # Check for MCAR using Little's MCAR test (simplified version)
    try:
        missing_indicator = column.isnull().astype(int)
        complete_data = df.dropna(subset=[column_name])
        incomplete_data = df.loc[column.isnull()]
        
        # Perform Little's MCAR test
        _, p_value = sm.stats.diagnostic.lilliefors(complete_data[column_name].dropna(), dist="norm")
        if p_value > 0.05:
            return "MCAR (missing completely at random)"
    except Exception as e:
        print(f"Error in MCAR test for column {column_name}: {e}")

    # Check for MAR using logistic regression
    try:
        df = df.dropna(subset=[column_name])
        missing_indicator = column.isnull().astype(int)
        X = df.drop([column_name], axis=1).select_dtypes(include=["number"]).fillna(0)
        y = missing_indicator[df.index]
        
        if len(np.unique(y)) > 1:  # Ensure the target has more than one class
            model = LogisticRegression(max_iter=1000).fit(X, y)
            if model.score(X, y) > 0.5:
                return "MAR (missing at random)"
    except Exception as e:
        print(f"Error in MAR logistic regression for column {column_name}: {e}")

    # Default to MNAR if other mechanisms cannot be established
    return "MNAR (missing not at random)"

# Function to analyze float64 columns with detailed statistics
def analyze_float64_column(col, column_data):
    print(f"\nColumn: {col}")

    # Unique value counts (including NaN)
    value_counts = column_data.value_counts(dropna=False)
    print(f"Unique values and counts:\n{value_counts}\n")

    # Count of missing values
    missing_values = column_data.isnull().sum()
    print(f"Number of missing values: {missing_values}")

    if missing_values > 0:
        # Determine missing value mechanism using statistical tests
        missing_mechanism = determine_missing_mechanism(data_cate, col)
        print(f"Missing value mechanism: {missing_mechanism}")

        # Check for normality
        is_normal = check_normality(column_data)
        if is_normal:
            mean = column_data.mean()
            std = column_data.std()
            print(f"Data distribution: Normal")
            print(f"2-standard deviation range: {mean - 2 * std} to {mean + 2 * std}")
        else:
            q25 = column_data.quantile(0.25)
            q75 = column_data.quantile(0.75)
            print(f"Data distribution: Not Normal")
            print(f"Interquartile range (IQR): {q25} to {q75}")

        # Determine skewness and multimodality
        skewness = column_data.skew()
        multimodal = len(column_data.value_counts()) > 2  # Simplistic check
        print(f"Skewness: {'Yes' if abs(skewness) > 1 else 'No'}")
        print(f"Multimodal: {'Yes' if multimodal else 'No'}")

        # Check for outliers using 1.5 * IQR rule
        iqr = column_data.quantile(0.75) - column_data.quantile(0.25)
        lower_bound = column_data.quantile(0.25) - 1.5 * iqr
        upper_bound = column_data.quantile(0.75) + 1.5 * iqr
        outliers = column_data[(column_data < lower_bound) | (column_data > upper_bound)]
        print(f"Outliers: {'Yes' if not outliers.empty else 'No'}")

    # Range of values
    min_value = column_data.min(skipna=True)
    max_value = column_data.max(skipna=True)
    print(f"Range of values: {min_value} to {max_value}")
    print("-" * 40)

# Function to analyze all columns in a category
def analyze_columns(columns, dtype_name):
    print(f"\n{'='*10} Analyzing {dtype_name} Columns {'='*10}\n")
    for col in columns:
        if dtype_name == "float64" and data_cate[col].isnull().sum() > 0:
            analyze_float64_column(col, data_cate[col])
        else:
            # Default analysis for other types
            print(f"Column: {col}")

            # Unique value counts (including NaN)
            value_counts = data_cate[col].value_counts(dropna=False)
            print(f"Unique values and counts:\n{value_counts}\n")

            # Count of missing values
            missing_values = data_cate[col].isnull().sum()
            print(f"Number of missing values: {missing_values}")

            # Range for numeric columns
            if dtype_name in ["int64", "float64"]:
                min_value = data_cate[col].min(skipna=True)
                max_value = data_cate[col].max(skipna=True)
                print(f"Range of values: {min_value} to {max_value}")

            print("-" * 40)
    print(f"\n{'='*30}\n")

# Analyze each category
analyze_columns(int64_cols, "int64")
analyze_columns(float64_cols, "float64")
analyze_columns(object_cols, "object")
analyze_columns(category_cols, "category")


Wrapper for Impuatation Analysis with Statistical Map

In [None]:
import pandas as pd
import numpy as np
from scipy.stats import normaltest, pearsonr
from sklearn.impute import KNNImputer
from sklearn.linear_model import LinearRegression

# 1. Helper Functions

# Function to determine sparsity
def determine_sparsity(column_data):
    unique_values = column_data.dropna().unique()
    sparsity_ratio = (len(unique_values) / len(column_data))
    return "Sparse" if sparsity_ratio < 0.1 else "Not Sparse"

# Function to check for missingness patterns
def check_missingness_pattern(data, col):
    missing = data[col].isnull().astype(int)
    patterns = {}
    for other_col in data.columns:
        if other_col != col:
            correlation = missing.corr(data[other_col].notnull().astype(int))
            patterns[other_col] = correlation
    correlated_columns = [k for k, v in patterns.items() if abs(v) > 0.5]
    return "Dependent" if correlated_columns else "Independent"

# Function to analyze feature relationships (correlation for numeric)
def check_feature_relationship(data, col):
    column_data = data[col]
    correlations = {}
    for other_col in data.columns:
        if other_col != col:
            if column_data.dtype in ["float64", "int64"] and data[other_col].dtype in ["float64", "int64"]:
                # Align indices to handle missing values
                aligned_data = pd.concat([column_data, data[other_col]], axis=1).dropna()
                if len(aligned_data) > 1:  # Ensure there are at least 2 data points
                    corr, _ = pearsonr(aligned_data.iloc[:, 0], aligned_data.iloc[:, 1])
                    correlations[other_col] = corr
    return "High Correlation" if any(abs(v) > 0.7 for v in correlations.values()) else "Low Correlation"

# Function to check skewness
def check_skewness(column_data):
    skewness = column_data.skew()
    return "Yes" if abs(skewness) > 1 else "No"

# Function to check for outliers using IQR
def check_outliers(column_data):
    iqr = column_data.quantile(0.75) - column_data.quantile(0.25)
    lower_bound = column_data.quantile(0.25) - 1.5 * iqr
    upper_bound = column_data.quantile(0.75) + 1.5 * iqr
    outliers = column_data[(column_data < lower_bound) | (column_data > upper_bound)]
    return "Yes" if not outliers.empty else "No"

# Function to check for normality using the D'Agostino and Pearson's test
def check_normality(column_data):
    stat, p = normaltest(column_data.dropna())
    return p > 0.05  # p > 0.05 indicates the data is likely normal

# 2. Analysis Functions for Each Data Type

categorical_data_mode_imputation = []
categorical_data_multivariate_imputation = []

def analyze_float64_column(col, column_data, data):
    print(f"\nColumn: {col} (float64)")

    # General Stats
    missing_values = column_data.isnull().sum()
    print(f"Missing values: {missing_values}")

    if missing_values > 0:
        # Sparsity
        sparsity = determine_sparsity(column_data)
        print(f"Sparsity: {sparsity}")

        # Missingness Pattern
        missingness_pattern = check_missingness_pattern(data, col)
        print(f"Missingness Pattern: {missingness_pattern}")

        # Feature Relationship
        feature_relationship = check_feature_relationship(data, col)
        print(f"Relationship with other features: {feature_relationship}")

        # Skewness and Outliers
        skewed = check_skewness(column_data)
        outliers = check_outliers(column_data)
        print(f"Skewed: {skewed}")
        print(f"Outliers: {outliers}")

    # Check for normality
    is_normal = check_normality(column_data)
    if is_normal:
        mean = column_data.mean()
        std = column_data.std()
        print(f"Data distribution: Normal")
        print(f"2-standard deviation range: {mean - 2 * std} to {mean + 2 * std}")
        imputation_recommendation = "Mean"
    else:
        q25 = column_data.quantile(0.25)
        q75 = column_data.quantile(0.75)
        print(f"Data distribution: Not Normal")
        print(f"Interquartile range (IQR): {q25} to {q75}")
        imputation_recommendation = "Median"

    # Imputation Recommendation based on table
    print(f"Imputation Recommendation: {imputation_recommendation}")

def analyze_int64_column(col, column_data, data):
    print(f"\nColumn: {col} (int64)")

    # General Stats
    missing_values = column_data.isnull().sum()
    print(f"Missing values: {missing_values}")

    if missing_values > 0:
        # Sparsity
        sparsity = determine_sparsity(column_data)
        print(f"Sparsity: {sparsity}")

        # Missingness Pattern
        missingness_pattern = check_missingness_pattern(data, col)
        print(f"Missingness Pattern: {missingness_pattern}")

        # Feature Relationship
        feature_relationship = check_feature_relationship(data, col)
        print(f"Relationship with other features: {feature_relationship}")

    # Imputation Recommendation
    print("Imputation Recommendation: Median")

def analyze_categorical_column(col, column_data, data):
    print(f"\nColumn: {col} (object/category)")

    # General Stats
    missing_values = column_data.isnull().sum()
    print(f"Missing values: {missing_values}")

    if missing_values > 0:
        # Sparsity
        sparsity = determine_sparsity(column_data)
        print(f"Sparsity: {sparsity}")

        # Missingness Pattern
        missingness_pattern = check_missingness_pattern(data, col)
        print(f"Missingness Pattern: {missingness_pattern}")

        # Mode and Cardinality
        mode = column_data.mode().iloc[0] if not column_data.mode().empty else "N/A"
        cardinality = column_data.nunique()
        print(f"Mode: {mode}, Cardinality: {cardinality}")

        # Imputation Recommendation based on table
        if missingness_pattern == "Dependent" and sparsity == "Sparse":
            if cardinality == 2:
                imputation_recommendation = "KNN Imputation or Multiple Imputation"
                categorical_data_multivariate_imputation.append(col)
            else:
                imputation_recommendation = "Mode"
                categorical_data_mode_imputation.append(col)
        else:
            imputation_recommendation = "Mode"
            categorical_data_mode_imputation.append(col)
    else:
        imputation_recommendation = "Mode"
        categorical_data_mode_imputation.append(col)

    # Imputation Recommendation
    print(f"Imputation Recommendation: {imputation_recommendation}")

# 3. Full Analysis Wrapper

def analyze_dataset(data):
    print("Starting analysis...\n")

    int64_cols = [col for col in data.columns if data[col].dtype == "int64"]
    float64_cols = [col for col in data.columns if data[col].dtype == "float64"]
    object_cols = [col for col in data.columns if data[col].dtype == "object"]
    category_cols = [col for col in data.columns if data[col].dtype == "category"]

    print("------ Int64 dtype -----------\n")
    for col in int64_cols:
        analyze_int64_column(col, data[col], data)

    print("\n------ Float64 dtype -----------\n")
    for col in float64_cols:
        analyze_float64_column(col, data[col], data)

    print("\n------ Object/Category dtype -----------\n")
    for col in object_cols + category_cols:
        analyze_categorical_column(col, data[col], data)

    print("Analysis completed.")
    print("\nCategorical Data Mode Imputation Columns:", categorical_data_mode_imputation)
    print("Categorical Data Multivariate Imputation Columns:", categorical_data_multivariate_imputation)

# Example usage (Replace `data` with your dataframe):
analyze_dataset(data_cate)

Train-test split before Imputation Implemenation

In [None]:
from sklearn.model_selection import train_test_split

# Stratified split
X_train, X_test, y_train, y_test = train_test_split(
    data_cate.drop("asthma_combined", axis=1),  # features
    data_cate["asthma_combined"],               # target
    test_size=0.3,                              # test set proportion
    random_state=0,                             # reproducibility
    stratify=data_cate["asthma_combined"]       # stratify by target
)

print(X_train.shape, X_test.shape)

Imputation

In [None]:
import json
# Load the lists back
with open("imputation_lists.json", "r") as f:
    imputation_lists = json.load(f)

categorical_data_mode_imputation = imputation_lists["categorical_data_mode_imputation"]
categorical_data_multivariate_imputation = imputation_lists["categorical_data_multivariate_imputation"]
float_columns_to_impute = imputation_lists["float_columns_to_impute"]

print("Loaded lists:", categorical_data_mode_imputation, categorical_data_multivariate_imputation, float_columns_to_impute)

Global Asthma Network pipeline

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# for the model
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score

# for feature engineering
from sklearn.preprocessing import StandardScaler
from feature_engine import imputation as mdi
from feature_engine import discretisation as dsc
from feature_engine import encoding as ce

In [None]:
#removing "asthma_combined" from the "categorical_data_multivariate_imputation" list as "asthma_combined" is the target column.

# Remove 'asthma_combined' from the list
categorical_data_multivariate_imputation.remove("asthma_combined")

# Print the updated list
print(categorical_data_multivariate_imputation)

In [None]:
gan_pipe = Pipeline(    [
        (
            "median_imputer",
            mdi.MeanMedianImputer(
                imputation_method="median", variables=float_columns_to_impute
            ),
        ),
        (
            'categorical_imputer',
             mdi.CategoricalImputer(
                 imputation_method='frequent',
                 variables=categorical_data_mode_imputation
             ),
        ),

        (
            'arbitrary_categorical_imputer',
             mdi.CategoricalImputer(
                 imputation_method='missing',
                 fill_value='Missing',
                 variables=categorical_data_multivariate_imputation
             ),
        ),        
    ]
)

# source = https://feature-engine.trainindata.com/en/1.8.x/api_doc/imputation/CategoricalImputer.html#feature_engine.imputation.CategoricalImputer
# source = https://feature-engine.trainindata.com/en/1.8.x/api_doc/imputation/index.html

In [None]:
# We can find the learned parametes
gan_pipe.named_steps["median_imputer"]

Fit the imputation pipeline to Train features (Target excluded)

In [None]:
gan_pipe.fit(X_train)

In [None]:
# transform the data sets

X_train_impu = gan_pipe.transform(X_train)
X_test_impu = gan_pipe.transform(X_test)

X_train_impu.head()

In [None]:
#Saving DataFrames as .pkl Files

# Save DataFrames and Series to pickle files
X_train_impu.to_pickle('X_train_impu70.pkl')
X_test_impu.to_pickle('X_test_impu30.pkl')

print("Imputed Data saved as .pkl files")
print(X_train_impu.shape, X_test_impu.shape)
print(y_train.shape, y_test.shape)

Drop these columns since they are 98% missing:
school_india, pincode_india, pincode_poll, latitude_poll, longitude_poll

# Encoding

In [None]:
# Load the pickle files
import pandas as pd
X_train_impu = pd.read_pickle('70-30(imputed)/X_train_impu70.pkl')
X_test_impu = pd.read_pickle('70-30(imputed)/X_test_impu30.pkl')
y_train = pd.read_pickle('70-30(imputed)/y_train70.pkl')
y_test = pd.read_pickle('70-30(imputed)/y_test30.pkl')

We must encode the categorical data attributes

In [None]:
categorical_cols = X_train_impu.select_dtypes(include=['object', 'category']).columns

# Check unique values in each categorical column
for col in categorical_cols:
    print(f"Column: {col}, dtype: {X_train_impu[col].dtype},   Unique Values: {data_enc[col].nunique()}")

In [None]:
# checking each datatype

#medpill , medpil1b, medpil2b, medpil3b, medpil4b are not in object column. lets check

# lets categorize each of the column intot the above datatype

# Initialize empty lists for each data type
int64_cols = []
float64_cols = []
object_cols = []
int32_cols = []
category_cols = []
float32_cols = []
int16_cols = []
float16_cols = []

# Iterate through columns and categorize them based on dtype
for column in X_train_impu.columns:
    dtype = X_train_impu[column].dtype
    if dtype == 'int64':
        int64_cols.append(column)
    elif dtype == 'float64':
        float64_cols.append(column)
    elif dtype == 'object':
        object_cols.append(column)
    elif dtype == 'bool':
        bool_cols.append(column)
    elif dtype == 'datetime64[ns]':
        datetime64_cols.append(column)
    elif dtype == 'timedelta64[ns]':
        timedelta_cols.append(column)
    elif dtype == 'category':
        category_cols.append(column)
    elif dtype == 'int32':
        int32_cols.append(column)
    elif dtype == 'float32':
        float32_cols.append(column)
    elif dtype == 'int16':
        int16_cols.append(column)
    elif dtype == 'float16':
        float16_cols.append(column)
    elif dtype == 'uint8':
        uint8_cols.append(column)
    elif dtype == 'uint16':
        uint16_cols.append(column)
    elif dtype == 'uint32':
        uint32_cols.append(column)
    elif dtype == 'uint64':
        uint64_cols.append(column)

# Print the columns categorized into each data type
print("int64_cols:", int64_cols)
print("\nfloat64_cols:", float64_cols)
print("\nobject_cols:", object_cols)

# print("\nbool_cols:", bool_cols)
# print("\ndatetime64_cols:", datetime64_cols)
# print("\ntimedelta_cols:", timedelta_cols)

print("\ncategory_cols:", category_cols)
print("\n int32_cols:", int32_cols)
print("\n float32_cols:", float32_cols)
print("\n int16_cols:", int16_cols)
print("\n float16_cols:", float16_cols)
# print("\n uint8_cols:", uint8_cols)
# print("\n uint16_cols:", uint16_cols)
# print("\n uint32_cols:", uint32_cols)
# print("\n uint64_cols:", uint64_cols)

In [None]:
categorical_cols = category_cols + object_cols

Extracted this encoding recommendation from the propriority wrapper

In [None]:
encoding_recommendations={'OneHotEncoding': ['milkynga', 'medpil1b', 'medpil2b', 'medpil3b', 'medpil4b'], 
                          'LabelEncoding': ['sex', 'whezev', 'whezage', 'whez12', 'nwhez12', 'awake12', 'speech12', 
                            'medpuff', 'sabafreq', 'labafreq', 'icsfreq', 'combfreq', 'medpill', 'docbrt12', 'erbrth12', 
                            'hosbrt12', 'exwhez12', 'cough12', 'pnoseev', 'pnoseage', 'pnose12', 'iitch12', 'ieyes12', 'iactiv12', 
                            'hfeverev', 'hfevdoc', 'rashev', 'mparaprg', 'msmokprg', 'mpcar01', 'mpcar02', 'mpcar03', 'mpcar04', 
                            'chprem', 'brstfed', 'nbrstfed', 'nbrstexc', 'parayng', 'nchstyng', 'antibiot', 'nantibiot', 'antibioch', 'sheepyng', 
                            'catyng', 'dogyng', 'aniyng', 'wheezyng', 'medyng', 'medyng1', 'medyng2', 'medyng3', 'medyng4', 'medyng5', 'medyng6', 'medyng7', 
                            'chcaryng', 'chcarold', 'exercise', 'televis', 'computer', 'pneumon', 'twin', 'cntrybir', 'chhmchng', 'trucfreq', 'meat',
                            'seafood', 'fruit', 'vegecook', 'vegeraw', 'pulses', 'cereals', 'bread', 'pasta', 'rice', 'margarin', 'butter', 'oliveoil', 
                            'milk', 'dairyoth', 'eggs', 'nuts', 'potato', 'sugar', 'burger', 'fastfood', 'softdrnk', 'catnow', 'dognow', 'paranow', 
                            'chflr01_1', 'chflr01_2', 'chflr01_3', 'chflr01_4', 'chflr02_1', 'chflr02_2', 'chflr02_3', 'chflr02_4', 'chflr03_1', 'chflr03_2', 
                            'chflr03_3', 'chflr03_4', 'chflr04_1', 'chflr04_2', 'chflr04_3', 'chflr04_4', 'nebumed_india', 'nebulasthalevo_india', 'nebuduo_india', 
                            'nebubude_india', 'banana_india', 'guava_india', 'curdyog_india', 'colddrinks_india', 'icecreams_india', 'cakepastries_india', 
                            'packcrunchy_india', 'medpil1a', 'medpil2a', 'medpil3a', 'medpil4a', 'cbiroth'], 
                          'BinaryEncoding': [], 'TargetEncoding': [], 'CountEncoding': [], 'UnrecognizedColumns': []}

Encoding Pipeline

In [None]:
from feature_engine.encoding import OneHotEncoder, OrdinalEncoder
from sklearn.pipeline import Pipeline


# Extract specific recommendations
one_hot_recommendation = encoding_recommendations['OneHotEncoding']
label_recommendation = encoding_recommendations['LabelEncoding']

# Define the pipeline
gan_encd_pipe = Pipeline(
    [
        ("ohe", OneHotEncoder(variables=one_hot_recommendation, drop_last=True)),
        ("labelencoding", OrdinalEncoder(encoding_method="arbitrary", variables=label_recommendation)),
    ]
)

# Fit the pipeline (example usage)
gan_encd_pipe.fit(X_train_impu)

Checking the datatypes in the dataset

In [None]:
# Initialize a dictionary to categorize columns by data type
dtype_categories = {
    'int64': [],
    'float64': [],
    'object': [],
    'bool': [],
    'datetime64[ns]': [],
    'timedelta64[ns]': [],
    'category': [],
    'int32': [],
    'float32': [],
    'int16': [],
    'float16': [],
    'uint8': [],
    'uint16': [],
    'uint32': [],
    'uint64': []
}

# Iterate through columns and categorize them based on dtype
for column in X_train_enc.columns:
    dtype = str(X_train_enc[column].dtype)
    if dtype in dtype_categories:
        dtype_categories[dtype].append(column)
    else:
        print(f"Warning: Column '{column}' has an unexpected dtype '{dtype}'")

# Print categorized columns
for dtype, columns in dtype_categories.items():
    print(f"{dtype}: {columns if columns else 'None'}")


# END OF FEATURE ENGINEERING