In [1]:
%matplotlib inline
import scipy.io
import numpy as np
import pandas as pd
from pathlib import Path
import re
from collections import Counter
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import TensorDataset
import torch.utils.data as data
from torchvision import datasets
from sklearn import metrics
import seaborn as sns
from sklearn import metrics
import seaborn as sns

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # use gpu

In [2]:
def import_depth_data(action, subject, trial):
    filename = f'../Data/UTDMHAD_data/Depth/a{action}_s{subject}_t{trial}_depth.mat'# The name of the .mat files
    if Path(filename).is_file():
        mat = scipy.io.loadmat(filename)
        return mat['d_depth']
    else:
        return None

def transform_depth_data(action, subject, trial):
    rows = []
    data = import_depth_data(action, subject, trial)
    if data is None: return None
    for frame in range(data.shape[2]):
        pixels = data[:, :, frame].flatten()
        rows.append(pixels)
    result = np.insert(rows, 0, [[action], [subject], [trial], [frame]], axis=1)
    return np.array(result)

def transform_depth_data_to_df(action, subject, trial):
    data = transform_depth_data(action, subject, trial)
    if data is None: return None
    df = pd.DataFrame(data)
    df.columns = ['action', 'subject', 'trial', 'frame'] + [f'depth_{n}' for n in range(240 * 320)]
    return df

def export_depth_data_to_csv(action, subject, trial):
    df = transform_depth_data_to_df(action, subject, trial)
    if df is None: return None
    filename = f'a{action}_s{subject}_t{trial}_depth.csv'
    df.to_csv(filename, index=False)

def show_depth_image(action, subject, trial, frame):
    data = import_depth_data(action, subject, trial)
    if data is None: return None
    plt.imshow(data[:,:,frame], cmap='gray')
    plt.axis('off')
    plt.show()

In [3]:
def import_inertial_data(action, subject, trial):
    filename = f'../Data/UTDMHAD_data/Inertial/a{action}_s{subject}_t{trial}_inertial.mat'
    if Path(filename).is_file():
        mat = scipy.io.loadmat(filename)
        return mat['d_iner']
    else:
        return None
    
def transform_inertial_data(action, subject, trial):
    data = import_inertial_data(action, subject, trial)
    if data is None: return None
    result = np.insert(data, 0, [[action], [subject], [trial]], axis=1)
    return np.array(result)

def transform_inertial_data_to_df(action, subject, trial):
    data = transform_inertial_data(action, subject, trial)
    if data is None: return None
    df = pd.DataFrame(data)
    df.columns = ['action', 'subject', 'trial', 'x-accel', 'y-accel', 'z-accel', 'x-ang-accel', 'y-ang-accel', 'z-ang-accel']
    return df

In [None]:
activities = [i for i in range(1,28)]
raw_dataframe = transform_inertial_data_to_df(0, 0, 0)
for index, action in enumerate(activities):
    for subject in range(1, 9):
        for trial in range(1, 5):
            data = transform_inertial_data_to_df(action, subject, trial) # (160, 6)
            if data is None: continue
            #data = data[0:128] #(128.6) # maximum length is 128
            raw_dataframe = pd.concat([raw_dataframe, data])
raw_dataframe

In [None]:
# window_size: size of time window
# step: overlapping
# data: dataset
def time_windows(window_size,overlapping,data):
  sigmentation_data_temp = []
  sigmentation_data = []
  sigmentation_label = []
  for i in range(0,len(data),overlapping):
    acc_x = data['x-accel'].values[i:i+window_size]
    acc_y = data['y-accel'].values[i:i+window_size]
    acc_z = data['z-accel'].values[i:i+window_size]
    gyro_x = data['x-ang-accel'].values[i:i+window_size]
    gyro_y = data['y-ang-accel'].values[i:i+window_size]
    gyro_z = data['z-ang-accel'].values[i:i+window_size]
    total_label = data['action'].values[i:i+window_size]
    label = Counter(total_label).most_common()[0][0]
    sigmentation_data_temp.append([acc_x,acc_y,acc_z,gyro_x,gyro_y,gyro_z])
    sigmentation_arr = np.asarray(sigmentation_data_temp)
    sig_size = sigmentation_arr.shape
    if sig_size[2] == window_size:
      sigmentation_arr.reshape(window_size,6)
      sigmentation_data.append(sigmentation_arr)
      sigmentation_label.append(label)
      sigmentation_data_temp = []
    else:
      sigmentation_data_temp = []

  sigmentation_data_arr = np.asarray(sigmentation_data)
  sigmentation_label_arr = np.asarray(sigmentation_label)

  return sigmentation_data_arr,sigmentation_label_arr

In [None]:
utd_readings,utd_labels = time_windows(128,128,raw_dataframe)

In [None]:
print('Shape of data:')
print(utd_readings.shape)
print('Shape of label')
utd_labels.shape

In [None]:
utd_labels

In [None]:
utd_labels

In [None]:
#split the dataset 
idx_list = np.array(range(2430))
np.random.shuffle(idx_list)
train_idxes = idx_list[:2000]
test_idxes = idx_list[2000:]
train_features = utd_readings[train_idxes]
train_labels = utd_labels[train_idxes]
test_features = utd_readings[test_idxes]
test_labels = utd_labels[test_idxes]
# train_features = np.array([utd_readings[i] for i in train_idxes])
# train_labels = np.array([utd_labels[i] for i in train_idxes])

# test_features = np.array([utd_readings[i] for i in test_idxes])
# test_labels = np.array([utd_labels[i] for i in test_idxes])
# train_features, train_labels = utd_readings, utd_labels
# test_features, test_labels = utd_readings, utd_labels

In [None]:
np.array(train_labels)

In [None]:
train_dataset = TensorDataset(torch.from_numpy(train_features).to(torch.float32),torch.from_numpy(train_labels).to(torch.float32))
test_dataset = TensorDataset(torch.from_numpy(test_features).to(torch.float32),torch.from_numpy(test_labels).to(torch.float32))

In [None]:
type(train_dataset)

In [None]:
train_loader = torch.utils.data.DataLoader(dataset = train_dataset,batch_size = 64,shuffle = True)
test_loader = torch.utils.data.DataLoader(dataset = test_dataset,batch_size = 64,shuffle = False)

In [None]:
class LSTM_model(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTM_model, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first = True)
        #FC_layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0),self.hidden_dim).requires_grad_()#.to(device) 
        c0 = torch.zeros(self.layer_dim, x.size(0),self.hidden_dim).requires_grad_()#.to(device) 
        # X.SIZE = batch SIZE
        
        # detach the hidden state to prevent exploding gradient
        out, (hn,cn) = self.lstm(x,( h0.detach(),c0.detach()))# detach 
        out = self.fc(out[:, -1, :])# -1 the last layer state at time t     
        return out

In [None]:
# Hyper parameter 
BATCH_SIZE = 32 # 
EPOCHES = 10
input_dim = 128 # input dimension
hidden_dim = 1000 # hidden layers number
layer_dim = 1 
output_dim = 27 + 1 # output dimension

# initialize the model
model_lstm = LSTM_model(input_dim, hidden_dim, layer_dim, output_dim)
# device = torch.device('cuda:0' if torch.cuda.is_available() else  'cpu')

#model = model.to(device)

criterion = nn.CrossEntropyLoss()
learning_rate = 0.1
optimizer = torch.optim.SGD(model_lstm.parameters(), lr = learning_rate)

In [None]:
sequence_dim = 6
lost_list_LSTM = []
accuracy_list = []
iteration_list = [] 
iter = 0

for epoch in range(EPOCHES):
    for i, (images, labels) in enumerate(train_loader):
        model_lstm.train()
        # 32*1*6*128
        images = images.view(-1, sequence_dim, input_dim).requires_grad_()#.to(device)
        labels = labels.to(torch.int64)
        #labels = labels.to(device)
        # gradient.zero
        optimizer.zero_grad()
        #forward pass
        
        outputs = model_lstm(images)
        # print(output.size(), labels.size(), "pass")
        # loss calc
        loss_lstm = criterion(outputs, labels)
        # backword
        loss_lstm.backward()
        # renew the parameter
        optimizer.step()
        
        iter += 1
        if iter % 10 == 0:
            model_lstm.eval()
            # accuracy
            correct = 0
            total = 0
            for i, (images, labels) in enumerate(train_loader):
                    images = images.view(-1, sequence_dim, input_dim)#.to(device)
                    outputs = model_lstm(images)
                    #print(outputs.size())
                    
                    predict = torch.max(outputs.data, 1)[1]
                    total += labels.size(0)
                    correct += (predict == labels).sum()
            
            accuracy = correct / total * 100
            lost_list_LSTM.append(loss_lstm.data)
            accuracy_list.append(accuracy)
            iteration_list.append(iter)
            # print the info
            print("Iter:{},loss:{},Accuracy:{}".format(iter, loss_lstm.item(), accuracy))
        

In [None]:
labels_list = []
predictions = []
classes = [
     '1. right arm swipe to the left', 
    '2. right arm swipe to the right', 
    '3. right hand wave',
    '4. two hand front clap',
    '5. right arm throw',
    '6. cross arms in the chest',
    '7. basketball shoot', 
    '8. right hand draw x',
    '9. right hand draw circle (clockwise)',
    '10. right hand draw circle (counter clockwise)',
    '11. draw triangle', 
    '12. bowling (right hand)', 
    '13. front boxing',
    '14. baseball swing from right',
    '15. tennis right hand forehand swing',
    '16. arm curl (two arms)', 
    '17. tennis serve', 
    '18. two hand push',
    '19. right hand knock on door', 
    '20. right hand catch an object',
    '21. right hand pick up and throw',
    '22. jogging in place',
    '23. walking in place',
    '24. sit to stand', 
    '25. stand to sit', 
    '26. forward lunge (left foot forward',
    '27. squat'
]

with torch.no_grad():
    correct = 0
    total = 0
    for i, (images, labels) in enumerate(test_loader):
        images = images.view(-1, sequence_dim, input_dim)#.to(device)
        outputs = model_lstm(images)
        #print(outputs.size())
                    
        predict = torch.max(outputs.data, 1)[1]
        total += labels.size(0)
        correct += (predict == labels).sum()
        predictions.append(predict)
        labels_list.append(labels)
    
    print('Test Accuracy of the basic LSTM model on the UTD test features: {} %'.format((correct / total) * 100))
    
mat = metrics.confusion_matrix(torch.cat(predictions), torch.cat(labels_list))

plt.figure(figsize=(27, 27))
sns.heatmap(mat.T, square=True, annot=True, fmt='d', cbar=False,
            xticklabels=classes, yticklabels=classes)
plt.xlabel('true label')
plt.ylabel('predicted label')

In [None]:
# rnn
# num_feature = 6
# feature_width = 128
class RNN_model(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(RNN_model, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        #循环对象
        self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first = True, nonlinearity = "relu")
        
        #FC_layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0),self.hidden_dim).requires_grad_()#.to(device) 
        # X.SIZE = batch SIZE
        
        # 分离隐藏状态，避免梯度爆炸
        out, hn = self.rnn(x, h0.detach())# detach 分离
        out = self.fc(out[:, -1, :])# -1 the last layer state at time t
            
        return out
        

In [None]:
input_dim = 128 # input dimension
hidden_dim = 100 # hidden layers number 
layer_dim = 2
output_dim = 27 + 1 # output dimension

# initialize the model
model_rnn = RNN_model(input_dim, hidden_dim, layer_dim, output_dim)
# device = torch.device('cuda:0' if torch.cuda.is_available() else  'cpu')

#model = model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
learning_rate = 0.01
optimizer = torch.optim.SGD(model_rnn.parameters(), lr = learning_rate)

In [None]:
length = (len(list(model_rnn.parameters())))

In [None]:
sequence_dim = 6
loss_list = []
accuracy_list = []
iteration_list = [] 
# 24576 = 32*1*6*128
iter =  0
for epoch in range(EPOCHES):
    for i, (images, labels) in enumerate(train_loader):
        model_rnn.train()
        # A batch of data with transfering the RNN input dimention 
        # 32*1*6*128
        images = images.view(-1, sequence_dim, input_dim).requires_grad_()#.to(device)
        labels = labels.to(torch.int64)
        #labels = labels.to(device)
        # gradient.zero
        optimizer.zero_grad()
        #forward pass
        output = model_rnn(images)

        
        # print(output.size(), labels.size(), "pass")
        # loss calc
        loss_rnn = criterion(output, labels)
        # backword
        loss_rnn.backward()
        # renew the parameter
        optimizer.step()
        
        iter += 1
        if iter % 5 == 0:
            model_rnn.eval()
            # accuracy
            correct = 0
            total = 0
            for i, (images, labels) in enumerate(train_loader):
                    images = images.view(-1, sequence_dim, input_dim)#.to(device)
                    outputs = model_rnn(images)
                    #print(outputs.size())
                    
                    predict = torch.max(outputs.data, 1)[1]
                    total += labels.size(0)
                    correct += (predict == labels).sum()
            
            accuracy = correct / total * 100
            loss_list.append(loss_rnn.data)
            accuracy_list.append(accuracy)
            iteration_list.append(iter)
            # print the info
            print("Iter:{},loss:{},Accuracy:{}".format(iter, loss_rnn.item(), accuracy))
            

In [None]:
labels_list = []
predictions = []
classes = [
     '1. right arm swipe to the left', 
    '2. right arm swipe to the right', 
    '3. right hand wave',
    '4. two hand front clap',
    '5. right arm throw',
    '6. cross arms in the chest',
    '7. basketball shoot', 
    '8. right hand draw x',
    '9. right hand draw circle (clockwise)',
    '10. right hand draw circle (counter clockwise)',
    '11. draw triangle', 
    '12. bowling (right hand)', 
    '13. front boxing',
    '14. baseball swing from right',
    '15. tennis right hand forehand swing',
    '16. arm curl (two arms)', 
    '17. tennis serve', 
    '18. two hand push',
    '19. right hand knock on door', 
    '20. right hand catch an object',
    '21. right hand pick up and throw',
    '22. jogging in place',
    '23. walking in place',
    '24. sit to stand', 
    '25. stand to sit', 
    '26. forward lunge (left foot forward',
    '27. squat'
]

with torch.no_grad():
    correct = 0
    total = 0
    for i, (images, labels) in enumerate(test_loader):
        images = images.view(-1, sequence_dim, input_dim)#.to(device)
        outputs = model_rnn(images)
        #print(outputs.size())
                    
        predict = torch.max(outputs.data, 1)[1]
        total += labels.size(0)
        correct += (predict == labels).sum()
        predictions.append(predict)
        labels_list.append(labels)
    
    print('Test Accuracy of the basic RNN model on the UTD test features: {} %'.format((correct / total) * 100))
    
mat = metrics.confusion_matrix(torch.cat(predictions), torch.cat(labels_list))

plt.figure(figsize=(27, 27))
sns.heatmap(mat.T, square=True, annot=True, fmt='d', cbar=False,
            xticklabels=classes, yticklabels=classes)
plt.xlabel('true label')
plt.ylabel('predicted label')