In [1]:
# Import libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import make_scorer
from scipy.stats import ttest_rel
import joblib
import time

In [None]:
# Import dataset
ETFD = pd.read_csv(r"../../Dataset/ETFD_Dataset.txt", sep="\t", low_memory=False)

In [None]:
# Determine dependent variables and class labels
X = ETFD.drop('Fraud', axis=1)
y = ETFD['Fraud']

In [None]:
# Define the custom scoring function
def custom_score(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    
    # Calculate precision and recall with zero_division=0 to handle undefined cases
    precision = precision_score(y_true, y_pred, average='macro', zero_division=0)
    recall = recall_score(y_true, y_pred, average='macro', zero_division=0)
    
    # Handle case where both precision and recall are zero
    if precision == 0 and recall == 0:
        return 0.0
    
    # Calculate the score
    score = 1 / ((((1 - precision) ** 2) * fp + (1 - recall) * fn) + 1)
    
    return score

In [None]:
# Create a dataframe to store results
results = pd.DataFrame(columns=[
    'Random State', 
    'Train Accuracy', 'Train Macro Precision', 'Train Macro Recall', 'Train Macro F1-Score', 'Train AUC',
    'Test Accuracy', 'Test Macro Precision', 'Test Macro Recall', 'Test Macro F1-Score', 'Test AUC',
    'Training Time', 'Validation Time', 'Tuning Time',
    'TP Train', 'TN Train', 'FP Train', 'FN Train',
    'TP Test', 'TN Test', 'FP Test', 'FN Test'
])

In [None]:
# Loop over random states
for random_state in range(1, 11):
    # Split data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=random_state)
    
    # Initialize KNN classifier
    KNN = KNeighborsClassifier()
    
    # Define hyperparameters grid for tuning
    param_grid = {
        'n_neighbors': [2, 3, 4, 5, 6, 7, 8, 9, 10, 20],
        'weights': ['uniform', 'distance'],
        'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute']
    }

    # Initialize GridSearchCV
    grid_search = GridSearchCV(estimator=KNN, param_grid=param_grid, n_jobs=-1, verbose=2, cv=10, scoring=make_scorer(custom_score))
    
    # Start hyperparameter tuning time
    start_tuning_time = time.time()
    
    # Fit GridSearchCV
    grid_search.fit(X_train, y_train)
    
    # End hyperparameter tuning time
    end_tuning_time = time.time()
    
    # Get best hyperparameters
    best_params = grid_search.best_params_
    
    # Save best hyperparameters to a text file
    with open(f'best_hyperparameters_{random_state}.txt', 'w') as f:
        f.write(str(best_params))
    
    # Get tuning time
    tuning_time = end_tuning_time - start_tuning_time
    
    # Get best estimator
    best_KNN = grid_search.best_estimator_
    
    # Save the best model
    joblib.dump(best_KNN, f'best_model_random_state_{random_state}.joblib')
    
    # Start training time for the best model
    start_training_time = time.time()
    
    # Train the model with the best parameters
    best_KNN.fit(X_train, y_train)
    
    # End training time for the best model
    end_training_time = time.time()
    
    # Get training time
    training_time = end_training_time - start_training_time
    
    # Start validation time
    start_validation_time = time.time()
    
    # Predict on training set
    y_train_pred = best_KNN.predict(X_train)
    
    # Predict on test set
    y_test_pred = best_KNN.predict(X_test)
    
    # End validation time
    end_validation_time = time.time()
    
    # Get validation time
    validation_time = end_validation_time - start_validation_time

    # Calculate TP, TN, FP, FN for training set
    tn_train, fp_train, fn_train, tp_train = confusion_matrix(y_train, y_train_pred).ravel()
    
    # Calculate TP, TN, FP, FN for test set
    tn_test, fp_test, fn_test, tp_test = confusion_matrix(y_test, y_test_pred).ravel()
    
    # Calculate training set evaluation metrics
    train_accuracy = accuracy_score(y_train, y_train_pred)
    train_precision = precision_score(y_train, y_train_pred, average='macro')
    train_recall = recall_score(y_train, y_train_pred, average='macro')
    train_f1 = f1_score(y_train, y_train_pred, average='macro')
    train_auc = roc_auc_score(y_train, y_train_pred)
    
    # Calculate test set evaluation metrics
    test_accuracy = accuracy_score(y_test, y_test_pred)
    test_precision = precision_score(y_test, y_test_pred, average='macro')
    test_recall = recall_score(y_test, y_test_pred, average='macro')
    test_f1 = f1_score(y_test, y_test_pred, average='macro')
    test_auc = roc_auc_score(y_test, y_test_pred)
    
    # Create a new row for the results
    new_row = {
        'Random State': random_state,
        'Train Accuracy': train_accuracy,
        'Train Macro Precision': train_precision,
        'Train Macro Recall': train_recall,
        'Train Macro F1-Score': train_f1,
        'Train AUC': train_auc,
        'Test Accuracy': test_accuracy,
        'Test Macro Precision': test_precision,
        'Test Macro Recall': test_recall,
        'Test Macro F1-Score': test_f1,
        'Test AUC': test_auc,
        'Training Time': training_time,
        'Validation Time': validation_time,
        'Tuning Time': tuning_time,
        'TP Train': tp_train,
        'TN Train': tn_train,
        'FP Train': fp_train,
        'FN Train': fn_train,
        'TP Test': tp_test,
        'TN Test': tn_test,
        'FP Test': fp_test,
        'FN Test': fn_test
    }
    
    # Append the new row to the DataFrame using pd.concat
    results = pd.concat([results, pd.DataFrame([new_row])], ignore_index=True)

# Save results to CSV file
results.to_csv('ETFD_KNN.csv', index=False)