## Importing necessary packages

In [18]:
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import chi2, SelectKBest
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import make_scorer
from sklearn.model_selection import cross_val_score, LeaveOneOut, StratifiedKFold
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor

import numpy as np
import pandas as pd
import umap
import warnings

warnings.filterwarnings("ignore")

In [2]:
EXPERIMENT = "OF"

DATASET_PATH = "../../data/12_classification_dataset/OF_2.csv"
LABELS_PATH  = "../../data/12_classification_dataset/video_anxiety_labels_for_ML_moreLabels_march_2022.xlsx"

LABELS = [
    "High_Anxiety",
    "Middle_Anxiety",
    "Low_Anxiety"
]

## Data Preprocessing

In [3]:
def get_week_number(video_name: str) -> int:
    result = video_name.split('_')
    
    if len(result) == 6:
        return int(result[3][1])
    else:
        return int(result[2][1])

def get_video_set(video_name: str) -> str:
    result = video_name.split('_')
    return "Videos " + result[1][1]
    
def get_trial_number(video_name: str) -> int:
    result = video_name.split('_')
    return int(result[0][5:])

def preprocess_labels(labels: pd.DataFrame) -> pd.DataFrame:
    labels = labels.dropna()
    encodings = pd.get_dummies(labels["labels"])
    labels = pd.concat([labels, encodings], axis=1)
    labels = labels.drop(columns=["test", "labels"])
    labels = labels.rename(columns={
        "high anxiety": "High_Anxiety",
        "mid anxiety": "Middle_Anxiety",
        "low anxiety": "Low_Anxiety"
    })
    
    return labels

def check_na(df: pd.DataFrame):
    na_df = df.isna().sum()
    na_df = na_df[na_df > 0]
    
    if len(na_df) > 0:
        print("Following columns have NA values:\n")
        
        for k, v in na_df.items():
            print(k, v)
    else:
        print("No NA values!")

def merge_dataset_and_labels(df: pd.DataFrame, labels: pd.DataFrame) -> pd.DataFrame:
    df["week"] = df["video_name"].map(get_week_number)
    df = df[df["week"] == 6]
    df["trial_number"] = df["video_name"].map(get_trial_number)
    df["set"] = df["video_name"].map(get_video_set)
    df = df.merge(labels, how="inner", on=["set", "trial_number"])
    df.drop(columns=["video_name", "week_x", "week_y", "trial_number", "set", "VelocityInner_Total"], inplace=True)
    
    return df

def save_new_dataset(dataset_path: str, result_path: str, experiment="EPM"):
    labels = pd.read_excel(LABELS_PATH, engine="openpyxl")
    labels = labels[labels["test"] == experiment]
    labels = preprocess_labels(labels)
    
    df = pd.read_csv(dataset_path)
    df = merge_dataset_and_labels(df, labels)
    df.to_csv(result_path)

def plot_labels_distribution(df: pd.DataFrame, experiment_name: str):
    targets = ["High_Anxiety", "Middle_Anxiety", "Low_Anxiety"]
    counts = df[targets].sum().values
    
    plt.figure(figsize=(8, 8))
    plt.bar(targets, counts, color=["red", "yellow", "green"])
    plt.title(f"Labels distribution for the {experiment_name}", fontsize=16)
    plt.ylabel("Frequency", fontsize=14)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)
    plt.grid(axis="y")
    plt.show()

In [4]:
def read_labels(labels_path):
    labels = pd.read_excel(labels_path, engine="openpyxl")
    labels = labels[labels["test"] == EXPERIMENT]
    labels = preprocess_labels(labels)
    return labels

def get_test_labels(labels, train_labels):
    test_labels = pd.merge(train_labels, labels, on=["set", "week", "trial_number"], how="right", indicator=True).loc[lambda x: x["_merge"] == "right_only"]
    test_labels.drop(columns=["High_Anxiety_x", "Middle_Anxiety_x", "Low_Anxiety_x", "_merge"], inplace=True)
    test_labels.rename(columns={
        "High_Anxiety_y": "High_Anxiety", 
        "Middle_Anxiety_y": "Middle_Anxiety", 
        "Low_Anxiety_y": "Low_Anxiety"
    }, inplace=True)
    return test_labels

def read_dataset(dataset_path, labels):
    df = pd.read_csv(dataset_path)
    df = merge_dataset_and_labels(df, labels)
    return df

def get_labels_dataset(df):
    mapping = {
        0: 1,
        1: 2,
        2: 3
    }
    
    y = df[LABELS]
    y = [mapping[2 - np.argmax(row)] for row in y.values]

    df.drop(columns=LABELS, inplace=True)
    df.head()
    return y
    
def get_dataset():
    all_labels = read_labels(LABELS_PATH)
    X = read_dataset(DATASET_PATH, all_labels)
    y = get_labels_dataset(X)
    
    return X, y

In [5]:
X, y = get_dataset()

In [11]:
check_na(X)

Following columns have NA values:

VelocityInner_1500 9
VelocityInner_3000 9
VelocityInner_4500 4
VelocityInner_6000 2
VelocityInner_7500 7
VelocityInner_9000 6
VelocityInner_10500 3
VelocityInner_12000 8
VelocityInner_13500 6
VelocityInner_15000 8
VelocityOuter_4500 1
VelocityOuter_6000 1
VelocityOuter_7500 1
VelocityOuter_9000 1
VelocityOuter_10500 1
VelocityOuter_12000 1
VelocityOuter_13500 1


In [12]:
# Temporarily remove NA columns

X.dropna(axis=1, inplace=True)

check_na(X)

No NA values!


## Regression

In [13]:
def regression_accuracy(y_true, y_pred, **kwargs):
    total = len(y_true)
    correct = 0
    
    for i, j in zip(y_true, y_pred):
        if i == round(j):
            correct += 1
            
    return correct / total

In [14]:
def calculate_metrics(model, X, y, metrics):
    skf = StratifiedKFold(n_splits=7, shuffle=True, random_state=0)
    
    scorings = {}
    
    for scoring in metrics:
        if scoring == "neg_root_mean_squared_error":
            metric = scoring
            sign = -1
        else:
            sign = 1
            metric = make_scorer(regression_accuracy, greater_is_better=True)
            
        skf_scores = cross_val_score(model, X, y, cv=skf, scoring=metric)

        scorings[scoring] = np.round(sign * np.mean(skf_scores), 3)
    
    return scorings

In [15]:
def data_preprocessing(X, y, preprocessing, n_components):
    datasets = {}
    temp_X = X.copy()
    
    if len(preprocessing) == 0:
        print("Error! No preprocessing techniques were provided. Available techniques are: selectkbset, pca, umap, total.")
    
    for technique in preprocessing:
        if technique == "selectkbest":
            selector = SelectKBest(chi2, k=n_components)
            temp_X   = selector.fit_transform(temp_X.replace(-1, 999), y)
            
        scaler = StandardScaler()
        temp_X = scaler.fit_transform(temp_X)
        
        if technique == "pca":
            pca    = PCA(n_components=n_components, random_state=0)
            temp_X = pca.fit_transform(temp_X)
        elif technique == "umap":
            UMAP   = umap.UMAP(n_components=n_components, random_state=0, n_jobs=-1)
            temp_X = UMAP.fit_transform(temp_X)
        elif technique != "selectkbest":
            continue
        
        datasets[technique] = temp_X
        
    return datasets

In [16]:
def test_model(model, X: pd.DataFrame, y, n_components=10, preprocessing=["selectkbest", "pca", "umap"], 
               metrics=["accuracy", "neg_root_mean_squared_error"]) -> pd.DataFrame:
    results = {}
    all_scores = {}
    
    results = pd.DataFrame(columns=[
        "n_components", 
        "dataset_type", 
        "stratified-kfold acc",
        "stratified-kfold rmse"
    ])
    
    for n in range(1, n_components+1):
        datasets = data_preprocessing(X, y, preprocessing, n)
        
        for name, dataset in datasets.items():
            scores = calculate_metrics(model, dataset, y, metrics)
            row_values = [n, name]
            
            for metric, score in scores.items():
                row_values.append(score)
            
            results = results.append({k: v for k, v in zip(results, row_values)}, ignore_index=True)
                
        print(f"Number of components finished: {n} out of {n_components}", end='\r')
                    
    return results

In [24]:
# best: 0.418, 74.8%, 2, umap (default)
model = DecisionTreeRegressor(max_depth=5, random_state=0)

results = test_model(model, X, y)
results.sort_values(["stratified-kfold rmse", "stratified-kfold acc", "n_components"], ascending=True)

Number of components finished: 10 out of 10

Unnamed: 0,n_components,dataset_type,stratified-kfold acc,stratified-kfold rmse
5,2,umap,0.748,0.42
13,5,pca,0.745,0.453
7,3,pca,0.748,0.461
29,10,umap,0.728,0.501
10,4,pca,0.697,0.52
9,4,selectkbest,0.718,0.524
16,6,pca,0.701,0.525
6,3,selectkbest,0.694,0.529
27,10,selectkbest,0.718,0.529
28,10,pca,0.653,0.541


In [25]:
# best: 0.432, 74.1%, 2, selectkbest or pca (default)
model = SGDRegressor(random_state=0)

results = test_model(model, X, y)
results.sort_values(["stratified-kfold rmse", "stratified-kfold acc", "n_components"], ascending=True)

Number of components finished: 10 out of 10

Unnamed: 0,n_components,dataset_type,stratified-kfold acc,stratified-kfold rmse
3,2,selectkbest,0.741,0.432
4,2,pca,0.741,0.432
6,3,selectkbest,0.741,0.438
7,3,pca,0.741,0.438
21,8,selectkbest,0.741,0.441
22,8,pca,0.741,0.441
9,4,selectkbest,0.718,0.442
10,4,pca,0.718,0.442
18,7,selectkbest,0.741,0.442
19,7,pca,0.741,0.442


In [33]:
# best: 0.424, 74.5%, 4, pca (kernel="poly", degree=4)
model = SVR(kernel="poly", degree=7)

results = test_model(model, X, y)
results.sort_values(["stratified-kfold rmse", "stratified-kfold acc", "n_components"], ascending=True)

Number of components finished: 3 out of 10

KeyboardInterrupt: 

In [41]:
# best: 0.380, 79.6%, 6, selectkbest or pca (n_neighbors=6, weights="distance")
model = KNeighborsRegressor(n_neighbors=6, weights="uniform", n_jobs=-1)

results = test_model(model, X, y)
results.sort_values(["stratified-kfold rmse", "stratified-kfold acc", "n_components"], ascending=True)

Number of components finished: 10 out of 10

Unnamed: 0,n_components,dataset_type,stratified-kfold acc,stratified-kfold rmse
15,6,selectkbest,0.772,0.419
16,6,pca,0.772,0.419
12,5,selectkbest,0.772,0.437
13,5,pca,0.772,0.437
5,2,umap,0.697,0.458
21,8,selectkbest,0.724,0.461
22,8,pca,0.724,0.461
18,7,selectkbest,0.724,0.462
19,7,pca,0.724,0.462
9,4,selectkbest,0.721,0.464


In [42]:
# best: 0.387, 76.9%, 6, pca (default)
model = RandomForestRegressor(n_jobs=-1, random_state=0)

results = test_model(model, X, y)
results.sort_values(["stratified-kfold rmse", "stratified-kfold acc", "n_components"], ascending=True)

Number of components finished: 10 out of 10

Unnamed: 0,n_components,dataset_type,stratified-kfold acc,stratified-kfold rmse
16,6,pca,0.769,0.387
7,3,pca,0.772,0.403
13,5,pca,0.721,0.428
5,2,umap,0.748,0.428
18,7,selectkbest,0.701,0.429
21,8,selectkbest,0.721,0.431
15,6,selectkbest,0.701,0.432
3,2,selectkbest,0.721,0.436
19,7,pca,0.724,0.436
11,4,umap,0.728,0.437
