In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import scipy.stats as sp
import os

In [4]:
from functions.test_validation_split import *

In [None]:
p = os.path.dirname(os.path.realpath(__name__))
path = f"{p}/data/data.csv"
df = pd.read_csv(path)

In [None]:
N_SUBJECTS = 10
N_CONDITIONS = 3
N_REPS = 10
N_SENSORS = 6
N_TRIALS = N_SUBJECTS * N_CONDITIONS * N_REPS

# Preprocessing

In [None]:
trials = ["subject", "condition", "replication"]
index_columns = trials + ["time"]
df = df.set_index(index_columns)

matrix = pd.DataFrame()
joint_map = {1: "ankle", 2: "knee", 3: "hip"}
leg_map = {1: "left", 2: "right"}

for leg_key, leg_val in leg_map.items():
    for joint_key, joint_val in joint_map.items():
        matrix[f"{leg_val}_{joint_val}"] = df[(df.leg == leg_key) & (df.joint == joint_key)].angle

rs = matrix.reset_index()
series = pd.DataFrame((rs.subject - 1) * N_REPS * N_CONDITIONS + (rs.condition - 1) * N_REPS + rs.replication)
matrix["trial"] = series.set_index(matrix.index)

target = pd.Series(range(N_TRIALS), index=range(1, N_TRIALS + 1))
target = 1 + ((target // 10) % 3)
target.name = "condition"

matrix = matrix.reset_index().drop(trials, axis=1).set_index(["trial", "time"])

In [None]:
matrix.head(2)

In [None]:
means = matrix.groupby("trial").mean()
means.columns=[f"{col}_mean" for col in matrix.columns]
stdevs = matrix.groupby("trial").std()
stdevs.columns=[f"{col}_stdev" for col in matrix.columns]

In [None]:
corrs = pd.DataFrame()
covs = pd.DataFrame()

for body_part in ["ankle", "knee"]:

    first_half = matrix[matrix.index.get_level_values(1) < 50]
    second_half = matrix[matrix.index.get_level_values(1) > 50]

    body_parts = second_half[[f"left_{body_part}", f"right_{body_part}"]]
    corrs[body_part] = body_parts.groupby(["trial"]).corr(method="pearson").groupby("trial")[f"right_{body_part}"].first()
    covs[body_part] = body_parts.groupby(["trial"]).cov().groupby("trial")[f"right_{body_part}"].first()


In [None]:
features = pd.concat([covs], axis=1)

In [None]:
matrix.loc[41, :].plot()

In [None]:
# two sets of features:
# base: mean of some of the sensors
# correlation model
# fft model

# extension: only look at the second half

In [None]:
from scipy.fft import fft, ifft, fftfreq

# CV fns

In [None]:
def cross_validation_splits(train_validation_data, targets, number_of_folds):
    
    # Convert input data to numpy array if it's not already
    train_validation_index = train_validation_data.index
    train_validation_data = np.array(train_validation_data)
    targets = np.array(targets)

    # Check if input_data and targets have the same number of rows
    if train_validation_data.shape[0] != targets.shape[0]:
        print("Input Data and Targets do not have the same number of entries.")
        print(f"input_data.shape = {train_validation_data.shape}")

    # Randomly assign each data point to a fold
    fold_assignments = np.random.randint(0, number_of_folds, size=targets.size)
    fold_assignments = (train_validation_index.values - 1) // 30
    print(fold_assignments)

    # Saving the different splits in a list
    folds = []

    for f in range(number_of_folds):
        train_filter = (fold_assignments != f)
        valid_filter = ~train_filter

        train_inputs = train_validation_data[train_filter, :]
        train_targets = targets[train_filter]
        valid_inputs = train_validation_data[valid_filter, :]
        valid_targets = targets[valid_filter]

        fold = {
            "train_inputs": train_inputs,
            "train_targets": train_targets,
            "valid_inputs": valid_inputs,
            "valid_targets": valid_targets
        }
        folds.append(fold)

        print(f"For fold {f}")
        print(f"\ttrain_inputs.shape = {train_inputs.shape}")
        print(f"\ttrain_targets.shape = {train_targets.shape}")
        print(f"\tvalid_inputs.shape = {valid_inputs.shape}")
        print(f"\tvalid_targets.shape = {valid_targets.shape}")

    return folds

In [None]:
def cross_validate(
    X_train,
    y_train, 
    folds, 
    model,
    metric
    ):

    # logic of CV
    n = len(X_train)
    n_per_fold = n // folds
    metric_values = np.zeros(folds)

    for i in range(folds):
        validation_index = range(i * n_per_fold, (i+1) * n_per_fold)
        X_train_fold = X_train[~X_train.index.isin(validation_index)]
        y_train_fold = y_train[~y_train.index.isin(validation_index)]
        X_test_fold = X_train[X_train.index.isin(validation_index)]
        y_test_fold = y_train[y_train.index.isin(validation_index)]

        model.fit(X_train_fold, y_train_fold)

        y_pred_fold = model.predict(X_test_fold)
        metric_values[i] = (metric(y_test_fold, y_pred_fold))

    return metric_values

# Testing Model

In [None]:
from sklearn import linear_model, ensemble, discriminant_analysis
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, log_loss, auc, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# from functions.cross_validation import *

In [None]:
TRAIN_SUBJECTS = 7
TRAIN_TRIALS = N_TRIALS * TRAIN_SUBJECTS / N_SUBJECTS

features_used = covs
target_used = target

X = features_used.copy()
y = target_used.copy()

X_split = TRAIN_TRIALS
y_split = (X_split)

X_train = X.loc[:X_split]
y_train = y.loc[:y_split]
X_test = X.loc[X_split+1:]
y_test = y.loc[y_split +1:]

In [None]:
# write a fn that takes features and targets, then spits out the results
# within this

In [None]:
a = cross_validation_splits(X_train, y_train, 7)

In [None]:
# here we will use cross validation on X_train, y_train (leaving out 1 subject each time!)

In [None]:
print(X_test.shape)
print(y_test.shape)

In [None]:
# we can change this, need to have reasoning for the model
model = linear_model.LogisticRegression(max_iter=1000, multi_class="multinomial") 
model = ensemble.RandomForestClassifier()
# model = discriminant_analysis.LinearDiscriminantAnalysis(solver="svd")

model.fit(X_train, y_train)

In [None]:
sns.heatmap(features.corr(), center=0)

In [None]:
y_pred = model.predict(X_test)
y_probs = model.predict_proba(X_test)

for f in [confusion_matrix, accuracy_score]:
    print(str(f).split()[1])
    print(f(y_test, y_pred))

# wants clear divisions between classes - would work
print(log_loss(y_test, y_probs))

# good scores - so look at the errors!

In [None]:
pd.DataFrame(y_probs).plot()

In [None]:
results = pd.concat([pd.Series(y_test).reset_index(drop=True), pd.Series(y_pred)], axis=1, ignore_index=False).sort_values(by="condition")
results.columns =["true", "pred"]

In [None]:
# ten subjects
# ten replications
# three conditions

# 6 measurements for each one (two legs, three joints)
# time series are 101 points long
# so the data is actually (300 x 101 x 6), so we should represent it as such.

# train-test split: 
# 7 subjects for train: should do leave-one-out validation (as in 1 subject each time)
# 3 subjects for test: leave three subjects for test