In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# # Mounting Google Drive to access images

# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# data_path = "/content/drive/My Drive/lsp_dataset/images/"
data_path = "./lsp_dataset/images/"

# Find max size of an image in both dimensions
max_h, max_w = 0, 0
for file_name in os.listdir(data_path):
    img = plt.imread(data_path+file_name)
    if img.shape[0] > max_h: max_h = img.shape[0]
    if img.shape[1] > max_w: max_w = img.shape[1]

print("max h and w are:",max_h, max_w)

In [None]:
from scipy.io import loadmat

joint_data = loadmat("./lsp_dataset/joints.mat") #/content/drive/My Drive/lsp_dataset/joints.mat
# joint_data["joints"]
print("total shape:",joint_data["joints"].shape)
print("x shape:",joint_data["joints"][0].shape)
print("y shape:",joint_data["joints"][1].shape)
print("visibility shape:",joint_data["joints"][2].shape)
print("labels for head top for a random picture:",joint_data["joints"][0][13][10],joint_data["joints"][1][13][10],joint_data["joints"][2][13][10])

In [None]:
img = "im1325.jpg"
img_num = 1324
plt.figure()
print(data_path+img)
img = plt.imread(data_path+img)
print(img.shape, type(img), img.min(), img.max())
plt.imshow(img);

for i in range(14):
    if joint_data["joints"][2][i][img_num] == 0.0: c = 'b'
    else: c = 'r'
  
    plt.plot(joint_data["joints"][0][i][img_num],joint_data["joints"][1][i][img_num],'.', color=c)

In [None]:
class LSP_Dataset(Dataset):
    def __init__(self, max_h, max_w, path="./lsp_dataset/"): #/content/drive/My Drive
    
        self.max_h = max_h
        self.max_w = max_w

        # Load joint data from the mat file
        self.joint_data = loadmat(path+"joints.mat")["joints"]

        # Load and store images (float) into a list
        self.array_of_images = np.empty([2000,self.max_h,self.max_w,3],dtype=float)
        self.array_of_labels = np.empty([2000,2,14],dtype=float)
        for file_idx, file_name in enumerate(sorted(os.listdir(path+"images/"))):
            padded_img, padded_labels      = pad_to_max(plt.imread(path+"images/"+file_name), 
                                                      self.joint_data[:2,:,file_idx], 
                                                      self.max_h, self.max_w)
            self.array_of_images[file_idx] = padded_img/256.0
            self.array_of_labels[file_idx] = padded_labels/202 - 0.5

#         Normalization
#         self.mean_img = np.mean(self.array_of_images,axis=0)
#         self.std_img  = np.std(self.array_of_images,axis=0)
#         self.array_of_images = (((self.array_of_images - self.mean_img)/(self.std_img+0.0001)) + 1)/2

    def __getitem__(self,idx):
        return self.array_of_images[idx], self.array_of_labels[idx]

    def __len__(self):
        return self.array_of_images.shape[0]



def pad_to_max(img, labels, max_h, max_w):
    img_h, img_w, _  = img.shape
    padded_img       = np.zeros([max_h,max_w,3])
    start_h, start_w = int((max_h-img_h)/2), int((max_w-img_w)/2)

    padded_img[start_h:start_h+img.shape[0], start_w:start_w+img.shape[1], :] = img
    padded_labels = labels + np.array([[start_w], [start_h]])

    return padded_img, padded_labels

In [None]:
dataset = LSP_Dataset(202,202)

In [None]:
img_num = 123
plt.figure()
print(dataset.__getitem__(img_num)[0].min(),dataset.__getitem__(img_num)[0].max())
plt.imshow(dataset.__getitem__(img_num)[0])
plt.scatter(202*(0.5 + np.transpose(dataset.__getitem__(img_num)[1])[:,0]), 202*(0.5 + np.transpose(dataset.__getitem__(img_num)[1])[:,1]),s=8,c="r")

In [None]:
batch_size = 16
total      = 2000
train_size = int(total*0.6)
val_size   = int(total*0.2)
test_size  = total - train_size - val_size

lengths    = [train_size,val_size,test_size]
train_dataset, val_dataset, test_dataset = torch.utils.data.dataset.random_split(dataset,lengths)

train_dl = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
val_dl   = DataLoader(val_dataset,batch_size=batch_size,shuffle=True)
test_dl  = DataLoader(test_dataset,batch_size=batch_size,shuffle=True)

In [None]:
class DeepPose(nn.Module):
    def __init__(self):
        super(DeepPose,self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=0)
        self.lrn1  = nn.LocalResponseNorm(size=2, alpha=2e-05, beta=0.75, k=1)
        self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)

        self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, stride=1, padding=0)
        self.lrn2  = nn.LocalResponseNorm(size=2, alpha=2e-05, beta=0.75, k=1)
        self.pool2 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)

        self.conv3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, stride=1, padding=0)

        self.conv4 = nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, stride=1, padding=0)

        self.conv5 = nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, stride=1, padding=0)
        self.pool3 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)

        self.fc1   = nn.Linear(in_features=256, out_features=4096)
        self.fc2   = nn.Linear(in_features=4096, out_features=4096)
        self.out   = nn.Linear(in_features=4096, out_features=28)

    def forward(self,input):
        x = input.view((input.shape[0],input.shape[3],input.shape[1],input.shape[2]))
        x = self.pool1(self.lrn1(F.relu(self.conv1(x))))
        x = self.pool2(self.lrn2(F.relu(self.conv2(x))))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool3(F.relu(self.conv5(x)))
        x = torch.flatten(x,1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.out(x)

        return x

In [None]:
model = DeepPose().float().to(device)

In [None]:
criterion = nn.MSELoss(reduction="sum")
optimizer = torch.optim.Adagrad(model.parameters(),lr=1e-3)

In [None]:
def train(epochs=10, model=model, train_dl=train_dl, val_dl=val_dl, optimizer=optimizer, criterion=criterion, 
          train_size=train_size, val_size=val_size):
    
    train_loss_lst, val_loss_lst, batch_epoch_loss_lst = [], [], []
    
    for e in range(epochs):
        train_loss, val_loss = 0, 0
        
        # Training
        for batch_idx,(batch_imgs,batch_labels) in enumerate(train_dl):
            model.train()
            optimizer.zero_grad()
            batch_imgs,batch_labels = batch_imgs.float().to(device),batch_labels.to(device)
            output = model(batch_imgs)
            
            # Reshape the outputs batch_size x 28 -> batch_size x 2 x 14
            output = output.view(batch_labels.shape)
            
            loss = criterion(output,batch_labels.float())
            batch_epoch_loss_lst.append(loss.item())
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        train_loss_lst.append(train_loss/train_size)
        
        # Validation
        for batch_idx,(batch_imgs,batch_labels) in enumerate(val_dl):
            model.eval()
            batch_imgs,batch_labels = batch_imgs.float().to(device),batch_labels.to(device)
            output = model(batch_imgs)
            
            # Reshape the outputs batch_size x 28 -> batch_size x 2 x 14
            output = output.view(batch_labels.shape)
            loss = criterion(output,batch_labels.float())
            val_loss += loss.item()
        
        val_loss_lst.append(val_loss/val_size)
        
        if e%1==0:
            print("[{}/{}]: Train loss={:2.4f}, Validation loss={:2.4f}".format(e+1,epochs,train_loss_lst[-1],val_loss_lst[-1]))
#             for param in model.parameters():
#                 print(param.data)
#         print()

        if train_loss_lst[-1]<=0.25:
            for param in optimizer.param_groups:
                param["lr"]=5e-4
                
            if train_loss_lst[-1]<=0.15:
                for param in optimizer.param_groups:
                    param["lr"]=1e-4

    return train_loss_lst, val_loss_lst, batch_epoch_loss_lst

In [None]:
train_loss_lst, val_loss_lst, batch_epoch_loss_lst = train(epochs=100)

In [None]:
def test(model=model, test_dl=train_dl, test_size=test_size):
    test_loss = 0
    for batch_idx,(batch_imgs,batch_labels) in enumerate(test_dl):
        model.eval()
        batch_imgs,batch_labels = batch_imgs.to(device),batch_labels.to(device)
        output     = model(batch_imgs.float())
        
        # Reshape the outputs batch_size x 28 -> batch_size x 2 x 14
        output     = output.view(batch_labels.shape)
        loss       = criterion(output,batch_labels.float())
        test_loss += loss.item()

        plt.figure()
        plt.imshow(batch_imgs[0])
        plt.scatter(202*(0.5 + np.transpose(output[0].detach().numpy())[:,0]), 202*(0.5 + np.transpose(output[0].detach().numpy())[:,1]),s=8,c="r")
        
        break

    return test_loss/test_size

In [None]:
test()