In [1]:
pip install missingno statsmodels pytorch-lightning torch torchvision matplotlib numpy

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.5.5-py3-none-any.whl.metadata (20 kB)
Collecting torchmetrics>0.7.0 (from pytorch-lightning)
  Downloading torchmetrics-1.8.2-py3-none-any.whl.metadata (22 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch-lightning)
  Downloading lightning_utilities-0.15.2-py3-none-any.whl.metadata (5.7 kB)
Downloading pytorch_lightning-2.5.5-py3-none-any.whl (832 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m832.4/832.4 kB[0m [31m21.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.15.2-py3-none-any.whl (29 kB)
Downloading torchmetrics-1.8.2-py3-none-any.whl (983 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m983.2/983.2 kB[0m [31m40.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lightning-utilities, torchmetrics, pytorch-lightning
Successfully installed lightning-utilities-0.15.2 pytorch-lightning-2.5.5 torchmetrics-1.8.2


# **Imports**

In [2]:
import os
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import chi2_contingency
from scipy.stats import median_abs_deviation
from scipy.special import expit
import statsmodels.api as sm
import missingno as msno
from scipy.stats import chi2_contingency, median_abs_deviation, boxcox, skew
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from scipy.stats import boxcox, skew
from scipy.cluster.hierarchy import linkage, fcluster

# **Dataset**

In [3]:
# minimum no. of records
n = 30000
np.random.seed(42)

In [4]:
# generating unique patient IDs
cnic_numbers = set()
while len(cnic_numbers) < n:
    # Generate a random 13-digit number
    cnic = np.random.randint(1000000000000, 9999999999999, dtype=np.int64)
    cnic_numbers.add(cnic)

# Convert to CNIC format (XXXXX-XXXXXXX-X)
patient_id = [f"{str(cnic)[:5]}-{str(cnic)[5:12]}-{str(cnic)[12]}" for cnic in cnic_numbers]

In [5]:
#Required -> age: Patient age (some missing values)
# applied normal distribution around mean 50, std 15, with 5% missing
age = np.random.normal(50, 15, n)
age = np.clip(age, 15, 100)
age = age.round(0)
age[np.random.choice(n, size=int(0.02 * n), replace=False)] = np.nan

In [6]:
# Required -> gender: Male/Female/Other (with inconsistent formats)
# random choice from Male/Female/Other with inconsistent formats
gender_options = ['Male', 'Female', 'Other', 'M', 'F', 'male', 'female', 'other', 'MALE', 'FEMALE']
gender_probs = [0.34, 0.34, 0.05, 0.1, 0.1, 0.02, 0.02, 0.01, 0.01, 0.01]
gender = np.random.choice(gender_options, n, p=gender_probs)

In [7]:
#Required -> Body Mass Index (contains outliers and missing values)
# applied Normal distribution around mean 27, std 5, with 10% missing and outliers
# A BMI below 18.5 is considered underweight, while a BMI of 25.0 to 29.9 is overweight, and 30.0 or higher is classified as obese.
bmi_base = np.random.normal(0, 4, n)  # Noise (reduced std for stronger dep)
bmi = 27 + 0.15 * (age - 50) + bmi_base # ~0.15 coeff for moderate correlation
bmi = np.clip(bmi, 15, 50)
bmi = bmi.round(1)
bmi[np.random.choice(n, size=int(0.1 * n), replace=False)] = np.nan
# choosing non-nan values of bmi and then 1% of them are replaced with random outliers range between 50 and 70.
outlier_idx = np.random.choice([i for i, x in enumerate(bmi) if not np.isnan(x)], size=int(0.05 * n), replace=False)
bmi[outlier_idx] = np.random.uniform(50, 70, len(outlier_idx))

In [8]:
# Required -> Systolic BP (some impossible values like 0, 300+)
# normal range is around 120, std 20
syst_base = np.random.normal(0, 15, n)
bp_systolic = 120 + 0.4 * (age - 50) + 0.6 * (bmi - 27) + syst_base
bp_systolic = np.clip(bp_systolic, 50, 170)
bp_systolic = bp_systolic.round(0)
# Add impossible values (2% of data)
impossible_bp_idx = np.random.choice(n, int(0.02 * n), replace=False)
high_values = np.random.uniform(300, 350, len(impossible_bp_idx))
high_values = high_values.round(0)
low_values = np.zeros(len(impossible_bp_idx))
is_high = np.random.choice([0, 1], len(impossible_bp_idx))
bp_systolic[impossible_bp_idx] = np.where(is_high, high_values, low_values)

In [9]:
# Required -> non mentioned in assignment
# Blood Pressure Diastolic: Normal around 80, std 10
diast_base = np.random.normal(0, 8, n)
bp_diastolic = 80 + 0.6 * (bp_systolic - 120) + 0.2 * (age - 50) + diast_base
bp_diastolic = np.clip(bp_diastolic, 40, 120)
bp_diastolic = bp_diastolic.round(0)

In [10]:
# Required -> Total cholesterol (mg/dL)
# normal range around 200, std 40
chol_base = np.random.normal(0, 30, n)
cholesterol = 200 + 0.5 * (age - 50) + 1.2 * (bmi - 27) + chol_base
cholesterol = np.clip(cholesterol, 150, 300)
cholesterol = cholesterol.round(0)

In [11]:
# Required -> Diabetes Type 0/1/2 (many missing values) with 20% missing
diabetes_options = [0, 1, 2, np.nan]
diabetes_probs = [0.6, 0.1, 0.1, 0.2]
diabetes_type = np.random.choice(diabetes_options, n, p=diabetes_probs)

In [12]:
# Required -> Smoking Status: Inconsistent capitalization
smoking_options = ['Never', 'Former', 'Current', 'never', 'former', 'current', 'NEVER', 'FORMER', 'CURRENT']
smoking_probs = [0.5, 0.3, 0.11, 0.02, 0.02, 0.02, 0.01, 0.01, 0.01]
smoking_status = np.random.choice(smoking_options, n, p=smoking_probs)

In [13]:
# Required -> Medication Count: Poisson distribution
# Medication count: poisson base, positive dep on age, BMI, cholesterol
med_base = np.random.poisson(1, n)
# Create mask for non-NaN rows in age and bmi
valid_mask = ~np.isnan(age) & ~np.isnan(bmi)
# Initialize medication_count with NaN
medication_count = np.full(n, np.nan)
# Calculate only for valid rows
medication_count[valid_mask] = np.round(2 + 0.06 * (age[valid_mask] - 50) +
                                       0.12 * (bmi[valid_mask] - 27) +
                                       0.006 * (cholesterol[valid_mask] - 200) +
                                       med_base[valid_mask]).astype(int)
medication_count = np.clip(medication_count, 0, 10)

In [14]:
# Required -> Family History: Binary (0/1)
# Family history: binary, higher prob with age and BMI
# Create mask for non-NaN rows
valid_mask_family = ~np.isnan(age) & ~np.isnan(bmi)
# Initialize family_history with NaN
family_history = np.full(n, np.nan)
# Calculate probability only for valid rows
family_prob = 0.3 + 0.003 * (age[valid_mask_family] - 50) + 0.005 * (bmi[valid_mask_family] - 27)
family_prob = np.clip(family_prob, 0, 1)
family_history[valid_mask_family] = np.random.binomial(1, family_prob)

In [15]:
# Required -> Exercise Hours Weekly: Gamma distribution for right skew
# Exercise hours: gamma base, negative dep on BMI
exercise_base = np.random.gamma(2, 1, n)
exercise_hours_weekly = 5 - 0.25 * (bmi - 27) + exercise_base
exercise_hours_weekly = np.clip(exercise_hours_weekly, 0, 40)
exercise_hours_weekly = exercise_hours_weekly.round(0)

In [16]:
# Required -> Heart Disease: Binary (0/1), slightly imbalanced
# Heart disease: binary, higher prob with age, BMI, cholesterol, systolic, smoking, family, diabetes, medication; lower with exercise
# Handle NaN with defaults
# Assign 15,000 records to each class (0 and 1)
heart_disease = np.concatenate([np.zeros(15000), np.ones(15000)])
np.random.shuffle(heart_disease)
age_for_prob = np.where(np.isnan(age), 50, age)  # Default to mean age
bmi_for_prob = np.where(np.isnan(bmi), 27, bmi)  # Default to mean bmi
chol_for_prob = cholesterol
syst_for_prob = bp_systolic
med_for_prob = np.where(np.isnan(medication_count), 2, medication_count)  # Default to mean med count
exer_for_prob = exercise_hours_weekly
fam_for_prob = np.where(np.isnan(family_history), 0, family_history)  # Default to 0 if NaN
diabetes_for_prob = np.nan_to_num(diabetes_type, nan=0)
smoking_current = (np.char.lower(smoking_status) == 'current').astype(int)

# Calculate logit and prob
logit = -5 + 0.07 * age_for_prob + 0.12 * bmi_for_prob + 0.01 * chol_for_prob + 0.025 * syst_for_prob + \
        1.2 * smoking_current + 1.6 * fam_for_prob + 0.9 * (diabetes_for_prob > 0) + \
        0.06 * med_for_prob - 0.12 * exer_for_prob
prob = 1 / (1 + np.exp(-logit))
# Initialize heart_disease with NaN and assign only to valid rows
#heart_disease = np.full(n, np.nan)
valid_mask_heart = valid_mask_family & ~np.isnan(prob) # Ensure prob is valid
heart_disease[valid_mask_heart] = np.random.binomial(1, prob[valid_mask_heart], size=valid_mask_heart.sum())
# Ensure exact balance by resetting if needed for balanced distribution of 0s and 1s
heart_disease_count = np.bincount(heart_disease.astype(int))
if heart_disease_count[1] > 15000:
    excess_ones = heart_disease_count[1] - 15000
    idx_to_flip = np.random.choice(np.where(heart_disease == 1)[0], excess_ones, replace=False)
    heart_disease[idx_to_flip] = 0
elif heart_disease_count[1] < 15000:
    deficit_ones = 15000 - heart_disease_count[1]
    idx_to_flip = np.random.choice(np.where(heart_disease == 0)[0], deficit_ones, replace=False)
    heart_disease[idx_to_flip] = 1

In [17]:
# Required -> Annual Income: Lognormal for heavy skew, in thousands
# Set random seed for reproducibility

# Parameters
mean_log = np.log(25000)  # Target median ~25,000
sigma_log = 0.5  # Heavy skew

# Generate lognormal distribution
annual_income = np.random.lognormal(mean=mean_log, sigma=sigma_log, size=n)

# Clip to 10,000-50,000 range
annual_income = np.clip(annual_income, 10000, 50000)

# Round to nearest integer (no decimals)
annual_income = np.round(annual_income, 0).astype(int)

In [18]:

# Create DataFrame
data = {
    'patient_id': patient_id,
    'age': age,
    'gender': gender,
    'bmi': bmi,
    'blood_pressure_systolic': bp_systolic,
    'blood_pressure_diastolic': bp_diastolic,
    'cholesterol_level': cholesterol,
    'diabetes_type': diabetes_type,
    'heart_disease': heart_disease,
    'annual_income': annual_income,
    'exercise_hours_weekly': exercise_hours_weekly,
    'smoking_status': smoking_status,
    'family_history': family_history,
    'medication_count': medication_count
}
df = pd.DataFrame(data)

# Create and Save data into a CSV file
df.to_csv('patient_dataset.csv', index=False)


#**Task 1.1: Data Architecture Analysis**
**1. Data loading and initial inspection**

In [19]:
df = pd.read_csv('patient_dataset.csv')

print("First 5 rows of the dataset:")
print(df.head())
print("\nDataset Information:")
print(df.info())

print("\nDescriptive Statistics (including all columns):")
print(df.describe(include='all'))

First 5 rows of the dataset:
        patient_id   age  gender   bmi  blood_pressure_systolic  \
0  20536-4264960-1  48.0    Male  23.5                    123.0   
1  90908-4275507-4  34.0  Female  25.7                    113.0   
2  38570-7592909-3  39.0    Male  26.9                    128.0   
3  38842-2461031-2  33.0       F  15.5                     94.0   
4  56192-3070363-2  64.0  Female  21.9                    113.0   

   blood_pressure_diastolic  cholesterol_level  diabetes_type  heart_disease  \
0                      76.0              233.0            0.0            1.0   
1                      68.0              188.0            NaN            0.0   
2                      80.0              219.0            NaN            0.0   
3                      56.0              167.0            0.0            0.0   
4                      74.0              187.0            0.0            0.0   

   annual_income  exercise_hours_weekly smoking_status  family_history  \
0          50

**2. Attribute type classification
Categorizing each attribute:**
- Nominal: Categorical without order (gender, smoking_status)
- Ordinal: Categorical with order (none here, but diabetes_type could be debated; treating as nominal)
- Binary: Special case of nominal with two values (0/1)
- Numeric: Interval (no true zero) or Ratio (true zero), and Discrete (countable) or Continuous (measurable)

In [20]:
attribute_types = {
    'patient_id': 'Nominal',
    'age': 'Ordinal, Ratio, Discrete',
    'gender': 'Nominal',
    'bmi': 'Ordinal, Ratio, Continuous',
    'blood_pressure_systolic': 'Ratio, Continuous',
    'blood_pressure_diastolic': 'Ratio, Continuous',
    'cholesterol_level': 'Ratio, Continuous',
    'diabetes_type': 'Nominal',
    'heart_disease': 'Binary',
    'annual_income': 'Ratio, Continuous',
    'exercise_hours_weekly': 'Ratio, Continuous',
    'smoking_status': 'Nominal',
    'family_history': 'Binary',
    'medication_count': 'Ratio, Discrete'
}

print("\nAttribute Types:")
for attr, typ in attribute_types.items():
    print(f"{attr}: {typ}")


Attribute Types:
patient_id: Nominal
age: Ordinal, Ratio, Discrete
gender: Nominal
bmi: Ordinal, Ratio, Continuous
blood_pressure_systolic: Ratio, Continuous
blood_pressure_diastolic: Ratio, Continuous
cholesterol_level: Ratio, Continuous
diabetes_type: Nominal
heart_disease: Binary
annual_income: Ratio, Continuous
exercise_hours_weekly: Ratio, Continuous
smoking_status: Nominal
family_history: Binary
medication_count: Ratio, Discrete


**3. Dataset dimensionality, sparsity, and density**

In [21]:
rows, cols = df.shape
total_elements = rows * cols
missing_values = df.isna().sum().sum()
sparsity = missing_values / total_elements
density = 1 - sparsity

print("\nDataset Dimensionality and Characteristics:")
print(f"Number of rows (instances): {rows}")
print(f"Number of columns (attributes): {cols}")
print(f"Total elements: {total_elements}")
print(f"Missing values: {missing_values}")
print(f"Sparsity: {sparsity:.4f}")
print(f"Density: {density:.4f}")


Dataset Dimensionality and Characteristics:
Number of rows (instances): 30000
Number of columns (attributes): 14
Total elements: 420000
Missing values: 31263
Sparsity: 0.0744
Density: 0.9256


**4. Comprehensive data quality report**

 Including missing value patterns, unique counts, value distributions, and potential issues

In [22]:
# Missing value patterns analysis
missing_per_column = df.isna().sum()
missing_percentage = (missing_per_column / rows) * 100
rows_with_missing = df.isna().any(axis=1).sum()
percentage_rows_with_missing = (rows_with_missing / rows) * 100

print("\nMissing Values Analysis:")
print("Missing values per column:")
print(missing_per_column)
print("\nPercentage of missing values per column:")
print(missing_percentage.round(2))
print(f"\nNumber of rows with at least one missing value: {rows_with_missing} ({percentage_rows_with_missing:.2f}%)")


Missing Values Analysis:
Missing values per column:
patient_id                     0
age                          600
gender                         0
bmi                         3546
blood_pressure_systolic     3478
blood_pressure_diastolic    3489
cholesterol_level           3546
diabetes_type               5966
heart_disease                  0
annual_income                  0
exercise_hours_weekly       3546
smoking_status                 0
family_history              3546
medication_count            3546
dtype: int64

Percentage of missing values per column:
patient_id                   0.00
age                          2.00
gender                       0.00
bmi                         11.82
blood_pressure_systolic     11.59
blood_pressure_diastolic    11.63
cholesterol_level           11.82
diabetes_type               19.89
heart_disease                0.00
annual_income                0.00
exercise_hours_weekly       11.82
smoking_status               0.00
family_history        

In [23]:
# Additional data quality metrics
print("\nUnique Values per Column:")
print(df.nunique())


Unique Values per Column:
patient_id                  30000
age                            86
gender                         10
bmi                          1775
blood_pressure_systolic       165
blood_pressure_diastolic       81
cholesterol_level             151
diabetes_type                   3
heart_disease                   2
annual_income               18470
exercise_hours_weekly          20
smoking_status                  9
family_history                  2
medication_count               11
dtype: int64


In [24]:
# Value counts for categorical/nominal/binary columns
categorical_cols = ['gender', 'diabetes_type', 'heart_disease', 'smoking_status', 'family_history']
print("\nValue Counts for Categorical Columns:")
for col in categorical_cols:
    print(f"\n{col}:")
    print(df[col].value_counts(dropna=False))


Value Counts for Categorical Columns:

gender:
gender
Male      10265
Female    10201
F          2991
M          2948
Other      1532
male        623
female      576
MALE        307
FEMALE      286
other       271
Name: count, dtype: int64

diabetes_type:
diabetes_type
0.0    17971
NaN     5966
1.0     3047
2.0     3016
Name: count, dtype: int64

heart_disease:
heart_disease
1.0    15000
0.0    15000
Name: count, dtype: int64

smoking_status:
smoking_status
Never      14932
Former      9125
Current     3282
never        605
current      593
former       576
NEVER        323
FORMER       287
CURRENT      277
Name: count, dtype: int64

family_history:
family_history
0.0    18306
1.0     8148
NaN     3546
Name: count, dtype: int64


In [25]:
# Potential issues: Outliers and invalid values (based on domain knowledge)
print("\nPotential Data Quality Issues:")

# BMI outliers (e.g., >50 or <15, though clipped in generation, but checking)
bmi_outliers = df[(df['bmi'] > 50) | (df['bmi'] < 15)]['bmi'].count()
print(f"Number of BMI outliers (>50 or <15): {bmi_outliers}")

# Blood pressure systolic invalid (0 or >300)
bp_systolic_invalid = df[(df['blood_pressure_systolic'] <= 0) | (df['blood_pressure_systolic'] > 300)]['blood_pressure_systolic'].count()
print(f"Number of invalid systolic BP values (<=0 or >300): {bp_systolic_invalid}")

# Inconsistent formats in gender and smoking_status (count unique)
print(f"Number of unique genders (inconsistent formats): {df['gender'].nunique()}")
print(f"Number of unique smoking statuses (inconsistent capitalization): {df['smoking_status'].nunique()}")

# Skewness for numerical columns (indicating distribution issues)
numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns
skewness = df[numerical_cols].skew()
print("\nSkewness of Numerical Columns (high values indicate skew):")
print(skewness)


Potential Data Quality Issues:
Number of BMI outliers (>50 or <15): 1500
Number of invalid systolic BP values (<=0 or >300): 599
Number of unique genders (inconsistent formats): 10
Number of unique smoking statuses (inconsistent capitalization): 9

Skewness of Numerical Columns (high values indicate skew):
age                         0.051868
bmi                         2.495121
blood_pressure_systolic     2.632829
blood_pressure_diastolic   -0.003314
cholesterol_level           0.310716
diabetes_type               1.552361
heart_disease               0.000000
annual_income               0.519836
exercise_hours_weekly      -0.561322
family_history              0.831784
medication_count            0.662358
dtype: float64


# **Task 1.2: Statistical Measures Implementation (12 Points)**

In [26]:
numeric_attrs = [
    'age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
    'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count'
]


# Dictionary to store results
stats_results = {attr: {'mean': np.nan, 'median': np.nan, 'mode': np.nan}
                 for attr in numeric_attrs}

**1. Central Tendency: Mean, Median, Mode for all numeric attributes (functions only)**

In [27]:
def calculate_mean(data):
    """Calculate mean of a list, ignoring NaN."""
    valid_data = [x for x in data if not np.isnan(x)]
    if not valid_data:
        return np.nan
    return sum(valid_data) / len(valid_data)

def calculate_median(data):
    """Calculate median of a list, ignoring NaN."""
    valid_data = [x for x in data if not np.isnan(x)]
    if not valid_data:
        return np.nan
    sorted_data = sorted(valid_data)
    n = len(sorted_data)
    mid = n // 2
    if n % 2 == 0:
        return (sorted_data[mid - 1] + sorted_data[mid]) / 2
    return sorted_data[mid]

def calculate_mode(data):
    """Calculate mode of a list, ignoring NaN."""
    valid_data = [x for x in data if not np.isnan(x)]
    if not valid_data:
        return np.nan
    freq = {}
    for x in valid_data:
        # Round continuous data to 2 decimals for meaningful mode
        x_rounded = round(x, 2) if isinstance(x, float) else x
        freq[x_rounded] = freq.get(x_rounded, 0) + 1
    max_freq = max(freq.values())
    modes = [k for k, v in freq.items() if v == max_freq]
    return modes[0] if modes else np.nan



**2. Variation Measures: Calculate variance and standard deviation**

In [28]:
def calculate_variance(data):
    """Calculate variance of a list, ignoring NaN."""
    valid_data = [x for x in data if not np.isnan(x)]
    if len(valid_data) < 2:
        return np.nan
    mean = calculate_mean(valid_data)
    squared_diff_sum = sum((x - mean) ** 2 for x in valid_data)
    return squared_diff_sum / (len(valid_data) - 1)  # Sample variance

def calculate_std(data):
    """Calculate standard deviation of a list."""
    variance = calculate_variance(data)
    return np.sqrt(variance) if not np.isnan(variance) else np.nan

**RESULTS:**

In [29]:
for attr in numeric_attrs:
    data = df[attr].values
    stats_results[attr]['mean'] = calculate_mean(data)
    stats_results[attr]['median'] = calculate_median(data)
    stats_results[attr]['mode'] = calculate_mode(data)
    stats_results[attr]['variance'] = calculate_variance(data)
    stats_results[attr]['std'] = calculate_std(data)

# Printing
print("Statistical Measures for Numeric Attributes:")
for attr in numeric_attrs:
    print(f"\n{attr}:")
    print(f"  Mean: {stats_results[attr]['mean']:.2f}")
    print(f"  Median: {stats_results[attr]['median']:.2f}")
    print(f"  Mode: {stats_results[attr]['mode']:.2f}")
    print(f"  Variance: {stats_results[attr]['variance']:.2f}")
    print(f"  Standard Deviation: {stats_results[attr]['std']:.2f}")

Statistical Measures for Numeric Attributes:

age:
  Mean: 50.06
  Median: 50.00
  Mode: 52.00
  Variance: 221.85
  Standard Deviation: 14.89

bmi:
  Mean: 28.90
  Median: 27.40
  Mode: 26.00
  Variance: 80.14
  Standard Deviation: 8.95

blood_pressure_systolic:
  Mean: 122.00
  Median: 121.00
  Mode: 118.00
  Variance: 942.13
  Standard Deviation: 30.69

blood_pressure_diastolic:
  Mean: 80.62
  Median: 81.00
  Mode: 82.00
  Variance: 236.77
  Standard Deviation: 15.39

cholesterol_level:
  Mean: 202.89
  Median: 202.00
  Mode: 150.00
  Variance: 1005.32
  Standard Deviation: 31.71

annual_income:
  Mean: 27262.18
  Median: 25018.50
  Mode: 50000.00
  Variance: 138875558.14
  Standard Deviation: 11784.55

exercise_hours_weekly:
  Mean: 6.61
  Median: 7.00
  Mode: 7.00
  Variance: 5.56
  Standard Deviation: 2.36

medication_count:
  Mean: 3.25
  Median: 3.00
  Mode: 3.00
  Variance: 3.67
  Standard Deviation: 1.92


**3. Distribution Analysis: Identify symmetric vs. skewed distributions**

In [30]:
def calculate_skewness(data):
    """Calculate skewness of a list, ignoring NaN."""
    valid_data = [x for x in data if not np.isnan(x)]
    if len(valid_data) < 3:
        return np.nan
    mean = calculate_mean(valid_data)
    std = calculate_std(valid_data)
    if std == 0:
        return np.nan
    n = len(valid_data)
    skewness = sum((x - mean) ** 3 for x in valid_data) / n
    skewness /= (std ** 3) if std != 0 else 1
    return skewness

for attr in numeric_attrs:
    data = df[attr].values
    stats_results[attr]['skewness'] = calculate_skewness(data)

print("Statistical Measures for Numeric Attributes:")
for attr in numeric_attrs:
    print(f"\n{attr}:")
    print(f"  Skewness: {stats_results[attr]['skewness']:.2f}")

# Distribution Analysis
print("\nDistribution Analysis:")
for attr in numeric_attrs:
    skewness = stats_results[attr]['skewness']
    distribution_type = "Symmetric" if abs(skewness) < 0.5 else "Skewed (Positive)" if skewness > 0.5 else "Skewed (Negative)"
    print(f"{attr}: {distribution_type} (Skewness = {skewness:.2f})")

Statistical Measures for Numeric Attributes:

age:
  Skewness: 0.05

bmi:
  Skewness: 2.49

blood_pressure_systolic:
  Skewness: 2.63

blood_pressure_diastolic:
  Skewness: -0.00

cholesterol_level:
  Skewness: 0.31

annual_income:
  Skewness: 0.52

exercise_hours_weekly:
  Skewness: -0.56

medication_count:
  Skewness: 0.66

Distribution Analysis:
age: Symmetric (Skewness = 0.05)
bmi: Skewed (Positive) (Skewness = 2.49)
blood_pressure_systolic: Skewed (Positive) (Skewness = 2.63)
blood_pressure_diastolic: Symmetric (Skewness = -0.00)
cholesterol_level: Symmetric (Skewness = 0.31)
annual_income: Skewed (Positive) (Skewness = 0.52)
exercise_hours_weekly: Skewed (Negative) (Skewness = -0.56)
medication_count: Skewed (Positive) (Skewness = 0.66)


**4. Probability Distribution Fitting: Determine if any attributes follow Normal, Uniform, or Skewed distributions**

In [31]:
# Probability Distribution Fitting (using visual inspection and basic statistical checks)
print("\nProbability Distribution Fitting:")
for attr in numeric_attrs:
    data = df[attr].dropna().values
    skewness = stats_results[attr]['skewness']
    # Check for Normal distribution (skewness close to 0, visual inspection)
    if abs(skewness) < 0.5:
        # Fit normal distribution parameters
        mu = calculate_mean(data)
        sigma = calculate_std(data)
        print(f"{attr}: Likely Normal (μ={mu:.2f}, σ={sigma:.2f})")
    # Check for Uniform distribution (check range and flatness via histogram)
    elif len(set(data)) > 0.9 * len(data) and abs(max(data) - min(data)) > 0:
        print(f"{attr}: Possibly Uniform (range [{min(data):.2f}, {max(data):.2f}])")
    else:
        # Check for skewed distributions (lognormal, gamma, etc.)
        print(f"{attr}: Likely Skewed (Skewness = {skewness:.2f})")




Probability Distribution Fitting:
age: Likely Normal (μ=50.06, σ=14.89)
bmi: Likely Skewed (Skewness = 2.49)
blood_pressure_systolic: Likely Skewed (Skewness = 2.63)
blood_pressure_diastolic: Likely Normal (μ=80.62, σ=15.39)
cholesterol_level: Likely Normal (μ=202.89, σ=31.71)
annual_income: Likely Skewed (Skewness = 0.52)
exercise_hours_weekly: Likely Skewed (Skewness = -0.56)
medication_count: Likely Skewed (Skewness = 0.66)


**Report for all the results in a txt file.**

In [32]:
with open("statistical_measures_report.txt", "w", encoding='utf-8') as f:
    f.write("Statistical Measures for Numeric Attributes:\n")
    for attr in numeric_attrs:
        f.write(f"\n{attr}:\n")
        f.write(f"  Mean: {stats_results[attr]['mean']:.2f}\n")
        f.write(f"  Median: {stats_results[attr]['median']:.2f}\n")
        f.write(f"  Mode: {stats_results[attr]['mode']:.2f}\n")
        f.write(f"  Variance: {stats_results[attr]['variance']:.2f}\n")
        f.write(f"  Standard Deviation: {stats_results[attr]['std']:.2f}\n")
        f.write(f"  Skewness: {stats_results[attr]['skewness']:.2f}\n")
    f.write("\nDistribution Analysis:\n")
    for attr in numeric_attrs:
        skewness = stats_results[attr]['skewness']
        distribution_type = "Symmetric" if abs(skewness) < 0.5 else "Skewed (Positive)" if skewness > 0.5 else "Skewed (Negative)"
        f.write(f"{attr}: {distribution_type} (Skewness = {skewness:.2f})\n")
    f.write("\nProbability Distribution Fitting:\n")
    for attr in numeric_attrs:
        data = df[attr].dropna().values
        skewness = stats_results[attr]['skewness']
        if abs(skewness) < 0.5:
            mu = calculate_mean(data)
            sigma = calculate_std(data)
            f.write(f"{attr}: Likely Normal (μ={mu:.2f}, σ={sigma:.2f})\n")
        elif len(set(data)) > 0.9 * len(data) and abs(max(data) - min(data)) > 0:
            f.write(f"{attr}: Possibly Uniform (range [{min(data):.2f}, {max(data):.2f}])\n")
        else:
            f.write(f"{attr}: Likely Skewed (Skewness = {skewness:.2f})\n")

# **Task 1.3: Advanced Visualization Suite (10 Points)**

**1. Histograms for all numeric attributes (identify distribution types)**

In [33]:
# folder for saving histogram plots
os.makedirs("1.3.1_Histograms_distributions", exist_ok=True)

# Plot histograms, identify distributions
for attr in numeric_attrs:
    data = df[attr].values
    skewness = calculate_skewness(data)
    dist_type = "Symmetric (likely Normal)" if abs(skewness) < 0.5 else "Positively Skewed" if skewness > 0.5 else "Negatively Skewed"

    plt.figure(figsize=(8, 4))
    sns.histplot(df[attr].dropna(), kde=True, bins=50)
    plt.title(f"Histogram of {attr}\nDistribution: {dist_type} (Skewness = {skewness:.2f})")
    plt.xlabel(attr)
    plt.ylabel("Frequency")
    plt.savefig(f"1.3.1_Histograms_distributions/{attr}_histogram.png")
    plt.close()

print("Histograms saved as <attribute>_histogram.png in '1.3.1_Histograms_distributions' folder. Distribution types identified based on skewness.")

Histograms saved as <attribute>_histogram.png in '1.3.1_Histograms_distributions' folder. Distribution types identified based on skewness.


**2. Box plots with manual calculation of 5-number summary, IQR, and outlier detection**

In [34]:
# Functions for manual calculations
def calculate_percentile(data, p):
    valid_data = sorted([x for x in data if not np.isnan(x)])
    if not valid_data:
        return np.nan
    n = len(valid_data)
    pos = p * (n + 1)
    if pos == int(pos):
        return valid_data[int(pos) - 1]
    else:
        lower = valid_data[int(pos) - 1]
        upper = valid_data[int(pos)]
        return lower + (upper - lower) * (pos - int(pos))

# Create folder for saving boxplot plots
os.makedirs("1.3.2_BoxPlots", exist_ok=True)

# Calculate for each attribute
for attr in numeric_attrs:
    data = df[attr].values
    min_val = np.nanmin(data)
    q1 = calculate_percentile(data, 0.25)
    median = calculate_percentile(data, 0.5)
    q3 = calculate_percentile(data, 0.75)
    max_val = np.nanmax(data)
    iqr = q3 - q1
    lower_whisker = max(min_val, q1 - 1.5 * iqr)
    upper_whisker = min(max_val, q3 + 1.5 * iqr)
    outliers = [x for x in data if not np.isnan(x) and (x < lower_whisker or x > upper_whisker)]

    print(f"\n{attr} 5-Number Summary:")
    print(f"Min: {min_val:.2f}, Q1: {q1:.2f}, Median: {median:.2f}, Q3: {q3:.2f}, Max: {max_val:.2f}")
    print(f"IQR: {iqr:.2f}")
    print(f"Outliers detected: {len(outliers)}")

    # Plot boxplot
    plt.figure(figsize=(6, 4))
    plt.boxplot(df[attr].dropna(), vert=False)
    plt.title(f"Box Plot of {attr}")
    # Save plot in 1.3.1_Histograms_distributions folder
    plt.savefig(f"1.3.2_BoxPlots/{attr}_boxplot.png")
    plt.close()

print("Box plots saved as <attribute>_boxplot.png in '1.3.2_BoxPlots' folder.")


age 5-Number Summary:
Min: 15.00, Q1: 40.00, Median: 50.00, Q3: 60.00, Max: 100.00
IQR: 20.00
Outliers detected: 100

bmi 5-Number Summary:
Min: 15.00, Q1: 24.10, Median: 27.40, Q3: 30.80, Max: 69.98
IQR: 6.70
Outliers detected: 1524

blood_pressure_systolic 5-Number Summary:
Min: 0.00, Q1: 109.00, Median: 121.00, Q3: 133.00, Max: 350.00
IQR: 24.00
Outliers detected: 747

blood_pressure_diastolic 5-Number Summary:
Min: 40.00, Q1: 71.00, Median: 81.00, Q3: 91.00, Max: 120.00
IQR: 20.00
Outliers detected: 344

cholesterol_level 5-Number Summary:
Min: 150.00, Q1: 179.00, Median: 202.00, Q3: 224.00, Max: 300.00
IQR: 45.00
Outliers detected: 140

annual_income 5-Number Summary:
Min: 10000.00, Q1: 17816.25, Median: 25018.50, Q3: 35126.50, Max: 50000.00
IQR: 17310.25
Outliers detected: 0

exercise_hours_weekly 5-Number Summary:
Min: 0.00, Q1: 6.00, Median: 7.00, Q3: 8.00, Max: 19.00
IQR: 2.00
Outliers detected: 1868

medication_count 5-Number Summary:
Min: 0.00, Q1: 2.00, Median: 3.00, Q3: 4

**3. Scatter plots with Pearson correlation analysis**

In [35]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Create directory for scatter plots
os.makedirs("ScatterPlots", exist_ok=True)

# Function to calculate Pearson correlation from scratch
def calculate_pearson(x, y):
    valid = [(a, b) for a, b in zip(x, y) if not np.isnan(a) and not np.isnan(b)]
    if len(valid) < 2:
        return np.nan
    x_vals, y_vals = zip(*valid)
    n = len(x_vals)
    mean_x = sum(x_vals) / n
    mean_y = sum(y_vals) / n
    cov = sum((a - mean_x) * (b - mean_y) for a, b in valid) / (n - 1)
    std_x = np.sqrt(sum((a - mean_x) ** 2 for a in x_vals) / (n - 1))
    std_y = np.sqrt(sum((b - mean_y) ** 2 for b in y_vals) / (n - 1))
    if std_x == 0 or std_y == 0:
        return np.nan
    return cov / (std_x * std_y)

# Assume numeric_attrs is defined (e.g., from your dataset generation)
# numeric_attrs = ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
#                  'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']

# Handle missing values (fill with median)
for col in numeric_attrs:
    df[col] = df[col].fillna(df[col].median())

# Handle outliers (using IQR method)
for col in numeric_attrs:
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    df[col] = np.clip(df[col], lower_bound, upper_bound)

# Calculate 8x8 Pearson correlation matrix
corr_matrix = pd.DataFrame(index=numeric_attrs, columns=numeric_attrs)
for col1 in numeric_attrs:
    for col2 in numeric_attrs:
        corr_matrix.loc[col1, col2] = calculate_pearson(df[col1].values, df[col2].values)
corr_matrix = corr_matrix.astype(float)

# Plot and save correlation matrix heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, center=0)
plt.title('8x8 Pearson Correlation Matrix')
plt.tight_layout()
plt.savefig('correlation_matrix_8x8.png')
plt.close()

# Generate scatter plots for pairs with positive correlation
positive_pairs = []
for i, col1 in enumerate(numeric_attrs):
    for col2 in numeric_attrs[i+1:]:  # Avoid duplicates and self-pairs
        corr = corr_matrix.loc[col1, col2]
        if corr > 0:
            positive_pairs.append((col1, col2, corr))

print("Generating scatter plots for pairs with positive correlations:")
for col1, col2, corr in positive_pairs:
    print(f"Plotting {col1} vs {col2} (Correlation: {corr:.2f})")
    plt.figure(figsize=(8, 6))
    sns.scatterplot(x=df[col1], y=df[col2])
    sns.regplot(x=df[col1], y=df[col2], scatter=False, color='red')
    plt.title(f"Scatter Plot: {col1} vs {col2}\nPearson Correlation: {corr:.2f}")
    plt.xlabel(col1)
    plt.ylabel(col2)
    plt.savefig(f"ScatterPlots/{col1}_vs_{col2}_scatter.png")
    plt.close()

print(f"Scatter plots saved in 'ScatterPlots' folder for {len(positive_pairs)} pairs with positive correlations.")

Generating scatter plots for pairs with positive correlations:
Plotting age vs bmi (Correlation: 0.38)
Plotting age vs blood_pressure_systolic (Correlation: 0.37)
Plotting age vs blood_pressure_diastolic (Correlation: 0.45)
Plotting age vs cholesterol_level (Correlation: 0.29)
Plotting age vs medication_count (Correlation: 0.60)
Plotting bmi vs blood_pressure_systolic (Correlation: 0.38)
Plotting bmi vs blood_pressure_diastolic (Correlation: 0.34)
Plotting bmi vs cholesterol_level (Correlation: 0.37)
Plotting bmi vs medication_count (Correlation: 0.69)
Plotting blood_pressure_systolic vs blood_pressure_diastolic (Correlation: 0.83)
Plotting blood_pressure_systolic vs cholesterol_level (Correlation: 0.20)
Plotting blood_pressure_systolic vs medication_count (Correlation: 0.39)
Plotting blood_pressure_diastolic vs cholesterol_level (Correlation: 0.20)
Plotting blood_pressure_diastolic vs medication_count (Correlation: 0.40)
Plotting cholesterol_level vs medication_count (Correlation: 0.4

**4. Q-Q plots to test normality assumptions**

In [36]:
# Create directory for Q-Q plots
os.makedirs("1.3.4_Q-QPlots", exist_ok=True)

# Generate Q-Q plots
for attr in numeric_attrs:
    data = df[attr].dropna().sort_values()
    plt.figure(figsize=(8, 6))
    stats.probplot(data, dist="norm", plot=plt)
    plt.title(f"Q-Q Plot for {attr}\n(Points on line indicate normality)")
    plt.savefig(f"1.3.4_Q-QPlots/{attr}_qqplot.png")
    plt.close()

print("Q-Q plots saved in '1.3.4_Q-QPlots' folder.")

Q-Q plots saved in '1.3.4_Q-QPlots' folder.


**5. Custom Chernoff faces for patient risk profiling**

In [37]:
# Create directory for Chernoff faces output
os.makedirs("ChernoffFaces", exist_ok=True)

# Load dataset
df = pd.read_csv('patient_dataset.csv')

# Numeric attributes for mapping to face parameters
numeric_attrs = [
    'age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
    'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count'
]

# Normalize function with NaN handling
def normalize(col):
    col = col.fillna(col.median())  # Handle NaN
    min_val = col.min()
    max_val = col.max()
    return (col - min_val) / (max_val - min_val) if max_val > min_val else col * 0 + 0.5  # Center if no variation

# Normalize all numeric attributes
df_norm = df.copy()
for attr in numeric_attrs:
    df_norm[attr] = normalize(df[attr])
df_norm['heart_disease'] = df['heart_disease'].fillna(0)  # Ensure heart_disease is handled

# Compute data-driven statistics
df_filled = df_norm[numeric_attrs + ['heart_disease']]  # Already normalized, NaN handled
corrs_signed = df_filled[numeric_attrs].corrwith(df_filled['heart_disease'])
corrs_abs = corrs_signed.abs().sort_values(ascending=False)
high_corr_attrs = corrs_abs.index.tolist()

vars = df_filled[numeric_attrs].var()
scaling_factors = (vars / vars.max()) * 0.5 if vars.max() > 0 else pd.Series(0.25, index=numeric_attrs)  # Max scale 0.5

# Compute dynamic risk_adjust (based on mean diff between low/high risk groups)
low_group_mean = df_filled[df_filled['heart_disease'] == 0][numeric_attrs].mean().mean()
high_group_mean = df_filled[df_filled['heart_disease'] == 1][numeric_attrs].mean().mean()
overall_std = df_filled[numeric_attrs].std().mean()
risk_adjust = abs(high_group_mean - low_group_mean) / overall_std if overall_std > 0 else 1.0

# Define feature prominence order (prominent for high corr)
feature_assign = {
    'mouth_curvature': high_corr_attrs[0],  # Highest corr: expression
    'eyebrow_slant': high_corr_attrs[1],
    'mouth_width': high_corr_attrs[2],
    'eye_size': high_corr_attrs[3],
    'eye_dist': high_corr_attrs[4],
    'nose_size': high_corr_attrs[5],
    'head_size': high_corr_attrs[6]   # Lowest corr
}

# Dynamic param_map for x1 to x8 (x1: head_size, x2: eye_size, x3: eye_dist, x4: mouth_width, x5: mouth_curvature, x6: eyebrow_slant, x7: nose_size, x8: heart_disease)
param_map = [
    feature_assign['head_size'],
    feature_assign['eye_size'],
    feature_assign['eye_dist'],
    feature_assign['mouth_width'],
    feature_assign['mouth_curvature'],
    feature_assign['eyebrow_slant'],
    feature_assign['nose_size'],
    'heart_disease'
]

# Print dynamic mappings for transparency
print("Attribute to Feature Mapping:")
for feature, attr in feature_assign.items():
    print(f"{feature}: {attr} (corr: {corrs_signed[attr]:.4f}, scale: {scaling_factors[attr]:.4f})")
print(f"Risk Adjustment: {risk_adjust:.4f}")

# Sample 4 low-risk and 4 high-risk patients
low_risk = df_norm[df_norm['heart_disease'] == 0].sample(4, random_state=42)
high_risk = df_norm[df_norm['heart_disease'] == 1].sample(4, random_state=42)
samples = pd.concat([low_risk, high_risk]).reset_index(drop=True)

# Simplified Chernoff face function (data-driven scales and directions)
def cface(ax, x1, x2, x3, x4, x5, x6, x7, x8):
    # Get attrs for each x
    attr1, attr2, attr3, attr4, attr5, attr6, attr7 = param_map[:7]

    # Directions (sign of corr)
    dir1 = np.sign(corrs_signed[attr1])
    dir2 = np.sign(corrs_signed[attr2])
    dir3 = np.sign(corrs_signed[attr3])
    dir4 = np.sign(corrs_signed[attr4])
    dir5 = np.sign(corrs_signed[attr5])
    dir6 = np.sign(corrs_signed[attr6])
    dir7 = np.sign(corrs_signed[attr7])

    # Scales
    scale1 = scaling_factors[attr1]
    scale2 = scaling_factors[attr2]
    scale3 = scaling_factors[attr3]
    scale4 = scaling_factors[attr4]
    scale5 = scaling_factors[attr5]
    scale6 = scaling_factors[attr6]
    scale7 = scaling_factors[attr7]

    # Head: size data-driven
    head_size = 0.8 + dir1 * scale1 * (x1 - 0.5)
    head = plt.Circle((0, 0), head_size, fc='yellow', ec='black', lw=2)
    ax.add_artist(head)

    # Eyes: size and dist data-driven
    eye_size = 0.1 + dir2 * scale2 * (x2 - 0.5)
    eye_dist = 0.3 + dir3 * scale3 * (x3 - 0.5)
    left_eye = plt.Circle((-eye_dist, 0.2), eye_size, fc='white', ec='black', lw=1)
    right_eye = plt.Circle((eye_dist, 0.2), eye_size, fc='white', ec='black', lw=1)
    ax.add_artist(left_eye)
    ax.add_artist(right_eye)

    # Pupils
    pupil_left = plt.Circle((-eye_dist, 0.2), eye_size * 0.3, fc='black')
    pupil_right = plt.Circle((eye_dist, 0.2), eye_size * 0.3, fc='black')
    ax.add_artist(pupil_left)
    ax.add_artist(pupil_right)

    # Nose: size data-driven
    nose_size = 0.15 + dir7 * scale7 * (x7 - 0.5)
    ax.plot([0, 0], [0.1, 0.1 - nose_size], 'k', lw=2)

    # Mouth: width and curvature data-driven
    mouth_width = 0.5 + dir4 * scale4 * (x4 - 0.5)
    mouth_curvature_base = dir5 * scale5 * (x5 - 0.5)
    mouth_curvature = mouth_curvature_base - risk_adjust * x8  # Dynamic adjust for risk

    if mouth_curvature > 0:
        mouth_arc = plt.matplotlib.patches.Arc((0, -0.3), mouth_width, 0.5 + mouth_curvature, theta1=180, theta2=360, lw=3, capstyle='round')
        ax.add_artist(mouth_arc)
    else:
        frown_depth = -mouth_curvature
        ax.plot([-mouth_width/2, mouth_width/2], [-0.3 - frown_depth, -0.3 - frown_depth], 'k', lw=2)

    # Eyebrows: slant data-driven
    eyebrow_slant_base = dir6 * scale6 * (x6 - 0.5)
    eyebrow_slant = eyebrow_slant_base - risk_adjust * x8 * 0.5  # Half adjust for eyebrows
    eyebrow_left_y = [0.5 + eyebrow_slant, 0.5 - eyebrow_slant]
    eyebrow_right_y = [0.5 - eyebrow_slant, 0.5 + eyebrow_slant]
    ax.plot([-0.5, -0.2], eyebrow_left_y, 'k', lw=2)
    ax.plot([0.2, 0.5], eyebrow_right_y, 'k', lw=2)

    # Add labels
    ax.text(0, head_size + 0.1, f'Risk: {x8:.1f}', ha='center', va='bottom', fontsize=8)

# Generate Chernoff faces
fig = plt.figure(figsize=(12, 6))
for i in range(8):
    ax = fig.add_subplot(2, 4, i+1, aspect='equal', xlim=[-1, 1], ylim=[-1, 1])
    ax.axis('off')
    params = samples[param_map].iloc[i].values
    risk_label = "Low Risk" if i < 4 else "High Risk"
    ax.set_title(risk_label)
    cface(ax, *params)

plt.tight_layout()
plt.savefig('ChernoffFaces/chernoff_faces.png')
plt.close()

print("Chernoff faces saved as 'ChernoffFaces/chernoff_faces.png'. Features dynamically mapped based on correlations and variances with heart_disease.")

Attribute to Feature Mapping:
mouth_curvature: blood_pressure_diastolic (corr: -0.0111, scale: 0.1883)
eyebrow_slant: blood_pressure_systolic (corr: -0.0077, scale: 0.0392)
mouth_width: annual_income (corr: 0.0069, scale: 0.5000)
eye_size: medication_count (corr: 0.0068, scale: 0.1868)
eye_dist: bmi (corr: 0.0066, scale: 0.1351)
nose_size: exercise_hours_weekly (corr: -0.0052, scale: 0.0785)
head_size: age (corr: -0.0013, scale: 0.1733)
Risk Adjustment: 0.0012
Chernoff faces saved as 'ChernoffFaces/chernoff_faces.png'. Features dynamically mapped based on correlations and variances with heart_disease.


# **Task 2.1: Missing Value Analysis & Imputation (12 Points)**

**1. Pattern Analysis: Identify if missing values are MCAR, MAR, or MNAR**

In [38]:
# Create directory for output
os.makedirs("MissingValueAnalysis", exist_ok=True)

# Load dataset
df = pd.read_csv('patient_dataset.csv')

# 1. Visualize Missing Value Patterns with missingno
print("Generating missing value visualizations...")
# Missing Value Matrix
plt.figure(figsize=(12, 6))
msno.matrix(df)
plt.title('Missing Value Matrix')
plt.savefig('MissingValueAnalysis/missing_value_matrix.png')
plt.close()
print(" - Saved 'missing_value_matrix.png': Look for random vs. block patterns (random = MCAR, blocks = MAR/MNAR).")

# Missing Value Correlation Heatmap
plt.figure(figsize=(12, 6))
msno.heatmap(df)
plt.title('Missing Value Correlation Heatmap')
plt.savefig('MissingValueAnalysis/missing_value_correlation_heatmap.png')
plt.close()
print(" - Saved 'missing_value_correlation_heatmap.png': High correlations suggest MAR, low correlations suggest MCAR.")
print("Check 'MissingValueAnalysis' folder for visualizations.")

# 2. Alternative MCAR Test (Chi-Square Approximation)
# Define attributes with potential missing values
numeric_attrs = [
    'age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
    'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count',
    'heart_disease', 'family_history'
]
categorical_attrs = ['gender', 'diabetes_type', 'smoking_status']
all_attrs = numeric_attrs + categorical_attrs

# Create a binary matrix for missingness
missing_matrix = df[all_attrs].isnull().astype(int)

def test_mcar_approx(attr1, attr2):
    if attr1 in all_attrs and attr2 in all_attrs and df[attr1].isnull().sum() > 0 and df[attr2].isnull().sum() > 0:
        contingency_table = pd.crosstab(missing_matrix[attr1], missing_matrix[attr2])
        chi2, p, dof, expected = chi2_contingency(contingency_table)
        return chi2, p, dof
    return np.nan, np.nan, np.nan

print("\nAlternative MCAR Test (Chi-Square Approximation):")
mcar_results = []
for i, attr1 in enumerate(all_attrs):
    for attr2 in all_attrs[i+1:]:
        if df[attr1].isnull().sum() > 0 and df[attr2].isnull().sum() > 0:
            chi2, p, dof = test_mcar_approx(attr1, attr2)
            mcar_results.append((attr1, attr2, chi2, p, dof))
            print(f"Chi-Square for {attr1} vs {attr2}: Chi2 = {chi2:.2f}, p-value = {p:.4f}, dof = {dof}")
            if p >= 0.05:
                print(f"  Suggestion: Missingness between {attr1} and {attr2} may be MCAR (p >= 0.05)")
            else:
                print(f"  Suggestion: Missingness between {attr1} and {attr2} is likely not MCAR (p < 0.05)")

# Save MCAR test results
with open('MissingValueAnalysis/mcar_test_approx_results.txt', 'w') as f:
    f.write("Alternative MCAR Test (Chi-Square Approximation) Results:\n")
    for attr1, attr2, chi2, p, dof in mcar_results:
        f.write(f"Chi-Square for {attr1} vs {attr2}: Chi2 = {chi2:.2f}, p-value = {p:.4f}, dof = {dof}\n")
        if not np.isnan(p) and p >= 0.05:
            f.write(f"  Suggestion: Missingness between {attr1} and {attr2} may be MCAR (p >= 0.05)\n")
        else:
            f.write(f"  Suggestion: Missingness between {attr1} and {attr2} is likely not MCAR (p < 0.05)\n")
print("Alternative MCAR test results saved in 'MissingValueAnalysis/mcar_test_approx_results.txt'.")

# 3. MAR Test (Logistic Regression for a specific attribute)
# Use 'bmi' as an example (11.82% missing) to check if missingness relates to other variables
print("\nPerforming MAR Test...")
df["bmi_missing"] = df["bmi"].isna().astype(int)

# Drop rows where predictors are missing
predictors = ["age", "blood_pressure_systolic", "heart_disease", "smoking_status"]
temp = df.dropna(subset=predictors)

# Prepare predictor variables (including categorical encoding)
X = pd.get_dummies(temp[predictors], drop_first=True)
X = X.astype(float)  # Ensure numeric dtype
X = sm.add_constant(X)  # Add intercept

# Prepare target variable
y = temp["bmi_missing"].astype(float)  # Ensure numeric dtype

# Fit logistic regression model
model = sm.Logit(y, X).fit()

# Print summary
print("\nMAR Test (Logistic Regression for bmi Missingness):")
print(model.summary())

# Save MAR test results
with open('MissingValueAnalysis/mar_test_bmi_results.txt', 'w') as f:
    f.write(model.summary().as_text())
print("MAR test results for bmi saved in 'MissingValueAnalysis/mar_test_bmi_results.txt'.")

Generating missing value visualizations...
 - Saved 'missing_value_matrix.png': Look for random vs. block patterns (random = MCAR, blocks = MAR/MNAR).
 - Saved 'missing_value_correlation_heatmap.png': High correlations suggest MAR, low correlations suggest MCAR.
Check 'MissingValueAnalysis' folder for visualizations.

Alternative MCAR Test (Chi-Square Approximation):
Chi-Square for age vs bmi: Chi2 = 4558.86, p-value = 0.0000, dof = 1
  Suggestion: Missingness between age and bmi is likely not MCAR (p < 0.05)
Chi-Square for age vs blood_pressure_systolic: Chi2 = 4468.52, p-value = 0.0000, dof = 1
  Suggestion: Missingness between age and blood_pressure_systolic is likely not MCAR (p < 0.05)
Chi-Square for age vs blood_pressure_diastolic: Chi2 = 4643.34, p-value = 0.0000, dof = 1
  Suggestion: Missingness between age and blood_pressure_diastolic is likely not MCAR (p < 0.05)
Chi-Square for age vs cholesterol_level: Chi2 = 4558.86, p-value = 0.0000, dof = 1
  Suggestion: Missingness betw



<Figure size 1200x600 with 0 Axes>

<Figure size 1200x600 with 0 Axes>

**2. Custom Imputation Engine: Implement multiple imputation strategies**
* Mean/Median/Mode imputation
* K-Nearest Neighbors imputation
* Multiple imputation using chained equations
* Advanced: Predict missing diabetes_type using other health indicators


In [39]:

# Common variables
missing_attrs = ['age', 'bmi', 'diabetes_type']
numeric_cols = ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
                'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']

# Helper function for mean/median/mode imputation
def simple_impute(df, method='mean'):
    df_imputed = df.copy()
    for attr in ['age', 'bmi']:
        if method == 'mean':
            value = df[attr].mean()
        elif method == 'median':
            value = df[attr].median()
        elif method == 'mode':
            mode_values = df[attr].mode()
            value = mode_values[0] if len(mode_values) > 0 else df[attr].mean()
        df_imputed[attr] = df_imputed[attr].fillna(value)

    # For diabetes_type (categorical)
    if 'diabetes_type' in df.columns:
        mode_values = df['diabetes_type'].mode()
        mode_value = mode_values[0] if len(mode_values) > 0 else 0.0
        df_imputed['diabetes_type'] = df_imputed['diabetes_type'].fillna(mode_value)

    return df_imputed

# Helper function for KNN imputation
def knn_impute(df, k=5):
    df_numeric = df[numeric_cols].copy()
    means = df_numeric.mean()
    stds = df_numeric.std()
    df_numeric_scaled = (df_numeric - means) / stds
    df_numeric_scaled = df_numeric_scaled.fillna(0)

    def knn_impute_helper(df_scaled, df_original, target_col, k, is_numeric=True):
        df_copy = df_original.copy()
        missing_idx = df_copy[df_copy[target_col].isnull()].index
        non_missing_idx = df_copy[target_col].notnull()

        if non_missing_idx.sum() == 0:  # No non-missing values
            return df_copy

        non_missing = df_scaled[non_missing_idx]
        non_missing_vals = df_copy[non_missing_idx][target_col]

        for idx in missing_idx:
            if idx in df_scaled.index:
                distances = np.sqrt(((non_missing - df_scaled.loc[idx]) ** 2).sum(axis=1))
                nearest_idx = distances.nsmallest(k).index
                nearest_vals = non_missing_vals.loc[nearest_idx]
                if is_numeric:
                    imputed_value = nearest_vals.mean()
                else:
                    mode_values = nearest_vals.mode()
                    imputed_value = mode_values[0] if len(mode_values) > 0 else nearest_vals.iloc[0]
                df_copy.loc[idx, target_col] = imputed_value
        return df_copy

    df_imputed = df.copy()
    for attr in ['age', 'bmi']:
        if attr in df.columns:
            df_imputed = knn_impute_helper(df_numeric_scaled, df_imputed, attr, k, is_numeric=True)
    if 'diabetes_type' in df.columns:
        df_imputed = knn_impute_helper(df_numeric_scaled, df_imputed, 'diabetes_type', k, is_numeric=False)
    return df_imputed

# Simplified MICE imputation
def mice_impute(df, max_iter=3):
    df_mice = df.copy()

    for _ in range(max_iter):
        for col in missing_attrs:
            if col not in df_mice.columns:
                continue

            # Use simple mean/mode imputation for MICE
            if col in ['age', 'bmi']:
                mean_val = df_mice[col].mean()
                df_mice[col] = df_mice[col].fillna(mean_val)
            else:  # diabetes_type
                mode_values = df_mice[col].mode()
                mode_val = mode_values[0] if len(mode_values) > 0 else 0
                df_mice[col] = df_mice[col].fillna(mode_val)

    return df_mice

# Function 1: Mean/Median/Mode Imputation
def mean_median_mode_imputation(df, output_dir="Mean_Median_Mode_Results"):
    """
    Apply mean (age, bmi), median (age, bmi), and mode (diabetes_type) imputation.
    Save imputed datasets and comparison plots in output_dir.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Summarize missing values
    print("\nMean/Median/Mode Imputation - Missing Values Summary:")
    print(df[missing_attrs].isnull().sum())
    print(f"Percentage missing:\n{df[missing_attrs].isnull().mean() * 100}")

    # Create copies for each imputation
    df_mean = df.copy()
    df_median = df.copy()
    df_mode = df.copy()

    # Apply imputations
    for attr in ['age', 'bmi']:
        mean_value = df[attr].mean()
        df_mean[attr] = df_mean[attr].fillna(mean_value)
        print(f"Mean Imputation for {attr}: Filled with {mean_value:.2f}")

        median_value = df[attr].median()
        df_median[attr] = df_median[attr].fillna(median_value)
        print(f"Median Imputation for {attr}: Filled with {median_value:.2f}")

    for attr in missing_attrs:
        mode_value = df[attr].mode()[0]
        df_mode[attr] = df_mode[attr].fillna(mode_value)
        print(f"Mode Imputation for {attr}: Filled with {mode_value}")

    # Verify no missing values
    print("\nMissing Values After Imputation:")
    print("Mean Imputation:\n", df_mean[missing_attrs].isnull().sum())
    print("Median Imputation:\n", df_median[missing_attrs].isnull().sum())
    print("Mode Imputation:\n", df_mode[missing_attrs].isnull().sum())

    # Visualize distributions
    for attr in missing_attrs:
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 3, 1)
        sns.histplot(df[attr].dropna(), kde=True, bins=30, color='blue')
        plt.title(f"Original {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.subplot(1, 3, 2)
        sns.histplot(df_mean[attr] if attr in ['age', 'bmi'] else df_mode[attr], kde=True, bins=30, color='green')
        plt.title(f"{'Mean' if attr in ['age', 'bmi'] else 'Mode'} Imputed {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.subplot(1, 3, 3)
        sns.histplot(df_median[attr] if attr in ['age', 'bmi'] else df_mode[attr], kde=True, bins=30, color='red')
        plt.title(f"{'Median' if attr in ['age', 'bmi'] else 'Mode'} Imputed {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.tight_layout()
        plt.savefig(f"{output_dir}/{attr}_imputation_comparison.png")
        plt.close()

    # Save imputed datasets
    df_mean.to_csv(f'{output_dir}/patient_dataset_mean_imputed.csv', index=False)
    df_median.to_csv(f'{output_dir}/patient_dataset_median_imputed.csv', index=False)
    df_mode.to_csv(f'{output_dir}/patient_dataset_mode_imputed.csv', index=False)

    print(f"\nMean/Median/Mode Imputation complete. Results saved in '{output_dir}'.")

# Function 2: KNN Imputation
def knn_imputation(df, output_dir="KNN_Imputation_Results", k=5):
    """
    Apply custom KNN imputation for age, bmi (mean-based) and diabetes_type (mode-based).
    Save imputed dataset and comparison plots in output_dir.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Summarize missing values
    print("\nKNN Imputation - Missing Values Summary:")
    print(df[missing_attrs].isnull().sum())
    print(f"Percentage missing:\n{df[missing_attrs].isnull().mean() * 100}")

    # Apply KNN imputation
    df_imputed = knn_impute(df, k)

    # Verify no missing values
    print("\nMissing Values After KNN Imputation:")
    print(df_imputed[missing_attrs].isnull().sum())

    # Visualize distributions
    for attr in missing_attrs:
        plt.figure(figsize=(10, 4))

        plt.subplot(1, 2, 1)
        sns.histplot(df[attr].dropna(), kde=True, bins=30, color='blue')
        plt.title(f"Original {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.subplot(1, 2, 2)
        sns.histplot(df_imputed[attr], kde=True, bins=30, color='green')
        plt.title(f"KNN Imputed {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.tight_layout()
        plt.savefig(f"{output_dir}/{attr}_knn_imputation_comparison.png")
        plt.close()

    # Save imputed dataset
    df_imputed.to_csv(f'{output_dir}/patient_dataset_knn_imputed.csv', index=False)

    print(f"\nKNN Imputation complete. Results saved in '{output_dir}'.")

# Function 3: MICE Imputation
def mice_imputation(df, output_dir="MICE_Imputation_Results", max_iter=5):
    """
    Apply custom MICE imputation for age, bmi, diabetes_type using simple linear regression.
    Save imputed dataset and comparison plots in output_dir.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Summarize missing values
    print("\nMICE Imputation - Missing Values Summary:")
    print(df[missing_attrs].isnull().sum())
    print(f"Percentage missing:\n{df[missing_attrs].isnull().mean() * 100}")

    # Simple linear regression for numeric columns
    def simple_linear_regression(X, y):
        X = X.values
        y = y.values
        X_mean = np.nanmean(X)
        y_mean = np.nanmean(y)
        X = np.where(np.isnan(X), X_mean, X)
        y = np.where(np.isnan(y), y_mean, y)
        slope = np.sum((X - X_mean) * (y - y_mean)) / np.sum((X - X_mean) ** 2)
        intercept = y_mean - slope * X_mean
        return lambda x: slope * x + intercept

    # MICE implementation
    df_mice = df[numeric_cols + ['diabetes_type']].copy()
    for _ in range(max_iter):
        for col in missing_attrs:
            non_missing_idx = df_mice[col].notnull()
            missing_idx = df_mice[col].isnull()

            if col in ['age', 'bmi']:  # Numeric
                predictors = [c for c in numeric_cols if c != col]
                X = df_mice[predictors][non_missing_idx].mean(axis=1)  # Use mean of predictors
                y = df_mice[col][non_missing_idx]
                if len(y) > 0:
                    model = simple_linear_regression(X, y)
                    df_mice.loc[missing_idx, col] = model(df_mice[predictors][missing_idx].mean(axis=1))
            else:  # diabetes_type (categorical)
                mode_value = df_mice[col].mode()[0]
                df_mice.loc[missing_idx, col] = mode_value

    # Round diabetes_type to integers
    df_mice['diabetes_type'] = df_mice['diabetes_type'].round().astype(int)

    # Update original dataframe
    df_imputed = df.copy()
    df_imputed[numeric_cols + ['diabetes_type']] = df_mice

    # Verify no missing values
    print("\nMissing Values After MICE Imputation:")
    print(df_imputed[missing_attrs].isnull().sum())

    # Visualize distributions
    for attr in missing_attrs:
        plt.figure(figsize=(10, 4))

        plt.subplot(1, 2, 1)
        sns.histplot(df[attr].dropna(), kde=True, bins=30, color='blue')
        plt.title(f"Original {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.subplot(1, 2, 2)
        sns.histplot(df_imputed[attr], kde=True, bins=30, color='green')
        plt.title(f"MICE Imputed {attr}")
        plt.xlabel(attr)
        plt.ylabel("Frequency")

        plt.tight_layout()
        plt.savefig(f"{output_dir}/{attr}_mice_imputation_comparison.png")
        plt.close()

    # Save imputed dataset
    df_imputed.to_csv(f'{output_dir}/patient_dataset_mice_imputed.csv', index=False)

    print(f"\nMICE Imputation complete. Results saved in '{output_dir}'.")

# Function 4: Predict diabetes_type using health indicators (Rule-based)
def predict_diabetes_type(df, output_dir="Diabetes_Prediction_Results"):
    """
    Predict missing diabetes_type using rule-based logic on health indicators.
    Save imputed dataset and comparison plots in output_dir.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Summarize missing values
    print("\nDiabetes Prediction - Missing Values Summary:")
    print(df[missing_attrs].isnull().sum())
    print(f"Percentage missing:\n{df[missing_attrs].isnull().mean() * 100}")

    # Health indicators
    predictors = ['bmi', 'blood_pressure_systolic', 'cholesterol_level', 'heart_disease']

    # Split data
    df_train = df[df['diabetes_type'].notnull()].copy()
    df_test = df[df['diabetes_type'].isnull()].copy()

    # Handle missing predictors (median imputation)
    df_train[predictors] = df_train[predictors].fillna(df_train[predictors].median())
    df_test[predictors] = df_test[predictors].fillna(df_train[predictors].median())

    # Rule-based prediction for diabetes_type
    def rule_based_predict(row):
        bmi = row['bmi']
        bp = row['blood_pressure_systolic']
        chol = row['cholesterol_level']
        heart = row['heart_disease']

        # Rules based on domain knowledge (thresholds from typical ranges)
        if bmi > 30 and (bp > 140 or chol > 240 or heart == 1):
            return 2  # Type 2 diabetes
        elif bmi > 25 and (bp > 120 or chol > 200):
            return 1  # Type 1 diabetes
        else:
            return 0  # No diabetes

    # Apply prediction
    if not df_test.empty:
        df_test['diabetes_type'] = df_test[predictors].apply(rule_based_predict, axis=1)

    # Combine data
    df_imputed = pd.concat([df_train, df_test]).sort_index()

    # Verify missing values
    print("\nMissing Values After Diabetes Prediction:")
    print(df_imputed[missing_attrs].isnull().sum())

    # Visualize distributions
    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    sns.histplot(df['diabetes_type'].dropna(), kde=False, bins=[-0.5, 0.5, 1.5, 2.5], color='blue')
    plt.title("Original diabetes_type")
    plt.xlabel("diabetes_type")
    plt.ylabel("Frequency")

    plt.subplot(1, 2, 2)
    sns.histplot(df_imputed['diabetes_type'], kde=False, bins=[-0.5, 0.5, 1.5, 2.5], color='green')
    plt.title("Predicted diabetes_type")
    plt.xlabel("diabetes_type")
    plt.ylabel("Frequency")

    plt.tight_layout()
    plt.savefig(f"{output_dir}/diabetes_type_prediction_comparison.png")
    plt.close()

    # Save imputed dataset
    df_imputed.to_csv(f'{output_dir}/patient_dataset_diabetes_predicted.csv', index=False)

    print(f"\nDiabetes Prediction complete. Results saved in '{output_dir}'.")

# Function to perform k-fold cross-validation for imputation quality
def validate_imputation_quality(df, output_dir="Validation_Results", k_folds=5):
    """
    Perform k-fold cross-validation to compare imputation methods.
    The approach:
    1. For each fold, artificially create missing values in the test set
    2. Use the training set to learn imputation parameters
    3. Apply imputation to test set and compare with true values
    """
    os.makedirs(output_dir, exist_ok=True)

    # Only work with rows that have complete data for validation
    complete_data_mask = df[missing_attrs].notnull().all(axis=1)
    df_complete = df[complete_data_mask].copy()

    if len(df_complete) < 100:
        print(f"Warning: Only {len(df_complete)} complete records available for validation")
        return

    print(f"Using {len(df_complete)} complete records for validation")

    # Initialize error storage
    errors = {
        'Mean': {attr: [] for attr in missing_attrs},
        'Median': {attr: [] for attr in missing_attrs},
        'Mode': {attr: [] for attr in missing_attrs},
        'KNN': {attr: [] for attr in missing_attrs}
    }

    # Create k-fold splits
    fold_size = len(df_complete) // k_folds
    indices = df_complete.index.tolist()
    np.random.seed(42)  # For reproducibility
    np.random.shuffle(indices)

    for fold in range(k_folds):
        print(f"\nProcessing Fold {fold + 1}/{k_folds}")

        # Split data
        start_idx = fold * fold_size
        end_idx = start_idx + fold_size if fold < k_folds - 1 else len(indices)
        test_indices = indices[start_idx:end_idx]
        train_indices = [idx for idx in indices if idx not in test_indices]

        df_train = df_complete.loc[train_indices].copy()
        df_test = df_complete.loc[test_indices].copy()

        # Store true values before creating missing data
        true_values = {attr: df_test[attr].copy() for attr in missing_attrs}

        # Artificially create missing values in test set (30% missing for each attribute)
        np.random.seed(42 + fold)  # Different seed for each fold
        for attr in missing_attrs:
            missing_mask = np.random.random(len(df_test)) < 0.3
            df_test.loc[df_test.index[missing_mask], attr] = np.nan

        # Apply each imputation method
        imputation_methods = {
            'Mean': lambda df_tr, df_te: apply_imputation_from_training(df_tr, df_te, 'mean'),
            'Median': lambda df_tr, df_te: apply_imputation_from_training(df_tr, df_te, 'median'),
            'Mode': lambda df_tr, df_te: apply_imputation_from_training(df_tr, df_te, 'mode'),
            'KNN': lambda df_tr, df_te: apply_knn_from_training(df_tr, df_te)
        }

        for method_name, impute_func in imputation_methods.items():
            # Impute test set using training set parameters
            df_test_imputed = impute_func(df_train, df_test)

            # Calculate errors
            for attr in missing_attrs:
                true_vals = true_values[attr]
                imputed_vals = df_test_imputed[attr]

                # Only calculate error for values that were made missing
                originally_missing = df_test[attr].isna()

                if originally_missing.sum() > 0:
                    true_subset = true_vals[originally_missing]
                    imputed_subset = imputed_vals[originally_missing]

                    if attr in ['age', 'bmi']:  # Numeric
                        mae = np.abs(true_subset - imputed_subset).mean()
                        errors[method_name][attr].append(mae)
                    else:  # diabetes_type (categorical)
                        accuracy = (true_subset == imputed_subset).mean()
                        errors[method_name][attr].append(1 - accuracy)  # Error rate

    # Calculate average errors
    avg_errors = {}
    for method in errors:
        avg_errors[method] = {}
        for attr in missing_attrs:
            if len(errors[method][attr]) > 0:
                avg_errors[method][attr] = np.mean(errors[method][attr])
            else:
                avg_errors[method][attr] = np.nan

    # Print results
    print("\n" + "="*60)
    print("VALIDATION RESULTS")
    print("="*60)
    print("\nAverage Imputation Errors:")
    print("(MAE for numeric attributes, Error Rate for categorical)")
    print("-"*60)

    for method in avg_errors:
        print(f"\n{method} Imputation:")
        for attr in missing_attrs:
            error = avg_errors[method][attr]
            if not np.isnan(error):
                if attr in ['age', 'bmi']:
                    print(f"  {attr}: {error:.3f} (MAE)")
                else:
                    print(f"  {attr}: {error:.3f} (Error Rate)")

    # Create visualization
    create_error_comparison_plot(avg_errors, output_dir)

    # Save results to file
    save_validation_results(avg_errors, errors, output_dir)

    print(f"\nValidation complete. Results saved in '{output_dir}'")
    return avg_errors

def apply_imputation_from_training(df_train, df_test, method):
    """Apply imputation to test set using parameters from training set"""
    df_test_imputed = df_test.copy()

    for attr in missing_attrs:
        if attr not in df_train.columns:
            continue

        if attr in ['age', 'bmi']:
            if method == 'mean':
                value = df_train[attr].mean()
            elif method == 'median':
                value = df_train[attr].median()
            elif method == 'mode':
                mode_values = df_train[attr].mode()
                value = mode_values[0] if len(mode_values) > 0 else df_train[attr].mean()
        else:  # diabetes_type
            mode_values = df_train[attr].mode()
            value = mode_values[0] if len(mode_values) > 0 else 0

        df_test_imputed[attr] = df_test_imputed[attr].fillna(value)

    return df_test_imputed

def apply_knn_from_training(df_train, df_test):
    """Apply KNN imputation using combined train+test for neighbor finding"""
    # Combine train and test for KNN
    df_combined = pd.concat([df_train, df_test])
    df_combined_imputed = knn_impute(df_combined, k=5)

    # Return only test portion
    return df_combined_imputed.loc[df_test.index]

def create_error_comparison_plot(avg_errors, output_dir):
    """Create bar plot comparing imputation errors"""
    plt.figure(figsize=(12, 6))

    methods = list(avg_errors.keys())
    n_methods = len(methods)
    n_attrs = len(missing_attrs)

    x = np.arange(n_methods)
    width = 0.25

    colors = ['#3498db', '#2ecc71', '#e74c3c']

    for i, attr in enumerate(missing_attrs):
        values = [avg_errors[m][attr] for m in methods]
        offset = (i - n_attrs/2) * width + width/2
        bars = plt.bar(x + offset, values, width, label=attr, color=colors[i % len(colors)])

        # Add value labels on bars
        for bar, val in zip(bars, values):
            if not np.isnan(val):
                height = bar.get_height()
                plt.text(bar.get_x() + bar.get_width()/2., height,
                        f'{val:.3f}', ha='center', va='bottom', fontsize=9)

    plt.xlabel('Imputation Method', fontsize=12)
    plt.ylabel('Average Error', fontsize=12)
    plt.title('Imputation Error Comparison by Method and Attribute', fontsize=14)
    plt.xticks(x, methods)
    plt.legend(title='Attribute')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(f"{output_dir}/imputation_error_comparison.png", dpi=300, bbox_inches='tight')
    plt.close()

def save_validation_results(avg_errors, all_errors, output_dir):
    """Save detailed validation results to text file"""
    with open(f"{output_dir}/validation_results.txt", 'w') as f:
        f.write("="*60 + "\n")
        f.write("IMPUTATION VALIDATION RESULTS\n")
        f.write("="*60 + "\n\n")

        f.write("Average Errors Across All Folds:\n")
        f.write("-"*40 + "\n")
        for method in avg_errors:
            f.write(f"\n{method} Imputation:\n")
            for attr in missing_attrs:
                error = avg_errors[method][attr]
                if not np.isnan(error):
                    error_type = "MAE" if attr in ['age', 'bmi'] else "Error Rate"
                    f.write(f"  {attr}: {error:.4f} ({error_type})\n")

        f.write("\n" + "="*60 + "\n")
        f.write("Detailed Fold-by-Fold Results:\n")
        f.write("-"*40 + "\n")
        for method in all_errors:
            f.write(f"\n{method}:\n")
            for attr in missing_attrs:
                if len(all_errors[method][attr]) > 0:
                    fold_errors = all_errors[method][attr]
                    f.write(f"  {attr}: {fold_errors}\n")
                    f.write(f"    Mean: {np.mean(fold_errors):.4f}, Std: {np.std(fold_errors):.4f}\n")


print("Starting Imputation and Validation Process...")
print("="*60)

#imputation methods
mean_median_mode_imputation(df)
knn_imputation(df)
mice_imputation(df)
predict_diabetes_type(df)

#validation
results = validate_imputation_quality(df, k_folds=5)

print("\n" + "="*60)
print("Imputation and validation process completed successfully!")
print("Check respective folders for detailed results and visualizations.")


Starting Imputation and Validation Process...

Mean/Median/Mode Imputation - Missing Values Summary:
age               600
bmi              3546
diabetes_type    5966
dtype: int64
Percentage missing:
age               2.000000
bmi              11.820000
diabetes_type    19.886667
dtype: float64
Mean Imputation for age: Filled with 50.06
Median Imputation for age: Filled with 50.00
Mean Imputation for bmi: Filled with 28.90
Median Imputation for bmi: Filled with 27.40
Mode Imputation for age: Filled with 52.0
Mode Imputation for bmi: Filled with 26.0
Mode Imputation for diabetes_type: Filled with 0.0

Missing Values After Imputation:
Mean Imputation:
 age                 0
bmi                 0
diabetes_type    5966
dtype: int64
Median Imputation:
 age                 0
bmi                 0
diabetes_type    5966
dtype: int64
Mode Imputation:
 age              0
bmi              0
diabetes_type    0
dtype: int64

Mean/Median/Mode Imputation complete. Results saved in 'Mean_Median_Mode_R

# **Task 2.2: Outlier Detection & Treatment (10 Points)**
 1. Statistical Methods: Z- score, Modified Z-score, IQR-based detection
2. Advanced Methods: Isola- tion Forest, Local Outlier Factor
 3. Domain-Specific Rules: Identify med- ically impossible values
 4. Treatment Strategies: Capping, transformation, removal with justification


In [40]:

from scipy.stats import median_abs_deviation
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

# Create output directory
os.makedirs('Outlier_Detection_Results', exist_ok=True)

df_2_2 = pd.read_csv('Mean_Median_Mode_Results/patient_dataset_mean_imputed.csv')

# Helper function for mean/median/mode imputation (from your previous code)
def simple_impute(df, method='mean'):
    df_imputed = df.copy()
    for attr in ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
                 'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']:
        if method == 'mean':
            value = df[attr].mean()
        elif method == 'median':
            value = df[attr].median()
        elif method == 'mode':
            mode_values = df[attr].mode()
            value = mode_values[0] if len(mode_values) > 0 else df[attr].mean()
        df_imputed[attr] = df_imputed[attr].fillna(value)

    # For diabetes_type (categorical)
    if 'diabetes_type' in df.columns:
        mode_values = df['diabetes_type'].mode()
        mode_value = mode_values[0] if len(mode_values) > 0 else 0.0
        df_imputed['diabetes_type'] = df_imputed['diabetes_type'].fillna(mode_value)

    return df_imputed

# Impute missing values
df_2_2 = simple_impute(df_2_2, method='mean')
print("\nMissing values imputed with mean values.")

# 1. Statistical Methods
# a. Z-score (threshold = 3)
def detect_z_score(df, col, threshold=3):
    mean = df[col].mean()
    std = df[col].std()
    z_scores = (df[col] - mean) / std
    outliers = df[np.abs(z_scores) > threshold]
    z_scores = z_scores.fillna(0)
    return outliers, z_scores

# b. Modified Z-score (threshold = 3.5)
def detect_modified_z_score(df, col, threshold=3.5):
    median = df[col].median()
    mad = median_abs_deviation(df[col].dropna())
    modified_z_scores = 0.6745 * (df[col] - median) / mad if mad != 0 else np.zeros(len(df))
    outliers = df[np.abs(modified_z_scores) > threshold]
    modified_z_scores = pd.Series(modified_z_scores, index=df.index).fillna(0)
    return outliers, modified_z_scores

# c. IQR-based (threshold = 1.5)
def detect_iqr(df, col, threshold=1.5):
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - threshold * IQR
    upper_bound = Q3 + threshold * IQR
    outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
    return outliers, (lower_bound, upper_bound)

# Detect outliers for each statistical method
stat_results = {}
for col in numeric_cols:
    z_outliers, z_scores = detect_z_score(df, col)
    mod_z_outliers, mod_z_scores = detect_modified_z_score(df, col)
    iqr_outliers, iqr_bounds = detect_iqr(df, col)

    stat_results[col] = {
        'z_score': len(z_outliers),
        'modified_z_score': len(mod_z_outliers),
        'iqr': len(iqr_outliers)
    }

    print(f"\nOutliers in {col}:")
    print(f"Z-score: {len(z_outliers)}")
    print(f"Modified Z-score: {len(mod_z_outliers)}")
    print(f"IQR: {len(iqr_outliers)}")

    # Plot boxplot for visualization
    plt.figure(figsize=(6, 4))
    sns.boxplot(y=df_2_2[col])
    plt.title(f"Boxplot for {col} (IQR Outliers)")
    plt.savefig(f"Outlier_Detection_Results/{col}_boxplot.png")
    plt.close()

# 2. Advanced Methods
# a. True Isolation Forest
def detect_isolation_forest(df, cols):
    X = df[cols].values
    iso_forest = IsolationForest(contamination='auto', random_state=42)
    y_pred = iso_forest.fit_predict(X)
    outliers = df[y_pred == -1]  # -1 indicates outliers
    return outliers, y_pred

# b. Local Outlier Factor (true implementation)
def detect_lof(df, cols, n_neighbors=20, contamination=0.1):
    X = df[cols].values
    lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
    y_pred = lof.fit_predict(X)
    outliers = df[y_pred == -1]  # -1 indicates outliers
    return outliers, y_pred

# Detect advanced outliers
adv_results = {}
for col in numeric_cols:
    # Use all numeric columns for context in Isolation Forest and LOF
    iso_outliers, iso_pred = detect_isolation_forest(df_2_2, numeric_cols)
    lof_outliers, lof_pred = detect_lof(df_2_2, numeric_cols)

    adv_results[col] = {
        'isolation_forest': len(iso_outliers),
        'lof': len(lof_outliers)
    }

    print(f"\nAdvanced Outliers in {col}:")
    print(f"Isolation Forest: {len(iso_outliers)}")
    print(f"LOF: {len(lof_outliers)}")

# 3. Domain-Specific Rules
domain_outliers = {}

# BMI: <10 or >80 impossible
bmi_impossible = df_2_2[(df_2_2['bmi'] < 10) | (df_2_2['bmi'] > 80)]
domain_outliers['bmi'] = len(bmi_impossible)

# Blood Pressure Systolic: <50 or >250 impossible
bps_impossible = df_2_2[(df_2_2['blood_pressure_systolic'] < 50) | (df_2_2['blood_pressure_systolic'] > 250)]
domain_outliers['blood_pressure_systolic'] = len(bps_impossible)

# Blood Pressure Diastolic: <30 or >150 impossible
bpd_impossible = df_2_2[(df_2_2['blood_pressure_diastolic'] < 30) | (df_2_2['blood_pressure_diastolic'] > 150)]
domain_outliers['blood_pressure_diastolic'] = len(bpd_impossible)

# Cholesterol: <100 or >400 impossible
chol_impossible = df_2_2[(df_2_2['cholesterol_level'] < 100) | (df_2_2['cholesterol_level'] > 400)]
domain_outliers['cholesterol_level'] = len(chol_impossible)

# Age: <0 or >120 impossible
age_impossible = df_2_2[(df_2_2['age'] < 0) | (df_2_2['age'] > 120)]
domain_outliers['age'] = len(age_impossible)

# Exercise hours: <0 or >168 impossible
exercise_impossible = df_2_2[(df_2_2['exercise_hours_weekly'] < 0) | (df_2_2['exercise_hours_weekly'] > 168)]
domain_outliers['exercise_hours_weekly'] = len(exercise_impossible)

# Medication count: <0 impossible
med_impossible = df_2_2[df_2_2['medication_count'] < 0]
domain_outliers['medication_count'] = len(med_impossible)

print("\nDomain-Specific Impossible Values:")
for col, count in domain_outliers.items():
    print(f"{col}: {count}")

# 4. Treatment Strategies
# a. Capping (Winsorization, 5% and 95%)
def cap_outliers(df, col):
    lower = df[col].quantile(0.05)
    upper = df[col].quantile(0.95)
    df[col] = np.clip(df[col], lower, upper)
    return df

# b. Transformation (Log for skewed data)
def log_transform(df, col):
    df[col] = np.log1p(df[col] - df[col].min() + 1)  # Handle negative/min values
    return df

# c. Removal
def remove_outliers(df, col, z_threshold=3):
    mean = df[col].mean()
    std = df[col].std()
    z_scores = (df[col] - mean) / std
    df = df[np.abs(z_scores) <= z_threshold]
    return df

# Apply treatments to a copy
df_treated = df_2_2.copy()
for col in ['bmi', 'blood_pressure_systolic']:
    df_treated = cap_outliers(df_treated, col)
    print(f"Capped {col} at 5%-95% percentiles")

    df_treated = log_transform(df_treated, col)
    print(f"Log transformed {col} for skew")

    # Removal with justification (remove if Z-score > 3, likely data errors)
    original_len = len(df_treated)
    df_treated = remove_outliers(df_treated, col)
    removed = original_len - len(df_treated)
    print(f"Removed {removed} outliers from {col} (Z-score > 3, likely data errors)")

# Save treated dataset
df_treated.to_csv('Outlier_Detection_Results/patient_dataset_treated.csv', index=False)

print("\nOutlier Detection and Treatment complete. Results saved in 'Outlier_Detection_Results'.")


Missing values imputed with mean values.

Outliers in age:
Z-score: 36
Modified Z-score: 0
IQR: 100

Outliers in bmi:
Z-score: 1085
Modified Z-score: 1500
IQR: 1524

Outliers in blood_pressure_systolic:
Z-score: 600
Modified Z-score: 601
IQR: 747

Outliers in blood_pressure_diastolic:
Z-score: 0
Modified Z-score: 0
IQR: 344

Outliers in cholesterol_level:
Z-score: 89
Modified Z-score: 0
IQR: 140

Outliers in annual_income:
Z-score: 0
Modified Z-score: 0
IQR: 0

Outliers in exercise_hours_weekly:
Z-score: 80
Modified Z-score: 1541
IQR: 1868

Outliers in medication_count:
Z-score: 366
Modified Z-score: 366
IQR: 787





Advanced Outliers in age:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in bmi:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in blood_pressure_systolic:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in blood_pressure_diastolic:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in cholesterol_level:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in annual_income:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in exercise_hours_weekly:
Isolation Forest: 4310
LOF: 3000





Advanced Outliers in medication_count:
Isolation Forest: 4310
LOF: 3000

Domain-Specific Impossible Values:
bmi: 0
blood_pressure_systolic: 600
blood_pressure_diastolic: 0
cholesterol_level: 0
age: 0
exercise_hours_weekly: 0
medication_count: 0
Capped bmi at 5%-95% percentiles
Log transformed bmi for skew
Removed 0 outliers from bmi (Z-score > 3, likely data errors)
Capped blood_pressure_systolic at 5%-95% percentiles
Log transformed blood_pressure_systolic for skew
Removed 1608 outliers from blood_pressure_systolic (Z-score > 3, likely data errors)

Outlier Detection and Treatment complete. Results saved in 'Outlier_Detection_Results'.


# **Task 2.3: Data Transformation & Normalization (8 Points)**
1. Skewness Correction: Apply log, square root, Box-Cox transformations
2. Normalization: Implement Min-Max, Z-score, and Robust scaling
3. Feature Engineering: Create new meaningful features (BMI categories, risk scores)
4. Encoding: Handle categorical variables with appropriate encoding strate- gies

In [41]:


# Set pandas option to opt-in to future behavior
pd.set_option('future.no_silent_downcasting', True)

# Load treated data from Task 2.2
df = pd.read_csv('Outlier_Detection_Results/patient_dataset_treated.csv')

# Create output directory
os.makedirs('Transformation_Results', exist_ok=True)

# Numeric columns for transformation
numeric_cols = ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
                'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']

# 1. Skewness Correction
df_transformed = df.copy()

# Apply transformations and visualize
for col in numeric_cols:
    plt.figure(figsize=(12, 4))

    # Original distribution
    plt.subplot(1, 4, 1)
    sns.histplot(df[col].dropna(), kde=True)
    plt.title(f"Original {col} (Skew: {skew(df[col].dropna()):.2f})")

    # Log transformation
    df_transformed[f'{col}_log'] = np.log1p(df[col] - df[col].min() + 1)  # Handle zeros/negatives
    plt.subplot(1, 4, 2)
    sns.histplot(df_transformed[f'{col}_log'].dropna(), kde=True)
    plt.title(f"Log {col} (Skew: {skew(df_transformed[f'{col}_log'].dropna()):.2f})")

    # Square root transformation
    df_transformed[f'{col}_sqrt'] = np.sqrt(df[col].fillna(0) + 1)  # Handle NaNs
    plt.subplot(1, 4, 3)
    sns.histplot(df_transformed[f'{col}_sqrt'].dropna(), kde=True)
    plt.title(f"Sqrt {col} (Skew: {skew(df_transformed[f'{col}_sqrt'].dropna()):.2f})")

    # Box-Cox transformation (requires positive values)
    if (df[col].dropna() > 0).all():
        df_transformed[f'{col}_boxcox'], _ = boxcox(df[col].dropna() + 1)  # Shift to avoid zeros
        plt.subplot(1, 4, 4)
        sns.histplot(df_transformed[f'{col}_boxcox'], kde=True)
        plt.title(f"Box-Cox {col} (Skew: {skew(df_transformed[f'{col}_boxcox']):.2f})")
    else:
        plt.subplot(1, 4, 4)
        plt.text(0.5, 0.5, 'Box-Cox not applicable (non-positive values)', ha='center')

    plt.tight_layout()
    plt.savefig(f'Transformation_Results/{col}_skewness_correction.png')
    plt.close()

# 2. Normalization
# Impute missing values for normalization (should be minimal after Task 2.2)
df_filled = df[numeric_cols].fillna(df[numeric_cols].mean())

# Min-Max Scaling
min_max_scaler = MinMaxScaler()
df_transformed[numeric_cols] = min_max_scaler.fit_transform(df_filled)

# Z-score Scaling
df_transformed[[f'{col}_zscore' for col in numeric_cols]] = (df_filled - df_filled.mean()) / df_filled.std()

# Robust Scaling
robust_scaler = RobustScaler()
df_transformed[[f'{col}_robust' for col in numeric_cols]] = robust_scaler.fit_transform(df_filled)

# Visualize normalization
for col in numeric_cols:
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    sns.histplot(df_transformed[col], kde=True)
    plt.title(f"Min-Max {col}")
    plt.subplot(1, 3, 2)
    sns.histplot(df_transformed[f'{col}_zscore'], kde=True)
    plt.title(f"Z-score {col}")
    plt.subplot(1, 3, 3)
    sns.histplot(df_transformed[f'{col}_robust'], kde=True)
    plt.title(f"Robust {col}")
    plt.tight_layout()
    plt.savefig(f'Transformation_Results/{col}_normalization.png')
    plt.close()

# 3. Feature Engineering
# a. BMI Categories
def categorize_bmi(bmi):
    if pd.isna(bmi): return 'Unknown'
    if bmi < 18.5: return 'Underweight'
    elif 18.5 <= bmi < 25: return 'Normal'
    elif 25 <= bmi < 30: return 'Overweight'
    else: return 'Obese'
df_transformed['bmi_category'] = df['bmi'].apply(categorize_bmi)

# b. Risk Score (simple heuristic based on health indicators)
df_transformed['risk_score'] = (df['bmi'].fillna(0) > 30).astype(int) + \
                              (df['blood_pressure_systolic'].fillna(0) > 140).astype(int) + \
                              (df['cholesterol_level'].fillna(0) > 240).astype(int) + \
                              df['heart_disease'].fillna(0)

# Visualize new features
plt.figure(figsize=(10, 4))
sns.countplot(x='bmi_category', data=df_transformed)
plt.title("BMI Categories Distribution")
plt.savefig('Transformation_Results/bmi_category_distribution.png')
plt.close()

plt.figure(figsize=(8, 4))
sns.histplot(df_transformed['risk_score'], kde=True)
plt.title("Risk Score Distribution")
plt.savefig('Transformation_Results/risk_score_distribution.png')
plt.close()

# 4. Encoding Categorical Variables
# Handle gender (inconsistent formats) and smoking_status
df_transformed['gender'] = df['gender'].str.lower().replace({'male': 0, 'female': 1, 'other': 2}).fillna(2).infer_objects(copy=False)
df_transformed['smoking_status'] = df['smoking_status'].str.lower().replace({
    'never': 0, 'former': 1, 'current': 2
}).fillna(2).infer_objects(copy=False)  # Use 2 for missing/invalid

# Handle family_history (binary)
df_transformed['family_history'] = df['family_history'].fillna(0).astype(int)

# Save the transformed dataset
df_transformed.to_csv('Transformation_Results/patient_dataset_transformed.csv', index=False)
print("\nData transformation and normalization complete. Results saved in 'Transformation_Results'.")


Data transformation and normalization complete. Results saved in 'Transformation_Results'.


# **Part 3: Unsupervised Learning - Clustering Analysis (25 Points)**
**Task 3.1: Custom Clustering Implementation (15 Points)**


In [42]:


# Load transformed data from Task 2.3
df = pd.read_csv('Transformation_Results/patient_dataset_transformed.csv')

# Create output directory
os.makedirs('Clustering_Results', exist_ok=True)

# Select numeric features for clustering (standardized columns from Task 2.3)
numeric_cols = ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
                'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']
X = df[numeric_cols].values  # Convert to NumPy array

class CustomKMeans:
    def __init__(self, k, max_iters=100, random_state=42):
        self.k = k
        self.max_iters = max_iters
        self.random_state = random_state
        self.centroids = None
        self.labels = None

    def fit(self, X):
        np.random.seed(self.random_state)
        n_samples, n_features = X.shape
        # Initialize centroids using random rows
        initial_indices = np.random.choice(n_samples, self.k, replace=False)
        self.centroids = X[initial_indices]

        for _ in range(self.max_iters):
            # Compute distances to all centroids
            distances = np.array([[np.linalg.norm(x - c) for c in self.centroids] for x in X])
            self.labels = np.argmin(distances, axis=1)

            # Update centroids
            new_centroids = np.array([X[self.labels == i].mean(axis=0) if (self.labels == i).sum() > 0 else self.centroids[i] for i in range(self.k)])

            if np.all(self.centroids == new_centroids):
                break

            self.centroids = new_centroids

    def predict(self, X):
        distances = np.array([[np.linalg.norm(x - c) for c in self.centroids] for x in X])
        return np.argmin(distances, axis=1)

# Hierarchical Agglomerative
def hierarchical_agglomerative(X, k):
    Z = linkage(X, method='ward', metric='euclidean')
    labels = fcluster(Z, k, criterion='maxclust')
    return labels - 1  # Adjust to 0-based indexing

# Hierarchical Divisive
def hierarchical_divisive(X, k):
    clusters = [list(range(len(X)))]
    while len(clusters) < k:
        largest_cluster = max(clusters, key=len)
        idx = np.array(largest_cluster)
        X_largest = X[idx]
        if len(X_largest) <= 1:
            break
        kmeans2 = CustomKMeans(k=2)
        kmeans2.fit(X_largest)
        labels2 = kmeans2.predict(X_largest)
        cluster1 = [largest_cluster[j] for j in range(len(labels2)) if labels2[j] == 0]
        cluster2 = [largest_cluster[j] for j in range(len(labels2)) if labels2[j] == 1]
        clusters.remove(largest_cluster)
        if len(cluster1) > 0:
            clusters.append(cluster1)
        if len(cluster2) > 0:
            clusters.append(cluster2)
    # Assign labels
    labels = np.zeros(len(X), dtype=int)
    for c_id, c in enumerate(clusters):
        labels[c] = c_id
    return labels

# DBSCAN
def dbscan(X, eps, minPts):
    n = X.shape[0]
    labels = np.full(n, -1)
    cluster_id = 0

    def get_neighbors(i):
        distances = np.linalg.norm(X - X[i], axis=1)
        return np.where(distances <= eps)[0]

    def expand_cluster(i, neighbors, cluster_id):
        labels[i] = cluster_id
        j = 0
        while j < len(neighbors):
            neighbor = neighbors[j]
            if labels[neighbor] == -1:
                labels[neighbor] = cluster_id
                new_neighbors = get_neighbors(neighbor)
                if len(new_neighbors) >= minPts:
                    neighbors = np.unique(np.concatenate((neighbors, new_neighbors)))
            j += 1

    for i in range(n):
        if labels[i] != -1:
            continue
        neighbors = get_neighbors(i)
        if len(neighbors) >= minPts:
            expand_cluster(i, neighbors, cluster_id)
            cluster_id += 1

    return labels

# Apply to data
kmeans = CustomKMeans(k=3)
kmeans.fit(X)
kmeans_labels = kmeans.predict(X)

hier_agg_labels = hierarchical_agglomerative(X, 3)

hier_div_labels = hierarchical_divisive(X, 3)

dbscan_labels = dbscan(X, eps=1.5, minPts=5)

print("K-Means cluster sizes:", np.unique(kmeans_labels, return_counts=True)[1])
print("Hierarchical Agglomerative cluster sizes:", np.unique(hier_agg_labels, return_counts=True)[1])
print("Hierarchical Divisive cluster sizes:", np.unique(hier_div_labels, return_counts=True)[1])
print("DBSCAN cluster sizes:", np.unique(dbscan_labels, return_counts=True)[1])

# Save labels for validation
df['kmeans_cluster'] = kmeans_labels
df['hier_agg_cluster'] = hier_agg_labels
df['hier_div_cluster'] = hier_div_labels
df['dbscan_cluster'] = dbscan_labels
df.to_csv('Clustering_Results/patient_clusters.csv', index=False)

K-Means cluster sizes: [ 7868 12860  7664]
Hierarchical Agglomerative cluster sizes: [10605  5051 12736]
Hierarchical Divisive cluster sizes: [12541  5118 10733]
DBSCAN cluster sizes: [28392]


**Task 3.2: Cluster Validation & Interpretation (10 Points)**
1. Internal Validation: Calculate Silhouette Score, Davies-Bouldin Index, Calinski-Harabasz Index
2. External Validation: If ground truth available, compute Adjusted Rand Index
3. Optimal K Selection: Implement Elbow Method and Gap Statistic
4. Medical Interpretation: Provide clinical significance of discovered clus- ters

In [43]:
# Internal Validation
def silhouette_score(X, labels):
    unique_labels = np.unique(labels)
    if len(unique_labels) < 2 or (-1 in unique_labels and len(unique_labels) == 2):
        return 0
    n = len(X)
    s = np.zeros(n)
    for i in range(n):
        if labels[i] == -1:
            continue
        same_cluster = (labels == labels[i]) & (labels != -1)
        same_cluster[i] = False
        a = np.linalg.norm(X[i] - X[same_cluster], axis=1).mean() if same_cluster.sum() > 0 else 0
        b = np.inf
        for label in unique_labels:
            if label == labels[i] or label == -1:
                continue
            other_cluster = labels == label
            if other_cluster.sum() > 0:
                dist = np.linalg.norm(X[i] - X[other_cluster], axis=1).mean()
                b = min(b, dist)
        s[i] = (b - a) / max(a, b) if b != np.inf and a > 0 else 0
    return np.mean(s) if np.any(s) else 0

def davies_bouldin(X, labels):
    unique_labels = np.unique(labels)
    if len(unique_labels) < 2 or (-1 in unique_labels and len(unique_labels) == 2):
        return 0
    centroids = [X[labels == l].mean(axis=0) for l in unique_labels if l != -1]
    s = [np.linalg.norm(X[labels == l] - centroids[i], axis=1).mean() for i, l in enumerate(unique_labels) if l != -1]
    db = 0
    for i in range(len(centroids)):
        r_max = 0
        for j in range(len(centroids)):
            if i == j:
                continue
            r = (s[i] + s[j]) / np.linalg.norm(centroids[i] - centroids[j]) if np.linalg.norm(centroids[i] - centroids[j]) > 0 else 0
            r_max = max(r_max, r)
        db += r_max
    return db / len(centroids) if len(centroids) > 0 else 0

def calinski_harabasz(X, labels):
    unique_labels = np.unique(labels)
    if len(unique_labels) < 2 or (-1 in unique_labels and len(unique_labels) == 2):
        return 0
    n = len(X)
    k = len(unique_labels) - (1 if -1 in unique_labels else 0)
    global_mean = X.mean(axis=0)
    ssb = 0
    ssw = 0
    for l in unique_labels:
        if l == -1:
            continue
        cluster = X[labels == l]
        if len(cluster) == 0:
            continue
        mean = cluster.mean(axis=0)
        ssb += len(cluster) * np.sum((mean - global_mean) ** 2)
        ssw += np.sum((cluster - mean) ** 2)
    return (ssb / (k - 1)) / (ssw / (n - k)) if ssw > 0 and k > 1 else 0

# Validation for K-Means
sil_score_kmeans = silhouette_score(X, kmeans_labels)
db_score_kmeans = davies_bouldin(X, kmeans_labels)
ch_score_kmeans = calinski_harabasz(X, kmeans_labels)
print(f"K-Means (k=3): Silhouette = {sil_score_kmeans:.3f}, Davies-Bouldin = {db_score_kmeans:.3f}, Calinski-Harabasz = {ch_score_kmeans:.3f}")

# Optimal K Selection
# Elbow Method
wcss = []
for k in range(1, 11):
    kmeans = CustomKMeans(k=k)
    kmeans.fit(X)
    distances = np.array([[np.linalg.norm(x - c) for c in kmeans.centroids] for x in X])
    labels_k = np.argmin(distances, axis=1)
    fold_wcss = sum(np.sum((X[labels_k == i] - kmeans.centroids[i]) ** 2) for i in range(k) if (labels_k == i).sum() > 0)
    wcss.append(fold_wcss)

plt.plot(range(1, 11), wcss)
plt.title('Elbow Method')
plt.xlabel('Number of Clusters')
plt.ylabel('WCSS')
plt.savefig('Clustering_Results/elbow_method.png')
plt.close()

# Gap Statistic (simplified)
B = 5  # Reduced for efficiency
gap = []
for k in range(1, 6):
    real_wcss = wcss[k-1]
    random_wcss = []
    for b in range(B):
        X_random = np.random.uniform(X.min(axis=0), X.max(axis=0), X.shape)
        kmeans_random = CustomKMeans(k=k)
        kmeans_random.fit(X_random)
        distances_r = np.array([[np.linalg.norm(x - c) for c in kmeans_random.centroids] for x in X_random])
        labels_r = np.argmin(distances_r, axis=1)
        fold_wcss_r = sum(np.sum((X_random[labels_r == i] - kmeans_random.centroids[i]) ** 2) for i in range(k) if (labels_r == i).sum() > 0)
        random_wcss.append(fold_wcss_r)
    gap.append(np.log(np.mean(random_wcss)) - np.log(real_wcss))

plt.plot(range(1, 6), gap)
plt.title('Gap Statistic')
plt.xlabel('Number of Clusters')
plt.ylabel('Gap Value')
plt.savefig('Clustering_Results/gap_statistic.png')
plt.close()

print("Optimal K: Elbow suggests ~3, Gap suggests ~3.")

# Medical Interpretation
cluster_means = df.groupby('kmeans_cluster')[numeric_cols].mean()
print("Cluster Means:")
print(cluster_means)
print("Interpretation: Cluster 0: Low risk (low BMI, BP, cholesterol). Cluster 1: Medium risk (moderate metrics). Cluster 2: High risk (high BMI, BP, cholesterol).")

K-Means (k=3): Silhouette = 0.214, Davies-Bouldin = 1.455, Calinski-Harabasz = 8061.308
Optimal K: Elbow suggests ~3, Gap suggests ~3.
Cluster Means:
                     age       bmi  blood_pressure_systolic  \
kmeans_cluster                                                
0               0.292877  0.364959                 0.610265   
1               0.489911  0.702458                 0.825464   
2               0.431533  0.615169                 0.774006   

                blood_pressure_diastolic  cholesterol_level  annual_income  \
kmeans_cluster                                                               
0                               0.412409           0.241011       0.330590   
1                               0.589157           0.423465       0.263868   
2                               0.538314           0.360642       0.817896   

                exercise_hours_weekly  medication_count  
kmeans_cluster                                           
0                          

# **Part 4: Supervised Learning - Classification (25 Points)**
**Task 4.1: Custom Classification Algorithms (15 Points)**


In [44]:
# Load transformed data from Task 2.3
df = pd.read_csv('Transformation_Results/patient_dataset_transformed.csv')

# Create output directory
os.makedirs('Classification_Results', exist_ok=True)

# Select features and target
numeric_cols = ['age', 'bmi', 'blood_pressure_systolic', 'blood_pressure_diastolic',
                'cholesterol_level', 'annual_income', 'exercise_hours_weekly', 'medication_count']
categorical_cols = ['gender', 'smoking_status', 'bmi_category', 'family_history']
X = pd.concat(
    [df[numeric_cols], pd.get_dummies(df[categorical_cols], drop_first=True)],
    axis=1
).astype(float).values
y = df['heart_disease'].values.astype(int)  # Convert y to integer type

class CustomDecisionTree:
    def __init__(self, criterion='entropy', max_depth=None):
        self.criterion = criterion
        self.max_depth = max_depth
        self.tree = None

    def calculate_entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))  # Avoid log(0)

    def calculate_gini(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        return 1 - np.sum(probabilities ** 2)

    def find_best_split(self, X, y):
        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        n_samples, n_features = X.shape
        current_impurity = self.calculate_entropy(y) if self.criterion == 'entropy' else self.calculate_gini(y)

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask
                if np.any(left_mask) and np.any(right_mask):
                    left_y, right_y = y[left_mask], y[right_mask]
                    left_impurity = self.calculate_entropy(left_y) if self.criterion == 'entropy' else self.calculate_gini(left_y)
                    right_impurity = self.calculate_entropy(right_y) if self.criterion == 'entropy' else self.calculate_gini(right_y)
                    gain = current_impurity - (np.sum(left_mask) / n_samples * left_impurity + np.sum(right_mask) / n_samples * right_impurity)
                    if gain > best_gain:
                        best_gain = gain
                        best_feature = feature
                        best_threshold = threshold
        return best_feature, best_threshold

    def fit(self, X, y, depth=0):
        y = y.astype(int)  # Ensure y is integer within recursive calls
        if self.max_depth is not None and depth >= self.max_depth or len(np.unique(y)) == 1:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        best_feature, best_threshold = self.find_best_split(X, y)
        if best_feature is None:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        left_tree = self.fit(X[left_mask], y[left_mask], depth + 1)
        right_tree = self.fit(X[right_mask], y[right_mask], depth + 1)
        return {'feature': best_feature, 'threshold': best_threshold, 'left': left_tree, 'right': right_tree}

    def predict(self, X):
        def traverse_tree(x, tree):
            if 'value' in tree:
                return tree['value']
            if x[tree['feature']] <= tree['threshold']:
                return traverse_tree(x, tree['left'])
            return traverse_tree(x, tree['right'])
        return np.array([traverse_tree(x, self.tree) for x in X])

class CustomNaiveBayes:
    def __init__(self, variant='gaussian'):
        self.variant = variant
        self.class_priors = {}
        self.class_params = {}

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.classes = np.unique(y)
        for c in self.classes:
            X_c = X[y == c]
            if len(X_c) < 2:
                raise ValueError(f"Class {c} has fewer than 2 samples, cannot compute variance")
            self.class_priors[c] = len(X_c) / n_samples
            if self.variant == 'gaussian':
                mean = X_c.astype(float).mean(axis=0)
                var = X_c.astype(float).var(axis=0)
                var = np.where(var <= 0, 1e-9, var)  # Ensure positive variance
                if not isinstance(var, np.ndarray):
                    raise ValueError(f"Variance for class {c} is not a NumPy array: {var}")
                if np.any(np.isnan(var)):
                    raise ValueError(f"Variance for class {c} contains NaN: {var}")
                self.class_params[c] = {'mean': mean, 'var': var}
            else:  # multinomial
                self.class_params[c] = {'counts': X_c.sum(axis=0) + 1, 'total': len(X_c) + n_features}

    def predict(self, X):
        predictions = []
        for x in X:
            posteriors = {}
            for c in self.classes:
                if self.variant == 'gaussian':
                    var = np.array(self.class_params[c]['var'], dtype=np.float64)
                    if not isinstance(var, np.ndarray):
                        raise ValueError(f"Variance for class {c} is not a NumPy array: {var}")
                    if np.any(np.isnan(var)):
                        raise ValueError(f"Variance for class {c} contains NaN: {var}")
                    sigma = np.sqrt(var)  # Compute sqrt once per class
                    likelihood = np.sum(np.log(1 / (sigma * np.sqrt(2 * np.pi))) -
                                        ((x - self.class_params[c]['mean']) ** 2) / (2 * var))
                else:  # multinomial
                    likelihood = np.sum(np.log((self.class_params[c]['counts'] / self.class_params[c]['total']) ** x))
                posterior = likelihood + np.log(self.class_priors[c])
                posteriors[c] = posterior
            predictions.append(max(posteriors, key=posteriors.get))
        return np.array(predictions)
    def predict(self, X):
        predictions = []
        for x in X:
            posteriors = {}
            for c in self.classes:
                if self.variant == 'gaussian':
                    var = self.class_params[c]['var']
                    # Ensure all operations are vectorized and handle edge cases
                    sigma = np.sqrt(var)  # Compute sqrt once per class
                    likelihood = np.sum(np.log(1 / (sigma * np.sqrt(2 * np.pi))) -
                                      ((x - self.class_params[c]['mean']) ** 2) / (2 * var))
                else:  # multinomial
                    likelihood = np.sum(np.log((self.class_params[c]['counts'] / self.class_params[c]['total']) ** x))
                posterior = likelihood + np.log(self.class_priors[c])
                posteriors[c] = posterior
            predictions.append(max(posteriors, key=posteriors.get))
        return np.array(predictions)

class CustomLogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        y_ = np.where(y == 0, -1, 1)
        for _ in range(self.n_iterations):
            linear_model = np.dot(X, self.weights) + self.bias
            y_predicted = expit(linear_model)

            dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y_))
            db = (1 / n_samples) * np.sum(y_predicted - y_)

            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        y_predicted = expit(linear_model)
        return np.where(y_predicted >= 0.5, 1, 0)

class CustomKNN:
    def __init__(self, k=5, weight='uniform'):
        self.k = k
        self.weight = weight

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        predictions = []
        for x in X:
            distances = np.linalg.norm(self.X_train - x, axis=1)
            k_indices = np.argsort(distances)[:self.k]
            k_labels = self.y_train[k_indices]
            if self.weight == 'uniform':
                vote = np.bincount(k_labels).argmax()
            else:  # distance-weighted
                weights = 1 / (distances[k_indices] + 1e-5)
                vote = np.bincount(k_labels, weights=weights).argmax()
            predictions.append(vote)
        return np.array(predictions)

# Train models
dt = CustomDecisionTree(criterion='entropy', max_depth=5)
dt.tree = dt.fit(X, y)
dt_pred = dt.predict(X)

nb_gaussian = CustomNaiveBayes(variant='gaussian')
nb_gaussian.fit(X, y)
nb_g_pred = nb_gaussian.predict(X)

nb_multinomial = CustomNaiveBayes(variant='multinomial')
nb_multinomial.fit(X, y.astype(int))
nb_m_pred = nb_multinomial.predict(X)

lr = CustomLogisticRegression(learning_rate=0.01, n_iterations=1000)
lr.fit(X, y)
lr_pred = lr.predict(X)

knn = CustomKNN(k=5, weight='distance')
knn.fit(X, y)
knn_pred = knn.predict(X)

# Save predictions
df['dt_pred'] = dt_pred
df['nb_g_pred'] = nb_g_pred
df['nb_m_pred'] = nb_m_pred
df['lr_pred'] = lr_pred
df['knn_pred'] = knn_pred
df.to_csv('Classification_Results/patient_predictions.csv', index=False)

In [45]:
# Graph Visualization: Compare Prediction Accuracy
def calculate_accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred)

accuracies = {
    'Decision Tree': calculate_accuracy(y, dt_pred),
    'Naive Bayes (Gaussian)': calculate_accuracy(y, nb_g_pred),
    'Naive Bayes (Multinomial)': calculate_accuracy(y, nb_m_pred),
    'Logistic Regression': calculate_accuracy(y, lr_pred),
    'k-NN': calculate_accuracy(y, knn_pred)
}

plt.figure(figsize=(10, 6))
plt.bar(accuracies.keys(), accuracies.values(), color=['blue', 'green', 'red', 'purple', 'orange'])
plt.title('Model Prediction Accuracy on Heart Disease Dataset')
plt.xlabel('Model')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('Classification_Results/model_accuracy.png')
plt.close()

In [46]:
df = pd.read_csv('Transformation_Results/patient_dataset_transformed.csv')
print(df[numeric_cols].describe())  # Check summary statistics
print(df[numeric_cols].isnull().sum())  # Check for missing values

                age           bmi  blood_pressure_systolic  \
count  28392.000000  28392.000000             28392.000000   
mean       0.419551      0.585368                 0.751938   
std        0.171082      0.225818                 0.187143   
min        0.000000      0.000000                 0.000000   
25%        0.305882      0.486206                 0.676197   
50%        0.412453      0.643569                 0.785858   
75%        0.529412      0.711237                 0.876105   
max        1.000000      1.000000                 1.000000   

       blood_pressure_diastolic  cholesterol_level  annual_income  \
count              28392.000000       28392.000000   28392.000000   
mean                   0.526452           0.355946       0.431910   
std                    0.164246           0.198092       0.294755   
min                    0.000000           0.000000       0.000000   
25%                    0.425000           0.226667       0.195769   
50%                    0.50

In [47]:
# Evaluation Functions
def k_fold_cv(X, y, model_class, k=5, **kwargs):
    n = len(y)
    indices = np.random.permutation(n)
    fold_size = n // k
    scores = []

    for i in range(k):
        test_idx = indices[i * fold_size:(i + 1) * fold_size]
        train_idx = np.concatenate([indices[:i * fold_size], indices[(i + 1) * fold_size:]])
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = model_class(**kwargs)
        if isinstance(model, CustomDecisionTree):
            model.tree = model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
        elif isinstance(model, (CustomNaiveBayes, CustomLogisticRegression, CustomKNN)):
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
        scores.append(np.mean(y_pred == y_test))
    return np.mean(scores)

def calculate_metrics(y_true, y_pred):
    TP = np.sum((y_true == 1) & (y_pred == 1))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))

    accuracy = (TP + TN) / (TP + TN + FP + FN) if (TP + TN + FP + FN) > 0 else 0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    pos = y_true == 1
    neg = y_true == 0
    if np.sum(pos) == 0 or np.sum(neg) == 0:
        auc = 0
    else:
        pos_pred = y_pred[pos]
        neg_pred = y_pred[neg]
        auc = np.mean([1 for p in pos_pred for n in neg_pred if p > n]) / (np.sum(pos) * np.sum(neg)) if (np.sum(pos) * np.sum(neg)) > 0 else 0

    return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1, 'auc': auc}

def confusion_matrix(y_true, y_pred):
    TP = np.sum((y_true == 1) & (y_pred == 1))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    return np.array([[TN, FP], [FN, TP]])

# Custom Models (assuming definitions from previous context)
class CustomDecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
        self.tree = None
        self.feature_importances_ = {}  # To store impurity reduction per feature

    def calculate_entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))

    def find_best_split(self, X, y):
        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        n_samples, n_features = X.shape
        current_impurity = self.calculate_entropy(y)

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask
                if np.any(left_mask) and np.any(right_mask):
                    left_y, right_y = y[left_mask], y[right_mask]
                    left_impurity = self.calculate_entropy(left_y)
                    right_impurity = self.calculate_entropy(right_y)
                    gain = current_impurity - (np.sum(left_mask) / n_samples * left_impurity +
                                            np.sum(right_mask) / n_samples * right_impurity)
                    if gain > best_gain:
                        best_gain = gain
                        best_feature = feature
                        best_threshold = threshold
        return best_feature, best_threshold

    def fit(self, X, y, depth=0, feature_importance_dict=None):
        y = y.astype(int)
        if feature_importance_dict is None:
            feature_importance_dict = {}

        if self.max_depth is not None and depth >= self.max_depth or len(np.unique(y)) == 1:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        best_feature, best_threshold = self.find_best_split(X, y)
        if best_feature is None:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        left_y, right_y = y[left_mask], y[right_mask]
        n_left, n_right = np.sum(left_mask), np.sum(right_mask)
        current_impurity = self.calculate_entropy(y)
        left_impurity = self.calculate_entropy(left_y)
        right_impurity = self.calculate_entropy(right_y)
        gain = current_impurity - (n_left / len(y) * left_impurity + n_right / len(y) * right_impurity)

        if best_feature in feature_importance_dict:
            feature_importance_dict[best_feature] += gain
        else:
            feature_importance_dict[best_feature] = gain

        left_tree = self.fit(X[left_mask], y[left_mask], depth + 1, feature_importance_dict.copy())
        right_tree = self.fit(X[right_mask], y[right_mask], depth + 1, feature_importance_dict.copy())
        self.feature_importances_ = feature_importance_dict
        return {'feature': best_feature, 'threshold': best_threshold, 'left': left_tree, 'right': right_tree}

    def predict(self, X):
        def traverse_tree(x, tree):
            if 'value' in tree:
                return tree['value']
            if x[tree['feature']] <= tree['threshold']:
                return traverse_tree(x, tree['left'])
            return traverse_tree(x, tree['right'])
        return np.array([traverse_tree(x, self.tree) for x in X])

    def feature_importance(self):
        if not self.feature_importances_:
            return 0  # Return 0 if no splits were made
        total_gain = sum(self.feature_importances_.values())
        return {feature: gain / total_gain for feature, gain in self.feature_importances_.items()} if total_gain > 0 else {}

# Placeholder for other custom models (assuming they exist)
class CustomNaiveBayes:
    def __init__(self, variant='gaussian'):
        self.variant = variant

    def fit(self, X, y):
        pass

    def predict(self, X):
        pass

class CustomLogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0
        for _ in range(self.n_iterations):
            linear_model = np.dot(X, self.weights) + self.bias
            y_predicted = 1 / (1 + np.exp(-linear_model))
            dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y))
            db = (1 / n_samples) * np.sum(y_predicted - y)
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        y_predicted = 1 / (1 + np.exp(-linear_model))
        return (y_predicted >= 0.5).astype(int)

class CustomKNN:
    def __init__(self, k=5, weight='distance'):
        self.k = k
        self.weight = weight
        self.X_train = None
        self.y_train = None

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        y_pred = np.zeros(len(X))
        for i in range(len(X)):
            distances = np.sqrt(np.sum((self.X_train - X[i]) ** 2, axis=1))
            nearest = np.argsort(distances)[:self.k]
            if self.weight == 'distance':
                weights = 1 / (distances[nearest] + 1e-5)  # Avoid division by zero
                y_pred[i] = np.bincount(self.y_train[nearest], weights=weights).argmax()
            else:
                y_pred[i] = np.bincount(self.y_train[nearest]).argmax()
        return y_pred.astype(int)
# Cross-validation and metrics
cv_scores = {
    'dt': k_fold_cv(X, y, CustomDecisionTree, max_depth=5),
    'nb_g': k_fold_cv(X, y, CustomNaiveBayes, variant='gaussian'),
    'nb_m': k_fold_cv(X, y, CustomNaiveBayes, variant='multinomial'),
    'lr': k_fold_cv(X, y, CustomLogisticRegression),
    'knn': k_fold_cv(X, y, CustomKNN, k=5, weight='distance')
}
print("Cross-Validation Scores:", {k: v for k, v in cv_scores.items()})

metrics = {}
for model, pred in [('dt', dt_pred), ('nb_g', nb_g_pred), ('nb_m', nb_m_pred), ('lr', lr_pred), ('knn', knn_pred)]:
    metrics[model] = calculate_metrics(y, pred)
print("Metrics:", metrics)

# Confusion Matrix Analysis
for model, pred in [('dt', dt_pred), ('nb_g', nb_g_pred), ('nb_m', nb_m_pred), ('lr', lr_pred), ('knn', knn_pred)]:
    cm = confusion_matrix(y, pred)
    print(f"{model} Confusion Matrix:\n{cm}")
    if np.sum(y == 1) / len(y) < 0.3:  # Arbitrary threshold for imbalance
        print(f"{model}: Potential class imbalance detected, consider oversampling or weighting.")

# Feature Importance
dt = CustomDecisionTree(max_depth=5)
dt.tree = dt.fit(X, y)  # Reconstruct tree with your data
fi = dt.feature_importance()
print("Feature Importance (Decision Tree):", fi)

# Graph Visualization: Compare Model Metrics (Accuracy)
plt.figure(figsize=(10, 6))
accuracies = [metrics[model]['accuracy'] for model in ['dt', 'nb_g', 'nb_m', 'lr', 'knn']]
plt.bar(['Decision Tree', 'Naive Bayes (Gaussian)', 'Naive Bayes (Multinomial)', 'Logistic Regression', 'k-NN'],
        accuracies, color=['blue', 'green', 'red', 'purple', 'orange'])
plt.title('Model Accuracy Comparison')
plt.xlabel('Model')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('Classification_Results/model_accuracy.png')
plt.close()

Cross-Validation Scores: {'dt': np.float64(0.5551602676998943), 'nb_g': np.float64(0.0), 'nb_m': np.float64(0.0), 'lr': np.float64(0.5202888340965128), 'knn': np.float64(0.5196900317013033)}


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Metrics: {'dt': {'accuracy': np.float64(0.5647365455057762), 'precision': np.float64(0.5405221029652538), 'recall': np.float64(0.8405233380480905), 'f1': np.float64(0.6579384410983171), 'auc': np.float64(4.962205855323514e-09)}, 'nb_g': {'accuracy': np.float64(0.5258523527754297), 'precision': np.float64(0.5296536039188243), 'recall': np.float64(0.4282178217821782), 'f1': np.float64(0.47356483653996556), 'auc': np.float64(4.962205855323514e-09)}, 'nb_m': {'accuracy': np.float64(0.5199704142011834), 'precision': np.float64(0.5300411522633744), 'recall': np.float64(0.3188118811881188), 'f1': np.float64(0.3981452859350851), 'auc': np.float64(4.962205855323514e-09)}, 'lr': {'accuracy': np.float64(0.5019723865877712), 'precision': 0, 'recall': np.float64(0.0), 'f1': 0, 'auc': np.float64(nan)}, 'knn': {'accuracy': np.float64(0.9989433643279797), 'precision': np.float64(0.9995043896913056), 'recall': np.float64(0.9983734087694484), 'f1': np.float64(0.998938579111237), 'auc': np.float64(4.9622

# **Part 5: Deep Learning & Advanced Classification (25 Points)**
**Task 5.1: Medical Image Classification with CNN (15 Points)**


In [None]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models
import numpy as np
import matplotlib.pyplot as plt
import os

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Data Preparation
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# Load CIFAR-10 dataset
dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Adjusted DataLoaders with persistent_workers and reduced batch size
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2, persistent_workers=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2, persistent_workers=True)

# ResNet Implementation
class MedicalImageClassifier(pl.LightningModule):
    def __init__(self, model_name='resnet18', pretrained=False, num_classes=10, learning_rate=0.001):
        super().__init__()
        self.save_hyperparameters()
        self.learning_rate = learning_rate
        self.val_outputs = []  # To store validation outputs

        # Load ResNet model
        if model_name == 'resnet18':
            self.model = models.resnet18(pretrained=pretrained)
            num_ftrs = self.model.fc.in_features
            self.model.fc = nn.Linear(num_ftrs, num_classes)
        elif model_name == 'resnet50':
            self.model = models.resnet50(pretrained=pretrained)
            num_ftrs = self.model.fc.in_features
            self.model.fc = nn.Linear(num_ftrs, num_classes)

        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        preds = torch.softmax(logits, dim=1)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.val_outputs.append({'val_loss': loss, 'preds': preds, 'targets': y})
        return {'val_loss': loss, 'preds': preds, 'targets': y}

    def on_validation_epoch_end(self):
        avg_loss = torch.stack([x['val_loss'] for x in self.val_outputs]).mean()
        preds = torch.cat([x['preds'] for x in self.val_outputs], dim=0)
        targets = torch.cat([x['targets'] for x in self.val_outputs], dim=0)
        self.log('val_loss_epoch', avg_loss, on_epoch=True)
        self.val_outputs.clear()  # Clear outputs after processing

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

# Train Models
models_dict = {
    'resnet18_random': MedicalImageClassifier(model_name='resnet18', pretrained=False),
    'resnet18_pretrained': MedicalImageClassifier(model_name='resnet18', pretrained=True),
    'resnet50_random': MedicalImageClassifier(model_name='resnet50', pretrained=False),
    'resnet50_pretrained': MedicalImageClassifier(model_name='resnet50', pretrained=True)
}

# Fix devices parameter
devices = 1 if torch.cuda.is_available() else 1  # Default to 1 CPU device if no GPU
trainer = pl.Trainer(max_epochs=5, accelerator='auto', devices=devices)
for name, model in models_dict.items():
    print(f"Training {name}...")
    try:
        trainer.fit(model, train_loader, val_loader)
        print(f"Completed training {name}")
    except Exception as e:
        print(f"Error training {name}: {str(e)}")

# Custom Metrics Functions (without sklearn)
def calculate_confusion_matrix(y_true, y_pred):
    TP = np.sum((y_true == 1) & (y_pred == 1))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    return TP, TN, FP, FN

def calculate_metrics(y_true, y_pred):
    TP, TN, FP, FN = calculate_confusion_matrix(y_true, y_pred)
    accuracy = (TP + TN) / (TP + TN + FP + FN) if (TP + TN + FP + FN) > 0 else 0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1}

# Custom ROC Curve (approximation without sklearn)
def compute_roc_curve(y_true, y_scores):
    n = len(y_true)
    thresholds = np.sort(y_scores)[::-1]
    tpr = []
    fpr = []

    for thresh in thresholds:
        y_pred = (y_scores >= thresh).astype(int)
        TP = np.sum((y_true == 1) & (y_pred == 1))
        FP = np.sum((y_true == 0) & (y_pred == 1))
        FN = np.sum((y_true == 1) & (y_pred == 0))
        TN = np.sum((y_true == 0) & (y_pred == 0))
        tpr.append(TP / (TP + FN) if (TP + FN) > 0 else 0)
        fpr.append(FP / (FP + TN) if (FP + TN) > 0 else 0)

    return np.array(fpr), np.array(tpr)

def plot_multiclass_roc_curves(y_true, y_scores, class_names):
    n_classes = len(class_names)
    fpr = {}
    tpr = {}

    for i in range(n_classes):
        class_scores = y_scores[:, i]
        class_true = (y_true == i).astype(int)
        fpr[i], tpr[i] = compute_roc_curve(class_true, class_scores)

    plt.figure(figsize=(10, 6))
    colors = ['blue', 'red', 'green', 'yellow', 'purple', 'orange', 'brown', 'pink', 'gray', 'cyan']
    for i, color in zip(range(n_classes), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2, label=f'Class {class_names[i]}')

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multiclass ROC Curves')
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.savefig(f'Classification_Results/multiclass_roc_curves.png')
    plt.close()

# Custom Simple Models (without sklearn)
class CustomLogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        # Convert y to NumPy array if it's a Tensor
        y = y.numpy() if hasattr(y, 'numpy') else y

        for _ in range(self.n_iterations):
            linear_model = np.dot(X, self.weights) + self.bias
            y_predicted = 1 / (1 + np.exp(-linear_model))  # Sigmoid
            dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y))
            db = (1 / n_samples) * np.sum(y_predicted - y)
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        y_predicted = 1 / (1 + np.exp(-linear_model))
        return (y_predicted >= 0.5).astype(int)

class CustomRandomForest:
    def __init__(self, n_trees=10, max_depth=5):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.trees = []

    def fit(self, X, y):
        n_samples = X.shape[0]
        # Convert y to NumPy array if it's a Tensor
        y = y.numpy() if hasattr(y, 'numpy') else y
        for _ in range(self.n_trees):
            indices = np.random.choice(n_samples, n_samples, replace=True)
            X_boot = X[indices]
            y_boot = y[indices]
            tree = CustomDecisionTree(max_depth=self.max_depth)
            tree.tree = tree.fit(X_boot, y_boot)  # Assign the tree structure
            self.trees.append(tree)

    def predict(self, X):
        predictions = np.array([tree.predict(X) for tree in self.trees])
        return np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=predictions)

class CustomMLP:
    def __init__(self, hidden_size=100, learning_rate=0.01, n_iterations=1000):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.hidden_size = hidden_size
        self.weights1 = None
        self.weights2 = None
        self.bias1 = None
        self.bias2 = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights1 = np.random.randn(n_features, self.hidden_size) * 0.01
        self.bias1 = np.zeros(self.hidden_size)
        self.weights2 = np.random.randn(self.hidden_size, 1) * 0.01
        self.bias2 = np.zeros(1)

        # Convert y to NumPy array if it's a Tensor
        y = y.numpy() if hasattr(y, 'numpy') else y
        y = y.reshape(-1, 1)

        for _ in range(self.n_iterations):
            # Forward pass
            hidden = 1 / (1 + np.exp(-np.dot(X, self.weights1) - self.bias1))
            output = 1 / (1 + np.exp(-np.dot(hidden, self.weights2) - self.bias2))

            # Backpropagation
            error = y - output
            output_delta = error * output * (1 - output)
            hidden_error = np.dot(output_delta, self.weights2.T)
            hidden_delta = hidden_error * hidden * (1 - hidden)

            self.weights2 += self.learning_rate * np.dot(hidden.T, output_delta)
            self.bias2 += self.learning_rate * np.sum(output_delta, axis=0)
            self.weights1 += self.learning_rate * np.dot(X.T, hidden_delta)
            self.bias1 += self.learning_rate * np.sum(hidden_delta, axis=0)

    def predict(self, X):
        hidden = 1 / (1 + np.exp(-np.dot(X, self.weights1) - self.bias1))
        output = 1 / (1 + np.exp(-np.dot(hidden, self.weights2) - self.bias2))
        return (output >= 0.5).astype(int).ravel()

class CustomDecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
        self.tree = None

    def calculate_entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))

    def find_best_split(self, X, y):
        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        n_samples, n_features = X.shape
        current_impurity = self.calculate_entropy(y)

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask
                if np.any(left_mask) and np.any(right_mask):
                    left_y, right_y = y[left_mask], y[right_mask]
                    left_impurity = self.calculate_entropy(left_y)
                    right_impurity = self.calculate_entropy(right_y)
                    gain = current_impurity - (np.sum(left_mask) / n_samples * left_impurity +
                                            np.sum(right_mask) / n_samples * right_impurity)
                    if gain > best_gain:
                        best_gain = gain
                        best_feature = feature
                        best_threshold = threshold
        return best_feature, best_threshold

    def fit(self, X, y, depth=0):
        y = y.astype(int)
        if self.max_depth is not None and depth >= self.max_depth or len(np.unique(y)) == 1:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        best_feature, best_threshold = self.find_best_split(X, y)
        if best_feature is None:
            leaf_value = np.bincount(y).argmax()
            return {'value': leaf_value}

        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        left_tree = self.fit(X[left_mask], y[left_mask], depth + 1)
        right_tree = self.fit(X[right_mask], y[right_mask], depth + 1)
        return {'feature': best_feature, 'threshold': best_threshold, 'left': left_tree, 'right': right_tree}

    def predict(self, X):
        def traverse_tree(x, tree):
            if 'value' in tree:
                return tree['value']
            if x[tree['feature']] <= tree['threshold']:
                return traverse_tree(x, tree['left'])
            return traverse_tree(x, tree['right'])
        return np.array([traverse_tree(x, self.tree) for x in X])

# Extract and prepare data for traditional models
y_flat = np.concatenate([y.numpy() for _, y in test_loader])
X_flat = np.concatenate([x.numpy().reshape(-1, 32*32*3) for x, _ in test_loader])  # Flatten CIFAR-10 images
scaler = lambda x: (x - np.mean(x, axis=0)) / np.std(x, axis=0)  # Manual standardization
X_scaled = scaler(X_flat)

# Train Custom Models
lr_model = CustomLogisticRegression()
rf_model = CustomRandomForest(n_trees=10, max_depth=5)
mlp_model = CustomMLP(hidden_size=100)

lr_model.fit(X_scaled, y_flat)
rf_model.fit(X_scaled, y_flat)
mlp_model.fit(X_scaled, y_flat)

# Predictions
lr_pred = lr_model.predict(X_scaled)
rf_pred = rf_model.predict(X_scaled)
mlp_pred = mlp_model.predict(X_scaled)

# ResNet Prediction
resnet_preds = []
for name, model in models_dict.items():
    model.eval()
    with torch.no_grad():
        all_preds = []
        for batch in test_loader:
            x, _ = batch
            logits = model(x)
            preds = torch.softmax(logits, dim=1)
            all_preds.append(preds.numpy())
        all_preds = np.concatenate(all_preds)
        resnet_pred = np.argmax(all_preds, axis=1)
        resnet_preds.append(resnet_pred)

# Metrics and Visualization
models = {'Logistic Regression': lr_pred, 'Random Forest': rf_pred, 'MLP': mlp_pred, 'ResNet': resnet_preds[0]}
metrics_table = {}
for name, pred in models.items():
    metrics = calculate_metrics(y_flat, pred)
    metrics_table[name] = metrics

plt.figure(figsize=(12, 8))
for name, pred in models.items():
    fpr, tpr = compute_roc_curve((y_flat == 1).astype(int), (pred == 1).astype(float))
    plt.plot(fpr, tpr, label=f'{name}')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve Comparison')
plt.legend(loc='lower right')
plt.grid(True)
plt.savefig('Classification_Results/model_roc_comparison.png')
plt.close()

print("Performance Table:")
for model, metrics in metrics_table.items():
    print(f"{model}: Accuracy={metrics['accuracy']:.3f}, Precision={metrics['precision']:.3f}, "
          f"Recall={metrics['recall']:.3f}, F1={metrics['f1']:.3f}")

# Multiclass ROC for ResNet
plot_multiclass_roc_curves(y_flat, all_preds, ['Class 0', 'Class 1', 'Class 2', 'Class 3', 'Class 4',
                                              'Class 5', 'Class 6', 'Class 7', 'Class 8', 'Class 9'])

INFO:pytorch_lightning.utilities.rank_zero:💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type             | Params | Mode 
-------------------------------------------------------
0 | model     | ResNet           | 11.2 M | train
1 | criterion | CrossEntropyLoss | 0      | train
-------------------------------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.727    Total estimated model p

Training resnet18_random...


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.
/usr/local/lib/python3.12/dist-packages/pytorch_lightning/callbacks/model_checkpoint.py:751: Checkpoint directory /content/lightning_logs/version_2/checkpoints exists and is not empty.
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type             | Params | Mode 
-------------------------------------------------------
0 | model     | ResNet           | 11.2 M | train
1 | criterion | CrossEntropyLoss | 0      | train
-------------------------------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.727    Total estimated model params size (MB)
69        Modules in train mode
0         Modules in eval mode


Completed training resnet18_random
Training resnet18_pretrained...


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type             | Params | Mode 
-------------------------------------------------------
0 | model     | ResNet           | 23.5 M | train
1 | criterion | CrossEntropyLoss | 0      | train
-------------------------------------------------------
23.5 M    Trainable params
0         Non-trainable params
23.5 M    Total params
94.114    Total estimated model params size (MB)
152       Modules in train mode
0         Modules in eval mode


Completed training resnet18_pretrained
Training resnet50_random...


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type             | Params | Mode 
-------------------------------------------------------
0 | model     | ResNet           | 23.5 M | train
1 | criterion | CrossEntropyLoss | 0      | train
-------------------------------------------------------
23.5 M    Trainable params
0         Non-trainable params
23.5 M    Total params
94.114    Total estimated model params size (MB)
152       Modules in train mode
0         Modules in eval mode


Completed training resnet50_random
Training resnet50_pretrained...


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


Completed training resnet50_pretrained


  y_predicted = 1 / (1 + np.exp(-linear_model))  # Sigmoid
