### Imports

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier

from scipy.spatial.distance import euclidean, hamming

import gurobipy as gp
from gurobipy import GRB

from sklearn.preprocessing import LabelEncoder

import warnings
warnings.simplefilter(action='ignore')

# Methods

### Proposed Model

In [None]:
def gurobi_NN(X, y, k):
    np.random.seed(96)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
    print(X_train.shape)
    N, I = X_train.shape
    M, I = X_test.shape
    p = gp.Model("NN")

    # Add variables
    alpha = p.addVars(I, lb=0, name="alpha", vtype=GRB.CONTINUOUS)
    e = p.addVars(M, name="e", vtype=GRB.CONTINUOUS)
    y_pred = p.addVars(M, name="y_pred", vtype=GRB.CONTINUOUS, lb=-GRB.INFINITY)
    dist = p.addVars(N, M, name="dist", vtype=GRB.CONTINUOUS)
    sqdist = p.addVars(N, M, name="sqdist", vtype=GRB.CONTINUOUS)

    # Add constraints
    for m in range(M):
        p.addConstr(y_pred[m] == 1/N * gp.quicksum(float(y_train[n]) * 
                                                   (1 + 
                                                    1/N * 1/k * gp.quicksum(dist[n1,m] for n1 in range(N))) -
                                                    1/k * dist[n,m]                                   
                                                    for n in range(N)), name="y_pred"+str(m))
        p.addConstr(e[m] >= float(y_test[m]) - y_pred[m], name="e+"+str(m))
        p.addConstr(e[m] >= y_pred[m] - float(y_test[m]), name="e-"+str(m))
        for n in range(N):
            p.addConstr(dist[n,m] == gp.quicksum(alpha[i] * (X_train[n][i]-X_test[m][i])**2  for i in range(I)))
            p.addConstr(sqdist[n,m]*sqdist[n,m] == dist[n,m])
    
    for i in range(I):
        p.addConstr(alpha[i] >= 1e-5, name="alpha+"+str(i))
        p.addConstr(alpha[i] <= 1/2, name="alpha-"+str(i))

    p.setObjective(1/M * gp.quicksum(e[m] for m in range(M)), GRB.MINIMIZE)

    # Don't print output
    p.setParam('OutputFlag', 0)
    p.optimize()

    if p.status == GRB.Status.OPTIMAL:
        return [np.abs(alpha[i].x) for i in range(I)], X_train, y_train
    else:
        p.computeIIS()
        p.write("model1.ilp")
        return [1/I for i in range(I)], X_train, y_train

In [None]:
# predict with gurobi
def proposed_regression(X_train, y_train, X_test, k):
    alpha = gurobi_NN(X_train, y_train, k)[0]
    # Predict using alphas as weights for the distance + consider weighted (with distance) average of the k nearest neighbors
    y_pred = []
    for m in range(X_test.shape[0]):
        dist = []
        for n in range(X_train.shape[0]):
            dist.append((np.sum(np.abs(alpha * (X_test[m] - X_train[n])**(2)))))
        idx = np.argsort(dist)[:k]
        y_pred.append(np.sum([y_train[i]/(1+dist[i]) for i in idx])/np.sum([1/(1+dist[i]) for i in idx]))
        if np.isnan(y_pred[-1]):
            y_pred[-1] = np.mean(y_train)
            print('nan')
    return y_pred

def proposed_classification(X_train, y_train, X_test, k):
    alpha = gurobi_NN(X_train, y_train, k)[0]
    # Predict using alphas as weights for the distance + consider weighted (with distance) average of the k nearest neighbors
    y_pred = []
    for m in range(X_test.shape[0]):
        dist = []
        for n in range(X_train.shape[0]):
            dist.append(np.sum(np.abs(alpha * (X_test[m] - X_train[n])**2)))
        idx = np.argsort(dist)[:k]
        weights = [1 / (1 + dist[i]) for i in idx]
        y_pred.append(max(set(y_train[idx]), key=lambda x: sum(w for i, w in zip(idx, weights) if y_train[i] == x)))
    return y_pred

### Traditional KNN

In [None]:
def traditional_regression(X_train, y_train, X_test, k):
    knn = KNeighborsRegressor(n_neighbors=k)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    return y_pred

def traditional_classification(X_train, y_train, X_test, k):
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    return y_pred

### Overlap

In [None]:
def overlap_regression(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    weights = [1/I for _ in range(I)]
    y_predict = []

    for x in X_test:
        similarities = []
        for i, x_train in enumerate(X_train):
            sim = 0
            for j in range(I):
                if x[j] == x_train[j]:
                    sim += weights[j]
            similarities.append((sim, y_train[i]))

        # Sort by similarity in descending order and select top k
        similarities.sort(key=lambda x: x[0], reverse=True)
        similarities = similarities[:k]

        # Calculate the predicted value as the average of the top k neighbors' values
        y_pred = sum(y for _, y in similarities) / k
        y_predict.append(y_pred)

    return y_predict

def overlap_classification(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    N = X_test.shape[0]
    weights = [1/I for i in range(I)]
    y_predict = []
    for x in X_test:
        similitudes = []
        for x_train in X_train:
            sim = 0
            for i in range(I):
                if x[i] != x_train[i]:
                    s = 0
                else:
                    s = 1
                sim += s * weights[i]
            similitudes.append([sim, y_train[i]])
        similitudes.sort(key=lambda x: x[0])
        similitudes = similitudes[:k]
        labels = [x[1] for x in similitudes]
        y_predict.append(max(set(labels), key=labels.count))
    return y_predict

### Eskin

In [None]:
def Eskin_classification(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    weights = [1/I for _ in range(I)]
    cardinality_features = [len(set(X_train[:, i])) for i in range(I)]
    y_predict = []

    for x in X_test:
        similarities = []
        for i, x_train in enumerate(X_train):
            sim = 0
            for j in range(I):
                if x[j] == x_train[j]:
                    sim += weights[j]
                else:
                    n_k = cardinality_features[j]
                    sim += n_k**2 / (n_k**2 + 2) * weights[j]
            similarities.append((sim, y_train[i]))

        # Sort by similarity in descending order and select top k
        similarities.sort(key=lambda x: x[0], reverse=True)
        similarities = similarities[:k]

        # Predict the most common class among the top k neighbors
        y_pred = max(set(y for _, y in similarities), key=[y for _, y in similarities].count)
        y_predict.append(y_pred)

    return y_predict

def Eskin_regression(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    weights = [1/I for _ in range(I)]
    cardinality_features = [len(set(X_train[:, i])) for i in range(I)]
    y_predict = []

    for x in X_test:
        similarities = []
        for i, x_train in enumerate(X_train):
            sim = 0
            for j in range(I):
                if x[j] == x_train[j]:
                    sim += weights[j]
                else:
                    n_k = cardinality_features[j]
                    sim += n_k**2 / (n_k**2 + 2) * weights[j]
            similarities.append((sim, y_train[i]))

        # Sort by similarity in descending order and select top k
        similarities.sort(key=lambda x: x[0], reverse=True)
        similarities = similarities[:k]

        # Calculate the predicted value as the average of the top k neighbors' values
        y_pred = sum(y for _, y in similarities) / k
        y_predict.append(y_pred)
    return y_predict

### OF

In [None]:
def of_classification(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    weights = [1/I for _ in range(I)]
    frequency_features = [{} for _ in range(I)]

    # Calculate the frequency of each feature value
    for i in range(I):
        for x in X_train[:, i]:
            if x in frequency_features[i]:
                frequency_features[i][x] += 1
            else:
                frequency_features[i][x] = 1

    y_predict = []

    for x in X_test:
        similarities = []
        for i, x_train in enumerate(X_train):
            sim = 0
            for j in range(I):
                if x[j] == x_train[j]:
                    sim += weights[j]
                else:
                    freq_x = frequency_features[j].get(x[j], 1)
                    freq_x_train = frequency_features[j].get(x_train[j], 1)
                    sim += (1 / (1 + np.log(freq_x) * np.log(freq_x_train))) * weights[j]
            similarities.append((sim, y_train[i]))

        # Sort by similarity in descending order and select top k
        similarities.sort(key=lambda x: x[0], reverse=True)
        similarities = similarities[:k]

        # Predict the most common class among the top k neighbors
        y_pred = max(set(y for _, y in similarities), key=[y for _, y in similarities].count)
        y_predict.append(y_pred)

    return y_predict

def of_regression(X_train, y_train, X_test, k):
    I = X_train.shape[1]
    weights = [1/I for _ in range(I)]
    frequency_features = [{} for _ in range(I)]

    # Calculate the frequency of each feature value
    for i in range(I):
        for x in X_train[:, i]:
            if x in frequency_features[i]:
                frequency_features[i][x] += 1
            else:
                frequency_features[i][x] = 1

    y_predict = []

    for x in X_test:
        similarities = []
        for i, x_train in enumerate(X_train):
            sim = 0
            for j in range(I):
                if x[j] == x_train[j]:
                    sim += weights[j]
                else:
                    freq_x = frequency_features[j].get(x[j], 1)
                    freq_x_train = frequency_features[j].get(x_train[j], 1)
                    sim += (1 / (1 + np.log(freq_x) * np.log(freq_x_train))) * weights[j]
            similarities.append((sim, y_train[i]))

        # Sort by similarity in descending order and select top k
        similarities.sort(key=lambda x: x[0], reverse=True)
        similarities = similarities[:k]

        # Calculate the predicted value as the average of the top k neighbors' values
        y_pred = sum(y for _, y in similarities) / k
        y_predict.append(y_pred)

    return y_predict


### Gwknn

In [None]:
class GeneralizedWeightedKNN:
    def __init__(self, k=3, beta=8, learning_rate=0.1, max_iter=100):
        self.k = k
        self.beta = beta
        self.learning_rate = learning_rate
        self.max_iter = max_iter
        self.weights = None
        self.X_train = None
        self.y_train = None

    def fit(self, X_train, y_train):
        n_samples, n_features = X_train.shape
        self.weights = np.ones((n_samples,))
        self.X_train = X_train
        self.y_train = y_train

        for iteration in range(self.max_iter):
            for i in range(n_samples):
                self._update_weights(X_train, y_train, i)

    def _update_weights(self, X, y, idx):
        distances = self._compute_distances(X, idx)
        nearest_indices = np.argsort(distances)[:self.k]

        enemy_loss = 0
        friend_loss = 0

        for neighbor_idx in nearest_indices:
            if y[neighbor_idx] != y[idx]:
                enemy_loss += 1
            else:
                friend_loss += 1

        total_loss = (enemy_loss + friend_loss) / 2
        gradient = self._compute_gradient(X, y, idx, nearest_indices, total_loss)
        self.weights[idx] -= self.learning_rate * gradient

    def _compute_distances(self, X, idx):
        distances = np.linalg.norm(X - X[idx], axis=1) / self.weights
        return distances

    def _compute_gradient(self, X, y, idx, nearest_indices, total_loss):
        gradient = 0
        for neighbor_idx in nearest_indices:
            if y[neighbor_idx] != y[idx]:
                gradient += self._sigmoid_derivative(total_loss) * (1 / self.weights[idx])
            else:
                gradient -= self._sigmoid_derivative(total_loss) * (1 / self.weights[idx])
        return gradient

    def _sigmoid_derivative(self, z):
        sigmoid = 1 / (1 + np.exp(-self.beta * (1 - z)))
        return self.beta * sigmoid * (1 - sigmoid)

    def predict(self, X_test):
        predictions = []
        for x in X_test:
            distances = np.linalg.norm(self.X_train - x, axis=1) / self.weights
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train[nearest_indices]
            prediction = np.round(np.mean(nearest_labels)).astype(int)
            predictions.append(prediction)
        return np.array(predictions)
    
    def predict_regression(self, X_test):
        predictions = []
        for x in X_test:
            distances = np.linalg.norm(self.X_train - x, axis=1) / self.weights
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_values = self.y_train[nearest_indices]
            prediction = np.average(nearest_values, weights=1/distances[nearest_indices])
            predictions.append(prediction)
        return np.array(predictions)

## Comparing all datasets

In [None]:
def all_models_classification(X_train, y_train, X_test, y_test, k):
    models = {
        "Traditional KNN": traditional_classification,
        "Overlap KNN": overlap_classification,
        "Eskin KNN": Eskin_classification,
        "OF KNN": of_classification,
        "Proposed Model": proposed_classification,
        "Generalized Weighted KNN": GeneralizedWeightedKNN(k=k)
    }

    results = {}
    for name, model in models.items():
        if name == "Generalized Weighted KNN":
            model_instance = model
            model_instance.fit(X_train, y_train)
            y_pred = model_instance.predict(X_test)
        else:
            y_pred = model(X_train, y_train, X_test, k)
        results[name] = y_pred

    # Calculate accuracy for each model
    accuracies = {}
    for name, y_pred in results.items():
        accuracy = np.mean(y_pred == y_test)
        accuracies[name] = accuracy
        print(f"{name} Accuracy: {accuracy:.4f}")
    return accuracies

def all_models_regression(X_train, y_train, X_test, y_test, k):
    models = {
        "Traditional KNN": traditional_regression,
        "Overlap KNN": overlap_regression,
        "Eskin KNN": Eskin_regression,
        "OF KNN": of_regression,
        "Proposed Model": proposed_regression,
        "Generalized Weighted KNN": GeneralizedWeightedKNN(k=k)
    }

    results = {}
    for name, model in models.items():
        if name == "Generalized Weighted KNN":
            model_instance = model
            model_instance.fit(X_train, y_train)
            y_pred = model_instance.predict_regression(X_test)
        else:
            y_pred = model(X_train, y_train, X_test, k)
        results[name] = y_pred

    # Calculate RMSE for each model and R^2
    rmse_scores = {}
    r2_scores = {}
    for name, y_pred in results.items():
        # if y_pred contains NaN, replace with mean of y_test
        if np.isnan(y_pred).any():
            y_pred = np.nan_to_num(y_pred, nan=np.mean(y_test))
        r2 = 1 - (np.sum((y_test - y_pred) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2))
        print(f"{name} R^2: {r2:.4f}")
        r2_scores[name] = r2
    return r2_scores

In [None]:
def multiple_k(X_train, y_train, X_test, y_test, class_regr=1):
    ks = [k for k in range(2, 13)]
    if class_regr == 1:
        accuracies = {}
        for k in ks:
            accuracies[k] = all_models_classification(X_train, y_train, X_test, y_test, k)
            print('accuracies for k =', k)
            print(accuracies[k])
        return accuracies
    else:
        r2_scores = {}
        for k in ks:
            r2_scores[k] = all_models_regression(X_train, y_train, X_test, y_test, k)
        return r2_scores

In [None]:
def multiple_splits(X, y, class_regr=1):
    accuracies = {}
    r2_scores = {}
    for i in range(10):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=i*100+(10-i))
        # Then use multiple_k function, so that we have all the k values tested
        if class_regr == 1:
            accuracy = multiple_k(X_train, y_train, X_test, y_test, class_regr)
            print('accuracies for split', i)
            print(accuracy)
            accuracies[i] = accuracy
        else:
            r2 = multiple_k(X_train, y_train, X_test, y_test, class_regr)
            r2_scores[i] = r2
    if class_regr == 1:
        return accuracies
    else:
        return r2_scores
    return accuracies

In [None]:
# Store the results in a csv file with the name of the dataset
def save_results_to_csv(results, dataset_name, class_regr=1):
    if class_regr == 1:
        df = pd.DataFrame(results)
        df.to_csv(f"{dataset_name}_classification_results.csv", index=False)
    else:
        df = pd.DataFrame(results)
        df.to_csv(f"{dataset_name}_regression_results.csv", index=False)
    print(f"Results saved to {dataset_name}_results.csv")

## Testing

### Houses votes

In [None]:
# Import the data from house-votes-84.csv
data_hv84 = pd.read_csv('house-votes-84.csv')
data_hv84 = data_hv84.replace('y', 1)
data_hv84 = data_hv84.replace('n', 0)
data_hv84 = data_hv84.replace('?', 0.5)
data_hv84 = data_hv84.replace('democrat', 1)
data_hv84 = data_hv84.replace('republican', 0)

# Split the data into features and labels
X_hv84 = data_hv84.drop('party', axis=1).to_numpy()
y_hv84 = data_hv84['party'].to_numpy()

In [None]:
accuracies = multiple_splits(X_hv84, y_hv84, class_regr=1)
save_results_to_csv(accuracies, 'house-votes-84', class_regr=1)

### Pima indians

In [None]:
# Import the data from pima-indians-diabetes.csv
data_pima = pd.read_csv('pima-indians-diabetes.csv')

# Split the data into features and labels
X_pima = data_pima.drop('Outcome', axis=1).to_numpy()
y_pima = data_pima['Outcome'].to_numpy()

In [None]:
accuracies = multiple_splits(X_pima, y_pima, class_regr=1)
save_results_to_csv(accuracies, 'pima_indians', class_regr=1)

### Ionosphere

In [None]:
# Import the data from ionosphere/ionosphere.data

data_ionosphere = pd.read_csv('ionosphere/ionosphere.data', header=None)

data_ionosphere = data_ionosphere.replace('g', 1)
data_ionosphere = data_ionosphere.replace('b', 0)

# Split the data into features and labels
X_ionosphere = data_ionosphere.drop(34, axis=1).to_numpy()
y_ionosphere = data_ionosphere[34].to_numpy()

In [None]:
accuracies = multiple_splits(X_ionosphere, y_ionosphere, class_regr=1)
save_results_to_csv(accuracies, 'ionosphere', class_regr=1)

### Insurance

In [None]:
# Import the data from insurance.csv
data_insurance = pd.read_csv('insurance.csv')
data_insurance = data_insurance.replace('female', 1)
data_insurance = data_insurance.replace('male', 0)
data_insurance = data_insurance.replace('yes', 1)
data_insurance = data_insurance.replace('no', 0)
data_insurance = data_insurance.replace('southwest', 0)
data_insurance = data_insurance.replace('southeast', 1)
data_insurance = data_insurance.replace('northwest', 2)
data_insurance = data_insurance.replace('northeast', 3)

# Split the data into features and labels
X_insurance = data_insurance.drop('charges', axis=1).to_numpy()
y_insurance = data_insurance['charges'].to_numpy()

In [None]:
r2 = multiple_splits(X_insurance, y_insurance, class_regr=0)
save_results_to_csv(r2, 'insurance', class_regr=0)

### Steel Industry

In [None]:
data = pd.read_csv('Steel_industry_data.csv')
# Transform date column - example : 01/01/2018 00:15
data['date'] = pd.to_datetime(data['date'], format='%d/%m/%Y %H:%M')

# Select only when hour is 00, 08, 16
data = data[data['date'].dt.hour.isin([0, 8, 16])]
data = data[data['date'].dt.minute == 0]

# Encode date column - drop year, month. Day, hour and minute are 3 separate columns
data['day'] = data['date'].dt.day
data['hour'] = data['date'].dt.hour
data['minute'] = data['date'].dt.minute
data = data.drop(columns=['date'])

# Encode WeekStatus
data['WeekStatus'] = data['WeekStatus'].replace({'Weekday': 0, 'Weekend': 1})

# Encode Day_of_week
data['Day_of_week'] = data['Day_of_week'].replace({'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3,
                                                    'Friday': 4, 'Saturday': 5, 'Sunday': 6})

# Encode Load_Type using LabelEncoder
label_encoder = LabelEncoder()
data['Load_Type'] = label_encoder.fit_transform(data['Load_Type'])

# X, y, y is Load_Type
X_steel = data.drop(columns=['Load_Type']).to_numpy()
y_steel = data['Load_Type'].to_numpy()

In [None]:
accuracies = multiple_splits(X_steel, y_steel, class_regr=1)
save_results_to_csv(accuracies, 'steel', class_regr=1)

### Servo

In [None]:
data = pd.read_csv('servo.data')
 
# Encode columns
label_encoder = LabelEncoder()
data['motor'] = label_encoder.fit_transform(data['motor'])
data['screw'] = label_encoder.fit_transform(data['screw'])

# Split X and y - y is class
X_servo = data.drop(columns=['class'])
y_servo = data['class']
X_servo = X_servo.to_numpy()
y_servo = y_servo.to_numpy()

In [None]:
accuracies = multiple_splits(X_servo, y_servo, class_regr=0)
save_results_to_csv(accuracies, 'servo', class_regr=0)


### Liver Disorder

In [None]:
# Import the data from Liver_disorder.csv
data_liver = pd.read_csv('Liver_disorder.csv')

data_liver = data_liver.replace('Female', 1)
data_liver = data_liver.replace('Male', 0)
data_liver = data_liver.fillna(data_liver.mean())

# Split the data into features and labels
X_liver = data_liver.drop('Selector', axis=1).to_numpy()
y_liver = data_liver['Selector'].to_numpy()

In [None]:
accuracies = multiple_splits(X_liver, y_liver, class_regr=1)
save_results_to_csv(accuracies, 'liver', class_regr=1)