In [1]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv1D, Bidirectional, LSTM, Dropout,
                                     BatchNormalization, MultiHeadAttention,
                                     GlobalAveragePooling1D, Dense, Add)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import tensorflow as tf
from sklearn.decomposition import PCA

In [2]:

# This code for the BiLSTM model with additional outcome metrics

from datetime import datetime 
import os
import numpy as np
from sklearn.preprocessing import OneHotEncoder

# Define the path to the dataset
dataset_path = 'D:\Feature Engineering\Room2_npy'

# Define the target length for data trimming/padding
target_length = 750

# Define parameters for model
Model_type = "Bidirectional Long Short-Term Memory"
batchsize = 32
monitor_choice = 'val_loss'
dropout = 0.3
learningrate = 0.001

#Initialise lists
data_list = []
data_list1 = []
labels_list = []

classes_to_double = ["kneel", "liedown", "pickup"]

# Helper function to split sequences

def split_sequence(sequence, step):
    return [sequence[i:i + step] for i in range(0, len(sequence), step) if len(sequence[i:i + step]) == step]

# Traverse through each activity directory
for activity_folder in os.listdir(dataset_path):
    activity_path = os.path.join(dataset_path, activity_folder)
##    print(f"Processing activity: {activity_folder}")

    if os.path.isdir(activity_path):
        for participant_file in os.listdir(activity_path):
            participant_path = os.path.join(activity_path, participant_file)
            if os.path.isfile(participant_path) and participant_file.endswith('.npy'):
##                print(f"Loading data from file: {participant_file}")
                data = np.load(participant_path)
                data = np.real(data)

                # Split data into smaller sequences
                data_splits = split_sequence(data, target_length)

                for split_data in data_splits:
                    # Normalize data
#                     min_val = np.min(split_data)
#                     max_val = np.max(split_data)
                   ##  Z-Score normalization
                    mean = np.mean(split_data, axis=0)
                    std = np.std(split_data, axis=0)
                    data_normalized = (split_data - mean) / (std + 1e-8) 

                    label = activity_folder
#                    data_list1.append(data_normalized[:, 0:30])          
#                    labels_list.append(label)
#                    data_list1.append(data_normalized[:, 30:60])          
#                    labels_list.append(label)
                    data_list.append(data_normalized)          
                    labels_list.append(label)
                    if label in classes_to_double:
                        data_list.append(data_normalized.copy())  # Double the data
                        labels_list.append(label)  # Double the label

# Convert lists to arrays for machine learning processing
data = np.array(data_list)
labels = np.array(labels_list).reshape(-1, 1)

print("Final data shape:", data.shape)
print("Final labels shape:", labels.shape)

Final data shape: (1814, 750, 90)
Final labels shape: (1814, 1)


In [3]:
## Reshaping the data and applying PCA
original_shape = data.shape 
data_reshaped = data.reshape(-1, original_shape[2])
pca = PCA(n_components=50)
data_pca_flat = pca.fit_transform(data_reshaped) 
data_pca = data_pca_flat.reshape(original_shape[0], original_shape[1], -1)
print("New shape after PCA:", data_pca.shape)

New shape after PCA: (1814, 750, 50)


In [6]:
# Set your learning rate and batch size
learningrate = 0.001
batchsize = 32
epochsvalue = 100
patience_number = 20
# One-hot encode the labels
encoder = OneHotEncoder(sparse=False)
labels_encoded = encoder.fit_transform(labels.reshape(-1, 1))
# Train-test split
X_train, X_test, y_train_encoded, y_test_encoded = train_test_split(data_pca, labels_encoded, test_size=0.2, random_state=42)
# Input layer
inputs = Input(shape=(X_train.shape[1], X_train.shape[2]))
# BiLSTM layer
bilstm_out = Bidirectional(LSTM(128, return_sequences=True), merge_mode='sum')(inputs)
drop1 = Dropout(0.3)(bilstm_out)
norm1 = BatchNormalization()(drop1)
# Attention layer
attn_out = MultiHeadAttention(num_heads=4, key_dim=64)(query=norm1, value=norm1, key=norm1)
attn_pool = GlobalAveragePooling1D()(attn_out)
# DenseNet-style dense stack
dense1 = Dense(256, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001))(attn_pool)
drop2 = Dropout(0.3)(dense1)
bn2 = BatchNormalization()(drop2)
dense2 = Dense(128, activation='relu')(bn2)
drop3 = Dropout(0.3)(dense2)
dense3 = Dense(64, activation='relu')(drop3)
drop4 = Dropout(0.3)(dense3)
# Output layer
outputs = Dense(y_train_encoded.shape[1], activation='softmax')(drop4)
# Compile model
model = Model(inputs=inputs, outputs=outputs)
optimizer = Adam(learning_rate=learningrate)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Summary
model.summary()


In [7]:
# Implement early stopping
monitor_choice = 'val_loss'
patience_number = 20
early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)

epochsvalue = 100

# start the timer for model training
start_time = datetime.now() 

# Train the model
history = model.fit(X_train, y_train_encoded, epochs=epochsvalue, batch_size=32, validation_data=(X_test, y_test_encoded), callbacks=[early_stopping])

# Get the final training accuracy and loss
train_loss = history.history['loss'][-1]
train_accuracy = history.history['accuracy'][-1]
print(f"Final training loss: {train_loss:.4f}")
print(f"Final training accuracy: {train_accuracy:.4f}")

# Evaluate the model on test data
test_loss, test_accuracy = model.evaluate(X_test, y_test_encoded)
print(f"Test loss: {test_loss:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")

#End timing and calculate total time for training
end_time = datetime.now() 
time_difference = (end_time - start_time).total_seconds()/60


Epoch 1/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 689ms/step - accuracy: 0.3542 - loss: 1.9892 - val_accuracy: 0.5372 - val_loss: 2.1329
Epoch 2/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 704ms/step - accuracy: 0.6817 - loss: 1.0829 - val_accuracy: 0.6860 - val_loss: 1.9076
Epoch 3/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 762ms/step - accuracy: 0.7821 - loss: 0.7803 - val_accuracy: 0.8457 - val_loss: 1.5305
Epoch 4/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 748ms/step - accuracy: 0.8500 - loss: 0.5816 - val_accuracy: 0.9174 - val_loss: 0.9696
Epoch 5/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 744ms/step - accuracy: 0.8717 - loss: 0.5390 - val_accuracy: 0.8843 - val_loss: 0.5983
Epoch 6/100
[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 733ms/step - accuracy: 0.8938 - loss: 0.4675 - val_accuracy: 0.9008 - val_loss: 0.4749
Epoch 7/100
[1m