# Import

In [None]:
import torch
import torch.nn as nn
import torchvision
import cv2 
import numpy as np
from pathlib import Path
import sys
from pathlib import Path
from IPython.display import clear_output
import matplotlib.pyplot as plt
import torchvision.models as models

WORK_DIR = Path(Path.cwd()).parent
sys.path.append(str(WORK_DIR))
from src import ROOT
from src.datasets.transforms import *
from src.utils import *

In [None]:
seq_id = 'Subject_1/open_juice_bottle/2/color'
fps = 12

In [None]:
split_set = Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_xyz_train.txt'
xyz = np.loadtxt(split_set)

split_set = Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_img_train.txt'
with open(split_set, 'r') as f:
    all_img_paths = f.read().splitlines()

img_paths = []
xyz_gt = []
for i in range(len(all_img_paths)):
    path = all_img_paths[i]
    if seq_id in path:
        img_paths.append(Path(ROOT)/img_root/path)
        xyz_gt.append(xyz[i])
uvd_gt = FPHA.xyz2uvd_color(np.reshape(xyz_gt, (-1, 21, 3)))
uvd_gt[..., 0] *= 416/FPHA.ORI_WIDTH
uvd_gt[..., 1] *= 416/FPHA.ORI_HEIGHT

In [None]:
from moviepy.editor import ImageSequenceClip
from tqdm import tqdm
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from IPython.display import Image as IPythonImage
# seq = [x for x in sorted(seq_path.glob('*')) if x.is_file()]

frames = []
for f, u in tqdm(zip(img_paths, uvd_gt)):
    img = get_img_dataloader(str(f), img_size)
    img = img.unsqueeze(0).cuda()
    img = ImgToNumpy()(img.cpu())[0]

    fig, ax = plt.subplots()
    ax = fig.gca()
    ax.axis('off')
    ax.imshow(img)
    draw_joints(ax, u, 'r')

    fig.canvas.draw()
    data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
    data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    frames.append(data)

    plt.close()

segment_clip = ImageSequenceClip(frames, fps=fps)
name = str(Path(ROOT)/'mlcv-exp/data/saved'/'tmp.gif')
segment_clip.write_gif(name, fps=fps)

with open(name, 'rb') as f:
    display(IPythonImage(data=f.read(), format='png'))

In [None]:
split_set = Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_xyz_train.txt'
xyz_gt = np.loadtxt(split_set)

with open(Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_ar_seq_train.txt') as f:
    img_list = f.read().splitlines()

path_length = [int(i.split(' ')[1]) for i in img_list]
action_cls  = [int(i.split(' ')[2]) for i in img_list]
action_cls = [1 if i == 0 else 0 for i in action_cls]

uvd_gt = FPHA.xyz2uvd_color(np.reshape(xyz_gt, (-1, 21, 3)))

num_segments = 5
track = 0
seq_uvd_gt = []
for num_frames in path_length:
    seq_uvd_gt.append(uvd_gt[track:track+num_frames])
    track += num_frames

print(np.stack([seq_uvd_gt[0][1], seq_uvd_gt[0][100]]).shape)

# Dataloader

In [None]:
import time

class Train_Dataset(torch.utils.data.Dataset):
    def __init__(self):
        super().__init__()
        split_set = Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_xyz_train.txt'
        xyz_gt = np.loadtxt(split_set)

        with open(Path(ROOT)/'mlcv-exp'/'data'/'labels'/'fpha_ar_seq_train.txt') as f:
            img_list = f.read().splitlines()

        path_length = [int(i.split(' ')[1]) for i in img_list]
        action_cls  = [int(i.split(' ')[2]) for i in img_list]
        self.action_cls = [1 if i == 0 else 0 for i in action_cls]

        uvd_gt = FPHA.xyz2uvd_color(np.reshape(xyz_gt, (-1, 21, 3)))
        
        self.num_segments = 5
        track = 0
        self.seq_uvd_gt = []
        for num_frames in path_length:
            self.seq_uvd_gt.append(uvd_gt[track:track+num_frames])
            track += num_frames
        
        tfrm = []
        tfrm.append(ImgToTorch())
        self.transform = torchvision.transforms.Compose(tfrm)
        
    def __getitem__(self, index):
        uvd = self.seq_uvd_gt[index]
        action_id = self.action_cls[index]
        
        num_frames = len(uvd)
        avg_duration = num_frames//self.num_segments
        frames = list(np.multiply(list(range(num_segments)), avg_duration) + offset)
        uvd_first = uvd[0].reshape(-1)
        uvd_out = np.stack([uvd[i] for i in frames])
        
        sample      = {'img': uvd_out}
        sample      = self.transform(sample)
        crop        = sample['img']
        return uvd_first, uvd_out, action_id
    
    def __len__(self):
            return len(self.action_cls)

In [None]:
kwargs = {
    'batch_size'    : 32,
    'shuffle'       : True,
    'num_workers'   : 8,
    'sampler'       : None,
    'pin_memory'    : True
}

train_dataloader = torch.utils.data.DataLoader(Train_Dataset(), **kwargs)

# Model

In [None]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        def block(in_feat, out_feat, normalize=True):
            layers = [nn.Linear(in_feat, out_feat)]
            if normalize:
                layers.append(nn.BatchNorm1d(out_feat, 0.8))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *block(63, 128, normalize=False),
            *block(128, 256),
            *block(256, 512),
            *block(512, 1024),
            nn.Linear(1024, int(np.prod(img_shape))),
            nn.Tanh()
        )

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), *img_shape)
        return img
    
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(opt.channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # The height and width of downsampled image
        ds_size = opt.img_size // 2 ** 4
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img):
        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)

        return validity