# Dataset

In [1]:
# sysimport os
import sys
import numpy as np
import random
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from itertools import permutations
from tqdm import tqdm
import time

In [2]:
class DataSet(torch.utils.data.Dataset):

    """ Dataset for skeleton-based action recognition
    Arguments:
        data_path: the path to '.npy' data, the shape of data should be (N, C, T, V, M)
        label_path: the path to label
        debug: If true, only use the first 100 samples
    """

    def __init__(self,
                 data_path: str,
                 label_path: str,
                 frame_path: str,
                 debug: bool = False):
        self.debug = debug
        self.data_path = data_path
        self.label_path = label_path
        self.frame_path = frame_path
        self.load_data()
        

    def load_data(self):
        # data: N C T V M
        if self.frame_path:
            self.frame = np.load(self.frame_path)

        # load label
        with open(self.label_path, 'rb') as f:
            self.sample_name, self.label = pickle.load(f)

        # load data
        self.data = np.load(self.data_path)

        #PKUMMD1
        if "PKUMMD_1" in self.data_path:
            self.N, self.C, self.T, self.V, self.M = self.data.shape
            self.data = np.transpose(self.data, (0, 2, 1, 3, 4))
            self.data = self.data.reshape(self.N, self.T, 150)
        #PKUMMD1

        if self.debug:
            self.label = self.label[0:100]
            self.data = self.data[0:100]
            self.sample_name = self.sample_name[0:100]

        self.size, self.max_frame, self.feature_dim = self.data.shape


    def __len__(self) -> int:
        return self.size

    def __getitem__(self, index: int) -> tuple:
        data = np.array(self.data[index])
        label = self.label[index]
        if hasattr(self, 'frame'):
            frame = self.frame[index]
        else:
            frame = self.max_frame
        if "NTU" in self.data_path or "PKUMMD_1" in self.data_path or "PKUMMD2" in self.data_path:
            return data, label, frame
        else:
            return data, label - 1, frame

# Visualization Dataset

In [13]:
dataset = DataSet("/mnt/netdisk/linlilang/UCLA/train_data.npy", "/mnt/netdisk/linlilang/UCLA/train_label.pkl", "/mnt/netdisk/linlilang/UCLA/train_num_frame.npy", True)

In [3]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
%matplotlib inline
from IPython import display

In [4]:
class Visualization:

    def drawFrame(self, datum):

        x = datum[:,0]
        y = datum[:,1]
        z = datum[:,2]
        
        fig = plt.figure()
        ax = Axes3D(fig)

        ax.scatter(x, y, z)
        for i in range(len(x)):
            ax.text(x[i],y[i],z[i],i + 1)
            
        links = [(3, 2), (2, 1), (1, 0), (0, 12), (0, 16), (12, 13), (13, 14),
                (14, 15), (16, 17), (17, 18), (18, 19), (2, 4), (4, 5), (5, 6), (6, 7), (2, 8), (8, 9), (9, 10), (10, 11)]
        for link in links:
            i = link[0]
            j = link[1]
            ax.plot([x[i],x[j]], [y[i],y[j]], [z[i],z[j]], c='r')

        plt.axis('off')
        
        ax.view_init(elev=-90, azim=90)
        
    #     plt.pause(0.5)
        display.clear_output(wait=True)
        plt.show()

    def drawVideo(self, data, frame_num):
        for i in range(frame_num):
            self.drawFrame(data[i])

    def drawFeature(self, feature, label):
        feature = feature.cpu().detach().numpy()
        feature = np.nan_to_num(feature)
        label = label.cpu().detach().numpy()
        # X_embedded = TSNE(n_components=2).fit_transform(feature)

        data_pd = {'x':feature[:, 0],'y':feature[:, 1],'label':label}
        data_pd = pd.DataFrame(data_pd)
        fig = sns.scatterplot(x="x", y="y", hue="label", palette="Paired", data=data_pd)
        plt.legend(loc = 'best')
        feature_fig = fig.get_figure()
        feature_fig.savefig("test", dpi = 400)
        plt.close()
    

In [None]:
index = 0
data = np.reshape(dataset[index][0], (-1, 20, 3))
frame_num = dataset[index][2]
vis = Visualization()
vis.drawVideo(data, frame_num)

# Model

In [5]:
import torch.nn as nn
import torch
from torchvision import models
from sys import path
import torch.nn.functional as F
import numpy as np

In [6]:
def mask_empty_frame(X, frame_num):

    batch = X.size(0)
    time_step = X.size(1)
    num_classes = X.size(2)

    idx = torch.arange(0, time_step, 1).cuda().long().expand(batch, time_step)
    frame_num_expand = frame_num.view(batch,1).repeat(1,time_step)
    #(batch, time_step, num_classes)
    mask = (idx < frame_num_expand).float().view(batch, time_step, 1).repeat(1,1,num_classes)
    X = X * mask
    return X

def mask_mean(X, frame_num):
    X = mask_empty_frame(X, frame_num)
    X = torch.sum(X, dim = 1)
    eps = 0.01 # to deal with 0 frame_num
    frame_num = frame_num.view(-1,1).float() + eps
    X = X / frame_num
    return X

hidden_size = 2

class ENC(nn.Module):
    def __init__(self, input_size):
        super(ENC, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.encoder = nn.GRU(
            input_size=self.input_size,
            hidden_size=self.hidden_size // 2,
            num_layers=2,
            batch_first=True,
            bidirectional=True
        )
        self.fc_mean = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc_lvar = nn.Linear(self.hidden_size, self.hidden_size)

    def forward(self, input, frame_num):
        self.encoder.flatten_parameters()
        X, _ = self.encoder(input)
        mean = self.fc_mean(X)
        lvar = self.fc_lvar(X)

        X = mask_mean(X, frame_num)
        mean = mask_mean(mean, frame_num)
        lvar = mask_mean(lvar, frame_num)
        eps = torch.randn(X.shape).cuda()
        Z = mean + eps * torch.exp(lvar / 2)

        return Z, mean, lvar

    
class CLS(nn.Module):
    def __init__(self, num_label): 
        super(CLS, self).__init__()
        self.classifier = nn.Linear(hidden_size, num_label)

    def forward(self, X):
        X = self.classifier(X)
        return X


# Optimization

In [7]:
import sys
import os
from parser import parser
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.manifold import TSNE
from scipy.stats import ttest_ind

import json

# torch
import torch
import torch.nn as nn
import torch.optim as optim

from tqdm import tqdm
import time

In [8]:
input_size = 60
label_num = 12
batch_size = 32
epoch_num = 300
gpus = "0"
train_list = '/mnt/netdisk/linlilang/UCLA/train_data.npy'
test_list = '/mnt/netdisk/linlilang/UCLA/test_data.npy'
train_label = '/mnt/netdisk/linlilang/UCLA/train_label.pkl'
test_label = '/mnt/netdisk/linlilang/UCLA/test_label.pkl'
train_frame = '/mnt/netdisk/linlilang/UCLA/train_num_frame.npy'
test_frame = '/mnt/netdisk/linlilang/UCLA/test_num_frame.npy'




class Processor:

    def load_model(self):
        self.enc = ENC(input_size)
        self.enc = torch.nn.DataParallel(self.enc).cuda()
        self.cls = CLS(label_num)
        self.cls = torch.nn.DataParallel(self.cls).cuda()

    def load_weights(self, model=None, weight_path=None):
        if weight_path:
            pretrained_dict = torch.load(weight_path)
            model_dict = list(model.state_dict())
            pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
            model.load_state_dict(pretrained_dict, strict=False)

    def init_weights(self, m):
        classname=m.__class__.__name__
        #print(classname)
        if classname.find('Conv2d') != -1:
            nn.init.xavier_normal_(m.weight.data)
            nn.init.constant_(m.bias.data, 0.0)
        elif classname.find('Linear') != -1:
            nn.init.xavier_normal_(m.weight.data)
            nn.init.constant_(m.bias.data, 0.0)

    def load_data(self):
        self.dataset = dict()
        self.data_loader = dict()

        self.data_loader['train'] = torch.utils.data.DataLoader(
            dataset=DataSet(train_list, train_label, train_frame),
            batch_size=batch_size,
            shuffle=False)


        self.data_loader['test'] = torch.utils.data.DataLoader(
            dataset=DataSet(test_list, test_label, test_frame),
            batch_size=batch_size,
            shuffle=False)

    def initialize(self):
        os.environ['CUDA_VISIBLE_DEVICES'] = gpus
        self.load_data()

        self.load_model()
        self.enc.apply(self.init_weights)
        self.cls.apply(self.init_weights)

        self.optimizer = torch.optim.Adam([
        {'params': self.enc.parameters()},
        {'params': self.cls.parameters(), 'lr': 1e-3}],lr = 1e-3)

        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=100, gamma=0.1)

        self.CrossEntropyLoss = torch.nn.CrossEntropyLoss().cuda()

    def optimize(self):
        for epoch in tqdm(range(epoch_num)):
            self.train_epoch()
            self.test_epoch()

    def __call__(self):
        self.initialize()
        self.optimize()
        self.drawFeature()

    def drawFeature(self):
        self.enc.eval()
        loader = self.data_loader['test']
        feature = torch.FloatTensor([]).cuda()
        labels = torch.LongTensor([]).cuda()
        for data, label, frame in loader:
            data = data.type(torch.FloatTensor).cuda()
            label = label.type(torch.LongTensor).cuda()
            frame = frame.type(torch.LongTensor).cuda()
            # inference
            Z, mean, lvar = self.enc(data, frame)
            with torch.no_grad():
                feature = torch.cat([feature, Z], 0)
                labels = torch.cat([labels, label], 0)

        vis = Visualization()
        vis.drawFeature(feature, labels)

    def train_epoch(self):
        self.enc.train()
        self.cls.train()
        loader = self.data_loader['train']
        for data, label, frame in loader:
            data = data.type(torch.FloatTensor).cuda()
            label = label.type(torch.LongTensor).cuda()
            frame = frame.type(torch.LongTensor).cuda()
            loss = self.train_batch(data, label, frame)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

    def train_batch(self, data, label, frame):
        Z, mean, lvar = self.enc(data, frame)
        predict = self.cls(Z)

        cls_loss = self.CrossEntropyLoss(predict, label)
        kld_loss = -0.5 * torch.sum(1 + lvar - mean.pow(2) - lvar.exp())
    
        loss = cls_loss + kld_loss

        return loss

    def test_epoch(self):
        self.enc.eval()
        self.cls.eval()

        loader = self.data_loader['test']
        for data, label, frame in loader:
            data = data.type(torch.FloatTensor).cuda()
            label = label.type(torch.LongTensor).cuda()
            frame = frame.type(torch.LongTensor).cuda()
            # inference
            with torch.no_grad():
                Z, mean, lvar = self.enc(data, frame)
                predict = self.cls(Z)
            _, pred = torch.max(predict, 1)

In [None]:
p = Processor()
p()

 52%|█████▏    | 155/300 [04:03<03:51,  1.60s/it]