See original notebook : https://colab.research.google.com/drive/1vm6n4ZvOf7RpZhGvA6p5JE0QcRYYkJXx

# Dataset

Download dataset and required files from https://drive.google.com/drive/folders/1AXV-CXs4D4fn8ub8oBV4G6mB6viqBJbm?usp=sharing before running. Put the `RT-multiperson-pose-pytorch` folder in the same folder of this notebook.

#Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields

This notebook is exploring the algorithm for human pose estimation OpenPose. The original repository is in https://github.com/CMU-Perceptual-Computing-Lab/openpose and the pytorch implementation used here comes from  https://github.com/tensorboy/pytorch_Realtime_Multi-Person_Pose_Estimation.

This notebook is based on the original notebook by Sao Mai Nguyen.

Please make sure that the mounted Google Drive contains the needed ressources and data for the project. All the cells of the "Settings" part need to be executed in order to run the code of the project.

In [6]:
# Libraries
# Colab libraries
from google.colab import drive
from google.colab import output
drive.mount('/content/gdrive')
colab_path = "/content/gdrive/My Drive/Colab Notebooks/Projects_for_Github" # CHANGE PATH TO FIND

# Basis libraries
import os, re, sys, math, time, scipy, argparse
import cv2, matplotlib
import matplotlib.gridspec as gridspec
import numpy as np
import pylab as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from collections import OrderedDict
from scipy.ndimage.morphology import generate_binary_structure
from scipy.ndimage.filters import gaussian_filter, maximum_filter

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


## Settings

First, we install the necessary packages

In [None]:
# Independence install
!sudo apt-get install swig


# Repository compile
%cd $colab_path"/RT-multiperson-pose-pytorch"
%cd lib/pafprocess 
!sh make.sh

# Libraries install
%cd $colab_path"/RT-multiperson-pose-pytorch"
!python -m pip install -r ./requirements.txt
!pip3 install numpngw

print("[INFO]: Project requirements installed successfully")

Now, we can import others libraries use in the project, contained in the repository.

In [8]:
# Framework libraries
%cd $colab_path"/RT-multiperson-pose-pytorch"
sys.path.append('.'); sys.argv=['']
from lib.network.rtpose_vgg import get_model 
from lib.network import im_transform
from evaluate.coco_eval import get_outputs, handle_paf_and_heat
from lib.utils.common import Human, BodyPart, CocoPart, CocoColors, CocoPairsRender, draw_humans
from lib.utils.paf_to_pose import paf_to_pose_cpp
from lib.config import cfg, update_config
from torchsummary import summary
from evaluate.coco_eval import get_outputs, handle_paf_and_heat, run_eval

/content/gdrive/My Drive/Colab Notebooks/Projects_for_Github/RT-multiperson-pose-pytorch


And update the variables space.

In [9]:
###########NETWORK CONFIG ############
class Namespace:
  def __init__(self, **kwargs):
    self.__dict__.update(kwargs)

# update config file
args = Namespace(cfg = './experiments/vgg19_368x368_sgd.yaml', weight = 'pose_model.pth', opts = [])
update_config(cfg, args)

In [10]:
# Other import
from google.colab.patches import cv2_imshow
from IPython.display import Image
from numpngw import write_apng

### ***Model extraction***

Initially, the input dimension image () is introduced into the first 10 layers of the CNN VGG-19 model, which is normally used for image classification, and produces a set of feature maps F that is input to the first stage of each branch.

In the following code fragment you can see the detail of the network

In [None]:
model = get_model('vgg19')   
model.load_state_dict(torch.load(args.weight))
model = torch.nn.DataParallel(model).cuda()
model.float()
model.eval()

# Part 1 : Blur Face


### Detect-and-blur function

In the following cells, the function `blur_faces` detects and blurs the faces in a unique given frame. It uses default model and configuration as default arguments.

To blur the faces, we seek for each detected human the position and radius of a circle which contains his or her head.
- The position of the center is given by the trained model, with keypoint number 0 which directly corresponds to the estimated position of the head / the nose.
- The radius of the circle is based on distances between different keypoints. We take the maximum of :
  - the length of the neck (between keypoints 0 and 1)
  - the distance between the ears (keypoints 16 and 17)
  - 75% of the distance between both shoulder (keypoints 2 and 5)

This apparently complicated formula simply ensures that the value of the radius is big enough to anonimize the people *no matter the point of view*. For example, the length of the neck of person seen from above could be considered as null. On the contrary, a side view may align ears and shoulders (the keypoints are very close to each other), but let the neck visible.

In [12]:
# Distance functions between different
# parts of the body

def dist_neck(human):
  """Return the length of the neck"""
  if 0 not in human.body_parts.keys():
    return 0
  if 1 not in human.body_parts.keys():
    return 0
  head = human.body_parts[0] # 0 for the head/nose
  m_shoulder = human.body_parts[1] # 1 for the neck
  length = int(np.sqrt(
    ((m_shoulder.x-head.x)*frame_width)**2
    + ((m_shoulder.y-head.y)*frame_height)**2
  ))
  return length

def dist_ears(human):
  """Return the distance between the two ears"""
  if 16 not in human.body_parts.keys():
    return 0
  if 17 not in human.body_parts.keys():
    return 0
  r_ear = human.body_parts[16] # 2 for the right ear
  l_ear = human.body_parts[17] # 5 for the left ear
  length = int(np.sqrt(
    ((l_ear.x-r_ear.x)*frame_width)**2
    + ((l_ear.y-r_ear.y)*frame_height)**2
  ))
  return length

def dist_shoulders(human):
  """Return the distance between the shoulders"""
  if 2 not in human.body_parts.keys():
    return 0
  if 5 not in human.body_parts.keys():
    return 0
  r_shoulder = human.body_parts[2] # 2 for the right shoulder
  l_shoulder = human.body_parts[5] # 5 for the left shoulder
  length = np.sqrt(
    ((l_shoulder.x-r_shoulder.x)*frame_width)**2
    + ((l_shoulder.y-r_shoulder.y)*frame_height)**2
  )
  return length

In [13]:
# Face-blurring function for a single frame

def blur_faces(oriImg,
      frame_width, frame_height,
      model=model, cfg=cfg):
    # Detect points of interest
    with torch.no_grad():
      paf, heatmap, imscale = get_outputs(
        oriImg, model, 'rtpose'
      )
    humans = paf_to_pose_cpp(heatmap, paf, cfg)

    # Iterate on each detected human to build the mask
    mask = np.zeros((frame_height, frame_width), dtype=np.uint8)
    for human in humans:
      # Center : Find the head
      if 0 not in human.body_parts.keys():
        continue
      head = human.body_parts[0] # 0 is the index for the head/nose
      center = (
        int(head.x * frame_width + 0.5),
        int(head.y * frame_height + 0.5)
      )
      # Radius : maximum of different lengths of the body
      ears_radius = dist_ears(human)
      neck_radius = dist_neck(human)
      shoulder_radius = 0.75 * dist_shoulders(human)
      radius = int(max(ears_radius, neck_radius, shoulder_radius))

      # Add the blurred area to the mask
      cv2.circle(mask, center, radius, (255, 255, 255), -1)

    # Compute a blurred version of the frame
    blurredArray = cv2.GaussianBlur(oriImg, (15, 15), 11)
    # Build the blurred-faces frame
    mask = mask / 255.0
    mask = np.expand_dims(mask, axis=-1)
    image = (1.0 - mask) * oriImg + mask * blurredArray
    image = image.astype(np.uint8)

    return image

### Run the experiments

First of all, select a video to be blurred. If you choose to activate the test mode, only the ten first frames of the video will be considered.

Then, you can run the next cell to launch the operation.

In [14]:
#@markdown Parameters :
filename = "./dataTpPoseKeraal/ctk/data1/Vid003.mp4" #@param ["./dataTpPoseKeraal/ctk/data1/Vid003.mp4", "./dataTpPoseKeraal/ctk/data2/Vid0015.mp4", "./dataTpPoseKeraal/ctk/data3/VideoColorCorrect0.mp4", "./dataTpPoseKeraal/rtk/data4/Vid008.mp4", "./dataTpPoseKeraal/rtk/data5/Vid021.mp4", "./dataTpPoseKeraal/rtk/data6/VideoColorCorrect0.mp4"]
TEST_MODE = False #@param {type:"boolean"}


In [15]:
# Main cell
## Initialization

images =[]
video_capture = cv2.VideoCapture(filename)

# Set resolutions for the output file
frame_width = int(video_capture.get(3))
frame_height = int(video_capture.get(4))
size = (frame_width, frame_height)

# Prepare output
result = cv2.VideoWriter(
  './filename.avi',  
  cv2.VideoWriter_fourcc(*'MJPG'), 
  10,
  size
)

## Main loop : loop over the frames

i = 1 # frame index
while video_capture.isOpened():
  # Capture frame-by-frame
  is_success, oriImg = video_capture.read()
  if not is_success:
    break

  # Generate final frame, with blurred faces
  image = blur_faces(oriImg, frame_width, frame_height)

  # Save the frame for output
  output.clear()
  images.append(image)
  result.write(image)

  # Early stopping if in test mode :
  if TEST_MODE and i >= 10:
    break
  i += 1

# Save output
video_capture.release()
cv2.destroyAllWindows()

In [16]:
# Display the result
write_apng('outvideo.png', images, delay=20)
Image(filename='outvideo.png')

The algorithm shows globally satisfying results. We are no longer able to identify the faces of the people in the videos.

Some issues may still occur in the first video, for example. When the third person hides his face, the skeleton recognition is disturbed and it takes time to blur again, at the end. A trick to implement could be to compare each head found with the heads of previous frame. After a meticulous sort, we can identity missing ones and blur the same areas from previous analysis, with the hypothesis that it does not move a lot.

Furthermore, a better rendering could by obtained by smoothing the position and radius of blurring circles between successive frames. Nevertheless, the current method used to compute the radius is based on the skeleton size : it adapts rather efficiently to sudden changes and is not too noisy if the movements of the people are smooth enough.

# Part 2 Classify the movements

There are 3 videos. We use 2 of them to train a model, and the 3rd video for testing.

We refer to the pytorch tutorial on sequence models in https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html and the documentation on torch.nn on https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html

In [None]:
#Project2 : Use one of the representations of movements to model the two exercises and classify them.

# Importing modules
from tensorflow.keras import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dropout, Flatten, Dense, LSTM
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras import Sequential
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import cv2

if colab_path[-1] != "/" :
  colab_path = colab_path + "/"

# Importing videos
videos_link = [
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/ctk/data1/Vid003.mp4',
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/ctk/data2/Vid015.mp4', 
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/ctk/data3/VideoColor_Correct0.avi', 
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/rtk/data4/Vid008.mp4', 
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/rtk/data5/Vid021.mp4', 
  'RT-multiperson-pose-pytorch/dataTpPoseKeraal/rtk/data6/VideoColor_Correct0.avi'
  ]
ctk_videos = []
rtk_videos = []

for video_link in videos_link :
  if "ctk" in video_link :
    ctk_videos.append(colab_path + video_link)
  elif "rtk" in video_link :
    rtk_videos.append(colab_path + video_link)
  else :
    print("Erreur sur les liens")
    break

# Building data sets and labels sets
data = []
labels = []
test_ctk = []
test_rtk = []

# CTK for training
for link in ctk_videos[0:2] :
  video = cv2.VideoCapture(link)
  frame_width = int(video.get(3)) 
  frame_height = int(video.get(4)) 
  size = (frame_width, frame_height)
  while video.isOpened():
    label = 'ctk'
    success, image_raw = video.read()
    if not success :
      break
    image = cv2.resize(cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB), size)
    data.append(image)
    labels.append(label)

# CTK for testing
link = ctk_videos[2]
video = cv2.VideoCapture(link)
while video.isOpened():
  success, image_raw = video.read()
  if not success :
    break
  image = cv2.resize(cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB), size)
  test_ctk.append(image)
test_ctk = np.array(test_ctk)

# RTK for training
for link in rtk_videos[0:2] :
  video = cv2.VideoCapture(link)
  frame_width = int(video.get(3)) 
  frame_height = int(video.get(4)) 
  size = (frame_width, frame_height)
  while video.isOpened():
    label = 'rtk'
    success, image_raw = video.read()
    if not success :
      break      
    image = cv2.resize(cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB), size)
    data.append(image)
    labels.append(label)

# RTK for testing
link = rtk_videos[2]
video = cv2.VideoCapture(link)
while video.isOpened():
  success, image_raw = video.read()
  if not success :
    break      
  image = cv2.resize(cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB), size)
  test_rtk.append(image)
test_rtk = np.array(test_rtk)

# Building training and validation set for training
data = np.array(data)
# One-Hot encoding of labels
OneHotLabels = np.array([
     [1,0] if label == 'ctk' else [0,1] for label in np.array(labels)
     ])
# Splits of training and testing sets
(trainX, testX, trainY, testY) = train_test_split(
                                  data, 
                                  OneHotLabels, 
                                  test_size=0.2, 
                                  stratify=OneHotLabels, 
                                  random_state=42
                                 )

We build the dataset so each sample corresponds to a frame.
The proportion of split for training and validation set is 80%/20%.
The _stratify_ option is activated so the split is done in a way that preserves the same proportions of examples in each class, here we pass the one-hot encoding of the labels as parameter so the proportion of frames representing one movement is 50% of training and testing datasets.

The model is implemented using Keras and it is inspired from the neural network refered in :

_Patrice Ferlet_ "Training a neural network with an image sequence — example with a video as input" _Smile Innovation, Nov 2019_ [link](https://medium.com/smileinnovation/training-neural-network-with-image-sequence-an-example-with-video-as-input-c3407f7a0b0f)

In [None]:
# Model
InputModel = ResNet50(weights="imagenet", 
                      include_top=False, 
                      input_tensor=Input(shape=(frame_height, frame_width, 3))
             )

input_layer = InputModel.output
averagepool_layer = AveragePooling2D(pool_size=(7, 7))(input_layer)
flatten_layer = Flatten(name="flatten")(averagepool_layer)
dense_layer = Dense(512, activation="relu")(flatten_layer)
dropout_layer = Dropout(0.5)(dense_layer)
output_layer = Dense(2, activation="softmax")(dropout_layer)

model = Model(inputs=InputModel.input, outputs=output_layer)

for layer in InputModel.layers :
  layer.trainable = False

In [None]:
# Parameters 
nb_epochs = 3
learning_rate = 1e-5
momentum = 0.9
weight_decay = 1e-5 / nb_epochs

In [23]:
# Defining optimizer and compiling model
optim = SGD(lr=learning_rate, momentum=momentum, decay=weight_decay)
model.compile(loss="binary_crossentropy", optimizer=optim, metrics=["accuracy"])

  super(SGD, self).__init__(name, **kwargs)


In [None]:
# Fitting model on data and plotting training metrics
fitted_model = model.fit(x=trainX,
                         y=trainY,
                         steps_per_epoch=len(trainX) // 64, 
                         validation_data=(testX, testY), 
                         epochs=nb_epochs
               )
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, nb_epochs),
         fitted_model.history["loss"], 
         label="train_loss")
plt.plot(np.arange(0, nb_epochs), 
         fitted_model.history["val_loss"], 
         label="validation_loss")
plt.plot(np.arange(0, nb_epochs), 
         fitted_model.history["accuracy"], 
         label="train_accuracy")
plt.title("Loss and Accuracy")
plt.xlabel("Nb_epochs")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")

We use the model we trained on the two first videos to classify the third video

In [None]:
# Testing
predictions_ctk = model.predict(x=test_ctk.astype("float32"), batch_size=32)
predictions_rtk = model.predict(x=test_rtk.astype("float32"), batch_size=32)

if np.argmax(predictions_ctk.mean(axis=0)) :
  result_ctk = "ctk"
else :
  result_ctk = "rtk"
print("CTK video will be labelled", result_ctk)
print("Accuracy :", predictions_ctk.mean(axis=0)[1] * 100, "%")

print('\n')

if np.argmax(predictions_rtk.mean(axis=0)) :
  result_rtk = "ctk"
else :
  result_rtk = "rtk"
print("RTK video will be labelled", result_rtk)
print("Accuracy :", predictions_rtk.mean(axis=0)[0] * 100, "%")

The model should not be trained over a too high number of epochs as the network tends to overfit quickly, due to the fact that we don't have a large dataset of inputs. The negative effect is the weakness of the average predictions for each video, as the accuracies never exceed 60%.

The training is unstable and the stochastic aspect of it induces errors. It may not always build a model leading to the expected predictions. After several runs, the model seems to overfit even if we are building a new model each time. We observe that the accuracy at the end of the first epoch is higher, and then it is necessary to kill the kernel to perform a training from scratch.