In [24]:
GYRO = False
SEQUENCE_LENGTH = 4
SEQUENCE_OVERLAP = 3
BATCH_SIZE = 5
EPOCHS = 20
MODEL_NAME = f"pochs:{EPOCHS}_batch:{BATCH_SIZE}_gyro:{GYRO}_window:{SEQUENCE_LENGTH}_overlap:{SEQUENCE_OVERLAP}"
DEV_SIZE = 4
TEST_SIZE = 8
LEAVE_ONE_OUT = False
NORMALISE = True
OVERLAP_ON_TEST_SET = False
TRANSFORM = True
AUGMENT = True
AUGMENT_SIZE = 1 # 1 means 1 extra augmented sample per sample, 2 means 2 extra augmented samples per sample etc.

### This code takes in a position, eg "sitting_or_standing", "lying_down_left" etc and trains a model for just classifying activities of that position

In [25]:
import helpers.file_tagger as file_tagger
import helpers.sequence_generator as sequence_generator
import helpers.split_by_student as split_by_student

import tensorflow as tf
import numpy as np
from keras import layers, Sequential, models
from sklearn.model_selection import train_test_split
import time


POSSIBLE_ACTIVITIES = {
    "sitting&coughing",
    "sitting&hyperventilating",
    "sitting&normal_breathing",
    
    "standing&coughing",
    "standing&hyperventilating",
    "standing&normal_breathing",
    
    "sitting&talking",
    "sitting&eating",
    "sitting&singing",
    "sitting&laughing",

    "standing&talking",
    "standing&eating",
    "standing&singing",
    "standing&laughing",
    "lying_down_left&coughing",
    "lying_down_left&hyperventilating",
    "lying_down_left&talking",
    "lying_down_left&singing",
    "lying_down_left&laughing",
    "lying_down_left&normal_breathing",
    "lying_down_right&normal_breathing",
    "lying_down_right&coughing",
    "lying_down_right&hyperventilating",
    "lying_down_right&talking",
    "lying_down_right&singing",
    "lying_down_right&laughing",
    "lying_down_back&normal_breathing",
    "lying_down_back&coughing",
    "lying_down_back&hyperventilating",
    "lying_down_back&talking",
    "lying_down_back&singing",
    "lying_down_back&laughing",
    "lying_down_stomach&normal_breathing",
    "lying_down_stomach&coughing",
    "lying_down_stomach&hyperventilating",
    "lying_down_stomach&talking",
    "lying_down_stomach&singing",
    "lying_down_stomach&laughing",

}

POSSIBLE_OUTCOMES= {
    "sitting_or_standing&normal_breathing",
    "sitting_or_standing&coughing",
    "sitting_or_standing&hyperventilating",
    "sitting_or_standing&other",
    "lying_down_left&normal_breathing",
    "lying_down_left&coughing",
    "lying_down_left&hyperventilating",
    "lying_down_left&other",
    "lying_down_right&normal_breathing",
    "lying_down_right&coughing",
    "lying_down_right&hyperventilating",
    "lying_down_right&other",
    "lying_down_back&normal_breathing",
    "lying_down_back&coughing",
    "lying_down_back&hyperventilating",
    "lying_down_back&other",
     "lying_down_stomach&normal_breathing",
    "lying_down_stomach&coughing",
    "lying_down_stomach&hyperventilating",
    "lying_down_stomach&other",
    
}
    

DATA_DIRECTORY = "./all_respeck"
LABEL_TO_INDEX = {label: idx for idx, label in enumerate(POSSIBLE_OUTCOMES)}

if OVERLAP_ON_TEST_SET:
    TEST_SEQUENCE_OVERLAP = SEQUENCE_OVERLAP
else:
    TEST_SEQUENCE_OVERLAP = 0

In [26]:
def generate_training_data(directory, sequence_length, overlap, file_names, gyro = GYRO): # if gyro is false, only accelerometer data is used

    tagged_data = []

    # group each csv file into their respective areas
    csv_dictionary = file_tagger.tag_directory(directory)

    # iterates through each activity
    for key in POSSIBLE_ACTIVITIES:

        # iterates through each csv file for the activity 
        for csv_file in csv_dictionary[key]:
            if csv_file in file_names:
                if gyro:
                    sequences = sequence_generator.generate_sequences_from_file_with_gyroscope(directory + "/" + csv_file, sequence_length, overlap, normalise=NORMALISE)
                else:
                    sequences = sequence_generator.generate_sequences_from_file_without_gyroscope(directory + "/" + csv_file, sequence_length, overlap, normalise=NORMALISE)

                # iterate through each generated sequence
                for sequence in sequences:
                    position = key.split("&")[0]
                    activity = key.split("&")[1]

                    if activity == "talking" or activity == "singing" or activity == "laughing" or activity == "eating":
                        activity = "other"

                    if position == "standing" or position == "sitting":
                        position = "sitting_or_standing"
                        
                    tagged_data.append((position + "&" + activity, sequence))

    print ("there are " + str(len(tagged_data)) + " tagged sequences in the dataset")

    return tagged_data

In [27]:
def train_model_CNN(input_data, labels_encoded, unique_labels, epochs, batch_size, validation_data):
    if GYRO:
        width = 6
    else:
        width = 3
    if TRANSFORM:
        width = width*3
    # Define the CNN model for your specific input shape
    model = Sequential([
        layers.Conv1D(32, 3, activation='relu', input_shape=(SEQUENCE_LENGTH*25, width)),
        layers.MaxPooling1D(2),
        layers.Conv1D(64, 3, activation='relu'),
        layers.MaxPooling1D(2),
        layers.Conv1D(128, 3, activation='relu'),
        layers.MaxPooling1D(2),
        layers.Conv1D(128, 3, activation='relu'),
        layers.MaxPooling1D(2),
        #layers.Conv1D(256, 3, activation='relu'),
        #layers.MaxPooling1D(2),
        #layers.Conv1D(512, 3, activation='relu'),
        #layers.MaxPooling1D(2),
        layers.Dropout(0.5),
        layers.Flatten(),
        #layers.Dense(256, activation='relu'),
        #layers.Dense(128, activation='relu'),
        layers.Dense(64, activation='relu'),
        layers.Dense(len(unique_labels), activation='softmax')
    ])

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # Train the CNN model
    if len(validation_data[0]) == 0:
        model.fit(input_data, labels_encoded, epochs=epochs, batch_size=batch_size)
    else:
        model.fit(input_data, labels_encoded, epochs=epochs, batch_size=batch_size, validation_data=validation_data)

    return model

In [None]:
import numpy as np

def augment_data(sequences, labels, num_augmentations):
    augmented_sequences = []
    augmented_labels = []
    
    for sequence, label in zip(sequences, labels):
        for _ in range(num_augmentations):
            augmented_sequence = sequence + np.random.normal(0, 0.1, size=sequence.shape)
            augmented_sequences.append(augmented_sequence)
            augmented_labels.append(label)
    
    return augmented_sequences, augmented_labels


In [28]:
def fft(data):

    # Extract x, y, and z data
    x_data = data[:, 0]
    y_data = data[:, 1]
    z_data = data[:, 2]

    # Apply FFT to each axis
    x_fft = np.fft.fft(x_data)
    y_fft = np.fft.fft(y_data)
    z_fft = np.fft.fft(z_data)

    # The result is complex numbers, so you may want to take the magnitude
    x_magnitude = np.abs(x_fft)
    y_magnitude = np.abs(y_fft)
    z_magnitude = np.abs(z_fft)

    # If needed, you can also compute the corresponding frequencies
    # The frequencies are in cycles per time unit (usually, Hz if your time unit is seconds)
    x_frequencies = np.fft.fftfreq(len(x_data))
    y_frequencies = np.fft.fftfreq(len(y_data))
    z_frequencies = np.fft.fftfreq(len(z_data))

    representation = []
    for i in range(len(x_magnitude)):
        representation.append([x_magnitude[i], y_magnitude[i], z_magnitude[i], x_frequencies[i], y_frequencies[i], z_frequencies[i]])

    return representation

def extract_features(train_data, dev_data, test_data):
    train_features = [fft(sequence) for sequence in train_data]
    dev_features = [fft(sequence) for sequence in dev_data]
    test_features = [fft(sequence) for sequence in test_data]

    return train_features, dev_features, test_features


def merge_arrays(arr1, arr2):
    return np.concatenate((arr1, arr2), axis=1)

def normalise(sequence):
    """
    Normalizes a matrix of accelerometer values.
    """
    norm = np.linalg.norm(sequence, axis=1)
    norm[norm == 0] = 1
    return sequence / norm[:, np.newaxis]

In [29]:
def create_sequence_label_lists(tagged_sequences):
    sequences = [sequence for _, sequence in tagged_sequences]
    labels = [label for label, _ in tagged_sequences]
    sequences = np.array(sequences, dtype=np.float32)
    labels_encoded = [LABEL_TO_INDEX[label] for label in labels]
    labels = np.array(labels_encoded)

    return sequences, labels
    

def create_data_sets(dev_size, test_size):

    training_files, dev_files, test_files = split_by_student.split_data(students_in_dev_set= dev_size, students_in_test_set=test_size)

    tagged_training_sequences = generate_training_data(DATA_DIRECTORY, SEQUENCE_LENGTH, SEQUENCE_OVERLAP, file_names=training_files)
    tagged_dev_sequences = generate_training_data(DATA_DIRECTORY, SEQUENCE_LENGTH, TEST_SEQUENCE_OVERLAP, file_names=dev_files)
    tagged_test_sequences = generate_training_data(DATA_DIRECTORY, SEQUENCE_LENGTH, TEST_SEQUENCE_OVERLAP, file_names=test_files)

    train_data, train_labels = create_sequence_label_lists(tagged_training_sequences)
    dev_data, dev_labels = create_sequence_label_lists(tagged_dev_sequences)
    test_data, test_labels = create_sequence_label_lists(tagged_test_sequences)

    #print(len(train_data), len(train_labels), len(dev_data), len(dev_labels), len(test_data), len(test_labels))

    return train_data, train_labels, dev_data, dev_labels, test_data, test_labels

In [30]:
def leave_one_out():
    students = split_by_student.get_list_of_stutents()
    test_accuracies = []
    for test_student in students:
        print("testing student: " + str(test_student))
        print("training students: " + str([student for student in students if student != test_student]))
        
        test_files, training_files = split_by_student.get_list_of_files(test_student)

        tagged_training_sequences = generate_training_data(DATA_DIRECTORY, SEQUENCE_LENGTH, SEQUENCE_OVERLAP, file_names=training_files)
        tagged_test_sequences = generate_training_data(DATA_DIRECTORY, SEQUENCE_LENGTH, TEST_SEQUENCE_OVERLAP, file_names=test_files)

        train_data, train_labels = create_sequence_label_lists(tagged_training_sequences)
        test_data, test_labels = create_sequence_label_lists(tagged_test_sequences)

        model = train_model_CNN(train_data, train_labels, POSSIBLE_OUTCOMES, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=([], [])) #batch_size, epochs
        test_loss, test_accuracy = model.evaluate(test_data, test_labels)
        test_accuracies.append(test_accuracy)
        print("for student " + str(test_student) + " the accuracy is " + str(test_accuracy))
        print("average accuracy so far: " + str(sum(test_accuracies)/len(test_accuracies)))
        print("number of students tested so far: " + str(len(test_accuracies)))
        time.sleep(3)

        
    print("Accuracy for each student:")
    print(", ".join([f"{student}: {accuracy}" for student, accuracy in zip(students, test_accuracies)]))
    print("Average overall accuracy:", sum(test_accuracies)/len(test_accuracies))


In [31]:
if LEAVE_ONE_OUT:
    leave_one_out()
    exit()

train_data, train_labels, dev_data, dev_labels, test_data, test_labels = create_data_sets(dev_size=DEV_SIZE, test_size=TEST_SIZE)

if AUGMENT:
    augmented_train_data, augmented_train_labels = augment_data(train_data, train_labels, AUGMENT_SIZE)
    train_data = np.concatenate((train_data, augmented_train_data))
    train_labels = np.concatenate((train_labels, augmented_train_labels))

if TRANSFORM:
    train_transform, dev_transform, test_transform = extract_features(train_data, dev_data, test_data)
    train_data = np.array(train_data)
    train_transform = np.array(train_transform)
    dev_data = np.array(dev_data)
    dev_transform = np.array(dev_transform)
    test_data = np.array(test_data)
    test_transform = np.array(test_transform)

    train_data = np.array([merge_arrays(train_data[i], train_transform[i]) for i in range(len(train_data))])
    dev_data = np.array([merge_arrays(dev_data[i], dev_transform[i]) for i in range(len(dev_data))])
    test_data = np.array([merge_arrays(test_data[i], test_transform[i]) for i in range(len(test_data))])



# train and save model (CHOOSE BETWEEN CNN AND LSTM)
model = train_model_CNN(train_data, train_labels, POSSIBLE_OUTCOMES, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(dev_data, dev_labels)) #batch_size, epochs


# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_data, test_labels)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Save the trained model
#model.save(f"models/models_for_presentation/lying_down_left_model.keras")

Train Set: s67, s7, s60, s9, s55, s63, s93, s16, s74, s21, s13, s59, s40, s29, s61, s38, s70, s98, s80, s50, s92, s30, s48, s100, s27, s51, s22, s43, s45, s83, s72, s46, s32, s39, s18, s88, s102, s11, s57, s15, s3, s56, s42, s75, s65, s36, s1, s79, s54, s82, s17, s64, s44, s8, s34, s52, s97, s77, s12
Dev Set: s35, s87, s71, s95
Test Set: s5, s23, s91, s96, s86, s66, s33, s84
there are 60350 tagged sequences in the dataset
there are 1064 tagged sequences in the dataset
there are 2019 tagged sequences in the dataset
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test Loss: 2.584341526031494, Test Accuracy: 0.7132243514060974
