In [None]:
import numpy as np
import pandas as pd

In [None]:
# Define the KNN class
class KNN:
    def __init__(self, k=5, distance_metric='manhattan'):
        self.k = k
        self.distance_metric = distance_metric

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y.reset_index(drop=True)

    def predict(self, X):
        #print("Predicting...")
        probabilities = []
    
    # 遍历每个输入样本
        for x in X:
            # 计算当前样本 x 与所有训练样本的距离
            distances = [self.compute_distance(x, x_train) for x_train in self.X_train]
            # 获取最近的 k 个邻居的索引
            k_indices = np.argsort(distances)[:self.k]
            # 根据最近邻的标签计算类别 1 的概率
            prob = np.mean(self.y_train[k_indices])  # 标签 1 的概率
            # 将概率存入列表
            probabilities.append(prob)
        
        return np.array(probabilities)

            

    def compute_distance(self, X1, X2):
        if self.distance_metric == 'euclidean':
            return np.sqrt(np.sum((X1 - X2) ** 2))
        elif self.distance_metric == 'manhattan':
            return np.sum(np.abs(X1 - X2))
        elif self.distance_metric == 'cosine':
            # 防止除以零的情况，加入一个微小的偏移量 1e-10
            dot_product = np.dot(X1, X2)
            norm_X1 = np.linalg.norm(X1)
            norm_X2 = np.linalg.norm(X2)
            cosine_similarity = dot_product / (norm_X1 * norm_X2 + 1e-10)
            return 1 - cosine_similarity

In [None]:
import pandas as pd
import numpy as np


# def remove_outliers(X, y, features, threshold=2.5):
#     """根据 Z-score 去除离群点，并返回过滤后的 X 和 y。"""
#     z_scores = np.abs(zscore(X[features]))
#     mask = (z_scores < threshold).all(axis=1)  # 过滤掉有离群点的行

#     X_filtered = X[mask]
#     y_filtered = y[mask]

#     # 重置索引，确保 X 和 y 对齐
#     X_filtered = X_filtered.reset_index(drop=True)
#     y_filtered = y_filtered.reset_index(drop=True)

#     return X_filtered, y_filtered
def standard_scale(X):
    return (X - X.mean(axis=0)) / X.std(axis=0)

def one_hot_encode(df, columns):
    return pd.get_dummies(df, columns=columns)

def preprocess_data(train_path, test_path):
    
    train_data = pd.read_csv(train_path)[:5000]
    test_data = pd.read_csv(test_path)

    
    X_train = train_data.drop(['Exited'], axis=1)
    y_train = train_data['Exited']

   
    X_train = X_train.drop(['id','CustomerId','Surname'], axis=1)
    X_test = test_data.drop(['id','CustomerId','Surname'], axis=1)

    
    numerical_cols = ['CreditScore', 'Age', 'Tenure', 'Balance', 
                        'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
    categorical_cols = ['Geography', 'Gender']

    

    X_train[numerical_cols] = standard_scale(X_train[numerical_cols])
    X_test[numerical_cols] = standard_scale(X_test[numerical_cols])

    # 独热编码分类特征
    X_train = one_hot_encode(X_train, categorical_cols)
    X_test = one_hot_encode(X_test, categorical_cols)

    # 保证训练集和测试集的列一致
    X_test = X_test.reindex(columns=X_train.columns, fill_value=0)

    return X_train.values, y_train.values, X_test.values



In [None]:
def cross_validate(X, y, knn, n_splits=2):
    fold_size = len(X) // n_splits
    scores = []

    for i in range(n_splits):
        start, end = i * fold_size, (i + 1) * fold_size
        X_val, y_val = X[start:end], y[start:end]
        X_train = np.concatenate([X[:start], X[end:]])
        y_train = np.concatenate([y[:start], y[end:]])

        knn.fit(X_train, pd.Series(y_train))
        y_pred = knn.predict(X_val)

        # 计算准确率作为评分指标
        score = np.mean((y_pred > 0.5) == y_val)
        scores.append(score)

    return np.mean(scores)


In [None]:

def hyperparameter_tuning(X, y):
    # the range of k is just an example, since it takes too long to choose. I make them run several times manually and record the score manually.
    param_grid = {'k': [15], 'distance_metric': [ 'euclidean']}
    best_score = 0
    best_params = {}
    
    for k in param_grid['k']:
        for metric in param_grid['distance_metric']:
            knn = KNN(k=k, distance_metric=metric)
            cv_scores = cross_validate(X, y, knn)
            avg_score = np.mean(cv_scores)
            print(k,cv_scores)
            if avg_score > best_score:
                best_score = avg_score
                best_params = {'k': k, 'distance_metric': metric}
    
    return best_params, best_score


In [None]:
# Load and preprocess data
X, y, X_test = preprocess_data('cs-506-predicting-customer-churn-using-knn/train.csv', 'cs-506-predicting-customer-churn-using-knn/test.csv')


# train_data = pd.read_csv('cs-506-predicting-customer-churn-using-knn/train.csv')
# train_data

with outlier :std=3 --- 0.78-0.79
pca : 10 会降低


In [None]:
best_params, best_score = hyperparameter_tuning(X, y)
print("Best parameters:", best_params)
print("Best cross-validation score:", best_score)

In [10]:


# Train final model with best hyperparameters
# try serveral times: k==7，9，...:
knn = KNN(k=15)
knn.fit(X, pd.Series(y))

# Make predictions on the test set
test_predictions = knn.predict(X_test)

# Save predictions to CSV
pd.DataFrame({'id': pd.read_csv('cs-506-predicting-customer-churn-using-knn/test.csv')['id'], 'Exited': test_predictions}).to_csv('submissions.csv', index=False)
