## Preparation for the modeling

In [1]:
# Import required libraries
!pip install scikit-learn==1.3.2 imbalanced-learn==0.11.0
!conda install -c conda-forge boruta_py -y
!conda install -c conda-forge imbalanced-learn=0.10.1 -y
import numpy as np
np.int = int
np.float = float
np.bool = bool
np.object = object
np.str = str
import pandas as pd 
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from boruta import BorutaPy
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neighbors import KNeighborsClassifier
from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import warnings
warnings.filterwarnings('ignore')

Channels:
 - conda-forge
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.

Channels:
 - conda-forge
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.



### Build the dicitonary for modeling

In [12]:
models_dict = {
    "Logistic Regression": LogisticRegression(random_state=42, max_iter=1000),
    "Linear Discriminant Analysis": LinearDiscriminantAnalysis(),
    "Gradient Boosting": GradientBoostingClassifier(random_state=42),
    "K Nearest Neighbour": KNeighborsClassifier()
}

balancers = {
    "SMOTE": lambda: SMOTE(sampling_strategy=1.0, random_state=42),
    "ADASYN": lambda: ADASYN(sampling_strategy=1.0, random_state=42),
    "RandomOverSampler": lambda: RandomOverSampler(sampling_strategy=1.0, random_state=42)
}

scalers = {
    "StandardScaler": StandardScaler,
    "MinMaxScaler": MinMaxScaler,
    "No Scaling": None
}

### Export format of Cols, results and a funcaiton

In [3]:
import os
from datetime import datetime

# Define the column structure
columns = [
    'imputer', 'balancer', 'scaler', 'model', 'accuracy',
    'precision_class_-1', 'recall_class_-1', 'f1_class_-1',
    'precision_class_1', 'recall_class_1', 'f1_class_1',
    'macro_f1', 'weighted_f1',
    'confusion_tp', 'confusion_tn', 'confusion_fp', 'confusion_fn',
    'specificity_class_-1',
    'notebook_id', 'timestamp',
    'normality_test'  
]
# Example result (can be replaced with a loop or real data)
results = [{
    'imputer': 'imputer_name',
    'balancer': 'balancer_name',
    'scaler': 'scaler_name',
    'model': 'model_name',
    'accuracy': 0.796,
    'precision_class_-1': 0.13,
    'recall_class_-1': 0.29,
    'f1_class_-1': 0.18,
    'precision_class_1': 0.13,
    'recall_class_1': 0.29,
    'f1_class_1': 0.18,
    'macro_f1': 0.53,
    'weighted_f1': 0.83,
    'notebook_id': "Yitsen",
    'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'normality_test': 0
}]

results_df = pd.DataFrame(results)
output_file = "model_comparison_results.csv"

# Create file if it doesn't exist
if not os.path.exists(output_file):
    # Save header with empty row if needed
    pd.DataFrame(columns=columns).to_csv(output_file, index=False)
    print(f"Created empty results file with correct structure: {output_file}")


# Function to append results to CSV
def append_results_to_csv(new_results, output_file="model_comparison_results.csv"):
    output_dir = os.path.dirname(output_file)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)

    results_df = pd.DataFrame(new_results)

    if os.path.exists(output_file):
        existing = pd.read_csv(output_file)

        id_cols = ["notebook_id", "imputer", "scaler", "balancer", "model"]
        existing_filtered = existing[~existing[id_cols].apply(tuple, axis=1).isin(
            results_df[id_cols].apply(tuple, axis=1)
        )]

        final = pd.concat([existing_filtered, results_df], ignore_index=True)
    else:
        final = results_df

    final.to_csv(output_file, index=False)
    print(f"Updated {len(new_results)} rows in {output_file}")


### Set the file path

In [4]:
import os

code_path = "./Code"
if os.path.exists(code_path):
    os.chdir(code_path)
    print("Current working directory:", os.getcwd())
else:
    print(f"Path not found: {code_path}")

📂 Current working directory: C:\Users\Yi-TsenLin\Desktop\HTW\25SOSE\Data Analytics II\00_Assignment\Code


### Imputation in KNN (avoid data leakage)

In [5]:
import pandas as pd
#  KNNImputer （ train_KNN as train set）

train = pd.read_csv("train_KNN.csv")
test = pd.read_csv("test_revised.csv")

# sepsration of feature and label
X_train = train.drop(columns=['label'])
y_train = train['label']
X_test = test.drop(columns=['label'])
y_test = test['label']

# Remove non-numeric fields (especially timestamps)
non_numeric_cols = X_train.select_dtypes(exclude=['number']).columns
X_train = X_train.drop(columns=non_numeric_cols)
X_test = X_test.drop(columns=non_numeric_cols)

# Fit only on train !!!imporatant!!!
imputer = KNNImputer(n_neighbors=5)
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed = imputer.transform(X_test)  #transform only

# merge 
train_imputed = pd.DataFrame(X_train_imputed, columns=X_train.columns)
train_imputed['label'] = y_train.values
test_imputed = pd.DataFrame(X_test_imputed, columns=X_test.columns)
test_imputed['label'] = y_test.values


### Feature Selection in KNN - Boruta

In [6]:
# Drop 'timestamp' if exists
for df in [train_imputed, test_imputed]:
    if 'timestamp' in df.columns:
        df.drop(columns=['timestamp'], inplace=True)

# Split into features and target
X_train = train_imputed.drop(columns=["label"])
y_train = train_imputed["label"]

# Fit Boruta
rf = RandomForestClassifier(n_jobs=-1, class_weight='balanced', max_depth=5, random_state=42)
boruta = BorutaPy(estimator=rf, n_estimators='auto', random_state=42)
boruta.fit(X_train.values, y_train.values)

# Select features
selected_cols = X_train.columns[boruta.support_]
X_train_selected = X_train[selected_cols]
X_test_selected = test_imputed[selected_cols]

#  merge target into the original data set
train_selected = X_train_selected.copy()
train_selected["label"] = y_train.values

test_selected = X_test_selected.copy()
test_selected["label"] = y_test.loc[X_test_selected.index].values

# export CSV
train_selected.to_csv("KNN_train_boruta.csv", index=False)
test_selected.to_csv("KNN_test_boruta.csv", index=False)

print("File saved：KNN_train_boruta.csv, KNN_test_boruta.csv")

✅ File saved：KNN_train_boruta.csv, KNN_test_boruta.csv


### Imputation in MICE (avoid data leakage)

In [7]:
#  MICE （ train_MICE as train set）

train = pd.read_csv("train_MICE.csv")
test = pd.read_csv("test_revised.csv")

# sepsration of feature and label
X_train = train.drop(columns=['label'])
y_train = train['label']
X_test = test.drop(columns=['label'])
y_test = test['label']

# Remove non-numeric fields (especially timestamps)
non_numeric_cols = X_train.select_dtypes(exclude=['number']).columns
X_train = X_train.drop(columns=non_numeric_cols)
X_test = X_test.drop(columns=non_numeric_cols)

# Fit only on train !!!imporatant!!!
imputer = KNNImputer(n_neighbors=5)
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed = imputer.transform(X_test)  #transform only

# merge 
train_imputed = pd.DataFrame(X_train_imputed, columns=X_train.columns)
train_imputed['label'] = y_train.values
test_imputed = pd.DataFrame(X_test_imputed, columns=X_test.columns)
test_imputed['label'] = y_test.values

### Feature Selection in MICE - Boruta

In [8]:
# Drop 'timestamp' if exists
for df in [train_imputed, test_imputed]:
    if 'timestamp' in df.columns:
        df.drop(columns=['timestamp'], inplace=True)

# Split into features and target
X_train = train_imputed.drop(columns=["label"])
y_train = train_imputed["label"]

# Fit Boruta
rf = RandomForestClassifier(n_jobs=-1, class_weight='balanced', max_depth=5, random_state=42)
boruta = BorutaPy(estimator=rf, n_estimators='auto', random_state=42)
boruta.fit(X_train.values, y_train.values)

# Select features
selected_cols = X_train.columns[boruta.support_]
X_train_selected = X_train[selected_cols]
X_test_selected = test_imputed[selected_cols]

#  merge target into the original data set
train_selected = X_train_selected.copy()
train_selected["label"] = y_train.values

test_selected = X_test_selected.copy()
test_selected["label"] = y_test.loc[X_test_selected.index].values

# export CSV
train_selected.to_csv("MICE_train_boruta.csv", index=False)
test_selected.to_csv("MICE_test_boruta.csv", index=False)

print("File saved：MICE_train_boruta.csv, MICE_test_boruta.csv")

✅ File saved：MICE_train_boruta.csv, MICE_test_boruta.csv


## Modeling

### Function

In [9]:
from sklearn.preprocessing import PowerTransformer
from statsmodels.stats.diagnostic import lilliefors

def run_model_experiment(X_train, X_test, y_train, y_test, balancer_name, models_dict, scaler, notebook_id, imputer_name="KNN"):
    results = []

    # Apply scaler if needed
    if scaler:
        scaler_instance = scaler()
        X_train = scaler_instance.fit_transform(X_train)
        X_test = scaler_instance.transform(X_test)
        scaler_name = scaler.__name__
    else:
        scaler_name = "None"

    # Apply balancing
    if balancer_name == "Manual_80_20":
        X_res, y_res = resample_to_ratio(pd.DataFrame(X_train), pd.Series(y_train), positive_ratio=0.8)
    else:
        balancer_instance = balancers[balancer_name]() 
        X_res, y_res = balancer_instance.fit_resample(X_train, y_train)

    # Loop through models
    for model_name, model in models_dict.items():
        X_res_model = X_res.copy()
        X_test_model = X_test.copy()

        # Normality testing and transformation (only for specific models)
        if model_name in ["Logistic Regression", "Linear Discriminant Analysis"]:
            print(f"[{model_name}] Checking normality & applying Yeo-Johnson...")

            # 確保是 numpy
            if isinstance(X_res_model, pd.DataFrame):
                X_res_model = X_res_model.values
                X_test_model = X_test_model.values

            transformer = PowerTransformer(method='yeo-johnson', standardize=False)
            columns_to_transform = []
            for i in range(X_res_model.shape[1]):
                col_data = X_res_model[:, i]
                _, p_value = lilliefors(col_data)
                if p_value < 0.05:
                    columns_to_transform.append(i)

            if columns_to_transform:
                X_res_model[:, columns_to_transform] = transformer.fit_transform(X_res_model[:, columns_to_transform])
                X_test_model[:, columns_to_transform] = transformer.transform(X_test_model[:, columns_to_transform])
                print(f"Yeo-Johnson applied to {len(columns_to_transform)} features.")
            else:
                print("All features passed normality test. No transformation needed.")

        # 模型訓練與預測
        model.fit(X_res_model, y_res)
        y_pred = model.predict(X_test_model)

        # Convert y_test and y_pred to float for consistency
        y_test_str = y_test.astype(float)
        y_pred_str = pd.Series(y_pred).astype(float)

         # Accuracy and classification report
        acc = accuracy_score(y_test_str, y_pred_str)
        report = classification_report(y_test_str, y_pred_str, output_dict=True)

        # Confusion matrix
        labels = [-1.0, 1.0]
        try: 
        # if '-1.0' in y_test_str.unique() else ['-1', '1']
            cm = confusion_matrix(y_test_str, y_pred_str, labels=labels)
            if cm.shape == (2, 2):
                tn, fp, fn, tp = cm.ravel()  
            else: 
                tn = fp = fn = tp = 0
        except Exception as e:
            print("Error computing confusion matrix:", e)
            tn = fp = fn = tp = 0

        # Specificity
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        report = classification_report(y_test_str, y_pred_str, labels=labels, output_dict=True)

        print("Classification Report:\n", classification_report(y_test_str, y_pred_str))

        results.append({
            'imputer': imputer_name,
            'balancer': balancer_name,
            'scaler': scaler_name,
            'model': model_name,
            'accuracy': acc,
            'precision_class_-1': report.get('-1.0', report.get('-1', {})).get('precision', 0),
            'recall_class_-1': report.get('-1.0', report.get('-1', {})).get('recall', 0),
            'f1_class_-1': report.get('-1.0', report.get('-1', {})).get('f1-score', 0),
            'precision_class_1': report.get('1.0', report.get('1', {})).get('precision', 0),
            'recall_class_1': report.get('1.0', report.get('1', {})).get('recall', 0),
            'f1_class_1': report.get('1.0', report.get('1', {})).get('f1-score', 0),
            'macro_f1': report['macro avg']['f1-score'],
            'weighted_f1': report['weighted avg']['f1-score'],
            'confusion_tp': tp,
            'confusion_tn': tn,
            'confusion_fp': fp,
            'confusion_fn': fn,
            'specificity_class_-1': specificity,
            'notebook_id': notebook_id,
        })
        
    print("y_test_str:", pd.Series(y_test_str).value_counts().to_dict())
    print("y_pred_str:", pd.Series(y_pred_str).value_counts().to_dict())
    return results

### Loop for iterating each result

In [10]:
def run_full_pipeline(dataset_list, models_dict, scalers, balancers, output_file="model_comparison_results.csv"):
    """
    Execute the complete model training process and write the results to CSV.

    Parameters:
    - dataset_list: list of tuples → [(train_df, test_df, "KNN"), (train_df, test_df, "MICE")]
    - models_dict: Model list
    - scalers: Standardization methods
    - balancers: Resampling methods
    - output_file: Output file path
    """
    for train_df, test_df, imputer_name in dataset_list:
        notebook_id = f"Yitsen_{imputer_name}_CF"
        print(f"\n📂 Running pipeline for imputer = {imputer_name}")

        # Iterate through all scaler × balancer combinations
        for scaler_name, scaler_class in scalers.items():
            for balancer_name in balancers:
                all_results = run_model_experiment(
                    X_train=train_df.drop(columns=['label']),
                    X_test=test_df.drop(columns=['label']),
                    y_train=train_df['label'],
                    y_test=test_df['label'],
                    balancer_name=balancer_name,
                    models_dict=models_dict,
                    scaler=scaler_class,
                    notebook_id=notebook_id,
                    imputer_name=imputer_name
                )
                append_results_to_csv(all_results, output_file)


### Tuples for the two imputation data sets

In [None]:
# Read in the data you have divided up
train_knn = pd.read_csv("KNN_train_boruta.csv")
test_knn = pd.read_csv("KNN_test_boruta.csv")

train_mice = pd.read_csv("MICE_train_boruta.csv")
test_mice = pd.read_csv("MICE_test_boruta.csv")

# Pass in list of (train_df, test_df, imputer_name)
file_list = [
    (train_knn, test_knn, "KNN"),
    (train_mice, test_mice, "MICE")
]

# Run
run_full_pipeline(file_list, models_dict, scalers, balancers)
