# Evaluate Model

In [None]:
import numpy as np
import pandas as pd
from glob import glob
import sys
import shutil
import os
from sklearn.metrics import accuracy_score, f1_score, cohen_kappa_score
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from imblearn.ensemble import BalancedRandomForestClassifier
from tqdm.auto import tqdm

sys.path.append("src")
from actinet.models import ActivityClassifier
from actinet.prepare import load_all_and_make_windows, extract_accelerometer_features, \
    prepare_accelerometer_data
from actinet.evaluate import evaluate_preprocessing, evaluate_models

WINSEC = 30 # seconds
SAMPLE_RATE = 100 # Hz
RESAMPLE_RATE = 30 # Hz
N_JOBS = 8 # Set to higher number for quicker execution, but don't exceed max.

DATAFILES = f"data/capture24/P[0-9][0-9][0-9].csv.gz"
ANNOFILE = f"data/capture24/annotation-label-dictionary.csv"
SAVEFOLDER = f"data/capture24"

## Evalate pre-processing steps

The purpose of this section is to evaluate the model performance compare to different preprocessing approaches, as well as model types.

In [None]:
# Approach 1 - Actipy downsampling, no lp filter

DATAFILES = f"data/capture24/P[0-9][0-9][0-9].csv.gz"
ANNOFILE = f"data/capture24/annotation-label-dictionary.csv"
SAVEFOLDER = f"data/capture24"

if len(glob(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_None/*.npy")) == 4:
    X_nn = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_None/X.npy")
    Y_nn = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_None/Y.npy")
    T_nn = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_None/T.npy")
    pid_nn = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_None/pid.npy")

else:
    X_nn, Y_nn, T_nn, pid_nn = load_all_and_make_windows(
        datafiles=glob(DATAFILES), 
        annofile=ANNOFILE, 
        out_dir=SAVEFOLDER, 
        anno_label="Walmsley2020",
        sample_rate=SAMPLE_RATE,
        winsec=WINSEC,
        n_jobs=N_JOBS,
        downsampling_method="nn",
        lowpass_hz=None,
        resample_rate=RESAMPLE_RATE,
    )

In [None]:
# Approach 2 - Actipy downsampling, 15Hz lp filter

DATAFILES = f"data/capture24/P[0-9][0-9][0-9].csv.gz"
ANNOFILE = f"data/capture24/annotation-label-dictionary.csv"
SAVEFOLDER = f"data/capture24"

if len(glob(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_15/*.npy")) == 4:
    X_nn15 = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_15/X.npy")
    Y_nn15 = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_15/Y.npy")
    T_nn15 = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_15/T.npy")
    pid_nn15 = np.load(f"{SAVEFOLDER}/prepared/downsampling_nn_lowpass_15/pid.npy")

else:
    X_nn15, Y_nn15, T_nn15, pid_nn15 = load_all_and_make_windows(
        datafiles=glob(DATAFILES), 
        annofile=ANNOFILE, 
        out_dir=SAVEFOLDER, 
        anno_label="Walmsley2020",
        sample_rate=SAMPLE_RATE,
        winsec=WINSEC,
        n_jobs=N_JOBS,
        downsampling_method="nn",
        lowpass_hz=15,
        resample_rate=RESAMPLE_RATE,
    )

In [None]:
# Approach 3 - Linear downsampling, no lp filter

DATAFILES = f"data/capture24/P[0-9][0-9][0-9].csv.gz"
ANNOFILE = f"data/capture24/annotation-label-dictionary.csv"
SAVEFOLDER = f"data/capture24"

if len(glob(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/*.npy")) == 4:
    X_linear = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/X.npy")
    Y_linear = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/Y.npy")
    T_linear = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/T.npy")
    pid_linear = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/pid.npy")

else:
    X_linear, Y_linear, T_linear, pid_linear = load_all_and_make_windows(
        datafiles=glob(DATAFILES), 
        annofile=ANNOFILE, 
        out_dir=SAVEFOLDER, 
        anno_label="Walmsley2020",
        sample_rate=SAMPLE_RATE,
        winsec=WINSEC,
        n_jobs=N_JOBS,
        downsampling_method="linear",
        lowpass_hz=None,
        resample_rate=RESAMPLE_RATE,
    )

In [None]:
models_path = "models/evaluation_models"

def reset_folder(path):
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

reset_folder(models_path)

In [None]:
classifier_nn = ActivityClassifier(
    labels = np.unique(Y_nn),
    batch_size=1000,
    device="cuda:0",
    verbose=True
)

Y_pred_nn = evaluate_preprocessing(classifier_nn, X_nn, Y_nn, pid_nn, T_nn, 
                     f"{models_path}/downsampling_nn_lowpass_None_{{}}.pt", True)

In [None]:
classifier_nn15 = ActivityClassifier(
    labels = np.unique(Y_nn),
    batch_size=1000,
    device="cuda:0",
    verbose=True
)

Y_pred_nn15 = evaluate_preprocessing(classifier_nn15, X_nn15, Y_nn15, pid_nn15, T_nn15, 
                       f"{models_path}/downsampling_nn_lowpass_15_{{}}.pt", True)

In [None]:
classifier_linear = ActivityClassifier(
    labels = np.unique(Y_nn),
    batch_size=1000,
    device="cuda:0",
    verbose=True
)

Y_pred_linear = evaluate_preprocessing(classifier_linear, X_linear, Y_linear, pid_linear, T_linear,
                         f"{models_path}/downsampling_linear_lowpass_None_{{}}.pt", True)

In [None]:
data = {
    'nearest neighbour no filter': {'y': Y_nn, 'y_pred': Y_pred_nn, 'pid': pid_nn},
    'nearest neighbour 15Hz lp filter': {'y': Y_nn15, 'y_pred': Y_pred_nn15, 'pid': pid_nn15},
    'linear downsampling no filter': {'y': Y_linear, 'y_pred': Y_pred_linear, 'pid': pid_linear}
}

# Define a function to calculate evaluation metrics for each participant
def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='macro')
    kappa = cohen_kappa_score(y_true, y_pred)
    return accuracy, f1, kappa

# Create a DataFrame to store the results
results = []

# Calculate metrics for each model and participant
for model, model_data in data.items():
    for pid in np.unique(model_data['pid']):
        mask = model_data['pid'] == pid
        y_true = model_data['y'][mask]
        y_pred = model_data['y_pred'][mask]
        accuracy, f1, kappa = calculate_metrics(y_true, y_pred)
        results.append({'Participant': pid, 'Model': model, 
                        'Accuracy': accuracy, 'Macro F1': f1, 'Cohen Kappa': kappa})

results = pd.DataFrame(results)

# Aggregate results by participant
agg_results = results.groupby('Model').agg({'Accuracy': ['mean', 'std'],
                                                  'Macro F1': ['mean', 'std'],
                                                  'Cohen Kappa': ['mean', 'std']})

# Rename columns for clarity
agg_results.columns = ['Accuracy Mean', 'Accuracy Std', 'Macro F1 Mean', 'Macro F1 Std', 'Cohen Kappa Mean', 'Cohen Kappa Std']


def format_mean_std(mean, std):
    return f"{mean:.3f} \u00B1 {std:.3f}"

agg_results["Accuracy"] = agg_results.apply(lambda x: format_mean_std(x["Accuracy Mean"], 
                                                                      x["Accuracy Std"]), axis=1)

agg_results["Macro F1"] = agg_results.apply(lambda x: format_mean_std(x["Macro F1 Mean"], 
                                                                      x["Macro F1 Std"]), axis=1)

agg_results["Cohen Kappa"] = agg_results.apply(lambda x: format_mean_std(x["Cohen Kappa Mean"], 
                                                                      x["Cohen Kappa Std"]), axis=1)

agg_results[["Accuracy", "Macro F1", "Cohen Kappa"]]

In [None]:
# Plot confusion matrix for each model
for model, model_data in data.items():
    y_true = model_data['y']
    y_pred = model_data['y_pred']
    
    # Calculate confusion matrix
    cm = confusion_matrix(y_true, y_pred, normalize='true')  # Normalized confusion matrix
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 8))
    sns.heatmap(cm, annot=True, fmt='.3f', cbar=False)
    plt.title(f'Confusion Matrix - {model}')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    
    plt.xticks(ticks=np.arange(len(cm))+0.5, labels=np.unique(y_true))
    plt.yticks(ticks=np.arange(len(cm))+0.5, labels=np.unique(y_true))

    plt.show()

## Evaluate actinet against accelerometer

First we extract the features each of the capture 24 files using the accelerometer package

In [None]:
extract_accelerometer_features(n_jobs=N_JOBS)

Next we prepare the participant accelerometer data into the expected shape, containing the X,Y,T and P

In [None]:
# Accelerometer feature data prepared
if len(glob(f"{SAVEFOLDER}/prepared/accelerometer/*.npy")) == 4:
    X_bbaa = np.load(f"{SAVEFOLDER}/prepared/accelerometer/X.npy")
    Y_bbaa = np.load(f"{SAVEFOLDER}/prepared/accelerometer/Y.npy")
    T_bbaa = np.load(f"{SAVEFOLDER}/prepared/accelerometer/T.npy")
    P_bbaa = np.load(f"{SAVEFOLDER}/prepared/accelerometer/pid.npy")

else:
    X_bbaa, Y_bbaa, T_bbaa, P_bbaa = prepare_accelerometer_data(ANNOFILE, SAVEFOLDER, N_JOBS)

In [None]:
# Actinet data prepared
if len(glob(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/*.npy")) == 4:
    X_actinet = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/X.npy")
    Y_actinet = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/Y.npy")
    T_actinet = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/T.npy")
    P_actinet = np.load(f"{SAVEFOLDER}/prepared/downsampling_linear_lowpass_None/pid.npy")

else:
    X_actinet, Y_actinet, T_actinet, P_actinet = load_all_and_make_windows(
        datafiles=glob(DATAFILES), 
        annofile=ANNOFILE, 
        out_dir=SAVEFOLDER, 
        anno_label="Walmsley2020",
        sample_rate=SAMPLE_RATE,
        winsec=WINSEC,
        n_jobs=N_JOBS,
        downsampling_method="linear",
        lowpass_hz=None,
        resample_rate=RESAMPLE_RATE,
    )

Evaluate model using 5 fold stratified group cross validation

In [None]:
bbaa_classifier = BalancedRandomForestClassifier(
    n_estimators=1000,
    oob_score=True,
    sampling_strategy="not minority",
    replacement=True,
    n_jobs=N_JOBS,
    random_state=42,
    verbose=1
)

actinet_classifier = ActivityClassifier(
    labels = np.unique(Y_actinet),
    batch_size=1000,
    device="cuda:0",
    verbose=True
)

In [None]:
res = evaluate_models(
    actinet_classifier,
    bbaa_classifier,
    X_actinet,
    X_bbaa,
    Y_actinet,
    Y_bbaa,
    P_actinet,
    P_bbaa,
    T_actinet,
    T_bbaa,
    weights_path="models/evaluation_models/actinet_vs_bbaa_{}.pt",
    out_dir="outputs/actinet_vs_bbaa",
    verbose=True,
)

In [None]:
results_bbaa = pd.read_pickle("outputs/actinet_vs_bbaa/rf_results.pkl")
results_actinet = pd.read_pickle("outputs/actinet_vs_bbaa/actinet_results.pkl")

In [None]:
data = {
    'accelerometer': {'y': np.hstack(results_bbaa["Y_true"]), 
                      'y_pred': np.hstack(results_bbaa["Y_pred"]), 
                      'pid': np.hstack(results_bbaa["group"])
                      },
    'actinet': {'y': np.hstack(results_actinet["Y_true"]), 
                'y_pred': np.hstack(results_actinet["Y_pred"]), 
                'pid': np.hstack(results_actinet["group"])
                }
}

In [None]:
def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='macro')
    kappa = cohen_kappa_score(y_true, y_pred)
    return accuracy, f1, kappa

results = []

for model, model_data in tqdm(data.items()):
    for pid in np.unique(model_data['pid']):
        mask = model_data['pid'] == pid
        y_true = model_data['y'][mask]
        y_pred = model_data['y_pred'][mask]
        accuracy, f1, kappa = calculate_metrics(y_true, y_pred)
        results.append({'Participant': pid, 'Model': model, 
                        'Accuracy': accuracy, 'Macro F1': f1, 'Cohen Kappa': kappa})

results = pd.DataFrame(results)

In [None]:
# Group by model and calculate median, Q1, and Q3
summary = results.groupby('Model')[['Accuracy', 
                                    'Macro F1', 
                                    'Cohen Kappa']].agg(lambda x: f"{np.median(x):.3f} " + 
                                                                  f"({np.quantile(x, .25):.3f}, " + 
                                                                  f"{np.quantile(x, .75):.3f})")

summary

In [None]:
def plot_confusion_matrix(y_true, y_pred, model):   
    # Calculate confusion matrix
    cm = confusion_matrix(y_true, y_pred, normalize='true')  # Normalized confusion matrix
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 8))
    sns.heatmap(cm, annot=True, fmt='.3f', cbar=False, annot_kws={"size": 16})
    plt.title(f'Confusion Matrix - {model}', fontsize=20)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    
    plt.xticks(ticks=np.arange(len(cm))+0.5, labels=np.unique(y_true), fontsize=14)
    plt.yticks(ticks=np.arange(len(cm))+0.5, labels=np.unique(y_true), fontsize=14)

    plt.show()

In [None]:
plot_confusion_matrix(data['accelerometer']['y'], data['accelerometer']['y_pred'], 'accelerometer')

In [None]:
plot_confusion_matrix(data['actinet']['y'], data['actinet']['y_pred'], 'actinet')

In [None]:
def plot_model_performance(metric='Macro F1', modulus=0):
    # Create a boxplot overlay without outliers
    sns.boxplot(x="Model", y=metric, hue="Model", data=results, width=0.3, showfliers=False)

    # Create a stripplot for the jittered points
    sns.stripplot(x="Model", y=metric, data=results, 
                  jitter=True, color='black', alpha=0.3)

    # Draw lines between points for each participant
    i = 0
    for pid in results['Participant'].unique():
        if modulus and i%modulus == 0:
            pid_df = results[results['Participant'] == pid]
            plt.plot(pid_df['Model'], pid_df[metric], marker='', 
                     linestyle='-', color='grey', alpha=0.3)
        i += 1

    plt.title(f"Comparison of {metric} between\naccelerometer and actinet models", fontsize=16)

In [None]:
plot_model_performance('Accuracy')

In [None]:
plot_model_performance('Macro F1', 6)

In [None]:
plot_model_performance('Cohen Kappa', 0)