In [2]:
from typing import Dict

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import os

from torch.utils.data import Dataset

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layer, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layer = num_layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layer, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layer, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layer, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [None]:
class CustomDataset(Dataset):
    def __init__(self, directory: os.path, label_map: Dict[str, int]):
        self.directory = directory
        self.label_map = label_map

    def __categorize(self, value) -> torch.Tensor:
        categorized_label = [0] * len(self.label_map)
        categorized_label[self.label_map[value]] = 1
        return torch.Tensor(categorized_label)

    def __len__(self):
        return len(next(os.walk(self.directory))[1])

    def __getitem__(self, idx) -> (torch.Tensor, torch.Tensor):
        keypoints_path = os.path.join(self.directory, str(idx))
        label = os.listdir(keypoints_path)[0]
        frames = os.listdir(os.path.join(keypoints_path, label))
        frames.sort()
        window = np.empty((len(frames), 21 * 3 * 2))
        for i in range(len(frames)):
            window[i] = np.load(os.path.join(keypoints_path, label, frames[i]))
        return dict(
            label=self.__categorize(label),
            sequence=torch.from_numpy(window).type(torch.FloatTensor)
        )

In [None]:
model = LSTMClassifier(input_size=126,
                       hidden_size=128,
                       num_layer=1,
                       num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [None]:
train_loader = torch.utils.data.DataLoader()
device = torch.device("cuda:0")
train_loss = 0
train_total = 0
train_correct = 0

In [None]:
for epoch in range(100):
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted==labels).sum().item()

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            val_loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted==labels).sum().item()

    print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.4f}, Val Loss: {:.4f}, Val Acc: {:.4f}'
          .format(epoch+1, num_epochs, train_loss/len(train_loader), train_correct/train_total,
                  val_loss/len(val_loader), val_correct/val_total))

In [None]:
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)

        _, predicted = torch.max(outputs.data, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()