In [None]:
from data.data_segmentation import DataSegmentation
from data.data_loader import DataLoader
from models.LSTM_model import LSTM_Network,Config
from models.CNN_model import read_data,make_input_data,CNNModel
import pandas as pd
from collections import Counter
import tensorflow as tf
from utils.activity_type import ActivityType
import numpy as np
import random as rn
import tensorflow as tf
from tensorflow.keras import layers, models
import tensorflow_addons as tfa
from sklearn.metrics import confusion_matrix
tf.compat.v1.disable_eager_execution()
np.random.seed(42)
rn.seed(12345)
tf.random.set_seed(1234)
import matplotlib.pyplot as plt

In [None]:
# Output classes to learn how to classify
LABELS = [
    "Walking",
    "Descending Stairs",
    "Ascending Stairs"
]

data_loader = DataLoader("dataset/labeled-raw-accelerometry-data-captured-during-walking-stair-climbing-and-driving-1.0.0/raw_accelerometry_data")
data_loader.download_data()
data_loader.read_files()

data_seg = DataSegmentation(window_duration=2.56, overlap=0.5, sampling_rate=50)
train_data_X,train_data_y = data_seg(data_loader.train_data)
test_data_X,test_data_y = data_seg(data_loader.test_data)

label_mapping = ActivityType.create_label_mapping()
print("label_mapping:",label_mapping)
one_hot_encoded_train_y = ActivityType.one_hot(train_data_y, label_mapping)
one_hot_encoded_test_y = ActivityType.one_hot(test_data_y, label_mapping)
final_train_y = one_hot_encoded_train_y.reshape(one_hot_encoded_train_y.shape[0],-1)
final_test_y = one_hot_encoded_test_y.reshape(one_hot_encoded_test_y.shape[0],-1)

config = Config(train_data_X, test_data_X)
print("Some useful info to get an insight on dataset's shape and normalisation:")
print("features shape, labels shape, each features mean, each features standard deviation")
print(test_data_X.shape, final_test_y.shape,
      np.mean(test_data_X), np.std(test_data_X))
print("the dataset is therefore properly normalised, as expected.")
print(train_data_X.shape, final_train_y.shape)

In [None]:
class_counts = dict(zip(LABELS, np.sum(final_train_y, axis=0)))

# Print the counts for each class
print("Class Counts:")
for label, count in class_counts.items():
    print(f"{label}: {count} samples")

In [None]:
# Visualize dataset information
print("Dataset Information:")
print("Number of training samples:", len(train_data_X))
print("Number of testing samples:", len(test_data_X))
print("Number of features per sample:", train_data_X.shape[1])
print("Number of classes:", len(LABELS))

# Plot four subplots, each containing three features
num_subplots = 4
features_per_subplot = 3

plt.figure(figsize=(15, 10))
example_index = 0

for i in range(num_subplots):
    plt.subplot(2, 2, i + 1)
    

    for axis in range(features_per_subplot):
        plt.plot(train_data_X[example_index, :, i*3+axis], label=f'Axis {axis + 1}')

    plt.title(f"Example Data - {LABELS[np.argmax(final_train_y[example_index])]}")
    plt.xlabel("Time Steps")
    plt.ylabel("Acceleration")
    plt.legend()

plt.tight_layout()
plt.show()

In [None]:


# Function to find an example from each class
def find_example_from_each_class(data, labels):
    unique_labels = np.unique(labels, axis=0)
    examples = []

    for label in unique_labels:
        index = np.argmax(np.all(labels == label, axis=1))
        examples.append((data[index], label))

    return examples

# Find an example from each class
examples = find_example_from_each_class(train_data_X, final_train_y)

# Plot four subplots, each containing three features
num_subplots = 3
features_per_subplot = 3

plt.figure(figsize=(15, 10))

for i in range(num_subplots):
    plt.subplot(2, 2, i + 1)
    example_data, example_label = examples[i]

    for axis in range(features_per_subplot):
        plt.plot(example_data[:, axis], label=f'Axis {axis + 1}')

    plt.title(f"Example Data - {LABELS[np.argmax(example_label)]}")
    plt.xlabel("Time Steps")
    plt.ylabel("Acceleration")
    plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Function to find an example from each class
def find_example_from_each_class(data, labels):
    unique_labels = np.unique(labels, axis=0)
    examples = []

    for label in unique_labels:
        index = np.argmax(np.all(labels == label, axis=1))
        examples.append((data[index], label))

    return examples

# Find an example from each class
examples = find_example_from_each_class(train_data_X, final_train_y)

# Plot four figures, each containing three subplots for each class
num_figures = 4
num_subplots_per_figure = 3
features_per_subplot = 3

all_features = range(train_data_X.shape[2])

In [None]:
subplot_index = 3
range(features_per_subplot+(subplot_index-1)*3,features_per_subplot+subplot_index*3)

In [None]:


for figure_index in range(num_figures):
    plt.figure(figsize=(15, 10))
    
    for subplot_index in range(num_subplots_per_figure):
        plt.subplot(2, 2, subplot_index + 1)
        
        for feature_index in range(features_per_subplot+(figure_index-1)*3,features_per_subplot+figure_index*3) :
            example_data, example_label = examples[subplot_index]
            plt.plot(example_data[:, feature_index], label=f'Feature {all_features[feature_index] + 1}')

        plt.title(f"Class: {LABELS[np.argmax(example_label)]}")
        plt.xlabel("Time Steps")
        plt.ylabel("Acceleration")
        plt.legend()

    plt.tight_layout()

plt.show()

In [None]:
# ------------------------------------------------------
# Step 3: Let's get serious and build the neural network
# ------------------------------------------------------

X = tf.compat.v1.placeholder(tf.float32, [None, config.n_steps, config.n_inputs])
Y = tf.compat.v1.placeholder(tf.float32, [None, config.n_classes])

pred_Y = LSTM_Network(X, config)

# # Loss,optimizer,evaluation
l2 = config.lambda_loss_amount * \
    sum(tf.nn.l2_loss(tf_var) for tf_var in tf.compat.v1.trainable_variables())
# Softmax loss and L2
cost = tf.reduce_mean(
    tf.nn.softmax_cross_entropy_with_logits(labels=tf.stop_gradient(Y), logits=pred_Y)) + l2
optimizer = tf.compat.v1.train.AdamOptimizer(
    learning_rate=config.learning_rate).minimize(cost)

correct_pred = tf.equal(tf.argmax(pred_Y, 1), tf.argmax(Y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, dtype=tf.float32))


In [None]:
# --------------------------------------------
# Step 4: Hooray, now train the neural network
# --------------------------------------------

# Note that log_device_placement can be turned ON but will cause console spam with RNNs.
sess = tf.compat.v1.InteractiveSession(config=tf.compat.v1.ConfigProto(log_device_placement=False))
init = tf.compat.v1.global_variables_initializer()
sess.run(init)

best_accuracy = 0.0
# Start training for each batch and loop epochs
for i in range(config.training_epochs):
    for start, end in zip(range(0, config.train_count, config.batch_size),
                          range(config.batch_size, config.train_count + 1, config.batch_size)):
        sess.run(optimizer, feed_dict={X: train_data_X[start:end],
                                       Y: final_train_y[start:end]})

    # Test completely at every epoch: calculate accuracy
    pred_out, accuracy_out, loss_out = sess.run(
        [pred_Y, accuracy, cost],
        feed_dict={
            X: test_data_X,
            Y: final_test_y
        }
    )
    print("traing iter: {},".format(i) +
          " test accuracy : {},".format(accuracy_out) +
          " loss : {}".format(loss_out))
    best_accuracy = max(best_accuracy, accuracy_out)

print("")
print("final test accuracy: {}".format(accuracy_out))
print("best epoch's test accuracy: {}".format(best_accuracy))
print("")


In [None]:
# Define the CNN model

model = models.Sequential()

# Convolutional layers
model.add(layers.Conv1D(32, kernel_size=3, activation='relu', input_shape=(128, 12)))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(64, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(128, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))

# Flatten the output for the fully connected layers
model.add(layers.Flatten())

# Fully connected layers

model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))

# Output layer
model.add(layers.Dense(3, activation='softmax'))  # Assuming 3 classes

#optimizer = tfa.optimizers.AdamW(weight_decay=1e-4)
# Compile the model
model.compile(optimizer= 'adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# Train the model
model.fit(train_data_X, final_train_y, epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_data_X, final_test_y)
print(f'Test accuracy: {test_acc}')

In [None]:
# Define the CNN model
import tensorflow as tf
from tensorflow.keras import layers, models
import tensorflow_addons as tfa
model = models.Sequential()

# Convolutional layers
model.add(layers.Conv1D(32, kernel_size=3, activation='relu', input_shape=(128, 12)))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(64, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(128, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))

# Flatten the output for the fully connected layers
model.add(layers.Flatten())

# Fully connected layers
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))

# Output layer
model.add(layers.Dense(3, activation='softmax'))  # Assuming 3 classes

#optimizer = tfa.optimizers.AdamW(weight_decay=1e-4)
# Compile the model
model.compile(optimizer= 'adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# Train the model
model.fit(train_data_X, final_train_y, epochs=10, batch_size=16, validation_split=0.2)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_data_X, final_test_y)
print(f'Test accuracy: {test_acc}')

In [None]:
# Define the CNN model
model = models.Sequential()

# Convolutional layers
model.add(layers.Conv1D(32, kernel_size=3, activation='relu', input_shape=(128, 12)))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(64, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Conv1D(128, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))

# Flatten the output for the fully connected layers
model.add(layers.Flatten())

# Fully connected layers
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))

# Output layer
model.add(layers.Dense(3, activation='softmax'))  # Assuming 3 classes
adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
loss_funct = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Compile the model
#optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5, decay=1e-7)
model.compile(optimizer = adam_optimizer, loss = loss_funct, metrics = ["accuracy"]) #(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# Train the model
model.fit(train_data_X, final_train_y, epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_data_X, final_test_y)
print(f'Test accuracy: {test_acc}')

In [None]:
# Assuming your model is already trained (e.g., 'model' is your trained model)

# Predict class probabilities for the test set
predicted_probs = model.predict(test_data_X)

# Extract predicted class labels
predicted_labels = np.argmax(predicted_probs, axis=1)

# Convert one-hot encoded true labels to class indices
true_labels = np.argmax(final_test_y, axis=1)

# Compute the confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)

# Create a heatmap of the confusion matrix using matplotlib
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()

classes = ['Walking', 'Descending Stairs', 'Ascending Stairs']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes)
plt.yticks(tick_marks, classes)

plt.xlabel('Predicted')
plt.ylabel('True')

# Display the values inside the heatmap
for i in range(len(classes)):
    for j in range(len(classes)):
        plt.text(j, i, str(conf_matrix[i, j]), horizontalalignment='center', verticalalignment='center')

plt.show()

In [None]:
# Define the CNN model
model = models.Sequential()

# Convolutional layers with dropout and max pooling
model.add(layers.Conv1D(16, kernel_size=3, activation='relu', input_shape=(128, 12)))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Dropout(0.2))

model.add(layers.Conv1D(32, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Dropout(0.2))

model.add(layers.Conv1D(64, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Dropout(0.2))

model.add(layers.Conv1D(128, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Dropout(0.2))

model.add(layers.Conv1D(256, kernel_size=3, activation='relu'))
model.add(layers.MaxPooling1D(pool_size=2))
model.add(layers.Dropout(0.2))

# Flatten the output for the fully connected layers
model.add(layers.Flatten())

# Fully connected layer
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dropout(0.2))

# Output layer
model.add(layers.Dense(3, activation='softmax'))  # Assuming 3 classes

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.00005)
loss_funct = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
#optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4, decay=1e-6)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# Train the model and plot accuracy through epochs
history = model.fit(train_data_X, final_train_y, epochs=25, batch_size=16, validation_split=0.2)

# Plot accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Model Accuracy Through Epochs')
plt.show()

test_loss, test_acc = model.evaluate(test_data_X, final_test_y)
print(f'Test accuracy: {test_acc}')

# Assuming your model is already trained (e.g., 'model' is your trained model)

# Predict class probabilities for the test set
predicted_probs = model.predict(test_data_X)

# Extract predicted class labels
predicted_labels = np.argmax(predicted_probs, axis=1)

# Convert one-hot encoded true labels to class indices
true_labels = np.argmax(final_test_y, axis=1)

# Compute the confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)

# Create a heatmap of the confusion matrix using matplotlib
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()

classes = ['Walking', 'Descending Stairs', 'Ascending Stairs']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes)
plt.yticks(tick_marks, classes)

plt.xlabel('Predicted')
plt.ylabel('True')

# Display the values inside the heatmap
for i in range(len(classes)):
    for j in range(len(classes)):
        plt.text(j, i, str(conf_matrix[i, j]), horizontalalignment='center', verticalalignment='center')

plt.show()