# Computer Lab 1: k-NN classifier

## Exercise 2 – Phoneme recognition

In this exercise, a real dataset will be employed. The dataset (file `speech_dataset.mat` in `/data/` folder) contains 5 features for each window of speech signal, with the aim to distinguish between nasal (class `1`) and oral sounds (class `2`). The five features are the normalized amplitudes of the five first harmonics of the speech signal. More information can be found here: https://www.openml.org/d/1489.
 
The data set contains features for 5404 speech samples (the 6-th column is the class label). The dataset has to be divided into training and test set. The activity to be done is the same as for the previous exercise. Make sure you do not use too many values of K, as computations might take a lot of time.

**Task**: your task is to implement a k-NN classifier in Matlab, which calculates the probability that a given test example belongs to each class, and outputs a class label as the class with the highest  probability. You will evaluate the classifier performance computing the average classification accuracy (i.e. the fraction of test examples that have been classified correctly).
 
In particular, you should perform the following:
* Split dataset into training and test set.
* Train a k-NN classifier for different values of k.
* Compare accuracy on the training set and the test set. Calculating accuracy of the training  set means that you will have to classify each sample in the training set as if it were a test sample; one expects that classification of training samples will perform well, and this may also be used to validate your implementation.
    * Accuracy is defined as the ratio between the number of test samples that are correctly classified, and the total number of test samples.
* Identifying overfitting and underfitting in the obtained results.

## Cross-validation
1. Holdout validation: it is a simple cross-validation approach to assess perfomance of the **kNN classification** model. It involves dividing the available dataset into two distinct subsets: a training set and a holdout set. The training set is used to train the machine learning model, while the holdout set is kept separate and only used after training to evaluate the model's performance on unseen data. Since the dataset is split into only two sets, the model is built just one time on the training set and executed faster.
    * **Drawback**
        * Single random split: different random splits may lead to different results, making it difficult to assess the true generalization ability of the model.
        * Underestimation of the bias of the model: holdout validation may underestimate the tendency to consistently favor one class over another. This is because the holdout set is not used during model training, and the model's bias may be inadvertently reflected in the training data.
        * Sensitivity to data distribution: this approach assumes that the data distribution is both constant and known. However, in real-world applications, data distribution may change over time or across different environments. Holdout validation may not be effective in these cases.
        * Unbalanced dataset: in the case of unbalanced datasets, one class has significantly fewer examples than the other classes. Then, holdout validation can be particularly problematic because the model may be overfitted to the majority class, leading to poor performance on the minority class when evaluated on the holdout set.
            * Dividing the dataset into a 70-30 training-testing split while preserving the original distribution of the two classes is a valid approach for holdout validation with imbalanced datasets. This ensures that the training set contains a representative sample of both classes, allowing the model to learn effectively from both majority and minority class data. The testing set also maintains the original class distribution, providing a realistic assessment of the model's generalization performance on unseen data.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import scipy.io
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from tqdm import tqdm

# Plots
sns.set_context(
    'talk', rc = {
        'font.size': 12.0,
        'axes.labelsize': 10.0,
        'axes.titlesize': 10.0,
        'xtick.labelsize': 10.0,
        'ytick.labelsize': 10.0,
        'legend.fontsize': 10.0,
        'legend.title_fontsize': 12.0,
        'patch.linewidth': 2.0
        }
    ) # 'paper'

data_sets = ['Train', 'Test']

In [None]:
# Check current folder
os.getcwd()

In [None]:
data_path = "/Users/ernestocolacrai/Documents/GitHub/StatisticalLearning/data/"

try:
    data = scipy.io.loadmat(data_path + f"speech_dataset.mat")
    print(f"Data ✓")
    print(f"Data Keys: {data.keys()}")
except:
    print(f"Not found data! ({data_path})")

In [None]:
data['dataset']

In [None]:
data_cols = {
    0: 'f1',
    1: 'f2',
    2: 'f3',
    3: 'f4',
    4: 'f5',
    5: 'label'
}

data_df = pd.DataFrame({data_cols[c]: data['dataset'][:,c] for c in np.arange(0,6,1)})

data_df.head()

In [None]:
data_df.dtypes

In [None]:
data_df.label = data_df.label.astype(int)

In [None]:
data_df.dtypes

In [None]:
data_df.isna().sum(axis=0)

In [None]:
# np.sqrt(np.sum((data_df.loc[0][:-1] - data_df.loc[1][:-1]) ** 2))
len(data_df)

In [None]:
data_df[data_df.duplicated(keep='first')]

data_df = data_df.drop_duplicates()
len(data_df)

In [None]:
print(
    f"Distribution for labels (0, 1):\n",
    f"\tdataset: {(data_df.label.value_counts() / len(data_df)).values}"
    )

In [None]:
def data_holdout_split(df:pd.DataFrame, bin_feature:str, frac=0.67, s=1, verbose=False) -> tuple:

    """
    This function splits a pandas DataFrame into a training and testing set while preserving the original distribution of a binary class represented by the `bin_feature` column.

    Args:
        df (pd.DataFrame): A pandas DataFrame containing the data.
        bin_feature (str): The name of the column representing the binary class.
        frac (float, optional): The fraction of rows to be sampled for the training set. The default value is 0.67, which means that 67% of the rows will be used for training and the remaining 33% for testing.
        s (int, optional): The random seed for reproducibility. The default value is 1.
        verbose (bool, optional): A boolean flag indicating whether to print additional information about the splitting process. The default value is False.

    Returns:
        pd.DataFrame: The training set as a pandas DataFrame.
        pd.DataFrame: The testing set as a pandas DataFrame.
    """
    
    # Ensure that the 'bin_feature' column exists in the DataFrame.
    assert bin_feature in df.columns, f"Binary feature should be a data frame column."

    # Check that 'frac' is less than 1 to prevent oversampling the data.
    assert frac < 1, f"Fraction of sampled rows should be smaller than 1 (100%)."
    
    # Set the random seed for consistent splitting.
    np.random.seed(s)

    # Calculate the class distribution for the 'bin_feature' column.
    classes = data_df[bin_feature].value_counts() / len(data_df)

    train_set = []  # List to store the sampled data for the training set
    test_set = []  # List to store the remaining data for the testing set

    for class_label in tqdm(classes.index, colour='green'):
        class_data = df[df[bin_feature] == class_label]

        # Sample 67% of the data for the training set.
        sampled_data = class_data.sample(frac=frac, random_state=s)
        train_set.extend(sampled_data.values) # Add the sampled data to the training set.

        # Remaining data goes to testing set.
        remaining_data = class_data.drop(sampled_data.index)
        test_set.extend(remaining_data.values) # Add the remaining data to the testing set.

    # Rename data frames columns and set class type to int
    data_cols = {}

    for i in np.arange(0, len(data_df.columns), 1):
        data_cols[i] = data_df.columns[i]

    train_df = pd.DataFrame(train_set).rename(columns=data_cols)
    train_df[train_df.columns[-1]] = train_df[train_df.columns[-1]].astype(int)

    test_df = pd.DataFrame(test_set).rename(columns=data_cols)
    test_df[test_df.columns[-1]] = test_df[test_df.columns[-1]].astype(int)

    # Print information about the split data if 'verbose' is True.
    if verbose:
        print(
            f"Datasets sizes:\n",
            f"\tdataset: {len(df)}\n",
            f"\ttrain set: {len(train_df)}\n",
            f"\ttest set: {len(test_df)}\n\n",
            f"\t{'Correct sizes ✓' if len(train_df) + len(test_df) == len(df) else 'Wrong sizes ✕'}\n",
            f"Distribution for labels (0, 1):\n",
            f"\tdataset: \t{(df[df.columns[-1]].value_counts() / len(df)).values}\n",
            f"\ttrain set: \t{(train_df[train_df.columns[-1]].value_counts() / len(train_df)).values}\n",
            f"\ttest set: \t{(test_df[test_df.columns[-1]].value_counts() / len(test_df)).values}"
        )
        
    return train_df, test_df

In [None]:
train_df, test_df = data_holdout_split(data_df, 'label', verbose=True)

In [None]:
def euclidean(point1: np.ndarray, point2: np.ndarray) -> float:
    """
    This function calculates the Euclidean distance between two points represented by NumPy arrays.

    Parameters:
        point1 (np.ndarray): The first point represented by a NumPy array.
        point2 (np.ndarray): The second point represented by a NumPy array.

    Returns:
        float: The Euclidean distance between the two points.
    """

    # Return the square root of the sum of squared distances.
    return np.sqrt(np.sum((point1 - point2) ** 2))

In [None]:
k=3

M = len(test_df)
N = len(train_df)

pred = np.zeros(M, dtype=int)

x_train = train_df.iloc[:,:-1].to_numpy()
y_train = train_df.iloc[:,-1].to_numpy()

x_test = test_df.iloc[:,:-2].to_numpy()

D = np.zeros([M, N], dtype=float)  # Distance matrix
E = np.zeros([M, k], dtype=int)  # Array of nearest neighbors

for i in tqdm(np.arange(0, M, 1), colour='green'):  # For each test point
    for j in np.arange(0, N, 1):  # For each training point
        D[i][j] = np.sqrt(np.sum((x_test[i] - x_train[j]) ** 2)) # euclidean(x_test[i], x_train[j])
    
    E[i] = np.argsort(D[i])[:k]

    l1 = np.sum(y_train[E[i]] == 1) >= (k + 1) / 2
    l2 = np.sum(y_train[E[i]] == 2) >= (k + 1) / 2

    if l1:
        np.append(pred, 1)
    elif l2:
        np.append(pred, 2)

In [None]:
def knn_from_scratch(x_train: np.ndarray, y_train: np.ndarray, x_test: np.ndarray, k: int = 3, bar: bool = False) -> np.ndarray:

    M = len(x_test)
    N = len(x_train)

    pred = np.zeros(M, dtype=int)

    # x_train = train_df.iloc[:,:-1].to_numpy()
    # y_train = train_df.iloc[:,-1].to_numpy()

    # x_test = test_df.iloc[:,:-2].to_numpy()

    D = np.zeros([M, N], dtype=float)  # Distance matrix
    E = np.zeros([M, k], dtype=int)  # Array of nearest neighbors

    for i in tqdm(np.arange(0, M, 1), colour='green', disable=bar):  # For each test point
        for j in np.arange(0, N, 1):  # For each training point
            D[i][j] = np.sqrt(np.sum((x_test[i] - x_train[j]) ** 2)) # euclidean(x_test[i], x_train[j])
        
        E[i] = np.argsort(D[i])[:k]

        l1 = np.sum(y_train[E[i]] == 1) >= (k + 1) / 2
        #l2 = np.sum(y_train[E[i]] == 2) >= (k + 1) / 2

        prediction = 1 if (np.sum(y_train[E[i]] == 1) >= (k + 1) / 2) else 2
        pred[i] = prediction  # Update prediction array with the calculated value
    
    return pred

In [None]:
def knn_accuracy(test_df: pd.DataFrame) -> float:
    
    return 1 - len(test_set[test_set.label != test_set.prediction]) / len(test_set)

In [None]:
M = len(x_test)
N = len(x_train)

pred = np.zeros(M, dtype=int)

x_train = train_df.iloc[:,:-1].to_numpy()
y_train = train_df.iloc[:,-1].to_numpy()

x_test = test_df.iloc[:,:-2].to_numpy()

D = np.zeros([M, N], dtype=float)  # Distance matrix
E = np.zeros([M, k], dtype=int)  # Array of nearest neighbors

for i in tqdm(np.arange(0, M, 1), colour='green'):  # For each test point
    for j in np.arange(0, N, 1):  # For each training point
        D[i][j] = np.sqrt(np.sum((x_test[i] - x_train[j]) ** 2)) # euclidean(x_test[i], x_train[j])
        
    E[i] = np.argsort(D[i])[:k]

    l1 = np.sum(y_train[E[i]] == 1) >= (k + 1) / 2
    l2 = np.sum(y_train[E[i]] == 2) >= (k + 1) / 2

    if l1:
        pred[i] = 1
    elif l2:
        pred[i] = 2

In [None]:
pred

In [None]:
test_df['prediction'] = knn_from_scratch(
    x_train=train_df.iloc[:,:-1].to_numpy(),
    y_train=train_df.iloc[:,-1].to_numpy(),
    x_test=test_df.iloc[:,:-2].to_numpy(),
    k=3)

In [None]:
k_vals = np.arange(1, 50, 2)
misclass_errs = {}
for d_type in data_sets:
    misclass_errs[d_type] = []
    print(f"{d_type} set misclassification error:")
    for k in tqdm(k_vals, colour="green"):
    #for k in k_vals:
        if d_type == 'Train':
            pred = knn_from_scratch(
                x_train=train_df.iloc[:,:-1].to_numpy(),
                y_train=train_df.iloc[:,-1].to_numpy(),
                x_test=train_df.iloc[:,:-1].to_numpy(),
                k=k)
            misclass_errs[d_type].append(np.sum(train_df.label.values != pred) / len(train_df))
        else:
            pred = knn_from_scratch(
                x_train=train_df.iloc[:,:-1].to_numpy(),
                y_train=train_df.iloc[:,-1].to_numpy(),
                x_test=test_df.iloc[:,:-2].to_numpy(),
                k=k)
            misclass_errs[d_type].append(np.sum(train_df.label.values != pred) / len(test_df))

In [None]:
fig, ax = plt.subplots()

for d_type in data_sets:
    sns.lineplot(
        x=k_vals, y=misclass_errs[d_type],
        ax=ax,
        label=f"{d_type} set"
    )
    
ax.legend(framealpha=0)
ax.set_ylabel("misclassification rate")
ax.set_xlabel("k parameter")
ax.set_title("with methods from scratch (not using sklearn)")

sns.despine(right=True)

---

In [None]:
def knn(train_df: pd.DataFrame, test_df: pd.DataFrame, k: int = 3):
    # Initialize data structures
    M = len(test_set)  # Number of data points in the training set
    N = len(train_set)  # Number of data points in the test set

    # Validate k parameter
    assert (type(k) != float) and (k % 2 == 1), "k parameter should be an odd integer number."
    assert k < N, "k parameter should be smaller than the train set size."

    D = np.zeros([M, N], dtype=float)  # Distance matrix
    E = np.zeros([M, k], dtype=int)  # Array of nearest neighbors

    # Calculate distances between test set points and training set points
    for i in tqdm(np.arange(0, M, 1), colour='green'):  # For each test point
        for j in np.arange(0, N, 1):  # For each training point
            D[i][j] = euclidean(  # Calculate distance between the points
                test_set.loc[i][:-2].values,  # Current test point coordinates
                train_set.loc[j][:-1].values  # Corresponding training point coordinates
            )

        # Find the k nearest neighbors for each test point
        E[i] = np.argsort(D[i])[:k]  # Obtain the indices of the k nearest neighbors
    
        # Assign predictions based on majority of labels among the k nearest neighbors
        l1 = np.sum([int(train_set.loc[E[i][idx]][-1] == 1) for idx in np.arange(0, k)]) >= (k + 1) / 2
        l2 = np.sum([int(train_set.loc[E[i][idx]][-1] == 2) for idx in np.arange(0, k)]) >= (k + 1) / 2

        if l1:
            test_set.loc[i, 'prediction'] = 1
        elif l2:
            test_set.loc[i, 'prediction'] = 2

    return test_set  # Return the modified test set with predictions

In [None]:
def knn_accuracy(test_set: pd.DataFrame) -> float:
    return 1 - len(test_set[test_set.label != test_set.prediction]) / len(test_set)

In [None]:
# Implementation by using sklearn
k_vals = np.arange(1, 1000, 2)
x = k_vals
y = {}

for d_type in data_sets:
    print(f"{d_type} set:")
    
    y[d_type] = []

    for k in tqdm(k_vals, colour='green'):
        # Instantiate learning model
        neig = KNeighborsClassifier(n_neighbors=k)

        # Fitting the model
        neig.fit(train_df[['f1', 'f2', 'f3', 'f4', 'f5']].values, train_df['label'].values)
        
        # Predict the response
        if d_type == 'Train':
            pred = neig.predict(train_df[['f1', 'f2', 'f3', 'f4', 'f5']].values)

            # Evaluate misclassification error
            y[d_type].append(1 - accuracy_score(train_df.label.values, pred))
        else:
            pred = neig.predict(test_df[['f1', 'f2', 'f3', 'f4', 'f5']].values)

            # Evaluate misclassification error
            y[d_type].append(1 - accuracy_score(test_df.label.values, pred))

In [None]:
fig2, ax2 = plt.subplots()

for d_type in data_sets:
    sns.lineplot(
        x=k_vals, y=y[d_type],
        ax=ax2,
        label=f"{d_type} set"
    )
    
ax2.legend(framealpha=0)
ax2.set_ylabel("misclassification rate")
ax2.set_xlabel("k parameter")
ax2.set_title("with sklearn methods")

sns.despine(right=True)