# Part 3: Practical Data Preparation

**Objective:** Handle categorical features using One-Hot Encoding and address class imbalance using SMOTE.

## 1. Setup

Import necessary libraries.

In [33]:
import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE

## 2. Data Loading

Load the dataset.

In [34]:
def load_data(file_path = './data/synthetic_health_data.csv') -> pd.DataFrame:
    """
    Load the synthetic health data from a CSV file.
    
    Args:
        file_path: Path to the CSV file
        
    Returns:
        DataFrame containing the data
    """
    # YOUR CODE HERE
    # Load the CSV file using pandas
    try:
        data = pd.read_csv(file_path)
        print(f"Data loaded successfully from {file_path}")
        data['timestamp'] = pd.to_datetime(data['timestamp'])
        assert data['timestamp'].dtype == 'datetime64[ns]', "Timestamp column is not in datetime format"
        return data
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return None

load_data()['smoker_status'].value_counts()

Data loaded successfully from ./data/synthetic_health_data.csv


smoker_status
no        4084
yes       1770
former    1472
Name: count, dtype: int64

## 3. Categorical Feature Encoding

Implement `encode_categorical_features` using `OneHotEncoder`.

In [35]:
def encode_categorical_features(df: pd.DataFrame, column_to_encode : str ='smoker_status'):
    """
    Encode a categorical column using OneHotEncoder.
    
    Args:
        df: Input DataFrame
        column_to_encode: Name of the categorical column to encode
        
    Returns:
        DataFrame with the categorical column replaced by one-hot encoded columns
    """
    # YOUR CODE HERE
    # 1. Extract the categorical column
    df = df.copy()
    
    # 2. Apply OneHotEncoder
    encoder = OneHotEncoder(sparse_output=False)
    encoded_columns = encoder.fit_transform(df[[column_to_encode]])
    # 3. Create new column names
    # encoded_column_name = column_to_encode + '_encoded'
    # 4. Replace the original categorical column with the encoded columns
    encoded_column_names = encoder.get_feature_names_out([column_to_encode])
    encoded_df = pd.DataFrame(encoded_columns, columns=encoded_column_names, index=df.index)
    # df = df.drop(columns=[column_to_encode])
    df = pd.concat([df, encoded_df], axis=1)
    # Placeholder return - replace with your implementation
    return df
# test it
df = load_data()
df = encode_categorical_features(df)
df.columns
df['smoker_status_former'].value_counts()
df['smoker_status_no'].value_counts()
df['smoker_status_yes'].value_counts()

Data loaded successfully from ./data/synthetic_health_data.csv


smoker_status_yes
0.0    5556
1.0    1770
Name: count, dtype: int64

## 4. Data Preparation

Implement `prepare_data_part3` to handle the train/test split correctly.

In [36]:
def prepare_data_part3(df, test_size=0.2, random_state=42):
    """
    Prepare data with categorical encoding.
    
    Args:
        df: Input DataFrame
        test_size: Proportion of data for testing
        random_state: Random seed for reproducibility
        
    Returns:
        X_train, X_test, y_train, y_test
    """
    # YOUR CODE HERE
    # 1. Encode categorical features using the encode_categorical_features function
    df_encoded = encode_categorical_features(df).ffill().bfill()
    assert df_encoded.isnull().sum().sum() == 0, "There are still missing values in the DataFrame"  
    # 2. Select relevant features (including the one-hot encoded ones) and the target
    feature_columns_names = ['smoker_status_former' ,'smoker_status_no', 'smoker_status_yes', 'age', 'bmi', 'systolic_bp', 'diastolic_bp', 'glucose_level']
    X = df_encoded[feature_columns_names]
    Y = df_encoded['disease_outcome']
    # 3. Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state)
    # 4. Handle missing values
    
    # Placeholder return - replace with your implementation
    return X_train, X_test, y_train, y_test
# test it
df = load_data()
X_train, X_test, y_train, y_test = prepare_data_part3(df)

Data loaded successfully from ./data/synthetic_health_data.csv


## 5. Handling Imbalanced Data

Implement `apply_smote` to oversample the minority class.

In [37]:
def apply_smote(X_train, y_train, random_state=42):
    """
    Apply SMOTE to oversample the minority class.
    
    Args:
        X_train: Training features
        y_train: Training target
        random_state: Random seed for reproducibility
        
    Returns:
        Resampled X_train and y_train with balanced classes
    """
    # YOUR CODE HERE
    # Apply SMOTE to balance the classes
    smote = SMOTE(random_state=random_state)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
    # Placeholder return - replace with your implementation
    return X_train_resampled, y_train_resampled

## 6. Model Training and Evaluation

Train a model on the SMOTE-resampled data and evaluate it.

In [38]:
def train_logistic_regression(X_train, y_train) -> LogisticRegression:
    """
    Train a logistic regression model.
    
    Args:
        X_train: Training features
        y_train: Training target
        
    Returns:
        Trained logistic regression model
    """
    # YOUR CODE HERE
    # Initialize and train a LogisticRegression model
    model = LogisticRegression()
    model.fit(X_train, y_train)
    
    return model  # Replace with actual implementation

def calculate_evaluation_metrics(model: LogisticRegression, X_test, y_test) -> dict:
    """
    Calculate classification evaluation metrics.
    
    Args:
        model: Trained model
        X_test: Test features
        y_test: Test target
        
    Returns:
        Dictionary containing accuracy, precision, recall, f1, auc, and confusion_matrix
    """
    # YOUR CODE HERE
    # 1. Generate predictions
    y_pred = model.predict(X_test)
    # 2. Calculate metrics: accuracy, precision, recall, f1, auc
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
    # 3. Create confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    # 4. Return metrics in a dictionary
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': auc,
        'confusion_matrix': cm
    }
    # Placeholder return - replace with your implementation
    return metrics

## 7. Save Results

Save the evaluation metrics to a text file.

In [39]:
# YOUR CODE HERE
# 1. Create 'results' directory if it doesn't exist
# 2. Format metrics as strings
# 3. Write metrics to 'results/results_part3.txt'

## 8. Main Execution

Run the complete workflow.

In [44]:
# Main execution
if __name__ == "__main__":
    # 1. Load data
    data_file = 'data/synthetic_health_data.csv'
    df = load_data(data_file)
    
    # 2. Prepare data with categorical encoding
    X_train, X_test, y_train, y_test = prepare_data_part3(df)
    
    # 3. Apply SMOTE to balance the training data
    X_train_resampled, y_train_resampled = apply_smote(X_train, y_train)
    
    # 4. Train model on resampled data
    model = train_logistic_regression(X_train_resampled, y_train_resampled)
    
    # 5. Evaluate on original test set
    metrics = calculate_evaluation_metrics(model, X_test, y_test)
    
    # 6. Print metrics
    for metric, value in metrics.items():
        if metric != 'confusion_matrix':
            print(f"{metric}: {value:.4f}")
    
    # 7. Save results
    # (Your code for saving results)
    import json
    if not os.path.exists('result'):
        os.makedirs('result')
    results_file = 'result/result_part3.txt'
    with open(results_file, 'w') as f:
        # write as json
        metrics_to_save = metrics.copy()
        metrics_to_save['confusion_matrix'] = metrics_to_save['confusion_matrix'].tolist()
        json.dump(metrics_to_save, f, indent=4)
    
    print(f"Result saved to {results_file}")
    
    # 8. Load Part 1 results for comparison

    try:
        with open('result/result_part1.json', 'r') as f:
            part1_metrics = json.load(f)
        
        # 9. Compare models
        comparison = compare_models(part1_metrics, metrics)
        print("\nModel Comparison (improvement percentages):")
        for metric, improvement in comparison.items():
            print(f"{metric}: {improvement:.2f}%")
    except FileNotFoundError:
        print("Part 1 results not found. Run part1_introduction.ipynb first.")

Data loaded successfully from data/synthetic_health_data.csv
accuracy: 0.8390
precision: 0.3604
recall: 0.8392
f1: 0.5042
auc: 0.9137
Result saved to result/result_part3.txt
Metric f1_score not found in Part 3 metrics.

Model Comparison (improvement percentages):
accuracy: -7.93%
precision: -51.35%
recall: 500.00%
auc: 1.26%


STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


## 9. Compare Results

Implement a function to compare model performance between balanced and imbalanced data.

In [31]:
def compare_models(part1_metrics: dict, part3_metrics: dict):
    """
    Calculate percentage improvement between models trained on imbalanced vs. balanced data.
    
    Args:
        part1_metrics: Dictionary containing evaluation metrics from Part 1 (imbalanced)
        part3_metrics: Dictionary containing evaluation metrics from Part 3 (balanced)
        
    Returns:
        Dictionary with metric names as keys and improvement percentages as values
    """
    # YOUR CODE HERE
    part1_metrics.pop('confusion_matrix')
    part3_metrics.pop('confusion_matrix')
    improve_dict = {}
    # 1. Calculate percentage improvement for each metric
    for metric , value in part1_metrics.items():
        if metric in part3_metrics:
            if part1_metrics[metric] == 0:
                part1_metrics[metric] = 0.0001
            improvement_value = (part3_metrics[metric] - part1_metrics[metric]) / abs(part1_metrics[metric]) * 100
            improve_dict[metric] = improvement_value
        else:
            print(f"Metric {metric} not found in Part 3 metrics.")
    # 2. Handle metrics where higher is better (most metrics) and where lower is better
    # 3. Return a dictionary with metric names and improvement percentages
    
    # Placeholder return - replace with your implementation
    return improve_dict