<div align="right">
    <a href="https://colab.research.google.com/github/Its-Shivanshu-Sharma/HumanPoseDetection/blob/main/HumanPoseDetection.ipynb">
        <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab"/>
    </a>
    <br/>
    <a href="https://console.paperspace.com/github/Its-Shivanshu-Sharma/HumanPoseDetection/blob/main/HumanPoseDetection.ipynb">
        <img src="https://assets.paperspace.io/img/gradient-badge.svg" alt="Run on Gradient"/>
    </a>
</div>

## Installing & Importing libraries, packages, etc.

In [None]:
%%bash
pip3 install -qq torch==1.10.2+cu113 torchvision==0.11.3+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html

In [None]:
import copy
import os
import pathlib
import time

import cv2
import pandas as pd
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torchvision
import torchvision.models as models
import torchvision.transforms as T
from google.colab.patches import cv2_imshow
from scipy.io import loadmat
from torch.utils.data import DataLoader, Dataset

cudnn.benchmark = True

## Fetching & Preparing the Dataset

In [None]:
# Set the seed for the RNG(Random Number Generator manually
# (for reproducibility)
torch.manual_seed(10)

### NOTE:
The dataset used in this project is the [`Leeds Sports Pose Dataset`](http://sam.johnson.io/research/lsp.html).

In [None]:
%%bash
# Create a new directory called `pose_dataset` to store the data
mkdir pose_dataset
cd pose_dataset
wget -q http://sam.johnson.io/research/lsp_dataset.zip
unzip -qq lsp_dataset.zip
rm lsp_dataset.zip

In [None]:
root = pathlib.Path("./pose_dataset")

#### The annotations for the joints for each image are in a `joints.mat` file, hence, we make use of the `scipy.io.loadmat` function to read this data.

In [None]:
raw_annotations = loadmat(root / "joints.mat")

In [None]:
raw_annotations_x = raw_annotations["joints"][0].T.reshape(-1, 1).squeeze()
raw_annotations_y = raw_annotations["joints"][1].T.reshape(-1, 1).squeeze()

In [None]:
raw_annot_x.shape

In [None]:
# No. of joints annotated for each image
n_joints = 14

In [None]:
index = pd.MultiIndex.from_product(
    (range(len(raw_annotations) // n_joints), range(n_joints)),
    names=["image_num", "joint"],
)
cols = pd.Index(["x", "y"], name="coordinates")

In [None]:
full_annotations = pd.DataFrame(
    zip(raw_annotations_x, raw_annotations_y), index=index, columns=cols
)
full_annotations

In [None]:
full_annotations = annot.unstack(1).swaplevel(i=0, j=1, axis=1).sort_index(axis=1)
full_annotations

In [None]:
def train_val_split(annotations, train_size=None, val_size=None):
    """Function for randomly splitting the annotations into training &
    validation sets as  per the size specified by the parmaters `train_size`
    or `val_size`.
    Indices of training & validation sets are reset to 0. The old indices
    (which specify the image) are stored in a new column named `image_num`.

    Parameters:
    -----------
    - annotations (pandas.DataFrame): Dataframe containing the annotations.
    - train_size (float): Fraction of the data to be allocated to training set.
    - val_size (float): Fraction of the data to be allocated to validation set.
                        This parameter is ignored if `train_size` is not None.

    Returns:
    --------
    Tuple containing the training & validation sets,
    i.e. `(train_annotations, val_annotations)`.

    """
    # Calculate length of training set
    annotation_length = len(annotations)
    if train_size:
        train_len = int(annotation_length * train_size)
    elif val_size:
        train_len = annotation_length - int(annotation_length * val_size)

    # Generate random indices
    rand_idxs = torch.randperm(annotation_length)
    train_annotations = annotations.loc[rand_idxs[:train_len]]
    val_annotations = annotations.loc[rand_idxs[train_len:]]

    # Reset indices of `train_annotations` & `val_annotations` to start from 0
    # (& add `image_num` column)
    train_annotations = train_annotations.reset_index().sort_index(axis=1)
    val_annotations = val_annotations.reset_index().sort_index(axis=1)

    return train_annotations, val_annotations

In [None]:
annotations = {}
annotations["train"], annotations["val"] = train_val_split(
    full_annotations, train_size=0.8
)

In [None]:
for phase in ["train", "val"]:
    print(f"Size of {phase} set: {len(annotations[phase])}")

### Creating `Dataset` & `DataLoader` objects

In [None]:
class PoseDataset(Dataset):
    def __init__(
        self,
        annotations,
        img_dir,
        img_format="jpg",
        transform=None,
        target_transform=None,
    ):
        """
        Parameters:
        -----------
        - annotations (pandas.DataFrame): Dataframe containing the annotations
                                          for the joints.
        - img_dir (pathlib.Path object or str): Path object or str specifying
                                                the directory containing the
                                                images.
        - img_format (str: default="jpg"): String specifying the format of the
                                           images.
        - transform (callable: default=None): Function or callable which returns
                                              a transformed version of the image
                                              passed as input.
        - target_transform (callable: default=None): Function or callable  which
                                                     returns a transformed
                                                     version of the target label
                                                     passed as input.
        """
        self.annotations = annotations
        self.img_dir = img_dir
        self.img_format = img_format
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        pts = self.annotations.loc[index, slice(0, 13)]
        img_num = self.annotations.loc[index, "image_num"]

        # Append image extension and pad zeros in `img_num` to get the file name
        file_name = f"im{int(img_num)+1:04d}.{self.img_format}"
        img_file = os.path.join(self.img_dir, file_name)
        img = torchvision.io.read_image(img_file)

        # Store the original height & width of the images before transformation
        # This will be used for scaling the coordinates of the target points
        org_size = img.size()[-2:]
        if self.transform:
            img = self.transform(img)
            new_size = self.transform.size

            if self.target_transform:
                # `org_size` & `new_size` are used to transform the coordinates
                # of the joints
                pts = self.target_transform(pts, org_size, new_size)

        return img, pts

In [None]:
def transform_coords(pts, org_size, new_size):
    """Function to scale the coordinates in proportion to image resizing.

    Parameters:
    -----------
    - pts (pandas.DataFrame): DataFrame containing the points to be scaled.
    - org_size (tuple or array-like): Original size of the image.
    - new_size (tuple or array-like): Final size of the image after resizing.

    Returns:
    --------
    Numpy ndarray containing the scaled coordinates.
    """
    new_x = pts.loc[:, "x"] * new_size[1] / org_size[1]
    new_y = pts.loc[:, "y"] * new_size[0] / org_size[0]
    new_pts = pd.concat([new_x, new_y], axis=1)

    return torch.Tensor(new_pts.to_numpy().reshape(-1, 1))

In [None]:
img_size = (3, 150, 125)
# Define the mean & standard deviation to use for Normalizing the images
img_mean = [0.485, 0.456, 0.406]
img_std = [0.229, 0.224, 0.225]

In [None]:
transforms = {
    "train": nn.Sequential(
        T.Resize(img_size[1:]),
        T.Normalize(img_mean, img_std),
    ),
    "val": nn.Sequential(
        T.Resize(img_size[1:]),
        T.Normalize(img_mean, img_std),
    ),
}

In [None]:
datasets = {
    phase: PoseDataset(
        annotations=annotations[phase],
        img_dir=root / "images",
        transform=transforms[phase],
        target_transform=transform_coords,
    )
    for phase in ["train", "val"]
}

In [None]:
len(datasets["train"], datasets["val"])

In [None]:
# Define the batch size
bs = 32
n_workers = 4

In [None]:
dataloaders = {
    phase: DataLoader(
        datasets[phase], batch_size=bs, shuffle=True, num_workers=n_workers
    )
}

## Visualizing the Data

In [None]:
joints = [
    "right ankle",
    "right knee",
    "right hip",
    "left hip",
    "left knee",
    "left ankle",
    "right wrist",
    "right elbow",
    "right shoulder",
    "left shoulder",
    "left elbow",
    "left wrist",
    "neck",
    "head top",
]
joints_map = {joint: i for i, joint in enumerate(joints)}

In [None]:
class COLOR:
    """Class containing variables for various colors.
    Color scheme = (B,G,R)
    """

    RED = (0, 0, 255)
    GREEN = (0, 255, 0)
    BLUE = (255, 0, 0)
    CYAN = (255, 255, 0)
    YELLOW = (0, 255, 255)
    MAGENTA = (255, 0, 255)
    LAVENDER = (250, 230, 230)
    ORANGE = (0, 175, 255)

In [None]:
# List of joint-connections & colors being used for connecting the joints in
# image annotations
joints_relations = [
    ("right ankle", "right knee", COLOR.CYAN),
    ("left ankle", "left knee", COLOR.CYAN),
    ("right knee", "right hip", COLOR.RED),
    ("left knee", "left hip", COLOR.RED),
    ("right wrist", "right elbow", COLOR.GREEN),
    ("left wrist", "left elbow", COLOR.GREEN),
    ("right elbow", "right shoulder", COLOR.BLUE),
    ("left elbow", "left shoulder", COLOR.BLUE),
    ("neck", "head top", COLOR.MAGENTA),
    ("neck", "hip line", COLOR.YELLOW),  # hip line = mean of left & right hip
    ("right hip", "left hip", COLOR.LAVENDER),
    ("right shoulder", "left shoulder", COLOR.ORANGE),
]

In [None]:
def annotate_image(img, pts, mapping, relations):
    """Function to annotate an image with lines drawn between specified points.

    Parameters:
    -----------
    - img (torch.Tensor) - Image in (C,H,W) format. Color-scheme = (R,G,B).
    - pts (array-like) - Points to use for annotating lines.
    - mapping (dict-like) - Mapping from joint name to joint number.
    - relations (list) - List containing tuples defining which points to join &
                         with what color.
    """
    # Change image format from (C, H, W) to (H, W, C)
    img = img.clone().detach().permute((1, 2, 0))

    # Change image from RBG to BGR (& change to np ndarray to show img)
    img = torch.stack((img[:, :, 2], img[:, :, 1], img[:, :, 0]), dim=2).numpy()

    # Calculate the coordinates for the `hip line` from coordinates of the
    # `right hip` & the `left hip`
    rhip = mapping.get("right hip")
    lhip = mapping.get("left hip")
    # Use floor divison to get integer coordinates (for annotating images)
    hipline_coords = (pts[rhip] + pts[lhip]) // 2

    for j1, j2, color in relations:
        pt1 = mapping.get(j1, None)
        pt2 = mapping.get(j2, None)
        # Convert coordinates to `int` for annotating the image
        # For "hip line" `pt1` or `pt2` will be equal `None` so we set it
        # equal to the above calculated `hipline_coords`
        pt1 = tuple(map(int, pts[pt1])) if pt1 is not None else hipline_coords
        pt2 = tuple(map(int, pts[pt2])) if pt2 is not None else hipline_coords
        # Annotate the image
        cv2.line(img, pt1, pt2, color=color)
    cv2_imshow(img)

## Creating & Training a Model on the Data.

In [None]:
class Model(nn.Module):
    """Class implementing the model to be trained on the data."""

    def __init__(self, img_shape, output_shape):
        super(Model, self).__init__()

        self.img_shape = img_shape
        self.output_shape = output_shape

        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=img_shape[0], out_channels=8, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=1),
            nn.Conv2d(in_channels=64, out_channels=16, kernel_size=1),
        )

        self.flatten = nn.Flatten(start_dim=-3)

        # Calculate the shape of the output produced by the convolution &
        # pooling layers
        out_shape = self._calc_output_shape()
        self.fc = nn.Sequential(
            nn.Linear(out_shape[0] * out_shape[1] * out_shape[2], 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, output_shape),
            nn.ReLU(),
        )

    def forward(self, x):
        y = self.conv(x)
        y = self.flatten(y)
        y = self.fc(y)
        return y

    def _calc_output_shape(self):
        out_img_shape = list(self.img_shape)
        for module in self.conv.modules():
            if isinstance(module, (nn.Conv2d, nn.MaxPool2d, nn.AvgPool2d)):
                # Kernel size
                k = module.kernel_size
                k = k if isinstance(k, tuple) else (k, k)
                # Stride
                s = module.stride
                s = s if isinstance(s, tuple) else (s, s)
                # Padding
                p = module.padding
                p = p if isinstance(p, tuple) else (p, p)
                # Dilation
                d = module.dilation
                d = d if isinstance(d, tuple) else (d, d)
                # Update no. of channels (for Conv2d layers)
                if hasattr(module, "out_channels"):
                    out_img_shape[0] = module.out_channels
                # Update image height
                out_img_shape[1] = (
                    out_img_shape[1] + 2 * p[0] - d[0] * (k[0] - 1) - 1
                ) // s[0] + 1
                # Update image width
                out_img_shape[2] = (
                    out_img_shape[2] + 2 * p[1] - d[1] * (k[1] - 1) - 1
                ) // s[1] + 1

        return out_img_shape

In [None]:
def train_val_model(
    model, dataloaders, loss_fn, optimizer, num_epochs, lr_scheduler=None, device="cpu"
):
    start_time = time.time()

    # Initialize the best model weights as the initial weights of the model
    best_model_parameters = copy.deepcopy(model.state_dict())
    min_val_loss = float("inf")

    # Move the `model` to the specified `device`
    model = model.to(device)

    # Initially set the gradients to zero
    optimizer.grad_zero()

    for i in range(num_epochs):
        print(f"Epoch {i}/{num_epochs-1}")
        print("-" * 15)

        # Both the training & validation datasets are passed to the model in
        # each epoch
        for phase in ["train", "val"]:
            # Set mode depending upon the phase
            if phase == "train":
                model.train()
            else:
                model.eval()

            total_loss = 0.0

            with torch.set_grad_enabled(phase == "train"):
                for imgs, targets in dataloaders["phase"]:
                    imgs = imgs.to(device)
                    targets = targets.to(device)
                    predictions = model(imgs)
                    loss = loss_fn(predictions, targets)
                    total_loss += loss.item() * imgs.size(0)

                    # Update parameter values during the training phase
                    if phase == "train":
                        loss.backward()
                        optimizer.step()
                        # Set the gradients to zero
                        optimizer.grad_zero()

            # Update the learning rate using the `lr_scheduler`
            # Only count training loops for updating parameters
            if lr_scheduler and phase == "train":
                lr_scheduler.step()
            avg_loss = total_loss / len(datasets[phase])
            print(f"{phase.capitalize()} Average Loss: {avg_loss: .4f}")

            if phase == "val" and avg_loss < min_val_loss:
                best_model_parameters = copy.deepcopy(model.state_dict())
                min_val_loss = avg_loss
        # Print a new line after each epoch
        print()

        total_time = time.time() - start_time
        print(f"Model training completed in {total_time//60}m {total_time % 60}s")
        print(f"Minimum average validation loss: {min_val_loss}")

        return best_model_parameters

##### Use `GPU` for training if available else use `CPU`

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device being used: {device}")

In [None]:
from torch.optim import Adam