In [23]:
# IMPORTING MODULES
import glob
import importlib
import matplotlib.pyplot as plt
import numpy as np
import os
module_path = os.path.abspath(os.path.join('..', 'src'))
sys.path.append(module_path)
import pandas as pd
import scipy.signal as ss
import sys

import tools.data_reader_apd as dr
import tools.display_tools as dt
import tools.preprocessing as preprocessing

from scipy.fft import fft, fftfreq, fftshift
from sklearn import svm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split, cross_val_score, RepeatedKFold


import warnings
warnings.filterwarnings(
    "ignore", 
    category=RuntimeWarning
)

In [None]:
# SVM
importlib.reload(preprocessing)
importlib.reload(dr)
importlib.reload(dt)

import csv

metrics_folder = os.path.join(dr.Paths.DATA_DIR, "metrics")
metrics = ["bpm", "rmssd", "hf_rr", "lf_rr", "mean_SCL", "SCR_rate"]
phases = [
    dr.Phases.BASE_REST, dr.Phases.BASE_SPEECH,
    dr.Phases.BUG_RELAX, dr.Phases.BUG_ANTICIPATE, dr.Phases.BUG_EXPOSURE, dr.Phases.BUG_BREAK, dr.Phases.BUG_REFLECT,
    dr.Phases.SPEECH_RELAX, dr.Phases.SPEECH_ANTICIPATE, dr.Phases.SPEECH_EXPOSURE, dr.Phases.SPEECH_BREAK, dr.Phases.SPEECH_REFLECT
]

clf = svm.SVC()

for metric in metrics:
    print(f"SVM USING {metric} -----------------------------------")
    ha = []
    la = []
    for phase in phases:
        file = os.path.join(metrics_folder, f"{metric}_{phase}_ha.csv")
        arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
        arr = arr[1:, 1:]
        # avg = np.nanmean(arr, axis=1)
        # ha.append(avg)
        col_mean = np.nanmean(arr, axis=1)
        idx = np.where(np.isnan(arr))
        arr[idx] = np.take(col_mean, idx[0])
        arr = np.nan_to_num(arr)

        ha.append(arr)

        file = os.path.join(metrics_folder, f"{metric}_{phase}_la.csv")
        arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
        arr = arr[1:, 1:]
        # avg = np.nanmean(arr, axis=1)
        # la.append(avg)
        col_mean = np.nanmean(arr, axis=1)
        idx = np.where(np.isnan(arr))
        arr[idx] = np.take(col_mean, idx[0])
        arr = np.nan_to_num(arr)

        la.append(arr)

    # ha_arr = np.asarray(ha).transpose()
    # la_arr = np.asarray(la).transpose()
    ha_arr = np.hstack(ha)
    la_arr = np.hstack(la)

    max_len = np.max([ha_arr.shape[1], la_arr.shape[1]])
    ha_arr = np.pad(ha_arr, ((0, 0), (0, max_len - ha_arr.shape[1])), "constant", constant_values=0.0)
    la_arr = np.pad(la_arr, ((0, 0), (0, max_len - la_arr.shape[1])), "constant", constant_values=0.0)

    x = np.vstack([ha_arr, la_arr])
    y = np.asarray([1 for _ in range(ha_arr.shape[0])] + [0 for _ in range(ha_arr.shape[0])])

    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=16)

    clf.fit(x_train, y_train)

    preds = clf.predict(x_test)
    actual = y_test

    # print(confusion_matrix(actual, preds))
    # print(classification_report(actual, preds))


In [27]:
# SVM ENSEMBLE
importlib.reload(preprocessing)
importlib.reload(dr)
importlib.reload(dt)

import csv
import random

from sklearn.ensemble import VotingClassifier


metrics_folder = os.path.join(dr.Paths.DATA_DIR, "metrics")
metrics = ["bpm", "rmssd", "hf_rr", "lf_rr", "mean_SCL", "SCR_rate"]
phases = [
    dr.Phases.BASE_REST, dr.Phases.BASE_SPEECH,
    # dr.Phases.BUG_RELAX, dr.Phases.BUG_ANTICIPATE, dr.Phases.BUG_EXPOSURE, dr.Phases.BUG_BREAK, dr.Phases.BUG_REFLECT,
    # dr.Phases.SPEECH_RELAX, dr.Phases.SPEECH_ANTICIPATE, dr.Phases.SPEECH_EXPOSURE, dr.Phases.SPEECH_BREAK, dr.Phases.SPEECH_REFLECT
]

clfs = [svm.SVC(), svm.SVC(), svm.SVC(), svm.SVC(), svm.SVC(), svm.SVC()]
x_y = []

acc = []
for _ in range(10):
    for metric in metrics:
        ha = []
        la = []
        for phase in phases:
            file = os.path.join(metrics_folder, f"{metric}_{phase}_ha.csv")
            arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
            arr = arr[1:, 1:]
            col_mean = np.nanmean(arr, axis=1)
            idx = np.where(np.isnan(arr))
            arr[idx] = np.take(col_mean, idx[0])
            arr = np.nan_to_num(arr)

            ha.append(arr)

            file = os.path.join(metrics_folder, f"{metric}_{phase}_la.csv")
            arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
            arr = arr[1:, 1:]
            col_mean = np.nanmean(arr, axis=1)
            idx = np.where(np.isnan(arr))
            arr[idx] = np.take(col_mean, idx[0])
            arr = np.nan_to_num(arr)

            la.append(arr)

        ha_arr = np.hstack(ha)
        la_arr = np.hstack(la)

        max_len = np.max([ha_arr.shape[1], la_arr.shape[1]])
        ha_arr = np.pad(ha_arr, ((0, 0), (0, max_len - ha_arr.shape[1])), "constant", constant_values=0.0)
        la_arr = np.pad(la_arr, ((0, 0), (0, max_len - la_arr.shape[1])), "constant", constant_values=0.0)

        x = np.vstack([ha_arr, la_arr])
        y = np.asarray([1 for _ in range(ha_arr.shape[0])] + [0 for _ in range(ha_arr.shape[0])])

        x_y.append([x, y])

        all_preds = []
        actual = []
        test_size = 0.1
        test_indices = random.sample(range(y.size), int(y.size*test_size))

        for clf, train_data in zip(clfs, x_y):
            x = train_data[0]
            y = train_data[1]
            x_train = [] 
            x_test = []
            y_train = []
            y_test = []
            for i in range(y.size):
                if i in test_indices:
                    x_test.append(x[i, :])
                    y_test.append(y[i])
                else:
                    x_train.append(x[i, :])
                    y_train.append(y[i])
            
            x_train = np.asarray(x_train)
            y_train = np.asarray(y_train)
            x_test = np.asarray(x_test)
            y_test = np.asarray(y_test)

            clf.fit(x_train, y_train)
            preds = clf.predict(x_test)

            all_preds.append(preds)
            actual.append(y_test)

        all_preds = np.vstack(all_preds).mean(axis=0)
        # Majority voting
        all_preds[all_preds < 0.5] = 0
        all_preds[all_preds > 0.5] = 1
        all_preds = all_preds.astype(int)
        actual = actual[0]

        # print(confusion_matrix(actual, preds))
        # print(classification_report(actual, preds))
        acc.append(accuracy_score(actual, preds))

print(f"Accuracy over 10 rounds: {np.mean(acc)}")


Accuracy over 10 rounds: 0.6466666666666667


In [None]:
# SVM FFT
importlib.reload(preprocessing)
importlib.reload(dr)
importlib.reload(dt)

import csv

phases = [
    dr.Phases.BASE_REST, dr.Phases.BASE_SPEECH,
    dr.Phases.BUG_RELAX, dr.Phases.BUG_ANTICIPATE, dr.Phases.BUG_EXPOSURE, dr.Phases.BUG_BREAK, dr.Phases.BUG_REFLECT,
    dr.Phases.SPEECH_RELAX, dr.Phases.SPEECH_ANTICIPATE, dr.Phases.SPEECH_EXPOSURE, dr.Phases.SPEECH_BREAK, dr.Phases.SPEECH_REFLECT
]

convert_sr = False
tasks = [dr.Tasks.BASELINE, dr.Tasks.BUGS, dr.Tasks.SPEAKING]
data_type = dr.DataTypes.ECG
fs = preprocessing.FS_DICT[data_type]
f_dim = preprocessing.DATA_TYPE_DIMENSIONS[data_type]

ha = []
la = []
for task in tasks:
    if task == dr.Tasks.BASELINE:
        phases = [dr.Phases.BASE_REST, dr.Phases.BASE_SPEECH]
    elif task == dr.Tasks.BUGS:
        phases = [dr.Phases.BUG_RELAX, dr.Phases.BUG_ANTICIPATE, dr.Phases.BUG_EXPOSURE, dr.Phases.BUG_BREAK, dr.Phases.BUG_REFLECT]
    else:
        phases = [dr.Phases.SPEECH_RELAX, dr.Phases.SPEECH_ANTICIPATE, dr.Phases.SPEECH_EXPOSURE, dr.Phases.SPEECH_BREAK, dr.Phases.SPEECH_REFLECT]

    for phase in phases:
        ha_data, la_data = preprocessing.load_data(task, data_type, phase, convert_sr)
        ha_fft = []
        la_fft = []
        for data in ha_data:
            freq, amp = preprocessing.calculate_fft_1d(data, fs)
            ha_fft.append(amp)
        for data in la_data:
            freq, amp = preprocessing.calculate_fft_1d(data, fs)
            la_fft.append(amp)


In [28]:
# KNN ENSEMBLE
importlib.reload(preprocessing)
importlib.reload(dr)
importlib.reload(dt)

import csv
import random

from sklearn.neighbors import KNeighborsClassifier


metrics_folder = os.path.join(dr.Paths.DATA_DIR, "metrics")
metrics = ["bpm", "rmssd", "hf_rr", "lf_rr", "mean_SCL", "SCR_rate"]
phases = [
    dr.Phases.BASE_REST, dr.Phases.BASE_SPEECH,
    dr.Phases.BUG_RELAX, dr.Phases.BUG_ANTICIPATE, dr.Phases.BUG_EXPOSURE, dr.Phases.BUG_BREAK, dr.Phases.BUG_REFLECT,
    dr.Phases.SPEECH_RELAX, dr.Phases.SPEECH_ANTICIPATE, dr.Phases.SPEECH_EXPOSURE, dr.Phases.SPEECH_BREAK, dr.Phases.SPEECH_REFLECT
]

clfs = [KNeighborsClassifier(), KNeighborsClassifier(), KNeighborsClassifier(), KNeighborsClassifier(), KNeighborsClassifier(), KNeighborsClassifier()]
x_y = []

acc = []
for _ in range(10):
    for metric in metrics:
        ha = []
        la = []
        for phase in phases:
            file = os.path.join(metrics_folder, f"{metric}_{phase}_ha.csv")
            arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
            arr = arr[1:, 1:]
            col_mean = np.nanmean(arr, axis=1)
            idx = np.where(np.isnan(arr))
            arr[idx] = np.take(col_mean, idx[0])
            arr = np.nan_to_num(arr)

            ha.append(arr)

            file = os.path.join(metrics_folder, f"{metric}_{phase}_la.csv")
            arr = pd.read_csv(file, header=None, index_col=[0]).to_numpy()
            arr = arr[1:, 1:]
            col_mean = np.nanmean(arr, axis=1)
            idx = np.where(np.isnan(arr))
            arr[idx] = np.take(col_mean, idx[0])
            arr = np.nan_to_num(arr)

            la.append(arr)

        ha_arr = np.hstack(ha)
        la_arr = np.hstack(la)

        max_len = np.max([ha_arr.shape[1], la_arr.shape[1]])
        ha_arr = np.pad(ha_arr, ((0, 0), (0, max_len - ha_arr.shape[1])), "constant", constant_values=0.0)
        la_arr = np.pad(la_arr, ((0, 0), (0, max_len - la_arr.shape[1])), "constant", constant_values=0.0)

        x = np.vstack([ha_arr, la_arr])
        y = np.asarray([1 for _ in range(ha_arr.shape[0])] + [0 for _ in range(ha_arr.shape[0])])

        x_y.append([x, y])

        all_preds = []
        actual = []
        test_size = 0.1
        test_indices = random.sample(range(y.size), int(y.size*test_size))

        for clf, train_data in zip(clfs, x_y):
            x = train_data[0]
            y = train_data[1]
            x_train = [] 
            x_test = []
            y_train = []
            y_test = []
            for i in range(y.size):
                if i in test_indices:
                    x_test.append(x[i, :])
                    y_test.append(y[i])
                else:
                    x_train.append(x[i, :])
                    y_train.append(y[i])
            
            x_train = np.asarray(x_train)
            y_train = np.asarray(y_train)
            x_test = np.asarray(x_test)
            y_test = np.asarray(y_test)

            clf.fit(x_train, y_train)
            preds = clf.predict(x_test)

            all_preds.append(preds)
            actual.append(y_test)

        all_preds = np.vstack(all_preds).mean(axis=0)
        # Majority voting
        all_preds[all_preds < 0.5] = 0
        all_preds[all_preds > 0.5] = 1
        all_preds = all_preds.astype(int)
        actual = actual[0]

        # print(confusion_matrix(actual, preds))
        # print(classification_report(actual, preds))
        acc.append(accuracy_score(actual, preds))

print(f"Accuracy over 10 rounds: {np.mean(acc)}")


Accuracy over 10 rounds: 0.9033333333333334
