In [1]:
# !pip install torchvision

In [1]:
import os
import random

import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
# from torchinfo import summary

In [2]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

set_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [3]:
class GetDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.file_list = self._get_file_list()

        self.label_mapping = {}
        label_names = os.listdir(root_dir)
        for idx, label in enumerate(label_names):
            self.label_mapping[label] = idx

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        file_path = self.file_list[index]
        data = pd.read_csv(file_path).drop(['elapsed_time','frame_number','nPoint'], axis=1)
        label = os.path.basename(os.path.dirname(file_path))
        label_id = self.label_mapping[label]
        # print(label)

        desired_length = 100000
        if len(data) < desired_length:
            data = self._pad_sequence(data, desired_length)
        else:
            data = data[:desired_length]

        return torch.tensor(data.values, dtype=torch.float32), torch.tensor(label_id, dtype=torch.long)

    def _pad_sequence(self, data, desired_length):
        data_length = len(data)
        pad_length = desired_length - data_length
        pad_data = pd.DataFrame([[0, 0, 0, 0, 0, 0, 0]] * pad_length, columns=data.columns)

        data = pd.concat([data, pad_data], axis=0)
        return data

    def _get_file_list(self):
        file_list = []
        for root, dirs, files in os.walk(self.root_dir):
            for file in files:
                if file.endswith(".csv"):
                    file_path = os.path.join(root, file)
                    file_list.append(file_path)
        return file_list

In [4]:
train_root = "/content/Train"
test_root = "/content/Test"

train_dataset = GetDataset(train_root)
test_dataset = GetDataset(test_root)

In [5]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)

        #self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(hidden_size,hidden_size)
        #dropout
        self.dropout = nn.Dropout(p=0.5)
        self.fc = nn.Linear(hidden_size, num_classes)


    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        h_n = h_n[-1]
        out = self.dropout(h_n)
        out = self.fc2(out)
        out = self.dropout(out)
        out = self.fc(out)
        return out

In [6]:
input_size = 7
hidden_size = 128
num_classes = 1
batch_size = 64
num_epochs = 50
learning_rate = 0.005

In [7]:
model = LSTMClassifier(input_size, hidden_size, num_classes)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss().to(device)
cr2 = nn.BCEWithLogitsLoss()

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [8]:
def compute_loss(y_hat, y):
    return nn.BCELoss()(y_hat, y)
def main():
    # summary(model, (batch_size, 1, input_size))
    for epoch in range(num_epochs):
        for batch_data, batch_labels in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_data)
            print(outputs.size(), batch_labels.size())
            outputs = outputs.view(-1) #reshape output

            loss = criterion(outputs, batch_labels.float())
            loss.backward()
            optimizer.step()

            print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")

def dothis():
    model.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for batch_data, batch_labels in test_loader:
            outputs = model(batch_data)
            outputs = outputs.view(-1)
            _, predicted = torch.max(outputs.data, 1)
            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

In [None]:
main()

torch.Size([16, 1]) torch.Size([16])


In [None]:
dothis()

In [None]:
import glob
import os
import numpy as np
# random seed.
rand_seed = 1
from numpy.random import seed
seed(rand_seed)
import tensorflow
tensorflow.random.set_seed(rand_seed)

import keras
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Dropout, Flatten, Activation
from keras.layers import Permute, Reshape
from keras import backend as K

from keras import optimizers
from keras.optimizers import SGD
from keras.optimizers import Adam
from keras.metrics import categorical_crossentropy
from keras.layers import BatchNormalization
from keras.layers import *
from keras.callbacks import Callback
from keras.callbacks import ModelCheckpoint
from keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Dropout, Flatten, Bidirectional,TimeDistributed
from sklearn.model_selection import train_test_split
from keras.models import load_model

In [None]:
sub_dirs=['Walk']

def one_hot_encoding(y_data, sub_dirs, categories=5):
    Mapping=dict()

    count=0
    for i in sub_dirs:
        Mapping[i]=count
        count=count+1
    print(Mapping)
    y_features2=[]
    print(y_data)
    for i in y_data:
        print(i)
        lab=Mapping[i]
        y_features2.append(lab)

    y_features=np.array(y_features2)
    y_features=y_features.reshape(y_features.shape[0],1)
    from keras.utils import to_categorical
    y_features = to_categorical(y_features)

    return y_features

In [None]:
def full_3D_model(summary=False):
    print('building the model ... ')
    model = Sequential()

    model.add(Bidirectional(LSTM(64, return_sequences=False, stateful=False,input_shape=(60, 10*1024) )))
    model.add(Dropout(.5,name='dropout_1'))
    model.add(Dense(128, activation='relu', name='DENSE_1'))
    model.add(Dropout(.5,name='dropout_2'))
    model.add(Dense(5, activation='softmax', name = 'output'))

    return model


In [None]:
import pandas as pd
train_root = "/content/Train/Walk/w_1.csv"
data = pd.read_csv(train_root).drop(['elapsed_time','frame_number','nPoint'], axis=1)
train_data = np.array(data,dtype=np.dtype(np.float64))

train_root = "/content/Train/Walk/w_2.csv"
data = pd.read_csv(train_root).drop(['elapsed_time','frame_number','nPoint'], axis=1)
_tr_data = np.array(data,dtype=np.dtype(np.float64))
train_data = np.concatenate((train_data, _tr_data), axis=0)

train_root = "/content/Train/Walk/w_3.csv"
data = pd.read_csv(train_root).drop(['elapsed_time','frame_number','nPoint'], axis=1)
_tr_data = np.array(data,dtype=np.dtype(np.float64))
train_data = np.concatenate((train_data, _tr_data), axis=0)
# train_data = np.array(train_data,dtype=np.dtype(np.int32))
train_label = ['Walk']


In [None]:
# train_data = np.array(train_data,dtype=np.dtype(np.int32))
train_label = ['Walk']

In [None]:
train_label = one_hot_encoding(train_label, sub_dirs, categories=1)
print(train_data.shape)
# train_data = train_data.reshape(train_data.shape[0],train_data.shape[1], train_data.shape[2]*train_data.shape[3]*train_data.shape[4])


In [None]:
print('Training Data Shape is:')
print(train_data.shape,train_label.shape)



X_train, X_val, y_train, y_val  = train_test_split(train_data, train_label, test_size=0.20, random_state=1)

model = full_3D_model()


print("Model building is completed")


adam = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None,
                       decay=0.0, amsgrad=False)

model.compile(loss=keras.losses.categorical_crossentropy,
                   optimizer=adam,
                  metrics=['accuracy'])

checkpoint = ModelCheckpoint('/content/', monitor='val_loss', verbose=1, save_best_only=True, mode='min')

callbacks_list = [checkpoint]


# Training the model
learning_hist = model.fit(X_train, y_train,
                             batch_size=20,
                             epochs=30,
                             verbose=1,
                             shuffle=True,
                           validation_data=(X_val,y_val),
                           callbacks=callbacks_list
                          )