# Data Preprocessing:
- Handle missing values and ensure that all relevant features are properly scaled.
- Analyze categorical and numerical variables to determine which are most important for churn prediction.
- Perform feature selection or engineering to improve model performance.

# Model Building:
- Implement K-Nearest Neighbors (KNN) from scratch. Pre-built libraries (e.g., scikit-learn, TensorFlow) for KNN are not allowed.
- Split the dataset and train the model to classify customers into churn and non-churn categories.
- Explore different values for K and choose the optimal one based on performance metrics.
- Tune the model by adjusting hyperparameters such as the distance metric (Euclidean, Manhattan, etc.).

# Model Evaluation:
- Evaluate the model using metrics such as accuracy, precision, recall, F1-score, area under the ROC curve.
- Use cross-validation to ensure the model is not overfitting to the training data.

In [145]:
import numpy as np
import pandas as pd

In [146]:
# Define the KNN class
class KNN:
    def __init__(self, k=3, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric
    
    def euclidean_distance(self, point1, point2):
        euclidean_distance = np.linalg.norm(point1 - point2)
        return euclidean_distance
    
    def manhattan_distance(self, point1, point2):
        manhattan_distance = np.sum(np.abs(point1 - point2))
        return manhattan_distance

    def cosine_distance(self, point1, point2):
        cosine_distance = 1 - np.dot(point1, point2) / (np.linalg.norm(point1) * np.linalg.norm(point2))
        return cosine_distance

    def chebyshev_distance(self, point1, point2):
        chebyshev_distance = np.max(np.abs(point1 - point2))
        return chebyshev_distance

    # Use Training data to store the data in KNN Model
    def fit(self, X, y):
        # TODO: Implement the fit method
        self.train_x = X
        self.train_y = y
        if (len(self.train_x) != len(self.train_y)):
            raise ValueError(f"Train X and Train Y size differ, Train size: {len(self.train_x)} Test size: {len(self.train_y)}")
        self.train_size = len(self.train_x)
    
    # Depends on the input distance Metric, calculate the distance based on the input
    def compute_distance(self, X1, X2):
        # TODO: Implement distance computation based on self.distance_metric
        # Hint: Use numpy operations for efficient computation
        if (self.distance_metric == "euclidean"):
            return self.euclidean_distance(X1, X2)
        elif self.distance_metric == 'manhattan':
            return self.manhattan_distance(X1, X2)
        elif self.distance_metric == 'cosine':
            return self.cosine_distance(X1, X2)
        elif self.distance_metric == 'chebyshev':
            return self.chebyshev_distance(X1, X2)
        else:
            raise ValueError(f"Unknown distance metric: {self.distance_metric}")
        
    def n_closest_to(self, X):
        # Array to save k closest distances
        distances = []
        
        # iterate the number of train values
        for i in range(self.train_size):
            distance = self.compute_distance(self.train_x[i], X)
            distances.append(distance)
        
        closest_points = np.argsort(distances)[:self.k]
        
        # [distances[i] for i in closest_points]
        return closest_points 
    
    def majority(self, closest_points):
        class_counts = {}
        
        for i in closest_points:
            label = self.train_y[i]
            if label in class_counts:
                class_counts[label]+= 1
            else:
                class_counts[label] = 1
        
        majority_class = max(class_counts, key=class_counts.get)
            
        return majority_class

    # Predict the Class of Test_X data 
    def predict(self, X_test):
        # TODO: Implement the predict method
        # X_test = X_test.to_numpy()
        predictions = []
        
        for index in  range(len(X_test)):
            closest_points = self.n_closest_to(X_test[index])
            predicted_class = self.majority(closest_points)
            predictions.append(predicted_class)
        
        return np.array(predictions)
    
    def predict_proba(self, X_test):
        # Simplified probability prediction method (for ROC AUC)
        # X_test = X_test.to_numpy()
        probabilities = []
        
        for index in range(len(X_test)): 
            closest_points = self.n_closest_to(X_test[index])
            closest_labels = self.train_y[closest_points]
            pos_prob = np.sum(closest_labels) / self.k  # Probability of positive class
            probabilities.append(pos_prob)
        return np.array(probabilities)

In [159]:
# Manually implementing one-hot encoding
def manual_one_hot_encoding(df, column):
    unique_values = df[column].unique()  # Find unique values in the column
    
    for unique_value in unique_values:
        new_column_name = f"{column}_{unique_value}"
        df[new_column_name] = df[column].apply(lambda x: 1 if x == unique_value else 0)
    
    df = df.drop(column, axis=1)  # Drop the original column
    return df

# Function to apply manual feature scaling
def manual_standard_scaler(df, column):
    mean = np.mean(df[column])
    std = np.std(df[column])
    
    # Apply the standardization formula
    df[column] = (df[column] - mean) / std
    return df

# Function to perform SVD and reduce dimensionality
def svd_reduction(X_train, X_test, n_components):
    # Step 1: Center the data
    X_train_centered = X_train - np.mean(X_train, axis=0)

    # Step 2: Compute SVD on the training set
    U, S, Vt = np.linalg.svd(X_train_centered)

    # Step 3: Select the top n_components
    U_reduced = U[:, :n_components]
    S_reduced = np.diag(S[:n_components])

    # Step 4: Reconstruct the training data using the reduced components
    X_train_reduced = np.dot(U_reduced, S_reduced)

    # Step 5: Center the test data using the training mean
    X_test_centered = X_test - np.mean(X_train, axis=0)
    # Step 6: Project the test data onto the same components
    X_test_reduced = np.dot(X_test_centered, Vt[:n_components, :].T)

    return X_train_reduced, X_test_reduced

# Define data preprocessing function
def preprocess_data_svd(train_path, test_path, n_components=2):
    train_data = pd.read_csv(train_path)
    X_test = pd.read_csv(test_path)
    
    y = train_data['Exited']
    X = train_data.drop(['Exited'], axis=1)
    
    X = X.drop(['id', 'CustomerId', 'Surname'], axis=1)
    X_test = X_test.drop(['id', 'CustomerId', 'Surname'], axis=1)
    
    X = manual_one_hot_encoding(X, 'Geography')
    X = manual_one_hot_encoding(X, 'Gender')
    
    X_test = manual_one_hot_encoding(X_test, 'Geography')
    X_test = manual_one_hot_encoding(X_test, 'Gender')
    
    # Apply standardization to specific columns
    for column in ['CreditScore', 'Age', 'Balance']:
        X = manual_standard_scaler(X, column)
        X_test = manual_standard_scaler(X_test, column)
    
    # Convert to NumPy arrays
    X = X.to_numpy()
    X_test = X_test.to_numpy()
    
    # Apply SVD for dimensionality reduction
    X_reduced, X_test_reduced = svd_reduction(X, X_test, n_components=n_components)

    # Handle categorical variables, scale features, etc.
    return X_reduced, y.to_numpy(), X_test_reduced
# Define data preprocessing function

def preprocess_data(train_path, test_path):
    train_data = pd.read_csv(train_path)
    X_test = pd.read_csv(test_path)
    
    y = train_data['Exited']
    X = train_data.drop(['Exited'], axis=1)
    
    X = X.drop(['id', 'CustomerId', 'Surname'], axis=1)
    X_test = X_test.drop(['id', 'CustomerId', 'Surname'], axis=1)
    
    X = manual_one_hot_encoding(X, 'Geography')
    X = manual_one_hot_encoding(X, 'Gender')
    
    X_test = manual_one_hot_encoding(X_test, 'Geography')
    X_test = manual_one_hot_encoding(X_test, 'Gender')
    
    # Apply standardization to specific columns
    for column in ['CreditScore', 'Age', 'Balance']:
        X = manual_standard_scaler(X, column)
        X_test = manual_standard_scaler(X_test, column)
    
    # Convert to NumPy arrays
    X = X.to_numpy()
    X_test = X_test.to_numpy()
    
    # Apply SVD for dimensionality reduction

    # Handle categorical variables, scale features, etc.
    return X, y.to_numpy(), X_test

In [148]:
def compute_metrics(y_true, y_pred):
    true_positive = np.sum((y_true == 1) & (y_pred == 1))
    true_negative = np.sum((y_true == 0) & (y_pred == 0))
    false_positive = np.sum((y_true == 0) & (y_pred == 1))
    false_negative = np.sum((y_true == 1) & (y_pred == 0))

    accuracy = (true_positive + true_negative) / len(y_true)
    precision = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0
    recall = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return accuracy, precision, recall, f1

def compute_roc_auc(y_true, y_scores):
    thresholds = np.linspace(0, 1, 100)
    tpr = []
    fpr = []
    
    # TP, FP, TN, FN 계산
    for threshold in thresholds:
        y_pred =  (y_scores >= threshold).astype(int)
        true_positive = np.sum((y_true == 1) & (y_pred == 1))
        true_negative = np.sum((y_true == 0) & (y_pred == 0))
        false_positive = np.sum((y_true == 0) & (y_pred == 1))
        false_negative = np.sum((y_true == 1) & (y_pred == 0))

        # TPR (True Positive Rate)
        tpr_value = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0  # TPR 정의
        tpr.append(tpr_value)

        # FPR (False Positive Rate)
        fpr_value = false_positive / (false_positive + true_negative) if (false_positive + true_negative) > 0 else 0  # FPR 정의
        fpr.append(fpr_value)


    # ROC 곡선 아래 면적 계산 (AUC)
    tpr = np.array(tpr)
    fpr = np.array(fpr)
    
    # Make sure we have at least two points for AUC calculation
    if len(tpr) < 2 or len(fpr) < 2:
        raise ValueError("Not enough points to compute AUC.")
    
    roc_auc = np.trapz(tpr, fpr)  # Trapezoidal rule for AUC

    return roc_auc

# Define cross-validation function
def cross_validate(X, y, knn, n_splits=5):
    # TODO: Implement cross-validation
    # Compute ROC AUC scores
    fold_size = len(X) // n_splits
    accuracy_list = []
    precision_list = []
    recall_list = []
    f1_list = []
    roc_auc_list = []

    for i in range(n_splits):
        # Split the data
        start = i * fold_size
        end = start + fold_size if i < n_splits - 1 else len(X)
        
        X_train = np.concatenate([X[:start], X[end:]])
        y_train = np.concatenate([y[:start], y[end:]])
        X_test = X[start:end]
        y_test = y[start:end]
        
        # Fit the KNN model
        knn.fit(X_train, y_train)
        
        # Make predictions
        y_pred = knn.predict(X_test)
        y_scores = knn.predict_proba(X_test)

        # Calculate metrics
        accuracy, precision, recall, f1 = compute_metrics(y_test, y_pred)
        roc_auc = compute_roc_auc(y_test, y_scores)

        # Append the metrics to the lists
        accuracy_list.append(accuracy)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)
        roc_auc_list.append(roc_auc)

    # Calculate the mean of each metric
    mean_accuracy = np.mean(accuracy_list)
    mean_precision = np.mean(precision_list)
    mean_recall = np.mean(recall_list)
    mean_f1 = np.mean(f1_list)
    mean_roc_auc = np.mean(roc_auc_list)

    # Return the average metrics
    return {
        'accuracy': mean_accuracy,
        'precision': mean_precision,
        'recall': mean_recall,
        'f1_score': mean_f1,
        'roc_auc': mean_roc_auc
    }

In [144]:

X_train = np.array([[0.1, 0.1],
                    [0.4, 0.4],
                    [0.5, 0.5],
                    [0.9, 0.8],
                    [0.8, 0.6]])



X_test = np.array([[0.1, 0.1],  # Close to first training point
                   [0.4, 0.4],  # Close to second training point
                   [0.5, 0.5],  # Close to third training point
                   [0.9, 0.9],  # Close to fourth training point
                   [0.8, 0.7]]) # Close to fifth training point

y_true = np.array([0, 1, 1, 0, 1])  # 실제 레이블

knn = KNN(k=1, distance_metric='euclidean')

knn.fit(X_train, y_true)
prediction = knn.predict(X_test)
probabilities = knn.predict_proba(X_test)
print("prediction : ", prediction, "\n")
print("probabilities :", probabilities)
y_scores = [0.1, 0.4, 0.35, 0.8, 0.7]  # 예측 확률
y_true = np.array(y_true)
y_scores = np.array(y_scores)
# AUC 계산
auc_score = compute_roc_auc(y_true, probabilities)
# auc_score = roc_auc_score(y_true, y_scores, 0.5)

print(f"AUC: {auc_score}")

prediction :  [0 1 1 0 1] 

probabilities : [0. 1. 1. 0. 1.]
AUC: -1.0


In [154]:
# Load and preprocess data
X, y, X_test = preprocess_data('cs-506-predicting-customer-churn-using-knn/train.csv', 'cs-506-predicting-customer-churn-using-knn/test.csv', 2)

# Create and evaluate model
knn = KNN(k=5, distance_metric='manhattan')

# Perform cross-validation
cv_scores = cross_validate(X, y, knn)

print("Cross-validation scores:", cv_scores)


Cross-validation scores: {'accuracy': 0.7664, 'precision': 0.2385216849400313, 'recall': 0.06956743267647551, 'f1_score': 0.10735214393219628, 'roc_auc': -0.5096462263694129}


In [149]:
# Load and preprocess data
X, y, X_test = preprocess_data('cs-506-predicting-customer-churn-using-knn/train.csv', 'cs-506-predicting-customer-churn-using-knn/test.csv')

# Create and evaluate model
knn = KNN(k=5, distance_metric='euclidean')

# Perform cross-validation
cv_scores = cross_validate(X, y, knn)

print("Cross-validation scores:", cv_scores)

def compute_model_score(X, y, k, distance_metric):
    # KNN 모델 생성 및 평가
    knn = KNN(k=k, distance_metric=distance_metric)
    scores = cross_validate(X, y, knn)  # 사용자 정의 cross-validation 함수
    return scores # 평균 점수 반환

# TODO: hyperparamters tuning
k_values = [3, 5, 10, 15]
distance_metrics = ['euclidean', 'manhattan']

best_score = 0
best_params = {}

for k in k_values:
    for distance_metric in distance_metrics:
        score = compute_model_score(X, y, k, distance_metric)
        print(f"k: {k}, distance_metric: {distance_metric}, Score: {score}\n")



# TODO: Train on full dataset with optimal hyperparameters and make predictions on test set
# knn = ...
# knn.fit(X, y)
# test_predictions = knn.predict(X_test)

# # Save test predictions
# pd.DataFrame({'id': pd.read_csv('/path/of/test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)

Cross-validation scores: {'accuracy': 0.7716000000000001, 'precision': 0.2668292377308332, 'recall': 0.0725209283940105, 'f1_score': 0.11375525816834915, 'roc_auc': -0.5335686112592383}
k: 3, distance_metric: euclidean, Score: {'accuracy': 0.7558666666666667, 'precision': 0.28402735013687613, 'recall': 0.13439466270438308, 'f1_score': 0.18205683315625296, 'roc_auc': -0.5442510683978025}

k: 3, distance_metric: manhattan, Score: {'accuracy': 0.7641333333333333, 'precision': 0.3159710056110793, 'recall': 0.14201601155555837, 'f1_score': 0.19571620765256964, 'roc_auc': -0.5759630274095184}

k: 3, distance_metric: chebyshev, Score: {'accuracy': 0.7461333333333333, 'precision': 0.24686502150063233, 'recall': 0.1231219750833908, 'f1_score': 0.1637603948192148, 'roc_auc': -0.5218395882741795}

k: 5, distance_metric: euclidean, Score: {'accuracy': 0.7716000000000001, 'precision': 0.2668292377308332, 'recall': 0.0725209283940105, 'f1_score': 0.11375525816834915, 'roc_auc': -0.5335686112592383}


In [156]:
# Load and preprocess data
def compute_model_score(X, y, k, n_component):
    X, y, X_test = preprocess_data('cs-506-predicting-customer-churn-using-knn/train.csv', 'cs-506-predicting-customer-churn-using-knn/test.csv', n_component)
    # KNN 모델 생성 및 평가
    knn = KNN(k=k, distance_metric='manhattan')
    scores = cross_validate(X, y, knn)  # 사용자 정의 cross-validation 함수
    return scores # 평균 점수 반환

# hyperparamters tuning
k_values = [15, 20, 25]
n_values = [2, 5, 8]


for k in k_values:
    for n in n_values:
        score = compute_model_score(X, y, k, n)
        print(f"k: {k}, N_components: {n}, Score: {score}\n")

k: 15, N_components: 2, Score: {'accuracy': 0.7943333333333334, 'precision': 0.18596638655462186, 'recall': 0.004621540761361795, 'f1_score': 0.008979599302460535, 'roc_auc': -0.5045293925101082}

k: 15, N_components: 5, Score: {'accuracy': 0.7952, 'precision': 0.26412698412698415, 'recall': 0.0062774633503845404, 'f1_score': 0.0122109750615764, 'roc_auc': -0.5265599206468567}

k: 15, N_components: 8, Score: {'accuracy': 0.7952666666666667, 'precision': 0.253015873015873, 'recall': 0.0059527880257092165, 'f1_score': 0.011577089711257926, 'roc_auc': -0.5306467000315396}

k: 20, N_components: 2, Score: {'accuracy': 0.7968666666666666, 'precision': 0.35189393939393937, 'recall': 0.004331691849373528, 'f1_score': 0.008503208085010436, 'roc_auc': -0.5077867063411559}

k: 20, N_components: 5, Score: {'accuracy': 0.7970666666666666, 'precision': 0.5241666666666667, 'recall': 0.003634613781057331, 'f1_score': 0.007166918929606902, 'roc_auc': -0.5261602213103341}

k: 20, N_components: 8, Score:

In [160]:
# TODO: Train on full dataset with optimal hyperparameters and make predictions on test set
# Load and preprocess data
X, y, X_test = preprocess_data('cs-506-predicting-customer-churn-using-knn/train.csv', 'cs-506-predicting-customer-churn-using-knn/test.csv')

knn = KNN(k=20, distance_metric='manhattan')
knn.fit(X, y)
test_predictions = knn.predict(X_test)

# # Save test predictions
pd.DataFrame({'id': pd.read_csv('cs-506-predicting-customer-churn-using-knn/test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)