In [None]:
"""Module for data-related stuff."""



class DrivingDataset:

    CLASSES = {
        "speedbumppassing": "speedbumppassing",
        "zigzag": "zigzag",
    }

    FEATURES = [
        "ax",
        "ay",
        "az",
        "wx",
        "wy",
        "wz"
    ]

    def __init__(self, data_root, unwrapped_attitude=False,
                 metadata_file=None):
        self.data_root = pathlib.Path(data_root)
        self.files = []
        self.unwrapped_attitude = unwrapped_attitude
        self.metadata = {}  # Dictionary with participant codes as keys.

        # Save each CSV file and infer class from filename.
        for csv_ in self.data_root.glob("**/*.csv"):
            class_ = str(csv_.parent.stem)[:3]
            if class_ in self.CLASSES.keys():
                self.files.append([csv_, class_])

        # Read metadata form given file.
        if metadata_file:
            with open(metadata_file, newline="") as metadata:
                csv_reader = csv.reader(metadata)
                next(csv_reader, None)  # skip the headers
                for row in csv_reader:
                    self.metadata[row[0]] = list(map(int, row[1:]))

    def __getitem__(self, item):
        file_, class_ = self.files[item]
        signals = csv2numpy(file_)

        if self.unwrapped_attitude:
            # Unwrap attitude signals.
            for i in range(3):
                signals[:, i] = np.unwrap(signals[:, i])

        if self.metadata:
            # Read metadata and return as extra element.
            metadata = self.metadata[file_.stem.split("_")[1]]
            return signals, class_, metadata
        return signals, class_

    def __len__(self):
        return len(self.files)


class HARDatasetCrops(HARDataset):
    """Dataset with fixed-length crops.

    Args:
        data_root -- string. Path to data directory.
        length -- int. Crops length.
        discard_start -- int. Number of samples to discard from start.
        discard_end -- int. Number of samples to discard from end.
        unwrapped_attitude -- bool. Whether to unwrap attitude signals.
        padding_mode -- None or string. If None, the samples not fitting in
                integer number of windows will be discarded. If string,
                the value will be passed to numpy's pad function.
    """

    def __init__(self, data_root, length, discard_start, discard_end,
                 unwrapped_attitude=True, padding_mode=None,
                 metadata_file=None):
        super().__init__(data_root, unwrapped_attitude=unwrapped_attitude,
                         metadata_file=metadata_file)
        self.length = length
        self.discard_start = discard_start
        self.discard_end = discard_end
        self.padding_mode = padding_mode

        self.crops = self.get_crops()

    def get_crops(self):
        """Return list with crops from files."""
        crops = []
        # Iterate over data files.
        for file, class_ in self.files:
            # Read from file.
            signal = csv2numpy(file)
            # Crop start and end.
            signal = signal[self.discard_start:(signal.shape[0] - self.discard_end)]
            windows, remainder = divmod(signal.shape[0], self.length)
            if self.padding_mode and remainder != 0:
                # Apply padding with given padding mode.
                padding = self.length * (windows + 1) - signal.shape[0]
                signal = np.pad(signal, ((0, padding), (0, 0)), self.padding_mode)
            elif self.padding_mode is None:
                # Crop the end.
                signal = signal[:(self.length * windows)]
            # Obtain crops from <discard_start> to <discard-end>.
            for i in range(0, signal.shape[0], self.length):
                crop = signal[i:(i + self.length)]
                if self.unwrapped_attitude:
                    # Unwrap phase of first 3 features (attitude signals).
                    for s in range(3):
                        crop[:, s] = np.unwrap(crop[:, s])
                if self.metadata:
                    # Read metadata and return as extra element.
                    metadata = self.metadata[file.stem.split("_")[1]]
                    crops.append([crop, class_, metadata])
                else:
                    crops.append([crop, class_])

        return crops

    def __getitem__(self, item):
        return self.crops[item]

    def __len__(self):
        return len(self.crops)


if __name__ == '__main__':
    dataset = HARDatasetCrops('motionsense-dataset', 256, 10, 10, True)
    for item in iter(dataset):
        assert item[0].shape == (256, 12)

In [28]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import sklearn.preprocessing
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.decomposition import PCA
import keras
from keras.utils import to_categorical
from keras.models import Sequential, Model
from keras.layers import Dense, Conv1D, Flatten, concatenate, Input

In [5]:
import os
files_train = []
for i in os.listdir("zigzag/train"):
    files_train.append(["zigzag/train/"+i, "zigzag"])
    
    
for i in os.listdir("speedbumppassing/train"):
    files_train.append(["speedbumppassing/train/"+i, "speedbump"])

In [6]:
import os
files_test = []
for i in os.listdir("zigzag/test"):
    files_test.append(["zigzag/test/"+i, "zigzag"])
    
    
for i in os.listdir("speedbumppassing/test"):
    files_test.append(["speedbumppassing/test/"+i, "speedbump"])

In [22]:
import pathlib
import csv

import numpy as np


def csv2numpy(file_name):
    """Read multidimensional signal from file"""
    # Read data from file.
    data = np.genfromtxt(file_name, delimiter=",", skip_header=1)
    # Return all columns but the first one (as it is the index).
    return data[:, 1:-1]



def get_crops(files, length, discard_start, discard_end, padding_mode=None):
        """Return list with crops from files."""
        crops = []
        # Iterate over data files.
        for file, class_ in files:
            # Read from file.
            signal = csv2numpy(file)
            # Crop start and end.
            signal = signal[discard_start:(signal.shape[0] - discard_end)]
            windows, remainder = divmod(signal.shape[0], length)
            if padding_mode and remainder != 0:
                # Apply padding with given padding mode.
                padding = length * (windows + 1) - signal.shape[0]
                signal = np.pad(signal, ((0, padding), (0, 0)), padding_mode)
            elif padding_mode is None:
                # Crop the end.
                signal = signal[:(length * windows)]
            # Obtain crops from <discard_start> to <discard-end>.
            for i in range(0, signal.shape[0], length):
                crop = signal[i:(i + length)]
                crops.append([crop, class_])

        return crops

In [23]:
crops_train = get_crops(files_train, 245, 50, 50)
crops_test = get_crops(files_test, 245, 50, 50)

In [29]:
label_encoder = sklearn.preprocessing.LabelEncoder()
label_encoder.fit(["zigzag", "speedbump"]);

In [31]:
X_train = []
y_train = []
for i in crops_train:
    X_train.append(i[0])
    y_train.append(i[1])

In [34]:
X_test = []
y_test = []
for i in crops_test:
    X_test.append(i[0])
    y_test.append(i[1])

In [35]:
X_train = np.array(X_train)
X_test = np.array(X_test)

In [36]:
y_train = to_categorical(label_encoder.transform(y_train))
y_test = to_categorical(label_encoder.transform(y_test))


In [41]:
X_train.shape

(15938, 245, 6)

# Simple CNN

In [82]:
clf = Sequential()
#add model layers
clf.add(Conv1D(16, kernel_size=5, activation="relu", input_shape=(245, 6)))
clf.add(Conv1D(32, kernel_size=5, activation="relu"))
clf.add(Flatten())
clf.add(Dense(2, activation="sigmoid", kernel_regularizer=keras.regularizers.l2(0.01)))

In [83]:
clf.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

clf.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=30)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x228c8c39890>

In [81]:
clf.evaluate(X_test, y_test)



[1.2629058361053467, 0.8159420490264893]

# Transformer

In [74]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

#Imports
import pandas as pd
import numpy as np
from keras import layers


#Splitting data
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split


#Building Transformer
import torch
import torch.nn as nn 
import torch.optim as optim
import math


In [75]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Attention and Normalization
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(inputs, inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)

    return x + res

In [76]:
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(2, activation="sigmoid")(x)
    return keras.Model(inputs, outputs)

In [77]:
input_shape = X_train.shape[1:]

model = build_model(
    input_shape,
    head_size=2,
    num_heads=4,
    ff_dim=3,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.3,
    dropout=0.25,
)

model.compile(
    loss="categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    metrics=["categorical_accuracy"],
)
model.summary()

# callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]

model.fit(
    X_train,
    y_train,
    validation_data = (X_test, y_test),
    epochs=50,
    batch_size=64,
#     callbacks=callbacks,
)



Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 245, 6)]             0         []                            
                                                                                                  
 multi_head_attention_8 (Mu  (None, 245, 6)               222       ['input_5[0][0]',             
 ltiHeadAttention)                                                   'input_5[0][0]']             
                                                                                                  
 dropout_26 (Dropout)        (None, 245, 6)               0         ['multi_head_attention_8[0][0]
                                                                    ']                            
                                                                                            

 dropout_31 (Dropout)        (None, 245, 3)               0         ['conv1d_32[0][0]']           
                                                                                                  
 conv1d_33 (Conv1D)          (None, 245, 6)               24        ['dropout_31[0][0]']          
                                                                                                  
 layer_normalization_21 (La  (None, 245, 6)               12        ['conv1d_33[0][0]']           
 yerNormalization)                                                                                
                                                                                                  
 tf.__operators__.add_21 (T  (None, 245, 6)               0         ['layer_normalization_21[0][0]
 FOpLambda)                                                         ',                            
                                                                     'tf.__operators__.add_20[0][0
          

Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x228cb924d10>

In [78]:
model.evaluate(X_test, y_test)



[1.6725695133209229, 0.8159420490264893]

# CNN-LSTM

In [60]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv1D, MaxPooling1D, Flatten, LSTM, Dense
from tensorflow.keras.regularizers import l2

# Define the model with L2 regularization
model = Sequential()

# TimeDistributed(Conv1D()) layer with parameters and L2 regularization
model.add(TimeDistributed(Conv1D(filters=16, kernel_size=3, activation='relu', kernel_regularizer=l2(0.01)), input_shape=(245, 6, 1)))

# TimeDistributed(MaxPooling1D()) layer with parameters and padding
model.add(TimeDistributed(MaxPooling1D(pool_size=2, padding='same')))

# TimeDistributed(Conv1D()) layer with parameters and L2 regularization
model.add(TimeDistributed(Conv1D(filters=8, kernel_size=3, activation='relu', padding='same', kernel_regularizer=l2(0.01))))

# TimeDistributed(MaxPooling1D()) layer with parameters and padding
model.add(TimeDistributed(MaxPooling1D(pool_size=2, padding='same')))

# TimeDistributed(Conv1D()) layer with parameters and L2 regularization
model.add(TimeDistributed(Conv1D(filters=64, kernel_size=3, activation='relu', padding='same', kernel_regularizer=l2(0.01))))

# TimeDistributed(MaxPooling1D()) layer with parameters and padding
model.add(TimeDistributed(MaxPooling1D(pool_size=2, padding='same')))

# TimeDistributed(Conv1D()) layer with parameters and L2 regularization
model.add(TimeDistributed(Conv1D(filters=8, kernel_size=3, activation='relu', padding='same', kernel_regularizer=l2(0.01))))

# TimeDistributed(MaxPooling1D()) layer with parameters and padding
model.add(TimeDistributed(MaxPooling1D(pool_size=2, padding='same')))

# TimeDistributed(Flatten()) layer
model.add(TimeDistributed(Flatten()))

# Define LSTM model with parameters and L2 regularization
model.add(LSTM(units=60, return_sequences=True, kernel_regularizer=l2(0.01)))

# Additional LSTM layer with L2 regularization
model.add(LSTM(units=70, return_sequences=True, kernel_regularizer=l2(0.01)))

# Flatten layer outside TimeDistributed
model.add(Flatten())

# Dense layer with parameters and L2 regularization
model.add(Dense(units=2, activation='sigmoid', kernel_regularizer=l2(0.01)))  # Adjust units to match the number of output classes

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100

KeyboardInterrupt: 

In [61]:
model.evaluate(X_test, y_test)



[1.067910075187683, 0.8173912763595581]

# TCN

In [65]:
!pip install keras-tcn

Collecting keras-tcn
  Downloading keras_tcn-3.5.0-py3-none-any.whl (13 kB)
Collecting tensorflow-addons (from keras-tcn)
  Obtaining dependency information for tensorflow-addons from https://files.pythonhosted.org/packages/ec/52/047d768c4669db0c059109a88c21a3c71bcda957c46f13967e44b8c7fa4c/tensorflow_addons-0.22.0-cp311-cp311-win_amd64.whl.metadata
  Downloading tensorflow_addons-0.22.0-cp311-cp311-win_amd64.whl.metadata (1.8 kB)
Collecting typeguard<3.0.0,>=2.7 (from tensorflow-addons->keras-tcn)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Downloading tensorflow_addons-0.22.0-cp311-cp311-win_amd64.whl (719 kB)
   ---------------------------------------- 0.0/719.8 kB ? eta -:--:--
   - ------------------------------------- 30.7/719.8 kB 660.6 kB/s eta 0:00:02
   ----- ---------------------------------- 92.2/719.8 kB 1.1 MB/s eta 0:00:01
   ----------- ---------------------------- 204.8/719.8 kB 1.6 MB/s eta 0:00:01
   ----------------- ---------------------- 307.2/719.8 kB 

DEPRECATION: neuralplot 0.0.8 has a non-standard dependency specifier matplotlib>=3.1numpy>=1.16. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of neuralplot or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063


In [66]:
# Build TCN Model
from tcn import TCN
def build_tcn_model(input_shape, num_classes):
    model = tf.keras.Sequential()

    # Add TCN layer
    model.add(TCN(input_shape=input_shape, nb_filters=64, kernel_size=3, dilations=[1, 2, 4, 8], return_sequences=False))

    # Fully Connected Layer
    model.add(tf.keras.layers.Dense(num_classes, activation='sigmoid'))

    # Compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

# Build the model
input_shape = X_train.shape[1:]
model = build_tcn_model(input_shape, 2)

# Display the model summary
model.summary()

# Train the Model
# early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

model.fit(
    X_train,
    y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
#     callbacks=[early_stopping]
)




Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 tcn (TCN)                   (None, 64)                88128     
                                                                 
 dense_15 (Dense)            (None, 2)                 130       
                                                                 
Total params: 88258 (344.76 KB)
Trainable params: 88258 (344.76 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
 93/499 [====>.........................] - ETA: 35s - loss: 2.0752e-06 - accuracy: 1.0000

KeyboardInterrupt: 

In [67]:
model.evaluate(X_test, y_test)



[13.103269577026367, 0.8246376514434814]