# **Pose Classification MLP - Implement, Train, and Test**

In [None]:
# mount to google drive if necessary
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
from copy import deepcopy
import os
import sys
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.transforms import ToTensor
from tqdm.auto import tqdm
import torchsummary

In [None]:
class pose_Dataset(Dataset):
  def __init__(self):
    super().__init__()
    self.num_cls = 8
    self.file_dir = '/content/gdrive/MyDrive/pose_class'
    self.data_lists = [[] for x in range(self.num_cls)]
    self.cls_list = []
    # set file index to separate files in different classes
    file_idx = 0
    # accessing to file directory
    for path, dirs, files in tqdm(os.walk(self.file_dir)):
      # get GT(=class) of a file -> file_cls  ex) 068.left -> file_cls = 68
      for dir in dirs:
        file_cls = int(dir.split('.')[0])
        self.cls_list.append(file_cls)

      # loading each data
      for file_name in files:
        file_path = path + "/" + file_name # ex) file_name = 068_0001.txt

        # if file format is 'csv'
        if file_name.split('.')[-1] == 'csv':
          csvdata = pd.read_csv(file_path)
          npdata = csvdata.to_numpy().squeeze()
          for index in range(npdata.shape[0]):
            npdata[index] = float(npdata[index].split(" ")[-1])
          data_ts = torch.from_numpy(npdata[7:].astype('float32')) # get keypoint data of 1st object, convert to tensor
        # if file format is 'txt'
        elif file_name.split('.')[-1] == 'txt':
          f = open(file_path, 'r')
          data_str = f.read()
          data_str = data_str.replace('(', '').replace(')', '').split(',') # data_str = "x1, y1, v1, x2, y2, v2, ..."
          data = [float(x) for x in data_str]
          data_ts = torch.tensor(data)[0][7:] # get keypoint data of 1st object, convert to tensor
          f.close()

        # print(f"{file_idx}, {len(self.data_lists[file_idx])}")
        self.data_lists[file_idx-1].append(data_ts)
      file_idx += 1

    print(f"{self.cls_list}")
    # length of total data and category positon
    self.total_len = 0
    self.cat_pos_list = []

    # for indexing, store aggregate number of files
    for data_files in self.data_lists:
      self.total_len += len(data_files)
      self.cat_pos_list.append(self.total_len)

  def __len__(self):
    return self.total_len

  def __getitem__(self, idx):
    # check category
    y = 0
    # keyboard input as an output
    ki = 0
    for cat_limit in self.cat_pos_list:
      if idx >= cat_limit:
        y += 1
      else:
        break

    # load x from data_lists
    if y == 0:
      x = self.data_lists[y][idx]
      ki = 0
    elif y == 1:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 32
    elif y == 2:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 68
    elif y == 3:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 101
    elif y == 4:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 67
    elif y == 5:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 100
    elif y == 6:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 66
    elif y == 7:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]
      ki = 65
    else:
      x = self.data_lists[y][idx - self.cat_pos_list[y-1]]

    return x, y

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
pose_ds = pose_Dataset()
print(len(pose_ds))
trainset, testset = random_split(pose_ds, [8259, 2000], generator=torch.Generator().manual_seed(42))
train_loader = DataLoader(trainset, batch_size=1, shuffle=False, num_workers=1)
test_loader = DataLoader(testset, batch_size=1, shuffle=True, num_workers=1)

In [None]:
class linear(nn.Module):
  def __init__(self, in_features, out_features):
    super().__init__()
    self.linear = nn.Linear(in_features, out_features)
    self.relu = nn.ReLU()

  def forward(self, x):
    x = self.linear(x)
    x = self.relu(x)
    return x

In [None]:
class pose_class_MLP(nn.Module):
  def __init__(self):
    super().__init__()
    self.net1 = linear(51, 1000)
    self.net2 = linear(1000, 256)
    self.net3 = nn.Linear(256, 8)

  def forward(self, x):
    x = x.view(-1)
    x = self.net1(x)
    x = self.net2(x)
    x = self.net3(x)
    return x

In [None]:
p_c_MLP = pose_class_MLP().to(device)
pytorch_total_params = sum(p.numel() for p in p_c_MLP.parameters())
print(f"total params: {pytorch_total_params}")
optimizer = torch.optim.Adam(p_c_MLP.parameters(), lr=1e-4, weight_decay=1e-3)
criterion = nn.MSELoss()

total params: 310312


In [None]:
# load the pretrained model
model = pose_class_MLP()
model.load_state_dict(torch.load(PATH))
model.to(device)
model.float().eval()

pose_class_MLP(
  (net1): linear(
    (linear): Linear(in_features=51, out_features=1000, bias=True)
    (relu): ReLU()
  )
  (net2): linear(
    (linear): Linear(in_features=1000, out_features=256, bias=True)
    (relu): ReLU()
  )
  (net3): Linear(in_features=256, out_features=8, bias=True)
)

In [None]:
p_c_MLP.train()
acc_tot = []
for epoch in range(1, 100+1):
  p_c_MLP.train()
  loss_avg = 0
  total_batch = 0
  for x, y in tqdm(train_loader):
    # initialize the gradients of the optimizer
    optimizer.zero_grad()

    # normalizing in a min-max manner
    x = x - x.min()
    x = x / x.max()

    # one-hot encoding the labels
    y = nn.functional.one_hot(y, num_classes=8).squeeze()

    # move input and labels to GPU device (else CPU)
    x = x.to(device)
    y = y.to(device).float()

    # forward pass the input to the MLP network
    pred = p_c_MLP(x)

    # calculate loss
    loss = criterion(pred, y)

    # perform backward propagation
    loss.backward()

    # updata the parameters w.r.t. gradients
    optimizer.step()

    # update batch count and average loss
    total_batch += 1
    loss_avg += loss
  loss_avg = loss_avg / total_batch
  print(f"{epoch}: {loss_avg}")

  if epoch % 10 == 0:
    p_c_MLP.eval()
    total = 0
    correct = 0
    for x, y in tqdm(test_loader):
      # normalizing in a min-max manner
      x = x - x.min()
      x = x / x.max()

      # one-hot encoding the labels
      y = nn.functional.one_hot(y, num_classes=8).squeeze()

      # move input and labels to GPU device (else CPU)
      x = x.to(device)
      y = y.to(device).float()

      # forward pass the input to the MLP network
      pred = p_c_MLP(x)
      total += 1

      # calculate the accuracy of the model
      correct += (torch.argmax(pred).cpu() == torch.argmax(y).cpu()).sum()
    acc = correct.item() / total * 100
    print(f"accuracy: {acc}%")
    acc_tot.append(acc)

In [None]:
# plot the total accuracy for every 10 epoch
plt.plot(acc_tot)
plt.show()

In [None]:
def is_parallel(model):
  return type(model) in (nn.parallel.DataParallel, nn.parallel.DistributedDataParallel)

In [None]:
# save the MLP after training
save_path = '/content/gdrive/MyDrive/pretrained_MLP/'  # enter the save path
file_name = 'pose_class_MLP.pt'
PATH = save_path + file_name
ckpt = deepcopy(p_c_MLP.module if is_parallel(p_c_MLP) else p_c_MLP).half()
torch.save(ckpt.state_dict(), PATH)