In [1]:
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from wfdb import rdsamp
from rich.progress import track
from sklearn.utils import shuffle

from typing import Tuple, List
from numpy.typing import NDArray
from dataclasses import dataclass

from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Configuration


In [2]:
DATA_DIR = "../data/dataset/"
FS = 300
SEGMENT_LEN = 512
DISEASE = "park"


# Subject Description


In [3]:
subject_description = pd.read_csv(
    os.path.join(DATA_DIR, "subject-description.csv")
)

subject_description


Unnamed: 0,ID,GROUP,AGE(YRS),HEIGHT(meters),Weight(kg),gender,GaitSpeed(m/sec),Duration/Severity
0,control1,control,57,1.94,95.00,f,1.330,0.0
1,control2,control,22,1.94,70.00,m,1.470,0.0
2,control3,control,23,1.83,66.00,f,1.440,0.0
3,control4,control,52,1.78,73.00,f,1.540,0.0
4,control5,control,47,1.94,82.00,f,1.540,0.0
...,...,...,...,...,...,...,...,...
59,als9,subjects,50,1.58,61.24,m,0.899,54.0
60,als10,subjects,40,1.70,61.24,f,1.219,14.5
61,als11,subjects,39,1.88,83.92,m,1.283,7.0
62,als12,subjects,62,1.78,117.50,m,0.831,12.0


# Utils

In [4]:
@dataclass
class Scaler:
    min: float
    max: float
    mean: float
    std: float


def normalize(
    x: NDArray[np.float32],
    scaler: Scaler = None
) -> Tuple[NDArray[np.float32], Scaler]:
    if scaler == None:
        scaler = Scaler(
            min=np.min(x, axis=0),
            max=np.max(x, axis=0),
            mean=np.mean(x, axis=0),
            std=np.std(x, axis=0)
        )

    z_score = (x - scaler.mean) / scaler.std
    return (z_score - scaler.min) / (scaler.max - scaler.min), scaler


def get_file_and_ids(disease: str) -> Tuple[List[str], List[str]]:
    all_files = sorted(glob.glob(os.path.join(DATA_DIR, "*hea")))
    files = []
    subject_ids = []

    for filename in all_files:
        if filename.find("control") != -1 or filename.find(disease) != -1:
            files.append(filename)
            subject_ids.append(((filename.split("/"))[-1])[:-4])

    return files, subject_ids


def get_demographic_features(subject_description: pd.Series) -> NDArray[np.float32]:
    age = subject_description["AGE(YRS)"]
    weight = subject_description["Weight(kg)"]
    height = subject_description["HEIGHT(meters)"]
    gender = 0 if subject_description["gender"] == "m" else 1
    speed = subject_description["GaitSpeed(m/sec)"]

    return np.array([age, weight, height, gender, speed])


In [5]:
def get_train_test_sets(
    test_subject: str,
    data_files: List[str],
    subject_ids: List[str]
) -> Tuple[
    NDArray[np.float32],
    NDArray[np.uint8],
    NDArray[np.float32],
    NDArray[np.uint8],
]:
    train_segments = []
    test_segments = []
    train_labels = []
    test_labels = []

    for filename, subject_id in zip(data_files, subject_ids):
        ts = np.loadtxt(filename[:-4] + ".ts", dtype=np.float32)
        demographics = subject_description[subject_description["ID"]
                                           == subject_id].iloc[0]
        ts_features = np.mean(ts[:, 1:], axis=0, dtype=np.float32)
        dg_features = get_demographic_features(demographics)
        features = np.concatenate([ts_features, dg_features], axis=0)
        label = 0 if "control" in subject_id else 1

        if test_subject == subject_id:
            test_segments.append(features)
            test_labels.append(label)
        else:
            train_segments.append(features)
            train_labels.append(label)
            
    train_x = np.array(train_segments, dtype=np.float32)
    train_y = np.array(train_labels, dtype=np.uint8)
    test_x = np.array(test_segments, dtype=np.float32)
    test_y = np.array(test_labels, dtype=np.uint8)

    train_x, scaler = normalize(train_x)
    test_x, _ = normalize(test_x, scaler)

    train_x, train_y = shuffle(train_x, train_y, random_state=42)

    return train_x, train_y, test_x, test_y


# Training

In [6]:
data_files, subject_ids = get_file_and_ids(DISEASE)

accuracies = []

for test_subject in subject_ids:
    train_x, train_y, test_x, test_y = get_train_test_sets(
        test_subject=test_subject,
        data_files=data_files,
        subject_ids=subject_ids
    )

    clf = SVC(C=10)
    clf.fit(train_x, train_y)
    pred_y = clf.predict(test_x)
    acc = accuracy_score(test_y.ravel(), pred_y.ravel())
    accuracies.append(acc)

    print("{:10s} ... {:.02f}".format(test_subject, acc))

    # break

mean_acc = np.mean(accuracies, dtype=np.float32)
print("-" * 20)
print("{:10s} ... {:.02f}".format("Accuracy", mean_acc))


control1   ... 1.00
control10  ... 1.00
control11  ... 1.00
control12  ... 1.00
control13  ... 1.00
control14  ... 1.00
control15  ... 0.00
control16  ... 1.00
control2   ... 1.00
control3   ... 0.00
control4   ... 1.00
control5   ... 1.00
control6   ... 0.00
control7   ... 1.00
control8   ... 1.00
control9   ... 1.00
park1      ... 1.00
park10     ... 1.00
park11     ... 1.00
park12     ... 0.00
park13     ... 0.00
park14     ... 1.00
park15     ... 1.00
park2      ... 0.00
park3      ... 1.00
park4      ... 1.00
park5      ... 1.00
park6      ... 0.00
park7      ... 1.00
park8      ... 1.00
park9      ... 0.00
--------------------
Accuracy   ... 0.74
