In [126]:
import h5py
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os

# Locally get vars to accommodate different environments
try:
    from env_vars import *
except:
    print(
        'No env_vars file found. Using default values. Make a "env_vars.py" file to change them.'
    )
    intra_train_path = "./data/Cross/train/"
    intra_test_path = "./data/Cross/test1/"

No env_vars file found. Using default values. Make a "env_vars.py" file to change them.


In [127]:
# from google.colab import drive

# drive.mount("/content/gdrive")

In [128]:
# Used to get the name of the dataset, and by extension, the label of the action being performed
def getdatasetname(file_name_with_dir):
    filename_without_dir = file_name_with_dir.split("/")[-1]

    temp = filename_without_dir.split("_")[:-1]

    datasetname = "_".join(temp)

    return datasetname

In [129]:
# Used to get the labels of data using filenames
def get_label(filename):
    if "rest" in filename:
        label = 0

    elif "math" in filename:
        label = 1

    elif "memory" in filename:
        label = 2
    elif "motor" in filename:
        label = 3

    return label

In [130]:
def get_all_matrices(dir_path):
    dataset = []
    labels = []
    for filename in os.listdir(dir_path):
        if filename.endswith(".h5"):
            filename_path = dir_path + filename
            with h5py.File(filename_path, "r") as f:
                dataset_name = getdatasetname(filename_path)
                label = get_label(filename)
                matrix = f.get(dataset_name)[()]
                dataset.append(matrix)
                labels.append(label)

    return dataset, labels

MinMax Scaling for sensor data


In [131]:
def scale(matrix):
    scaler = MinMaxScaler(feature_range=(0, 1))

    scaler.fit(matrix)

    scaled_data = scaler.transform(matrix)

    return scaled_data

Downsampling of data, refer to file called "Notebook.ipynb" to see how it works


In [132]:
def downsample(dataset, frequency):
    downsampled_dataset = []

    for i in range(0, dataset.shape[1], 2034):
        second = dataset[:, i : i + 2034]
        subsample = []

        for j in range(0, 2034, int(2034 / frequency)):
            if j < second.shape[1]:
                measurement = second[:, j]
                subsample.append(measurement)

        downsampled_dataset.extend(subsample)

    return np.array(downsampled_dataset).T

Model setup


In [133]:
# TensorFlow and tf.keras
import tensorflow as tf

print(tf.__version__)

2.15.0


This is where the archtiecture is specified. For now it is just a Recurrent Neural Network (RNN) with two layers with 128 parameters each i.e., not very sophisticated.


In [134]:
model = tf.keras.Sequential(
    [
        tf.keras.layers.Dense(
            128, activation="relu"
        ),  # The layer has 128 dense neurons using relu for activation
        tf.keras.layers.Dense(
            128, activation="relu"
        ),  # The layer has 128 dense neurons using relu for activation
        tf.keras.layers.Dense(
            4
        ),  # The final layer has 4 neurons, each representing a class for the action being done
    ]
)

In [135]:
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"],
)

In [136]:
def load_data(dir_path):
    data = []
    labels = []

    for filename in os.listdir(dir_path):
        if filename.endswith(".h5"):
            filename_path = os.path.join(dir_path, filename)
            with h5py.File(filename_path, "r") as f:
                dataset_name = getdatasetname(filename_path)
                label = get_label(filename)
                matrix = f.get(dataset_name)[()]

                train_meg = downsample(matrix, 113)
                train_meg = scale(train_meg)
                # flattened_meg = np.array(
                #     train_meg.flatten()
                # )  # The data is flattened to change the meg data from shape 248 x frequency to a 1D array (result: a lot of inupt params)

                data.append(train_meg)
                labels.append(label)

    return np.array(data), np.array(labels)

All training data is loaded into memory and fitted to the model along with the training lables


In [137]:
X_train, y_train = load_data(intra_train_path)

In [138]:
import numpy as np


def select_every_eighth(arr1_3d_list, arr2_1d):
    # Convert arr1_3d_list to a list of NumPy arrays
    arr1_3d_np_list = [np.array(arr) for arr in arr1_3d_list]

    # Select every 8th value from the first 3D array in the list
    selected_values_arr1_list = [arr[:, 7::8, :] for arr in arr1_3d_np_list]

    # Remove every 8th value from the first 3D array in the list
    modified_arr1_3d_list = [
        np.delete(arr, range(7, arr.shape[1], 8), axis=1) for arr in arr1_3d_np_list
    ]

    # Convert arr2_1d to a NumPy array
    arr2_1d_np = np.array(arr2_1d)

    # Select every 8th value from the second 1D array
    selected_values_arr2 = arr2_1d_np[7::8]

    # Remove every 8th value from the second 1D array
    modified_arr2_1d = np.delete(arr2_1d_np, range(7, len(arr2_1d_np), 8))

    return (
        modified_arr1_3d_list,
        modified_arr2_1d,
        selected_values_arr1_list,
        selected_values_arr2,
    )

In [139]:
X_train, y_train, X_val, y_val = select_every_eighth(X_train, y_train)

print(X_train.shape)



print(y_train.shape)

IndexError: too many indices for array: array is 2-dimensional, but 3 were indexed

In [None]:
model = tf.keras.Sequential(
    [
        tf.keras.layers.Conv1D(
            32,
            kernel_size=3,
            activation="relu",
            input_shape=(X_train.shape[1], X_train.shape[2]),
        ),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        # tf.keras.layers.BatchNormalization(),
        # tf.keras.layers.DepthwiseConv2D(kernel_size=(1, 1), depth_multiplier=3),
        # tf.keras.layers.SeparableConv2D(3, (1, 16), use_bias=False, padding="same"),
        # tf.keras.layers.Dense(128),
        tf.keras.layers.LSTM(
            64, return_sequences=True
        ),  # Adjust the number of LSTM units
        tf.keras.layers.Flatten(),  # Flatten the output of LSTM to connect with Dense layer
        tf.keras.layers.Dense(4),
        tf.keras.layers.Activation("softmax"),
    ]
)

In [None]:
model.compile(
    optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

In [None]:
model.fit(X_train, y_train, epochs=10, batch_size=32)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x2478f577890>

In [None]:
X_test, y_test = load_data(intra_test_path)

In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=2)

print("\nTest accuracy:", test_acc)

1/1 - 1s - loss: 1.1299 - accuracy: 0.5000 - 740ms/epoch - 740ms/step

Test accuracy: 0.5


In [None]:
from kerastuner.tuners import RandomSearch
from kerastuner.engine.hyperparameters import HyperParameters

# Define the hyperparameter search space
hyperparameters = HyperParameters()
hyperparameters.Int("units", min_value=32, max_value=256, step=32)
hyperparameters.Float("dropout", min_value=0.2, max_value=0.5, step=0.1)


# Define the model-building function
def build_model(hp):
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Conv1D(
                32,
                kernel_size=3,
                activation="relu",
                input_shape=(X_train.shape[1], X_train.shape[2]),
            ),
            tf.keras.layers.MaxPooling1D(pool_size=2),
            tf.keras.layers.LSTM(
                hp.Int("units", min_value=16, max_value=256, step=8),
                return_sequences=True,
            ),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(
                hp.Float("dropout", min_value=0.2, max_value=0.5, step=0.1)
            ),
            tf.keras.layers.Dense(4, activation="softmax"),
        ]
    )
    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    return model


# Instantiate the tuner
tuner = RandomSearch(
    build_model,
    objective="val_accuracy",
    max_trials=10,
    executions_per_trial=2,
    directory="my_tuner_directory",
    project_name="my_tuning_project",
)

# Perform the hyperparameter search
tuner.search(X_train, y_train, epochs=10, validation_data=(X_val, y_val))

# Get the best hyperparameters
best_hps = tuner.oracle.get_best_trials(1)[0].hyperparameters
print(f"Best Hyperparameters: {best_hps}")

# Build the model with the best hyperparameters and train on the full dataset
best_model = tuner.hypermodel.build(best_hps)
best_model.fit(X_train, y_train, epochs=100, validation_data=(X_val, y_val))

  from kerastuner.tuners import RandomSearch


NameError: name 'X_val' is not defined

In [None]:
def load_data_LTSM(dir_path):
    data = []
    labels = []

    for filename in os.listdir(dir_path):
        if filename.endswith(".h5"):
            filename_path = os.path.join(dir_path, filename)
            with h5py.File(filename_path, "r") as f:
                dataset_name = getdatasetname(filename_path)
                label = get_label(filename)
                matrix = f.get(dataset_name)[()]

                train_meg = downsample(matrix, 113)
                train_meg = scale(train_meg)
                data.append(train_meg)
                labels.append(label)

    return np.array(data), np.array(labels)

In [None]:
trainx, trainY = load_data_LTSM(intra_train_path)

trainy = tf.keras.utils.to_categorical(trainY)

testx, testY = load_data_LTSM(intra_test_path)

testy = tf.keras.utils.to_categorical(testY)

print(testx.shape)
print(testy)

(16, 248, 1980)
[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]]


In [None]:
from sklearn.metrics import confusion_matrix

model = tf.keras.models.Sequential()
##### Add convolutional layers
# Convolutional layers
model.add(
    tf.keras.layers.Conv2D(
        113, (3, 3), activation="relu", padding="same", input_shape=(3959, 20, 21)
    )
)
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.DepthwiseConv2D(kernel_size=(1, 1), depth_multiplier=3))
model.add(tf.keras.layers.BatchNormalization())

model.add(tf.keras.layers.Activation("relu"))
model.add(tf.keras.layers.MaxPooling2D((2, 2)))

# model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same"))
# model.add(tf.keras.layers.MaxPooling2D((2, 2)))

model.add(tf.keras.layers.SeparableConv2D(3, (1, 16), use_bias=False, padding="same"))
model.add(tf.keras.layers.BatchNormalization())

# Reshape data to be compatible with LSTM layer
model.add(
    tf.keras.layers.Reshape(
        (model.output_shape[1], model.output_shape[2] * model.output_shape[3])
    )
)
#####

model.add(tf.keras.layers.LSTM(248, input_shape=(trainx.shape[1], trainx.shape[2])))

model.add(tf.keras.layers.Dense(248, activation="relu"))
model.add(tf.keras.layers.Activation("softmax"))
model.compile(
    loss="categorical_crossentropy",
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.0001),
    metrics=["accuracy"],
)
verbose, epochs, batch_size = 0, 30, 64

model.fit(trainx, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)

predictions = model.predict(testx)
predicted_labels = np.argmax(predictions, axis=1)
true_labels = np.argmax(testy, axis=1)

cm = confusion_matrix(true_labels, predicted_labels)
print(cm)
_, accuracy = model.evaluate(testx, testy, batch_size=batch_size, verbose=0)
print(accuracy)

ValueError: in user code:

    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\engine\training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\engine\training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\engine\training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\engine\training.py", line 1150, in train_step
        y_pred = self(x, training=True)
    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\Janus\anaconda3\envs\tf\Lib\site-packages\keras\src\engine\input_spec.py", line 298, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "sequential_7" is incompatible with the layer: expected shape=(None, 3959, 20, 21), found shape=(64, 248, 1980)
