# Setup

Setting parameters for notebook so can easily change as right at top of notebook

In [None]:
# size of windows in seconds
window_size = 5

# accelerometer to be analysed; valid values are 'acg', 'axivity' and 'sens'
accelerometer = "sens"

# label defining the motus labels to include in the analysis - used in filenames of output data
include = "all"

drop_impure=True

# select MOTUS values to keep. NotRecording should never be kept. Have some defaults but can choose explicitly
if include == "sit_stand":
    values_to_keep = ["Sit", "Stand"]
elif include == "all":
    values_to_keep = ["Lie", "Sit", "Stand", "Walk", "Stairs", "Run", "Other"]
else:
    values_to_keep = ["Sit", "Stand"]

all_values = ["Unknown", "Other", "Lie", "Sit", "Stand", "Walk", "Stairs", "Run"]
# all_values = ["Lie", "Sit", "Stand", "Walk", "Stairs", "Run"]
values_to_drop = [value for value in all_values if value not in values_to_keep]

DATA_DIR = "./data"  # data
JSON_DIR = "../machine_learning/"

dl_normalisation_method = "min_max" # std_dev OR min_max

Import all the libraries

In [None]:
import utils
from tqdm.auto import tqdm
import pandas as pd
import numpy as np
from glob import glob
import os
import pickle

import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
import joblib

from eval_metrics import eval_classification


%reload_ext autoreload
%autoreload 2


pd.options.display.max_rows = 999
pd.options.display.max_colwidth = None

# For reproducibility
np.random.seed(42)


In [None]:
if accelerometer == "acg":
    ACC_MISSING = [23, 24, 25, 34, 36]
elif accelerometer == "axivity":
    raise Exception("Axivity data not tested yet")
elif accelerometer == "sens":
    ACC_MISSING = []
else:
    raise Exception("Invalid accelerometer type")

acc_missing = [f"P{i:02d}.csv.gz" for i in ACC_MISSING]


def load_all_and_make_windows(datafiles):
    def worker(datafile):
        print("\nProcessing", datafile)
        data = utils.load_data(datafile, acc_prefix=accelerometer)
        data = utils.map_to_new_classes(
            data, "annotation", JSON_DIR + "motus_class_map.json", verbose=True
        )
        data = data[~data["annotation"].isin(values_to_drop)]
        X, Y, T = utils.make_windows(
            data,
            winsec=window_size,
            sample_rate=30,
            dropna=False,
            drop_impure=drop_impure,
            verbose=True,
            frame_info= True,
        )
        pid = os.path.basename(datafile).split(".")[0]  # participant ID
        pid = np.asarray([pid] * len(X))
        return X, Y, T, pid

    results = []
    for datafile in tqdm(datafiles):
        if os.path.basename(datafile) in acc_missing:
            print("\nSkipping", datafile)
            continue
        result = worker(datafile)
        results.append(result)

    X = np.concatenate([result[0] for result in results])
    Y = np.concatenate([result[1] for result in results])
    T = np.concatenate([result[2] for result in results])
    pid = np.concatenate([result[3] for result in results])

    return X, Y, T, pid

In [None]:
# Original labels
original_labels = ["Lie", "Sit", "Stand", "Walk", "Stairs", "Run"]

# Manually create a mapping between labels and their corresponding encoded values
label_mapping = {label: index for index, label in enumerate(original_labels)}

# Convert the list to a numpy array
original_labels = np.array(original_labels)

# Create an instance of LabelEncoder with the specified mapping
label_encoder = LabelEncoder()
label_encoder.classes_ = original_labels
label_encoder.transform(
    original_labels
)  # This step is important to set internal state


## Load data, map to motus, make windows and output to files

In [None]:
# check if data directory exists
if not os.path.exists(DATA_DIR):
    # raise an error
    raise Exception(
        "Data directory does not exist. Please create it and download the data."
    )

In [None]:
datafiles = os.path.join(DATA_DIR, "P*/cleaned_data_*_30hz_v1.1.csv")
X_train, y_train, T, pid = load_all_and_make_windows(sorted(glob(datafiles)))

Let's count the number of window_size windows for each activity class.

In [None]:
print("\nLabel distribution (# windows)")
print(pd.Series(y_train).value_counts())

In [None]:
# remove NA or nan data
indices_to_keep = ~np.logical_or(y_train == "nan", y_train == "Other")
# Use boolean indexing to get the filtered arrays
X_train = X_train[indices_to_keep]
y_train = y_train[indices_to_keep]
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1] * 3, 1)

Load unseen data for Leave One Subject Out (LOSO) evaluation

In [None]:
LOSO_DATA_DIR = "./data/LOSO"

datafiles_evl = os.path.join(LOSO_DATA_DIR, "P*/cleaned_data_*_30hz_v1.1.csv")
X_test, y_test, T_evl, pid_evl = load_all_and_make_windows(sorted(glob(datafiles_evl)))
# Find indices where y is not equal to "nan"
indices_to_keep_evl = ~np.logical_or(y_test == "nan", y_test == "Other")

# Filter X and y based on the indices
X_test = X_test[indices_to_keep_evl]
y_test = y_test[indices_to_keep_evl]
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1] * 3, 1)

# Visualisation
Let's visualise some examples for each activity label.

In [None]:
NPLOTS = 8
unqY = np.unique(Y)
fig, axs = plt.subplots(
    len(unqY), NPLOTS, sharex=True, sharey=True, figsize=(NPLOTS * 1.5, len(unqY) + 1)
)
for y, row in zip(unqY, axs):
    idxs = np.random.choice(np.where(Y == y)[0], size=NPLOTS)
    if y == "Throwing and catching":
        y = "T&C"
    elif y == "Walking downstairs":
        y = "W downstairs"
    elif y == "Walking upstairs":
        y = "W upstairs"
    elif y == "Running upstairs":
        y = "R upstairs"
    row[0].set_ylabel(y)
    for x, ax in zip(X[idxs], row):
        ax.plot(x[:, 0], color="red")
        ax.plot(x[:, 1], color="green")
        ax.plot(x[:, 2], color="blue")
        ax.set_ylim(-5, 5)
fig.tight_layout()

# Deep Learning Model Development

In [None]:
# Fit and transform the labels
y_train = label_encoder.transform(y_train)
y_test = label_encoder.transform(y_test)

In [None]:
# Assuming `dl_normalisation_method` is defined earlier
if dl_normalisation_method == "std_dev":
    mean_value = np.mean(X_train)
    std_value = np.std(X_train)

    X_train = (X_train - mean_value) / std_value
    X_test = (X_test - mean_value) / std_value
    params_file = 'normalization_params_std_dev.pkl'
    if not os.path.exists(params_file):
        # Save normalization parameters
        joblib.dump({'mean': mean_value, 'std': std_value}, params_file)

elif dl_normalisation_method == "min_max":
    min_vals = np.min(X_train, axis=0)
    max_vals = np.max(X_train, axis=0)

    # Perform Min-Max scaling
    X_train = (X_train - min_vals) / (max_vals - min_vals)
    X_test = (X_test - min_vals) / (max_vals - min_vals)

    params_file = 'normalization_params_min_max.pkl'
    if not os.path.exists(params_file):
        # Save normalization parameters
        joblib.dump({'min': min_vals, 'max': max_vals}, params_file)


In [None]:
print(pd.Series(y_test).value_counts())

In [None]:
# setting up learning rate parameters
lr_params = {"learning_rate": 0.001, "decay_steps": 1000, "decay_rate": 0.95}


In [None]:
config = {
    "input_shape": (X_train.shape[1], X_train.shape[2]),
    "num_classes": len(np.unique(y_train)),
    "lr_params": lr_params,
    "loss_func": "sparse_categorical_crossentropy",
}
model_struc = {
    "conv1d_layers": {
        "parameters": [
            {"filters": 64, "kernel_size": 7},
            {"filters": 128, "kernel_size": 7},
        ],
    },
    "lstm_layers": {
        "parameters": [
            {"units": 128},
            {"units": 64},
        ],
    },
    "dense_layers": {
        "parameters": [
            {"units": 128, "reg_rate": 0.01, "dropout_rate": 0.5},
            {"units": 64, "reg_rate": 0.01, "dropout_rate": 0.5},
        ],
    },
}

In [None]:
from cnn_lstm_model_creator import CreateCNNLSTM

model_creator = CreateCNNLSTM(config=config)
model = model_creator.create_model(model_struc=model_struc)
model = model_creator.compile_model(model=model)
model.summary()


In [None]:
# Calculate class weights based on the label set
class_counts = np.bincount(y_train)
total_samples = np.sum(class_counts)
class_weights = total_samples / (len(class_counts) * class_counts)

# Set the class weights for specific classes to 1
classes_to_ignore = []  # Replace with the classes you want to ignore
class_weights[classes_to_ignore] = 1

# Create a dictionary to store the class weights
class_weights_dict = dict(enumerate(class_weights))


In [None]:
from tensorflow.keras.callbacks import EarlyStopping

# Define the early stopping callback
early_stopping = EarlyStopping(
    monitor="val_loss", patience=10, restore_best_weights=True
)

In [None]:
model.fit(
    X_train,
    y_train,
    batch_size=64,
    epochs=1000,
    validation_data=(X_test, y_test),
    class_weight=class_weights_dict,
    callbacks=[early_stopping],
)


In [None]:
# Step 6: Evaluation
loss, accuracy = model.evaluate(X_test, y_test)

In [None]:
# Make predictions on the test set
y_pred = model.predict(X_test)
y_pred_encoded = np.argmax(y_pred, axis=1) if y_pred.ndim > 1 else y_pred
y_pred_decoded = label_encoder.inverse_transform(y_pred_encoded)

utils.plot_confusion_matrix(y_test, y_pred, label_encoder)

In [None]:
from eval_metrics import eval_classification

metrics_df = eval_classification(y_test, y_pred_decoded)
metrics_df

In [None]:
from tensorflow.keras.models import save_model, load_model

# Save the model
save_model(model, "lstm_cnn_model.h5")

# Load the model
loaded_model = load_model("lstm_cnn_model.h5")

In [None]:
# Evaluate the loaded model
loss, accuracy = loaded_model.evaluate(X_test, y_test)

# Blind Test on Trained Model

## Load blind test data

In [None]:
datafiles_blind = ["./file1.csv", "./file2.csv"]
X_blind, y_blind, T_blind, pid_blind = load_all_and_make_windows(sorted(glob(datafiles_blind)))

In [None]:
# remove NA or nan data
indices_to_keep_blind = ~np.logical_or(y_blind == "nan", y_blind == "Other")
# Use boolean indexing to get the filtered arrays
X_blind = X_blind[indices_to_keep_blind]
y_blind = y_blind[indices_to_keep_blind]
X_blind = X_blind.reshape(X_blind.shape[0], X_blind.shape[1] * 3, 1)

In [None]:
# Load the normalization parameters
if dl_normalisation_method == "std_dev":
    params = joblib.load('normalization_params_std_dev.pkl')
    mean_value = params['mean']
    std_value = params['std']

    X_blind = (X_blind - mean_value) / std_value

elif dl_normalisation_method == "min_max":
    params = joblib.load('normalization_params_min_max.pkl')
    min_vals = params['min']
    max_vals = params['max']

    X_blind = (X_blind - min_vals) / (max_vals - min_vals)

In [None]:
y_blind = label_encoder.transform(y_blind)

## Load a trained model

In [None]:
trained_model_dir = "./trained_models/lstm_cnn_model_3_mm_pure_5s.h5"

In [None]:
model = tf.keras.models.load_model(trained_model_dir)

## Evaluation Performance

In [None]:
# Make predictions on the test set
y_pred = model.predict(X_blind)

utils.plot_confusion_matrix(y_blind, y_pred, label_encoder, original_labels)

In [None]:
y_pred_encoded = np.argmax(y_pred, axis=1)

In [None]:
metrics_df = eval_classification(y_blind, y_pred_encoded)
metrics_df