In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = "/content/gdrive/My Drive/"
base_dir = root_dir + 'PredictionCode/'
import os
import sys
sys.path.append(base_dir)
print(os.getcwd())
print(os.listdir())
os.chdir(base_dir)
print(os.getcwd())
print(os.listdir())

In [None]:
from torch import nn
import torch
import copy
from torch.utils import data
import numpy as np
import random
from torchvision import models
from util.dataset import TrainDatasetMask, TestDatasetMask, ValDatasetMask


class Network(nn.Module):

    def __init__(self, hidden_dim, output_size, n_layers, drop_prob, batch_size=1):
        super(Network, self).__init__()
        self.cnn_layers = models.alexnet(pretrained=True)
        self.batch_size = batch_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.rnn = nn.LSTM(70000, hidden_dim, n_layers, dropout=drop_prob, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(hidden_dim * 2, 500)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(500, output_size)
        self.softmax = nn.Softmax()

    def forward(self, X):
        conv_output = []
        X = X.view(70,3,256,256)
        for x in X:
            x = x.view(1, 3, x.size(1), x.size(2))
            x = self.cnn_layers(x)
            x = x.view(x.size(0), -1)
            conv_output.append(x)
        conv_output = torch.stack(conv_output)
        conv_output = conv_output.view(1, conv_output.size(1), conv_output.size(2) * conv_output.size(0))
        h = model.hidden(1)
        rnn_out, hidden = self.rnn(conv_output, h)
        out = self.fc1(rnn_out[:, -1, :])
        out = self.relu(out)
        out = self.fc2(out)
        out = self.softmax(out)
        return out


    def hidden(self, batch_size):
        # This is what we'll initialise our hidden state as
        h0 = torch.zeros(self.n_layers*2, batch_size, self.hidden_dim).to(device) # 2 for bidirection 
        c0 = torch.zeros(self.n_layers*2, batch_size, self.hidden_dim).to(device)
        return (h0, c0)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('\ndevice = {0}\n'.format(device))


batch_size = 1
kwargs = {'data_path': './PredData/',
          'labels': ['handstand', 'stand', 'smoke', 'somersault',
                     'clap', 'smile', 'sit', 'wave', 'laugh', 'cartwheel']
          }

train_dataset = TrainDatasetMask(**kwargs)

#Train Dataloader
train_dataloader_pars = {'batch_size': 1, 'shuffle': True, 'num_workers': 0}
train_dataloader = torch.utils.data.DataLoader(train_dataset, **train_dataloader_pars)


val_dataset = ValDatasetMask(**kwargs)

#Validation Dataloader
val_dataloader_pars = {'batch_size': 1, 'shuffle': True, 'num_workers': 0}
val_dataloader = torch.utils.data.DataLoader(val_dataset, **val_dataloader_pars)

#Instantiate Network
model = Network(256, len(train_dataset.labels), 2, 0)
model.load_state_dict(torch.load('./MaskLSTM.pt'))
model.train()

model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)
        
loss_function = nn.CrossEntropyLoss()
total_epochs = 30


list_val_loss = []
list_train_loss = []
list_val_acc = []
list_train_acc = []

#Training Loop
for e in range(total_epochs):

    #to calculate loss and accuracy for the epoch
    total_loss_epoch = 0
    total_val_loss_epoch = 0
    h = model.hidden(batch_size)
    train_correct = 0
    val_correct = 0

    #train
    for i, data in enumerate(train_dataloader, start=0):
        #get a batch
        X, y, index = data

        #put data to device
        X_temp = X.float().to(device)
        y_temp = y.long().to(device)
        optimizer.zero_grad()

        #forward prop
        out = model(X_temp)
        _ , pred = torch.max(out, 1)
        pred = int(pred)

        #calculate train accuracy
        if pred == y_temp.item():
            train_correct+=1

        #back prop
        loss = loss_function(out, torch.tensor([y_temp]).to(device))
        loss.backward()
        optimizer.step()
        total_loss_epoch += loss.item()

        
    with torch.no_grad():
        model.eval()
        for i, data in enumerate(val_dataloader, start=0):
            #get a batch
            X, y, index = data

            #put data to device
            X_temp = X.float().to(device)
            y_temp = y.long().to(device)
            optimizer.zero_grad()

            #forward prop
            out = model(X_temp)
            _ , pred = torch.max(out, 1)
            pred = int(pred)

            #calculate validation accuracy
            if pred == y_temp.item():
                val_correct+=1
            loss = loss_function(out, torch.tensor([y_temp]).to(device))
            total_val_loss_epoch += loss.item()

    model.train()


    #store and save metrics
    list_val_loss.append(total_val_loss_epoch/len(val_dataloader))
    list_train_loss.append(total_loss_epoch/len(train_dataloader))
    list_val_acc.append(val_correct/len(val_dataloader))
    list_train_acc.append(train_correct/len(train_dataloader))
    np.save('./MaskLSTMval_loss', list_val_loss)
    np.save('./MaskLSTMval_acc', list_val_acc)
    np.save('./MaskLSTMtrain_loss', list_train_loss)
    np.save('./MaskLSTMtrain_acc', list_train_acc)

    torch.save(model.state_dict(), './MaskLSTM.pt')
    print('Epoch = %g/%g, Loss = %s, Acc = %s' % (e + 1, total_epochs, str(total_loss_epoch/len(train_dataloader)), str(train_correct/len(train_dataloader))))
    print('Epoch = %g/%g, Val Loss = %s, Val Acc = %s' % (e + 1, total_epochs, str(total_val_loss_epoch/len(val_dataloader)), str(val_correct/len(val_dataloader))))