In [5]:
import numpy as np
import pandas as pd

In [6]:
# Define the KNN class
class KNN:
    def __init__(self, k=3, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric

    def fit(self, X, y):
        self.x_train = X
        self.y_train = y

    def predict(self, X):
        dist = self.compute_distance(X, self.x_train)
        neighbors = np.argsort(dist)[:, :self.k]
        neighbor_distances = np.take_along_axis(dist, neighbors, axis=1)
        knn_labels = self.y_train[neighbors]

        
#         return np.array([self.classify(labels) for labels in knn])
        return np.array([self.classify(labels, distances) for labels, distances in zip(knn_labels, neighbor_distances)])

    def compute_distance(self, X1, X2):
        # distance computation based on self.distance_metric
        # Hint: Use numpy operations for efficient computation
        if self.distance_metric == 'euclidean':
            return np.sqrt(np.sum((X1[:, np.newaxis, :] - X2[np.newaxis, :, :]) ** 2, axis=2))
    
    def classify(self, labels, distances):
        # Returns the most common element in a list

        # Calculate weights based on distance (w = 1 / d^2)
        weights = 1 / (np.array(distances) ** 2)

        # Accumulate the total weights and the weights for label 1
        total_weight = np.sum(weights)
        weight_for_1 = np.sum([weight for label, weight in zip(labels, weights) if label == 1])

        # Return the probability of label 1
        probability_of_1 = weight_for_1 / total_weight

        return probability_of_1

In [7]:
# Define data preprocessing function
def preprocess_data(train_path, test_path):
    train_data = pd.read_csv(train_path)
    test_data = pd.read_csv(test_path)

    # data preprocessing
    # Handle categorical variables, scale features, etc.
    train_data = train_data.drop(columns=['id', 'CustomerId', 'Surname'])
    X = train_data.drop('Exited', axis=1)
    y = train_data["Exited"]
    X_test = test_data.drop(columns=['id', 'CustomerId', 'Surname'])

    categorical_cols = X.select_dtypes(include=['object']).columns
    numerical_cols = X.select_dtypes(include=['float64', 'int64']).columns

    for col in numerical_cols:
        X.fillna({col: X[col].mean()}, inplace=True)
        X_test.fillna({col: X_test[col].mean()}, inplace=True)

    for col in categorical_cols:
        X.fillna({col: X[col].mode()[0]}, inplace=True)
        X_test.fillna({col: X_test[col].mode()[0]}, inplace=True)

        X = pd.get_dummies(X, columns=[col], drop_first=True, dtype=float)
        X_test = pd.get_dummies(X_test, columns=[col], drop_first=True, dtype=float)

    X[numerical_cols] = (X[numerical_cols] - X[numerical_cols].mean()) / X[numerical_cols].std()
    X_test[numerical_cols] = (X_test[numerical_cols] - X_test[numerical_cols].mean()) / X_test[numerical_cols].std()

    X = X.to_numpy()
    y = y.to_numpy()
    X_test = X_test.to_numpy()

    return X, y, X_test

In [8]:
# Define cross-validation function
def cross_validate(X, y, knn, n_splits=5):
    # Compute ROC AUC scores
    indices = np.arange(len(X))
    np.random.shuffle(indices)

    fold_size = len(X) // n_splits
    folds = [indices[i * fold_size:(i + 1) * fold_size] for i in range(n_splits)]

    auc_scores = []

    for i in range(n_splits):
        val_indices = folds[i]
        train_indices = np.concatenate([folds[j] for j in range(n_splits) if j != i])

        X_train, X_val = X[train_indices], X[val_indices]
        y_train, y_val = y[train_indices], y[val_indices]

        knn.fit(X_train, y_train)

        distances = knn.compute_distance(X_val, X_train)

        knn_indices = np.argsort(distances, axis=1)[:, :knn.k]

        k_nearest_labels = y_train[knn_indices]
        k_nearest_distances = np.take_along_axis(distances, knn_indices, axis=1)
        predictions = np.array([knn.classify(labels, dists) for labels, dists in zip(k_nearest_labels, k_nearest_distances)])

        min_distances = np.min(distances[np.arange(distances.shape[0])[:, np.newaxis], knn_indices], axis=1)

        scores = 1 / (min_distances + 1e-8)

        auc = calculate_roc_auc(y_val, scores)
        auc_scores.append(auc)

    return np.mean(auc_scores)

def calculate_roc_auc(y_true, y_scores):

    sorted_indices = np.argsort(y_scores)
    y_true_sorted = y_true[sorted_indices]

    tps = np.cumsum(y_true_sorted)
    fps = np.arange(1, len(y_true_sorted) + 1) - tps

    tpr = tps / tps[-1]
    fpr = fps / fps[-1]

    return np.trapz(tpr, fpr)

In [9]:
# Load and preprocess data
X, y, X_test = preprocess_data('./train.csv', './test.csv')

# Create and evaluate model
knn = KNN(k=5, distance_metric='euclidean')

# Perform cross-validation
cv_scores = cross_validate(X, y, knn)

print("Cross-validation scores:", cv_scores)

# TODO: hyperparamters tuning
best_k = None
best_metric = 'euclidean'
best_score = -1

for k in range(3, 20, 2):
    knn = KNN(k=k, distance_metric='euclidean')
    score = cross_validate(X, y, knn)
    print(f"k={k}, metric='euclidean', score={score}")
    if score > best_score:
        best_score = score
        best_k = k

print(f"Best score: k={best_k}, metric={best_metric}, score={best_score}")

Cross-validation scores: 0.6757753464366948
k=3, metric='euclidean', score=0.6761039816229604
k=5, metric='euclidean', score=0.6771442544090777
k=7, metric='euclidean', score=0.6778701893147059
k=9, metric='euclidean', score=0.6801464126204845
k=11, metric='euclidean', score=0.6750567580640362
k=13, metric='euclidean', score=0.679589192317484
k=15, metric='euclidean', score=0.6761398311874314
k=17, metric='euclidean', score=0.676211431335607
k=19, metric='euclidean', score=0.6756079160404544
Best score: k=9, metric=euclidean, score=0.6801464126204845


In [10]:
# Train on full dataset with optimal hyperparameters and make predictions on test set
knn = KNN(k=9, distance_metric='euclidean')
knn.fit(X, y)
test_predictions = knn.predict(X_test)

# Save test predictions
pd.DataFrame({'id': pd.read_csv('./test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)