In [2]:
import os
import torch
import math
import torch.nn as nn
from torch.utils.data import Dataset
import numpy as np
import json
import cv2
import re
import random
import torchvision.transforms as transforms
from torchvision.io import read_image

import matplotlib.pyplot as plt
import torchvision.transforms as T

# import nbimporter
# from siamese import get_siam_net
# from simclr import get_simclr_net


class PanopticDataset(Dataset):
    def __init__(self, transform):

        self.data_path = "/Users/olha/unitn/computer_vision/dataset/Panoptic/ProcessedPanopticDataset/"
#         self.data_path = '/media/mmlab/Volume/Panoptic/ProcessedPanopticDataset/'
        self.training_dir = []
        
        self.transform = transform

        paths = []

        motion_seq = os.listdir(self.data_path)
        no_dir = ['scripts','python','matlab','.git','glViewer.py','README.md','matlab',
                'README_kinoptic.md']
    
        for dir in motion_seq:
            if dir not in no_dir:
                if 'haggling' in dir:
                    continue
                elif dir == '171204_pose2' or dir =='171204_pose5' or dir =='171026_cello3':
                    if os.path.exists(os.path.join(self.data_path,dir,'hdJoints')):
                        data_path = os.path.join(self.data_path,dir,'hdJoints')
                        for lists in (os.listdir(data_path)):
                            paths.append(os.path.join(data_path,lists.split('.json')[0]))
                elif 'ian' in dir:
                    continue
                else:
                    if os.path.exists(os.path.join(self.data_path,dir,'hdJoints')):
                        data_path = os.path.join(self.data_path,dir,'hdJoints')
                        for lists in (os.listdir(data_path)):
                            paths.append(os.path.join(data_path,lists.split('.json')[0]))

        self.data = {'paths': paths}

    def __len__(self):
        return len(self.data['paths'])

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        sample = dict()

        path_split = self.data['paths'][idx].split('/hdJoints')
        image_path = path_split[0] + '/hdImages' + path_split[-1] + '.jpg'
        
        image = cv2.imread(image_path)
        image =cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transform(image)
        
        sample['image'] = image
        
        joints_path = self.data['paths'][idx]+'.json'
        
        with open(joints_path) as dfile:
            bframe = json.load(dfile)

        sample['poses_2d'] = np.array(bframe['poses_2d'])
        sample['poses_2d'] = torch.div(torch.tensor(sample['poses_2d'],dtype=torch.float), 224)
        
        return sample

In [3]:
# uncomment this if you get an error
# !pip3 install pytorch-lightning==1.2.2

In [4]:
class Linear(nn.Module):
    def __init__(self):
        super(Linear, self).__init__()
        self.layers = nn.Sequential(nn.Linear(2048, 19 * 2))
 
    def forward(self, x):
        z = self.layers(x).sigmoid()
        return z

In [13]:
def get_linear_evaluation_model(path, base):
    
#     base.load_state_dict(torch.load(path, map_location=torch.device('cpu')))

    base.load_state_dict(torch.load(path))
    
    base.fc = Linear()
    
    return base

In [6]:
from torch.optim import SGD


def get_optimizer(net, learning_rate, weight_decay, momentum):
    final_layer_weights = []
    rest_of_the_net_weights = []

    for name, param in net.named_parameters():
        if name.startswith('fc'):
            final_layer_weights.append(param)
        else:
            param.requires_grad = False
  
    optimizer = SGD([
        {'params': final_layer_weights, 'lr': learning_rate}
    ], weight_decay=weight_decay, momentum=momentum)

    
    return optimizer

In [7]:
def get_data(batch_size):
    
    transforms = T.Compose(
        [
            T.ToTensor(),
            T.Resize(size=(224, 224)),
        ]
    )

    data = PanopticDataset(transforms)
    
    num_samples = len(data)
    
    training_samples = int(num_samples * 0.6 + 1)
    val_samples = int(num_samples * 0.2 + 1)
    test_samples = num_samples - training_samples - val_samples
    
    training_data, val_data, test_data = torch.utils.data.random_split(
        data, [training_samples, val_samples, test_samples]
    )

    train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_data, batch_size, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_data, batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

In [8]:
from tqdm import tqdm


def training_step(net, data_loader, optimizer, cost_function, device='cuda'):
    samples = 0.0
    cumulative_loss = 0.0
    cumulative_accuracy = 0.0

    net.train()

    for batch_idx, batch in enumerate(tqdm(data_loader)):

        images = batch['image']
        poses = batch['poses_2d']
        
        images = images.to(device)
        poses = poses.to(device)
             
        output = net(images)

        loss = cost_function(output, poses)
        cumulative_loss += loss.item()

        loss.backward()

        optimizer.step()

        optimizer.zero_grad()

        samples += images.shape[0]
        
        cumulative_accuracy += torch.cdist(output, poses, 2).mean()
    
    return cumulative_loss / samples, cumulative_accuracy / samples


def test_step(net, data_loader, cost_function, device='cuda'):
    samples = 0.
    cumulative_loss = 0.
    cumulative_accuracy = 0.

    net.eval() 

    with torch.no_grad():

        for batch_idx, batch in enumerate(tqdm(data_loader)):
            images = batch['image']
            poses = batch['poses_2d']
        
            images = images.to(device)
            poses = poses.to(device)
            
            output = net(images)

            loss = cost_function(output, poses)
            cumulative_loss += loss.item()
            
            samples += images.shape[0]
            cumulative_accuracy += torch.cdist(output, poses, 2).mean()
  
    return loss / samples, cumulative_accuracy / samples

In [9]:
def main(path, base, batch_size=128, device='cuda', learning_rate=0.01, weight_decay=0.000001, momentum=0.9, epochs=20):
    train_loader, val_loader, test_loader = get_data(batch_size)
    
    net = get_linear_evaluation_model(path, base).to(device)
    
    optimizer = get_optimizer(net, learning_rate, weight_decay, momentum)
    
    cost_function = nn.L1Loss(reduction='sum')
    
    for e in range(epochs):
    
        train_loss, train_accuracy = training_step(net, train_loader, optimizer, cost_function, device)
        val_loss, val_accuracy = test_step(net, val_loader, cost_function, device)

        print('Epoch: {:d}'.format(e+1))
        print('\tTraining loss {:.5f}, Training Acc {:.4f}'.format(train_loss, train_accuracy))
        print('\tValidation loss {:.5f}, Validation Acc {:.2f}'.format(val_loss, val_accuracy))
        print('-----------------------------------------------------')

        torch.save(net.state_dict(), 'sim_linear.pt')

    print('After training:')
    train_loss, train_accuracy = test_step(net, train_loader, cost_function, device)
    val_loss, val_accuracy = test_step(net, val_loader, cost_function, device)
    test_loss, test_accuracy = test_step(net, test_loader, cost_function, device)
    
    print('\tTraining loss {:.5f}, Training MAP {:.4f}'.format(train_loss, train_accuracy))
    print('\tValidation loss {:.5f}, Validation MAP {:.4f}'.format(val_loss, val_accuracy))
    print('\tTest loss {:.5f}, Test MAP {:.4f}'.format(test_loss, test_accuracy))
    print('-----------------------------------------------------')


In [14]:
# simclr_path = '/Users/olha/unitn/computer_vision/lectures/ver0.pt'
# siam_path = '/Users/olha/unitn/computer_vision/lectures/siam.pt'
simclr_path = 'ver0.pt'

# siam = get_siam_net()
simclr = get_simclr_net()

main(simclr_path, simclr)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 