### Setting the task

There is a video file `dance.mp4` with three girls performing shuffle-dance.   
It is necessary to pose each girl in each frame, save the result in the video file `dance_out.mp4`.

In [1]:
import torch, cv2
import numpy as np
import torchvision
from tqdm import tqdm
import matplotlib.pyplot as plt
from torchvision import transforms
from torchvision.models.detection import keypointrcnn_resnet50_fpn

Declaration of required functions.

In [2]:
# Centering function with the conversion of an array into a vector. 
def center(Xar, F=True):
    newX = Xar - np.mean(Xar, axis = 0)
    newX = newX.flatten() if F else newX
    return newX

def affine_transform(ref_keys, tst_keys, ref_confs, tst_confs):
    ref_keys = ref_keys
    tst_keys = tst_keys
    # pad and unpad add and remove 1 at the end of the matrix.  
    pad = lambda x: np.hstack([x, np.ones((x.shape[0], 1))])
    unpad = lambda x: x[:, :-1]
    X = pad(tst_keys)
    Y = pad(ref_keys)
    A, res, rank, s = np.linalg.lstsq(X, Y, rcond=None)

    # Converting too small values to "0".
    A[np.abs(A) < 1e-10] = 0

    # Now that we have found the extended matrix A,
    # we can transform the input set of key points.
    transform = lambda x: unpad(np.dot(pad(x), A))
    keypoints_transformed = transform(tst_keys)
    return keypoints_transformed

# The cosine similarity calculation function.  
def cosine_similarity(pose1, pose2):
    pose1, pose2 = [center(i) for i in [pose1, pose2]]
    return np.dot(pose1, pose2.T) / (np.linalg.norm(pose1)*np.linalg.norm(pose2))

# Weighted cosine similarity calculation function.  
def weighted_distance(pose1, pose2, confs):
    # Centering.  
    pose1, pose2 = [center(i, False) for i in [pose1, pose2]]

    # Normalization
    pose1, pose2 = [i/np.linalg.norm(i) for i in [pose1, pose2]]

    # Summation of weighted distances between keypoints.  
    sum = 0
    for k in range(len(pose1)):
        sum += (confs[k] 
        * np.linalg.norm(pose1[k]-pose2[k]))
    return sum / confs.sum()

def draw_skeleton_per_person(
            img, all_keypoints, all_scores, confs, 
            all_boxes,
            keypoint_threshold=2, conf_threshold=0.9,
            thickness=2, rd=5, th=2):    

    cmap = plt.get_cmap('rainbow')
    img_copy = img.copy()
    color_id = (np.linspace(0, 255, all_keypoints.shape[0]+2).astype(int).tolist()[1:-1])
    boxes = sorted(all_boxes.tolist())
    color_order = 0
    for i, person_id in enumerate(all_keypoints[:,:,0].mean(axis=1).argsort()):
        keypoints_transformed = affine_transform(all_keypoints[0], all_keypoints[person_id], confs[0], confs[person_id])
        cos_sim = round(cosine_similarity(keypoints_transformed, all_keypoints[0]), 4)
        W = all_scores[0] * all_scores[person_id]
        W[W < 0] = 0
        W = np.sqrt(W)
        wght = round(1-weighted_distance(keypoints_transformed, all_keypoints[0], W), 4)
        fontscale = min(img.shape[:2]) * 1.3e-3
        thickness = int((min(img.shape[:2]) * 5e-3))
        keypoints = all_keypoints[person_id, ...]
        scores = all_scores[person_id, ...]
        color = tuple(np.asarray(cmap(color_id[color_order])[:-1])*255)
        x_pos = int(boxes[person_id][0])
        y_pos = int(boxes[person_id][1])
        if person_id == 0:
            text = f"ETALON"
        else:
            text = f"DANC-{person_id}"
        xp = int((int(boxes[person_id][2]-int(boxes[person_id][0])))/3)
        cv2.putText(
            img_copy, text, (x_pos+xp, y_pos-20),
            fontFace=cv2.FONT_ITALIC, 
            fontScale=1.15, 
            color=(255, 255, 0), thickness=thickness)
        color_order += 1
        for kp in range(len(scores)):
            if scores[kp] > keypoint_threshold:
                keypoint = tuple(map(int, keypoints[kp, :2])) 
                cv2.circle(img_copy, keypoint, rd, color, -1)
        
        for limb_id in range(len(limbs)):
          limb_loc1 = tuple(map(int, keypoints[limbs[limb_id][0], :2]))
          limb_loc2 = tuple(map(int, keypoints[limbs[limb_id][1], :2]))
          limb_score = min(all_scores[person_id, limbs[limb_id][0]], all_scores[person_id, limbs[limb_id][1]])
          if limb_score> keypoint_threshold:
            cv2.line(img_copy, limb_loc1, limb_loc2, color, thickness=th)

    return img_copy[:,:,::-1]

In [3]:
video_file = "../video/3-danc.mp4"
cap = cv2.VideoCapture(video_file)
frameRate = cap.get(5)
girl = []
while(cap.isOpened()):
  frameId = cap.get(1) 
  ret, frame = cap.read()
  if (ret != True):
      break
  else:
    girl.append(frame)

# Data about the video file being processed.  
video_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
video_fn = cap.get(cv2.CAP_PROP_FRAME_COUNT)

print(f" file: {video_file}\n width: {video_w}\n height: {video_h}\n \
fps: {video_fps}\n frame count: {video_fn}")
cap.release()   

 file: ../video/3-danc.mp4
 width: 1920
 height: 1080
 fps: 29.97002997002997
 frame count: 90.0


In [4]:
model = torchvision.models.detection.keypointrcnn_resnet50_fpn(weights='DEFAULT')

keypoints = ['nose', 'left_eye','right_eye',
             'left_ear', 'right_ear', 'left_shoulder',
             'right_shoulder', 'left_elbow', 'right_elbow',
             'left_wrist', 'right_wrist', 'left_hip',
             'right_hip', 'left_knee', 'right_knee',
             'left_ankle', 'right_ankle']

def get_limbs_from_keypoints(keypoints):
    limbs = [
        [keypoints.index("right_eye"), 
        keypoints.index("nose")],
        [keypoints.index("right_eye"), 
        keypoints.index("right_ear")],
        [keypoints.index("left_eye"), 
        keypoints.index("nose")],
        [keypoints.index("left_eye"), 
        keypoints.index("left_ear")],
        [keypoints.index("right_shoulder"), 
        keypoints.index("right_elbow")],
        [keypoints.index("right_elbow"), 
        keypoints.index("right_wrist")],
        [keypoints.index("left_shoulder"), 
        keypoints.index("left_elbow")],
        [keypoints.index("left_elbow"), 
        keypoints.index("left_wrist")],
        [keypoints.index("right_hip"), 
        keypoints.index("right_knee")],
        [keypoints.index("right_knee"), 
        keypoints.index("right_ankle")],
        [keypoints.index("left_hip"), 
        keypoints.index("left_knee")],
        [keypoints.index("left_knee"), 
        keypoints.index("left_ankle")],
        [keypoints.index("right_shoulder"), 
        keypoints.index("left_shoulder")],
        [keypoints.index("right_hip"), 
        keypoints.index("left_hip")],
        [keypoints.index("right_shoulder"), 
        keypoints.index("right_hip")],
        [keypoints.index("left_shoulder"), 
        keypoints.index("left_hip")]]
    return limbs

limbs = get_limbs_from_keypoints(keypoints)

In [5]:
# Preparing the model for the inference.  
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model = model.eval().to(device)

cuda


Building a skeleton and creating a list of frames.

In [6]:
grl_frame=[]
count = 0
for img in tqdm(girl):
# Transformation - obtaining an image tensor. 
    transform = transforms.Compose([transforms.ToTensor()])
    img_tensor = transform(img).to(device)
    with torch.no_grad():
        out_img = model([img_tensor])[0]

    # Mask of the threshold number of objects in the frame.  
    mask_obj = out_img['scores'] > 0.9

    # Only objects that have passed through the threshold.  
    boxes = out_img['boxes']
    all_keys = out_img['keypoints']
    all_scrs = out_img['keypoints_scores']
    confidence = out_img['scores']
    frame_keys = all_keys[mask_obj][:,:,:2].cpu().numpy()
    frame_scores = all_scrs[mask_obj].cpu().numpy()
    frame_confs = confidence[mask_obj].cpu().numpy()
    frame_boxes = boxes[mask_obj].cpu().numpy()

    skeletal_mdl = draw_skeleton_per_person(
        img[:,:,::-1], 
        frame_keys,
        frame_scores,
        frame_confs, frame_boxes,
        rd=6, th=3) 

    grl_frame.append(skeletal_mdl)

100%|██████████| 90/90 [00:32<00:00,  2.80it/s]


Converting a list of frames to a video file and saving it.

In [7]:
video_out = cv2.VideoWriter(
    "out_video.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 29.97002997002997, (1920, 1080))
for frame in grl_frame:
    video_out.write(frame)
video_out.release()