In [None]:
connections = [(7,8),(6,7),(2,6),(2,12),(12,13),(13,14),(2,0),(0,3),(0,9),(3,4),(4,5),(9,10),(10,11),(0,16)]

In [None]:
from torchvision.models import resnet50, ResNet50_Weights
import torch.nn as nn

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, out_dim):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim),
        )

    def forward(self, x):
        return x, self.layers(x)

def get_simclr_net():
    weights = ResNet50_Weights.DEFAULT
    model = resnet50(weights=weights)
    model.fc = MLP(2048, 2048, 128)

    return model

class Projector(nn.Module):
    def __init__(self, input_dim, out_dim, hidder_proj):
        super().__init__()
        self.proj = nn.Sequential(
            nn.Linear(input_dim, hidder_proj),
            nn.BatchNorm1d(hidder_proj),
            nn.ReLU(inplace=True),

            nn.Linear(hidder_proj, hidder_proj),
            nn.BatchNorm1d(hidder_proj),
            nn.ReLU(inplace=True),

            nn.Linear(hidder_proj, out_dim),
        )

    def forward(self, x):
        return x, self.proj(x)

class SiamMLP(nn.Module):
    def __init__(self, base, input_dim, out_dim, hidder_proj, hidden_pred):
        super().__init__()

        self.base = base

        base.fc = Projector(input_dim, out_dim, hidder_proj)

        self.predictor = nn.Sequential(
            nn.Linear(out_dim, hidden_pred),
            nn.BatchNorm1d(hidden_pred),
            nn.ReLU(inplace=True),

            nn.Linear(hidden_pred, out_dim)
        )


    def forward(self, x):
        x, projections =  self.base(x)

        predictions = self.predictor(projections)

        return x, projections.detach(), predictions

def get_siam_net():
    weights = ResNet50_Weights.DEFAULT
    model = resnet50(weights=weights)
    model = SiamMLP(model, 2048, 2048, 2048, 512)

    return model

In [None]:
import os
import torch
import math
import torch.nn as nn
from torch.utils.data import Dataset
import numpy as np
import json
import cv2
import re
import random
import torchvision.transforms as transforms
from torchvision.io import read_image

import matplotlib.pyplot as plt
import torchvision.transforms as T



class PanopticDataset(Dataset):
    def __init__(self, transform):

        # change this to the path where the dataset is stored
        self.data_path = "ProcessedPanopticDataset\\"
        self.training_dir = []

        self.transform = transform

        paths = []

        motion_seq = os.listdir(self.data_path)
        no_dir = ['scripts','python','matlab','.git','glViewer.py','README.md','matlab',
                'README_kinoptic.md']

        for dir in motion_seq:
            if dir not in no_dir:
                if 'haggling' in dir:
                    continue
                elif dir == '171204_pose2' or dir =='171204_pose5' or dir =='171026_cello3':
                    if os.path.exists(os.path.join(self.data_path,dir,'hdJoints')):
                        data_path = os.path.join(self.data_path,dir,'hdJoints')
                        for lists in (os.listdir(data_path)):
                            paths.append(os.path.join(data_path,lists.split('.json')[0]))
                elif 'ian' in dir:
                    continue
                else:
                    if os.path.exists(os.path.join(self.data_path,dir,'hdJoints')):
                        data_path = os.path.join(self.data_path,dir,'hdJoints')
                        for lists in (os.listdir(data_path)):
                            
                            paths.append(os.path.join(data_path,lists.split('.json')[0]))

        self.data = {'paths': paths}

    def __len__(self):
        return len(self.data['paths'])

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        sample = dict()

        path_split = self.data['paths'][idx].split('\\hdJoints')
        image_path = path_split[0] + '\\hdImages' + path_split[-1] + '.jpg'
        sample['image_path'] = image_path

        image = cv2.imread(image_path)
        image =cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transform(image)

        sample['image'] = image

        joints_path = self.data['paths'][idx]+'.json'

        with open(joints_path) as dfile:
            bframe = json.load(dfile)

        poses_3d = torch.tensor(np.array(bframe['poses_3d']), dtype=torch.float32)
        #remove every 4th element of one dimensional array poses_3d
        poses_3d = poses_3d.reshape(-1, 4)[:, :3].reshape(-1)

        sample['poses_3d'] =  poses_3d

        cam = bframe['cam']
        # Replace 'array' with 'list' in the cam string
        cam = cam.replace('array', 'list')

        sample['cam'] = cam

        return sample

In [None]:
# uncomment this if you get an No module named 'pytorch_lightning.utilities.apply_func' error
# !pip3 install pytorch-lightning==1.2.2

### On top of a base encoder, a simple linear layer is attached that predicts 19 pairs of joints. The base encoder can be either SimCLR or Siamese models. I remove the projection head (or projection and prediction in case of Siamese) and use only the output of the base encoder, which is the output of AvrPool layer.

In [None]:
class Linear(nn.Module):
    def __init__(self):
        super(Linear, self).__init__()
        #initialize weights and biases to 0
        self.layers = nn.Sequential(nn.Linear(2048, 512), nn.ReLU(), nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, 18*3))

    def forward(self, x):
        z = self.layers(x)
        return z

In [None]:
def get_linear_evaluation_model(path, base, siam=True):

    base.load_state_dict(torch.load(path, map_location=torch.device('cuda')))

    if siam:
        base = base.base
        base.fc = Linear()
    else:
        base.fc = Linear()

    return base

### I freeze all the layers of the base encoder and use Stochastic gradient descent to train only the linear layer.

### The images are resized into (128, 128) since the models were trained using this size. The dataset is split into 60%/20%/20% for training, validation and testing.

In [None]:
generator = torch.Generator().manual_seed(42)

def get_data(batch_size):

    transforms = T.Compose(
        [
            T.ToTensor(),
            T.Resize(size=(128, 128)),
        ]
    )

    data = PanopticDataset(transforms)

    num_samples = len(data)

    training_samples = int(num_samples * 0.6 + 1)
    val_samples = int(num_samples * 0.2 + 1)
    test_samples = num_samples - training_samples - val_samples

    training_data, val_data, test_data = torch.utils.data.random_split(
        data, [training_samples, val_samples, test_samples], generator=generator
    )

    train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_data, batch_size, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_data, batch_size, shuffle=False)

    return training_data, val_data, test_data, train_loader, val_loader, test_loader

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Assuming your get_data function returns appropriate data
training_data, _, _, _, _, _ = get_data(201)

image = training_data[9]['image']
coords = training_data[9]['poses_3d'].reshape((-1, 3))
cam = training_data[9]['cam']


def project_points( points_3d, camera_parameters_str):
    # Load camera parameters
    camera_params = eval(camera_parameters_str)

    # Camera intrinsic matrix
    K = camera_params['K']

    # Rotation matrix and translation vector
    R = camera_params['R']
    t = camera_params['t']

    # Projection matrix
    P = np.dot(K, np.hstack((R, t)))

    # Homogeneous 3D points
    points_3d_homogeneous = np.hstack((points_3d, np.ones((points_3d.shape[0], 1))))

    # Project 3D points to 2D
    points_2d_homogeneous = np.dot(P, points_3d_homogeneous.T).T

    # Normalize homogeneous coordinates
    points_2d = points_2d_homogeneous[:, :2] / points_2d_homogeneous[:, 2:]

    # rseize to 128x128
    points_2d[:, 0] = points_2d[:, 0] * 128 / 1080
    points_2d[:, 1] = points_2d[:, 1] * 128 / 1080

    # center points, 2 is the center of the image
    points_2d[:, 0] = points_2d[:, 0] + 64 - points_2d[2, 0]
    points_2d[:, 1] = points_2d[:, 1] + 64 - points_2d[2, 1]

    return points_2d

points = project_points(coords, cam)


#plot image with points
plt.imshow(image.permute(1, 2, 0))
plt.scatter(points[:, 0], points[:, 1], c='r')

#show number of points
for i in range(18):
    plt.text(points[i, 0], points[i, 1], str(i), color='r')

#connect points 7-8 6-7 2-6 2-12 12-13 13-14 2-0 0-3 0-9 3-4 4-5 9-10 10-11 0-16
for connection in connections:
    plt.plot(points[connection, 0], points[connection, 1], c='r')


plt.show()


### The loss function that is used for training is L1 loss. As an accuracy metrics I report average euclidean distance between true and predicted joints. The model is also trained for 20 epochs.

In [None]:
def find_rotation_mat(points1, points2):
    """
    Function to find the rotation matrix between two sets of ordered 3D points.

    Parameters:
    - points1: input containing a set of ordered 3D points
    - points2: target containing the reference set of ordered 3D points

    Returns:
    - transformation_matrix: torch.Tensor, 3x3 transformation matrix
    """

    # Calculate the covariance matrix 
    covariance_matrix = torch.mm(points1.t(), points2)

    # Calculate the singular value decomposition
    U, S, V = torch.svd(covariance_matrix)

    # Calculate the rotation matrix
    rotation_matrix = torch.mm(V, U.t())

    # special reflection case
    if torch.det(rotation_matrix) < 0:
        U, S, V = torch.svd(covariance_matrix)
        V[2, :] *= -1
        rotation_matrix = torch.mm(V, U.t())

    return rotation_matrix


In [None]:
def find_scaling(points1, points2):
    """
    Function to find the scaling factor between two sets of ordered 3D points.

    Parameters:
    - points1: input containing a set of ordered 3D points
    - points2: target containing the reference set of ordered 3D points

    Returns:
    - scaling_factor: torch.Tensor, 1x1 scaling factor
    """
    points1 = points1.view(-1)
    points2 = points2.view(-1)

    # Calculate the scaling factor
    scaling_factor = torch.sum(points2 * points1) / torch.sum(points1 * points1)

    return scaling_factor

In [None]:
# get loss for the whole batch
def get_loss(output, pose, weights=None, norm_factor=0.2):
    batch_size = output.shape[0]

    # vectors are a column vector and should be grouped by 3 (x, y, z)
    output = output.view(batch_size, -1, 3)
    pose = pose.view(pose.shape[0], -1, 3)

    # add first point to output (0,0,0) for each batch
    output = torch.cat((torch.zeros((batch_size, 1, 3)), output), 1)

    # center pose on first point for each batch
    pose = pose - pose[:, 0].unsqueeze(1)

    #find rotation matrix for each batch
    batch_rotation_matrix = torch.zeros((batch_size, 3, 3))
    # scaling_factor = torch.zeros((batch_size, 1)).to("cuda")

    with torch.no_grad():
        #center pose on first point for each batch
        pose = pose - pose[:, 0].unsqueeze(1)

        #print ("output before\n", output)

        for i in range(batch_size):
            #print(output[i])
            rotation_matrix = find_rotation_mat(pose[i], output[i])
            
            batch_rotation_matrix[i] = rotation_matrix

    output = torch.bmm(output, batch_rotation_matrix)

    '''
    #find scaling factor for each batch

    with torch.no_grad():
        for i in range(batch_size):
            scaling_factor[i] = find_scaling(pose[i], output[i])
        
    for i in range(batch_size):
        output[i] = output[i] * scaling_factor[i].item()
    '''

    #print ("output\n", output)
    #print ("pose\n", pose)
    #mean squared error
    loss = torch.mean((output - pose)**2)
    #print(loss)

    # add L2 normalization factor for weights
    if weights is not None:
        weights = weights.view(-1)
        loss += norm_factor * torch.sum(weights**2)

    return loss

In [None]:
_, _, test_data, _, _, _ = get_data(200)

In [None]:
def print_one_example(model, ind):
    """Function to plot true and predicted joints"""
    image = test_data[ind]['image']
    true_coords = test_data[ind]['poses_3d'].reshape((-1,3))

    image_path = test_data[ind]['image_path']
    print(image_path)

    #print(true_coords)
    true_coords_center = true_coords - true_coords[0]
    #print(true_coords_center)

    image_input = image.unsqueeze(0)
    mid_outs, pred_coords = model(image_input)

    pred_coords = pred_coords[0].reshape((-1,3)).detach()

    #add first point to output (0,0,0) for each batch
    pred_coords = torch.cat((torch.zeros((1, 3)), pred_coords), 0)
    #print(pred_coords)
    #find rotation matrix
    rotation_matrix = find_rotation_mat(true_coords_center, pred_coords)
    #rotate predicted points
    pred_coords = torch.matmul(pred_coords, rotation_matrix)


    true_coords_2D = project_points(true_coords, test_data[ind]['cam'])
    pred_coords_2D = project_points(pred_coords, test_data[ind]['cam'])
    

    f, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
    f.set_figheight(10)
    f.set_figwidth(10)
    
    #plot image with points
    ax1.imshow(image.permute(1, 2, 0))
    ax1.scatter(true_coords_2D[:, 0], true_coords_2D[:, 1], c='r')

    #connect points 7-8 6-7 2-6 2-12 12-13 13-14 2-0 0-3 0-9 3-4 4-5 9-10 10-11 0-16
    for connection in connections:
        ax1.plot([true_coords_2D[connection[0], 0], true_coords_2D[connection[1], 0]], [true_coords_2D[connection[0], 1], true_coords_2D[connection[1], 1]], c='r')

    ax1.set_title("True joints")

    ax2.imshow(image.permute(1, 2, 0))

    ax2.scatter(pred_coords_2D[:, 0], pred_coords_2D[:, 1], c='r')

    #connect points 7-8 6-7 2-6 2-12 12-13 13-14 2-0 0-3 0-9 3-4 4-5 9-10 10-11 0-16
    for connection in connections:
        ax2.plot([pred_coords_2D[connection[0], 0], pred_coords_2D[connection[1], 0]], [pred_coords_2D[connection[0], 1], pred_coords_2D[connection[1], 1]], c='r')


    ax2.set_title("Predicted joints")

    plt.show()
    
    images = {}

    for key in mid_outs:
        mid_out = mid_outs[key][0].detach()
        #show mid_out that is layer1 resnet50 shape (num img,H,W)
        #print(mid_out.shape)

        num_img = mid_out.shape[0]
        sqrt_num_img = int(math.sqrt(num_img))
        
        num_rows = sqrt_num_img
        num_cols = int(math.ceil(num_img / sqrt_num_img))

        img_size = mid_out.shape[1]
        padding = 1

        #print (num_img, sqrt_num_img, img_size)

        #normalize each image
        for i in range(num_img):
            mid_out[i] = mid_out[i] - torch.min(mid_out[i])
            mid_out[i] = mid_out[i] / torch.max(mid_out[i])
            mid_out[i] = mid_out[i] * 255

        img = torch.ones((num_rows*(img_size+padding) + padding, num_cols*(img_size+padding) + padding))
        img = img * 255

        for i in range(num_img):
            row = int(i / num_cols)
            col = i % num_cols

            img[row*(img_size+padding) + padding:row*(img_size+padding) + padding + img_size, col*(img_size+padding) + padding:col*(img_size+padding) + padding + img_size] = mid_out[i]   

        img = img.numpy().astype(np.uint8)

        #save image to file
        cv2.imwrite("mid_out_"+key+".png", img)
        
        images[key] = img
    
    #show all mid_outs
    f, axarr = plt.subplots(1, len(images))
    f.set_figheight(20)
    f.set_figwidth(40)
    i = 0
    for key in images:
        axarr[i].imshow(images[key], cmap='gray')
        axarr[i].set_title(key)
        i += 1
    plt.show()
    


In [None]:
def show_3d_poses(model, ind):
    """Function to plot true and predicted joints"""
    image = test_data[ind]['image']
    true_coords = test_data[ind]['poses_3d'].reshape((-1,3))

    #print(true_coords)
    true_coords_center = true_coords - true_coords[0]
    #print(true_coords_center)

    image_input = image.unsqueeze(0)
    pred_coords = model(image_input)[0].reshape((-1,3)).detach()
    #add first point to output (0,0,0) for each batch
    pred_coords = torch.cat((torch.zeros((1, 3)), pred_coords), 0)
    #print(pred_coords)
    #find rotation matrix
    rotation_matrix = find_rotation_mat(true_coords_center, pred_coords)
    #rotate predicted points
    rot_pred_coords = torch.matmul(pred_coords, rotation_matrix)

    #show 3d scatter plot of true, predicted and rotated predicted points side by side (on different subplots)
    fig = plt.figure(figsize=(15, 5))
    ax1 = fig.add_subplot(131, projection='3d')
    ax1.scatter(true_coords_center[:, 0], true_coords_center[:, 2], true_coords_center[:, 1], c='r')
    ax1.set_title("True joints")
    ax2 = fig.add_subplot(132, projection='3d')
    ax2.scatter(pred_coords[:, 0], pred_coords[:, 2], pred_coords[:, 1], c='r')
    ax2.set_title("Predicted joints")
    ax3 = fig.add_subplot(133, projection='3d')
    ax3.scatter(rot_pred_coords[:, 0], rot_pred_coords[:, 2], rot_pred_coords[:, 1], c='r')
    ax3.set_title("Rotated predicted joints")

    #keep aspect ratio
    ax1.set_aspect('equal')
    ax2.set_aspect('equal')
    ax3.set_aspect('equal')

    #labels
    ax1.set_xlabel('X')
    ax1.set_ylabel('Y')
    ax1.set_zlabel('Z')
    ax2.set_xlabel('X')
    ax2.set_ylabel('Y')
    ax2.set_zlabel('Z')
    ax3.set_xlabel('X')
    ax3.set_ylabel('Y')
    ax3.set_zlabel('Z')

    #invert z axis
    ax1.invert_zaxis()
    ax2.invert_zaxis()
    ax3.invert_zaxis()

    #invert x axis
    ax1.invert_xaxis()
    ax2.invert_xaxis()
    ax3.invert_xaxis()


    #connect points 7-8 6-7 2-6 2-12 12-13 13-14 2-0 0-3 0-9 3-4 4-5 9-10 10-11 0-16
    for connection in connections:
        ax1.plot([true_coords_center[connection[0], 0], true_coords_center[connection[1], 0]], [true_coords_center[connection[0], 2], true_coords_center[connection[1], 2]], [true_coords_center[connection[0], 1], true_coords_center[connection[1], 1]], c='r')
        ax2.plot([pred_coords[connection[0], 0], pred_coords[connection[1], 0]], [pred_coords[connection[0], 2], pred_coords[connection[1], 2]], [pred_coords[connection[0], 1], pred_coords[connection[1], 1]], c='r')
        ax3.plot([rot_pred_coords[connection[0], 0], rot_pred_coords[connection[1], 0]], [rot_pred_coords[connection[0], 2], rot_pred_coords[connection[1], 2]], [rot_pred_coords[connection[0], 1], rot_pred_coords[connection[1], 1]], c='r')
    
    # write loss on plot
    loss = get_loss(pred_coords[1:].view(1, -1), true_coords.view(1, -1))
    ax1.text2D(0.05, 0.95, "Loss: {:.4f}".format(loss), transform=ax1.transAxes)


    plt.show()

In [None]:
#print n worst predictions
def print_worst_predictions(model, num):
    """Function to plot true and predicted joints"""
    #get all predictions
    predictions = []
    for i in range(len(test_data)):
        print("\r {}/{}".format(i, len(test_data)), end="")
        image = test_data[i]['image']
        true_coords = test_data[i]['poses_3d'].reshape((-1,3))

        #print(true_coords)
        true_coords_center = true_coords - true_coords[0]
        #print(true_coords_center)

        image_input = image.unsqueeze(0)
        pred_coords = model(image_input)[0].reshape((-1,3)).detach()

        predictions.append(pred_coords)

    #calculate loss for each prediction
    losses = []
    for i in range(len(test_data)):
        loss = get_loss(predictions[i].view(1, -1), test_data[i]['poses_3d'].view(1, -1))
        losses.append(loss.item())

    #sort losses
    losses, indices = torch.sort(torch.tensor(losses), descending=True)

    #print worst predictions
    for i in range(num):
        print_one_example(model, indices[i])

# 3D pose estimation using SimCLR model

The average euclidean distance between true and predicted joints after training is:
- Training set: 0.4431
- Validation set: 0.4442
- Test set: 0.4444

In [None]:
simclr_path = 'trained_models/ver1.pt'
simclr = get_simclr_net()

#get last model epoch in trained_models/sim/ folder

#get list of files in directory
files = os.listdir("trained_models/sim")
files = [f for f in files if re.match(r'sim_linear_epoch\d+.pt', f)]

#sort files by epoch number
files.sort(key=lambda f: int(re.sub('\D', '', f)))
#get last file
path = "trained_models/sim/" + files[-1]
print(path)

#load model

simclr_model = get_linear_evaluation_model(simclr_path, simclr, False)
simclr_model.load_state_dict(torch.load(path, map_location=torch.device('cpu')))

from torch_intermediate_layer_getter import IntermediateLayerGetter as MidGetter
#get intermediate layer of resnet of model and show as images
return_layers = {
    'conv1': 'conv1',
    'layer1': 'layer1',
    'layer2': 'layer2',
    'layer3': 'layer3',
    'layer4': 'layer4',
}

mid_getter_simclr = MidGetter(simclr_model, return_layers=return_layers, keep_output=True)


simclr_model.eval()

In [None]:
print_one_example(mid_getter_simclr, 0)
show_3d_poses(simclr_model, 0)

In [None]:
print_one_example(mid_getter_simclr, 145)
show_3d_poses(simclr_model, 145)

In [None]:
print_one_example(mid_getter_simclr, 40)
show_3d_poses(simclr_model, 40)

In [None]:
print_one_example(mid_getter_simclr, 66)
show_3d_poses(simclr_model, 66)

In [None]:
for i in range(5):
    ind = random.randint(0, len(test_data))
    print ("index", ind)
    print_one_example(mid_getter_simclr, ind)
    show_3d_poses(simclr_model, ind)

#ProcessedPanopticDataset\171204_pose2\hdImages\171204_pose2;hd_00_20 person 5,6,7 all green

In [None]:
#plot 10 worst predictions
#print_worst_predictions(simclr_model, 10)


In [None]:
#print conv1 kernels of simclr model
params = list(simclr_model.conv1.parameters())
print(params[0].size())
print(params[0][8][2])
print(sum(sum(params[0][8][2])))

# 3D pose estimation using SimSiam model

The average euclidean distance between true and predicted joints after training is:
- Training set: 0.4075
- Validation set: 0.4098
- Test set: 0.4111

In [None]:
siam_path = 'trained_models/siam2.pt'

siam = get_siam_net()

#get last model epoch in trained_models/siam/ folder
files = os.listdir("trained_models/siam")
files = [f for f in files if re.match(r'siam_linear_epoch\d+.pt', f)]

#sort files by epoch number
files.sort(key=lambda f: int(re.sub('\D', '', f)))
#get last file
path = "trained_models/siam/" + files[-1]

#load model
siam_model = get_linear_evaluation_model(siam_path, siam)
siam_model.load_state_dict(torch.load(path, map_location=torch.device('cpu')))

from torch_intermediate_layer_getter import IntermediateLayerGetter as MidGetter
#get intermediate layer of resnet of model and show as images
return_layers = {
    'conv1': 'conv1',
    'layer1': 'layer1',
    'layer2': 'layer2',
    'layer3': 'layer3',
    'layer4': 'layer4',
}

mid_getter_siam = MidGetter(siam_model, return_layers=return_layers, keep_output=True)

siam_model.eval()

In [None]:
print_one_example(mid_getter_siam, 0)
show_3d_poses(siam_model, 0)

In [None]:
print_one_example(mid_getter_siam, 145)
show_3d_poses(siam_model, 145)

In [None]:
print_one_example(mid_getter_siam, 40)
show_3d_poses(siam_model, 40)

In [None]:
print_one_example(mid_getter_siam, 66)
show_3d_poses(siam_model, 66)

In [None]:
for i in range(5):
    ind = random.randint(0, len(test_data))
    print_one_example(mid_getter_siam, ind)
    show_3d_poses(siam_model, ind)