In [None]:
# Import all the necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import cv2
import deeplake

In [None]:
# Load the dataset (LSP)
# The dataset is in the form of a dictionary with three keys: 'images', 'keypoints', 'images_visualized'
# 'images' is the original image
# 'keypoints' is the ground truth keypoints
# 'images_visualized' is the original image with the ground truth keypoints visualized
# However, the image and visualized image are different in size and the keypoints are not aligned with the original image but with the visualized image
# The dataset is split into train and test
ds_train = deeplake.load("hub://activeloop/lsp-train")
ds_test = deeplake.load("hub://activeloop/lsp-test")

# The dataset can be converted to a tensorflow dataset
dataloader_train = ds_train.tensorflow()
dataloader_test = ds_test.tensorflow()

In [None]:
# Function to normalize the keypoints
# The bounding box is in the form of [x,y,w,h] where x and y are the coordinates of the center of the bounding box and w and h are the width and height of the bounding box
# This function is the same as the one in the paper
def normalize_keypoints(keypoints,bounding_box):
    keypoints[:,0] = (keypoints[:,0] - bounding_box[0])/bounding_box[2]
    keypoints[:,1] = (keypoints[:,1] - bounding_box[1])/bounding_box[3]
    return keypoints

In [None]:
# Function to unnormalize the keypoints
# This function is the same as the one in the paper
def unnormalize_keypoints(keypoints,bounding_box):
    keypoints[:,0] = (keypoints[:,0]*bounding_box[2]) + bounding_box[0]
    keypoints[:,1] = (keypoints[:,1]*bounding_box[3]) + bounding_box[1]
    keypoints[:,0] = np.clip(keypoints[:,0],0,bounding_box[2])
    keypoints[:,1] = np.clip(keypoints[:,1],0,bounding_box[3])
    return keypoints.astype(np.int32)

In [None]:
# Function to visualize the keypoints
# image_visualized is needed to get the size of the image on which the keypoints are aligned
# This draws the keypoints and the lines connecting the keypoints
def visualize_keypoints(image,keypoints,image_visualized):
    height,width = image_visualized.shape[:2]
    try:
        image = cv2.resize(image.numpy(),(width,height))
    except:
        image = cv2.resize(image,(width,height))
    fig , ax = plt.subplots(1,figsize=(10,10))
    ax.imshow(image)
    ax.scatter(keypoints[:,0],keypoints[:,1])
    for i in range(keypoints.shape[0]):
        ax.annotate(str(i), (keypoints[i,0],keypoints[i,1]),fontsize=10)
    for i in range(0,5):
        ax.plot([keypoints[i,0],keypoints[i+1,0]],[keypoints[i,1],keypoints[i+1,1]],linewidth=5)
    for i in range(6,11):
        ax.plot([keypoints[i,0],keypoints[i+1,0]],[keypoints[i,1],keypoints[i+1,1]],linewidth=5)
    for i in range(12,13):
        ax.plot([keypoints[i,0],keypoints[i+1,0]],[keypoints[i,1],keypoints[i+1,1]],linewidth=5)
    plt.show()

In [None]:
# Visualize on a sample image
batch_iter = iter(dataloader_train)
batch = next(batch_iter)

In [None]:
# Get the image and the keypoints
batch = next(batch_iter)
image = batch["images"]
keypoints = batch["keypoints"]
image_visualized = batch["images_visualized"]

# Visualize the keypoints
visualize_keypoints(image,keypoints,image_visualized)

In [None]:
# Get all the images and keypoints from the dataset and store them in a numpy array for training
# The keypoints are normalized and stored in the keypoints_array
# The images are resized to 220x220 and stored in the images array (also normalized to 0-1)
images = []
keypoints_array = []

# Iterate over the training dataset and store the images and keypoints
for batch in dataloader_train:
    image = batch["images"]
    keypoints = batch["keypoints"].numpy().astype(np.float32)
    keypoints = keypoints[:,0:2]
    image_visualized = batch["images_visualized"].numpy()
    height,width = image_visualized.shape[:2]
    keypoints = normalize_keypoints(keypoints,(width/2,height/2,width,height))
    image = cv2.resize(image.numpy(),(220,220)).astype(np.float32)/255.0
    images.append(image)
    keypoints_array.append(keypoints)

# Since the dataset is small, we can also use the test dataset for training
# Iterate over the test dataset and store the images and keypoints
for batch in dataloader_test:
    image = batch["images"]
    keypoints = batch["keypoints"].numpy().astype(np.float32)
    keypoints = keypoints[:,0:2]
    image_visualized = batch["images_visualized"].numpy()
    height,width = image_visualized.shape[:2]
    keypoints = normalize_keypoints(keypoints,(width/2,height/2,width,height))
    image = cv2.resize(image.numpy(),(220,220)).astype(np.float32)/255.0
    images.append(image)
    keypoints_array.append(keypoints)

# Convert the images and keypoints_array to numpy arrays
images = np.array(images)
keypoints_array = np.array(keypoints_array,dtype=np.float32)

In [None]:
# Create a model
# The model is the same as the one in the paper
# This is also called the AlexNet model
# Use batch normalization for wherever LRN is used in the paper
# Default output is 14*2 (14 keypoints with x and y coordinates)
def get_model(output = 14*2):
    model = tf.keras.models.Sequential([tf.keras.layers.Conv2D(96,(11,11),strides=(4,4),activation="relu",input_shape=(220,220,3)),
                                        tf.keras.layers.BatchNormalization(),
                                        tf.keras.layers.MaxPool2D((2,2),strides=(2,2)),
                                        tf.keras.layers.Conv2D(256,(5,5),activation="relu",padding="same"),
                                        tf.keras.layers.BatchNormalization(),
                                        tf.keras.layers.MaxPool2D((2,2),strides=(2,2)),
                                        tf.keras.layers.Conv2D(384,(3,3),activation="relu"),
                                        tf.keras.layers.BatchNormalization(),
                                        tf.keras.layers.Conv2D(384,(3,3),activation="relu"),
                                        tf.keras.layers.BatchNormalization(),
                                        tf.keras.layers.Conv2D(256,(3,3),activation="relu"),
                                        tf.keras.layers.BatchNormalization(),
                                        tf.keras.layers.MaxPool2D((2,2)),
                                        tf.keras.layers.Flatten(),
                                        tf.keras.layers.Dense(4096,activation="relu"),
                                        tf.keras.layers.Dropout(0.5),
                                        tf.keras.layers.Dense(4096,activation="relu"),
                                        tf.keras.layers.Dropout(0.5),
                                        tf.keras.layers.Dense(output,activation="linear")])
    return model


In [None]:
# Get the model
model = get_model()

In [None]:
# A custom loss function is used
# The loss function is the same as the one in the paper (L2 loss)
# The loss function is the sum of the squared difference between the predicted and ground truth keypoints
# Reshape the predicted keypoints(batch_size,28) to (batch_size,14,2) before calculating the loss
def loss_fn(y_true,y_pred):
    y_pred = tf.reshape(y_pred,(-1,14,2))
    return tf.reduce_sum(tf.square(y_true-y_pred))

In [None]:
# Check for a sample image
sample = images[0]
sample = cv2.resize(sample,(220,220)).astype(np.float32)/255.0
sample = np.expand_dims(sample,axis=0)
sample = tf.convert_to_tensor(sample,dtype=tf.float32)
pred = model(sample)

In [None]:
# Calculate the loss
loss_fn(keypoints_array[0],pred)

In [None]:
# Compile the model with Adam optimizer and the custom loss function
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),loss=loss_fn)

In [None]:
# Train the model
# model.fit(images,keypoints_array,epochs=10,batch_size=2,validation_split=0.1,shuffle=True,verbose=1)

In [None]:
# Save the model
# model.save("deeppose.h5")

In [None]:
# Load the model
model = tf.keras.models.load_model("deeppose.h5",custom_objects={"loss_fn":loss_fn})

In [None]:
# Function to get the predictions from the model for a given image
def get_preds(model,image):
    image= cv2.resize(image,(220,220))
    image = np.expand_dims(image,axis=0)
    image = tf.convert_to_tensor(image,dtype=tf.float32)
    pred = model(image)
    pred = tf.reshape(pred,(14,2))
    return pred

In [None]:
# Make predictions on a sample image
test_iter = iter(dataloader_test)

In [None]:
# Visualize the predictions
# We will see that the model performs resonably well on our dataset
img = next(test_iter)["images"].numpy()
img = cv2.resize(img,(220,220))
image = cv2.resize(img,(220,220)).astype(np.float32)/255.0
pred = get_preds(model,image).numpy()
keypoints = unnormalize_keypoints(pred,(110,110,220,220))
visualize_keypoints(img,keypoints,img)

In [None]:
# Test on a random image
# We can see that the model is not able to generalize well
img = cv2.imread("download.jfif")
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
img = cv2.resize(img,(220,220))
image = cv2.resize(img,(220,220)).astype(np.float32)/255.0
pred = get_preds(model,image).numpy()
keypoints = unnormalize_keypoints(pred,(110,110,220,220))
visualize_keypoints(img,keypoints,img)

In [None]:
# Now we shall move on to train the cascaded model
# The cascaded model is the same as the one in the paper
# The cascaded model is trained on the same dataset as the original model
# The cascaded model is trained on the difference between the predicted and ground truth keypoints
# Unnormalize the keypoints before training
for i in range(keypoints_array.shape[0]):
    keypoints_array[i] = unnormalize_keypoints(keypoints_array[i],(110,110,220,220)).astype(np.int32)  

In [None]:
# Find out the diameter 
# The diameter is the distance between the left shoulder and right hip
diams = []
for i,image in enumerate(images):
    diams.append(np.sqrt(np.sum(np.square(keypoints_array[i][8]-keypoints_array[i][3]))))
diams = np.array(diams)

In [None]:
# Multiplier to multiply the diameter to get the bounding box size
SIGMA = 1.25

In [None]:
# Get all the predicted keypoints from the original model
# preds = model(images)
# preds = tf.reshape(preds,(-1,14,2)).numpy()

In [None]:
# Unnormalize the predicted keypoints
for i in range(preds.shape[0]):
    preds[i] = unnormalize_keypoints(preds[i],(110,110,220,220))  

In [None]:
# Find the difference between the predicted and ground truth keypoints
keypoints_errors = keypoints_array - preds

In [None]:
# Function to create the training dataset for the cascaded model
def crop_and_resize(image,bounding_box):
    x,y,w,h = bounding_box
    top_left_x = int(max(0,x-(w//2)))
    top_left_y = int(max(0,y-(h//2)))
    bottom_right_x = int(min(image.shape[1],x+(w//2)))
    bottom_right_y = int(min(image.shape[0],y+(h//2)))
    image = image[top_left_y:bottom_right_y,top_left_x:bottom_right_x]
    image = cv2.resize(image,(220,220))
    return image

In [None]:
# Get all the bounding boxes corresponding to each keypoint
bounding_boxes = []
for i in range(keypoints_array.shape[0]):
    bounding_box_per_image = []
    for k in range(14):
        x,y = keypoints_array[i][k]
        bounding_box_per_image.append([x,y,SIGMA*diams[i]+0.0001,SIGMA*diams[i]+0.0001])
    bounding_box_per_image = np.array(bounding_box_per_image)
    bounding_boxes.append(bounding_box_per_image)
bounding_boxes = np.array(bounding_boxes)

In [None]:
# Normalize the keypoints_errors
for i in range(keypoints_errors.shape[0]):
    for k in range(keypoints_errors.shape[2]):
        keypoints_errors[i,k,:] = normalize_keypoints(np.array([keypoints_errors[i,k,:]]),bounding_boxes[i,k])

In [None]:
# Get the cascaded model (same as the one in the paper)
# Has 2 outputs: displacement in x and y direction
cascade_model = get_model(2)

In [None]:
# Compile the model with Adam optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)

In [None]:
# Custom loss function
# The loss function is the same as the one in the paper (L2 loss)
# The loss function is the sum of the squared difference between the predicted and ground truth keypoints
def loss_fn_cascade(y_true,y_pred):
    return tf.reduce_sum(tf.square(y_true-y_pred))

In [None]:
# Create a custom training step
@tf.function
def train_step(image,keypoints_errors):
    with tf.GradientTape() as tape:
        pred = cascade_model(image)
        loss_value = loss_fn_cascade(keypoints_errors,pred)
    grads = tape.gradient(loss_value,cascade_model.trainable_weights)
    optimizer.apply_gradients(zip(grads,cascade_model.trainable_weights))
    return loss_value

# Function to train the model
def train(epochs):
    for epoch in range(epochs):
        loss_value = 0
        for i in range(images.shape[0]):
            for k in range(14):
                image = crop_and_resize(images[i],bounding_boxes[i,k])
                image = np.expand_dims(image,axis=0)
                image = tf.convert_to_tensor(image,dtype=tf.float32)
                keypoints_error = keypoints_errors[i,k]
                keypoints_error = np.expand_dims(keypoints_error,axis=0)
                keypoints_error = tf.convert_to_tensor(keypoints_error,dtype=tf.float32)
                loss_value += train_step(image,keypoints_error)
            if(i%100==0):
                print(f"{i} images done with loss {loss_value.numpy()} for epoch {epoch}")
        print(f"Epoch {epoch} done with loss {loss_value.numpy()}")

In [None]:
# Train the model
# train(epochs=1)

In [None]:
# Save the model
# cascade_model.save("deeppose_cascade_model.h5")

In [None]:
# Load the model
cascade_model = tf.keras.models.load_model("deeppose_cascade_model.h5")

In [None]:
# Test the model on a sample image
test_iter = iter(dataloader_test)

In [None]:
# Visualize the predictions on a sample image
# We can see that the model performs resonably well on our dataset
# We refine the predictions from the original model using the cascaded model
img = next(test_iter)["images"].numpy()
img = cv2.resize(img,(220,220))
image = cv2.resize(img,(220,220)).astype(np.float32)/255.0
pred = get_preds(model,image).numpy()
keypoints = unnormalize_keypoints(pred,(110,110,220,220))
sigma = 1.25
diam = np.sqrt(np.sum(np.square(keypoints[8]-keypoints_array[3])))
w = int(sigma*diam + 0.0001)
h = int(sigma*diam + 0.0001)
for i in range(14):
    image = crop_and_resize(img/255.0,(keypoints[i,0],keypoints[i,1],w,h))
    image = np.expand_dims(image,axis=0)
    image = tf.convert_to_tensor(image,dtype=tf.float32)
    pred = cascade_model(image)
    pred = tf.reshape(pred,(2,))
    # pred = unnormalize_keypoints(np.array([pred.numpy()]),(keypoints[i,0],keypoints[i,1],w,h))
    keypoints[i] += pred[0]
visualize_keypoints(img,keypoints,img)

In [None]:
# Test on a random image
img = cv2.imread("download.jfif")
img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
img = cv2.resize(img,(220,220))
image = cv2.resize(img,(220,220)).astype(np.float32)/255.0
pred = get_preds(model,image).numpy()
keypoints = unnormalize_keypoints(pred,(110,110,220,220))
sigma = 1.25
diam = np.sqrt(np.sum(np.square(keypoints[8]-keypoints_array[3])))
w = int(sigma*diam + 0.0001)
h = int(sigma*diam + 0.0001)
for i in range(14):
    image = crop_and_resize(img/255.0,(keypoints[i,0],keypoints[i,1],w,h))
    image = np.expand_dims(image,axis=0)
    image = tf.convert_to_tensor(image,dtype=tf.float32)
    pred = cascade_model(image)
    pred = tf.reshape(pred,(2,))
    # pred = unnormalize_keypoints(np.array([pred.numpy()]),(keypoints[i,0],keypoints[i,1],w,h))
    keypoints[i] += pred[0]
visualize_keypoints(img,keypoints,img)