# Fast MTCNN detector

This notebook demonstrates how to achieve 45 frames per second speeds for loading frames and detecting faces on full resolution videos.

## Algorithm

**Striding**: The algorithm used is a strided modification of MTCNN in which face detection is performed on only every _N_ frames, and applied to all frames. For example, with a batch of 9 frames, we could pass frames 0, 3, and 6 to MTCNN. Then, the bounding boxes (and potentially landmarks) returned for frame 0 would be naively applied to frames 1 and 2. Similarly, the detections for frame 3 are applied to frames 4 and 5, and the detections for frames 6 are applied to frames 7 and 8.

Although this assume that faces do not move between frames significantly, this is generally a good approximation for low stride numbers. If the stride is 3, we are assuming that the face does not significantly alter position for an additional 2 frames, or ~0.07 seconds. If faces are moving faster than this, they are likely to be extremely blurry anyway. Furthermore, ensuring that faces are cropped with a small margin mitigates the impact of face drift.

**Scale pyramid**: The algorithm uses a slightly smaller scaling factor (0.6 vs 0.709) than the original MTCNN algorithm to construct the scaling pyramid applied to input images. For details of the scaling pyramid, see the [original paper](https://arxiv.org/abs/1604.02878) for details of the scaling pyramid approach.

**Multi-threading**: A modest performance gain comes from loading video frames (with `cv2.VideoCapture`) using threading. This functionality is provided by the `FileVideoStream` class of the imutils package.

## Other resources

See the following kernel for a guide to using the MTCNN functionality of facenet-pytorch: https://www.kaggle.com/timesler/guide-to-mtcnn-in-facenet-pytorch

## Imports

In [12]:
from facenet_pytorch import MTCNN, InceptionResnetV1
from PIL import Image
import torch
from imutils.video import FileVideoStream
import cv2
import time
import glob
import numpy as np
import pandas as pd
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets,transforms
# from tqdm.notebook import tqdm

device = 'cuda' if torch.cuda.is_available() else 'cpu'
input_video_folder = './videos/*.avi'
images_folder = './frames/'
target_name = 'divyansh'
timestamped_frames = np.array([[0,tuple()]])
output_video_name = f"divyansh-lt-reborn-.85.avi"
output_fps = 23


  timestamped_frames = np.array([[0,tuple()]])


# Generate Face Embeddings

In [13]:
# # The model is running on CPU, since it is already pre-trained and doesnt require GPU
# device = 'cpu'
# print('Running on device: {}'.format(device))

#Define MTCNN module
#Since MTCNN is a collection of neural nets and other code, 
#The device must be passed in the following way to enable copying of objects when needed internally.
mtcnn = MTCNN(
    image_size=160, margin=0, min_face_size=20,
    thresholds=[0.6, 0.7, 0.7], factor=0.709,
    device="cpu"
)
#Function takes 2 vectors 'a' and 'b'
#Returns the cosine similarity according to the definition of the dot product
def cos_sim(a, b):
    dot_product = np.dot(a, b)
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    return dot_product / (norm_a * norm_b)

#cos_sim returns real numbers,where negative numbers have different interpretations.
#So we use this function to return only positive values.
def cos(a,b):
    minx = -1 
    maxx = 1
    return (cos_sim(a,b)- minx)/(maxx-minx)

# Define Inception Resnet V1 module (GoogLe Net)
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

# Define a dataset and data loader
dataset = datasets.ImageFolder(images_folder)
dataset.idx_to_class = {i:c for c, i in dataset.class_to_idx.items()}
loader = DataLoader(dataset, collate_fn=lambda x: x[0])

#Perfom MTCNN facial detection
#Detects the face present in the image and prints the probablity of face detected in the image.
aligned = []
names = []
for x, y in loader:
    x_aligned, prob = mtcnn(x, return_prob=True)
    if x_aligned is not None:
        print('Face detected with probability: {:8f}'.format(prob))
        aligned.append(x_aligned)
        names.append(dataset.idx_to_class[y])

# Calculate the 512 face embeddings
aligned = torch.stack(aligned).to(device)
embeddings = resnet(aligned).cpu()

# Print distance matrix for classes.
#The embeddings are plotted in space and cosine distace is measured.
cos_sim = nn.CosineSimilarity(dim=-1, eps=1e-6)
for i in range(0,len(names)):
    emb=embeddings[i].unsqueeze(0)
    # The cosine similarity between the embeddings is given by 'dist'.
    dist =cos(embeddings[0],emb)  
        
dists = [[cos(e1,e2).item() for e2 in embeddings] for e1 in embeddings]
# The print statement below is
#Helpful for analysing the results and for determining the value of threshold.
print(pd.DataFrame(dists, columns=names, index=names)) 

Face detected with probability: 0.999979
Face detected with probability: 0.999989
Face detected with probability: 0.999998
Face detected with probability: 0.999979
Face detected with probability: 1.000000
Face detected with probability: 0.999993
Face detected with probability: 0.999815
Face detected with probability: 0.999991
Face detected with probability: 0.999990
Face detected with probability: 0.999997
Face detected with probability: 0.999999
Face detected with probability: 0.999994
Face detected with probability: 1.000000
Face detected with probability: 0.999999
Face detected with probability: 0.999970
Face detected with probability: 0.999997
Face detected with probability: 0.999997
Face detected with probability: 0.999994
Face detected with probability: 0.999992
Face detected with probability: 0.999968
Face detected with probability: 0.999958
Face detected with probability: 0.999973
Face detected with probability: 0.999881
Face detected with probability: 0.999810
Face detected wi

## The FastMTCNN class

The class below is a thin wrapper for the MTCNN implementation in the `facenet-pytorch` package that implements the algorithm described above.

In [14]:
class FastMTCNN(object):
    """Fast MTCNN implementation."""
    
    def __init__(self, stride, resize=1, *args, **kwargs):
        """Constructor for FastMTCNN class.
        
        Arguments:
            stride (int): The detection stride. Faces will be detected every `stride` frames
                and remembered for `stride-1` frames.
        
        Keyword arguments:
            resize (float): Fractional frame scaling. [default: {1}]
            *args: Arguments to pass to the MTCNN constructor. See help(MTCNN).
            **kwargs: Keyword arguments to pass to the MTCNN constructor. See help(MTCNN).
        """
        self.stride = stride
        self.resize = resize
        self.mtcnn = MTCNN(*args, **kwargs)
        
    def __call__(self, frames, do_process):
        """Detect faces in frames using strided MTCNN."""
        if self.resize != 1:
            frames = [
                cv2.resize(f, (int(f.shape[1] * self.resize), int(f.shape[0] * self.resize)))
                    for f in frames
            ]
                      
        boxes, probs = self.mtcnn.detect(frames[::self.stride])
        
        faces = []
        for i, frame in enumerate(frames):
            # frames not processed here
            # if not do_process[i]: 
            #     continue
            box_ind = int(i / self.stride)
            if boxes[box_ind] is None:
                continue
            for box in boxes[box_ind]:
                box = [int(b) for b in box]
                faces.append(frame[box[1]:box[3], box[0]:box[2]])
        
        return faces

## Full resolution detection

In this example, we demonstrate how to detect faces using full resolution frames (i.e., `resize=1`).

In [15]:
fast_mtcnn = FastMTCNN(
    image_size=160, min_face_size=20,
    thresholds=[0.6, 0.7, 0.7],
    stride=4,
    resize=1,
    margin=14,
    factor=0.6,
    keep_all=True,
    device=device
)

# Recognize Faces   

In [16]:
def verify(faces): 
    for j,l in enumerate(faces):
        minDist = float("-inf")
        ansi, ansj = 0, 0
        for i,k in enumerate(embeddings):
            #Computing Cosine distance.
            dist =cos(k,l)
            if(dist>minDist):
                minDist = dist
                ansi = i
                ansj = j                
        # Chosen threshold is 0.85. 
        #Threshold is determined after seeing the table in the previous cell.
        #Name of the person identified is printed on the screen, as well as below the detecetd face (below the rectangular box).
        if minDist > 0.85:
            text=names[ansi]
            # cv2.putText(im, text,(boxes[ansj][0].astype(int) ,boxes[ansj][3].astype(int) + 17), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (255,255,255), 2)
            if text==target_name: 
                return True


    return False

In [17]:
# The model is running on CPU, since it is already pre-trained and doesnt require GPU
# Define Inception Resnet V1 module (GoogLe Net)
resnet = InceptionResnetV1(pretrained='vggface2').eval().to('cpu')
transform = transforms.Compose([transforms.PILToTensor()])
def recognize_faces(faces):
    # generate face embeddings
    face_embeddings = []
    for face in faces:
        face_embeddings.append(resnet(transform(face))) 
    return verify(face_embeddings)


# Background Subtraction

In [18]:
def background_subtraction(previous_frame, frame_resized_grayscale, min_area):
    """
    This function returns 1 for the frames in which the area
    after subtraction with previous frame is greater than minimum area
    defined.
    Thus expensive computation of human detection face detection
    and face recognition is not done on all the frames.
    Only the frames undergoing significant amount of change (which is controlled min_area)
    are processed for detection and recognition.
    """
    frameDelta = cv2.absdiff(previous_frame, frame_resized_grayscale)
    thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
    # cv2.imshow("Thresh",thresh)
    # cv2.waitKey(200)
    thresh = cv2.dilate(thresh, None, iterations=2)
    # cv2.imshow("Thresh dialtetd",thresh)
    # cv2.waitKey(200)
    countours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    for c in countours:
        # if the contour is too small, ignore it
        if cv2.contourArea(c) > min_area:
            return True
    return False

In [19]:
def run_detection(fast_mtcnn, filenames):
    frames = []
    frames_processed = 0
    faces_detected = 0
    batch_size = 15
    start = time.time()
    do_process = []
    

    for filename in filenames:

        v_cap = FileVideoStream(filename).start()
        v_len = int(v_cap.stream.get(cv2.CAP_PROP_FRAME_COUNT))
        assert v_len>0, "Corrupt video found"
        prev_frame = v_cap.read()
        prev_frame_grey = cv2.cvtColor(prev_frame,cv2.COLOR_BGR2GRAY)
        # min area for background subtraction
        min_area = (3000 / 1280) * prev_frame.shape[1]


        for j in range(v_len):
            frame = v_cap.read()
            # background subtraction
            frame_grey = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)
            do_process.append(background_subtraction(prev_frame_grey,frame_grey,min_area))
            prev_frame_grey = frame_grey

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)

            if len(frames) >= batch_size or j == v_len - 1:

                faces = fast_mtcnn(frames,do_process)

                frames_processed += len(frames)
                faces_detected += len(faces)
                
                if recognize_faces(faces):
                        timestamped_frames = np.append(timestamped_frames,[j,tuple(frames)])


                do_process = []
                frames = []
                print(
                    f'Frames per second: {frames_processed / (time.time() - start):.3f},',
                    f'faces detected: {faces_detected}\r',
                    end=''
                )

        v_cap.stop()

filenames = glob.glob(input_video_folder)
run_detection(fast_mtcnn, filenames)

TypeError: pic should be PIL Image. Got <class 'numpy.ndarray'>

# Frame Stitching

In [None]:
# import numpy as np
# np.random.shuffle(timestamped_frames)
timestamped_frames = timestamped_frames[timestamped_frames[:,0].argsort()]
print([timestamped_frames[:,0]])
assert len(timestamped_frames)>1, "No person detected"
height, width, layers = timestamped_frames[3][-1][-1].shape
# Decaaring video writer
writer = cv2.VideoWriter(output_video_name,cv2.VideoWriter_fourcc(*'MPEG'),output_fps,(width,height))
assert writer, "Error in creating video writer"
            
for i,name,frames in timestamped_frames[1:]:
    for frame in frames:
        if frame is not None: writer.write(frame)
writer.release()

[array([0], dtype=object)]


AssertionError: No person detected

## Half resolution detection

In this example, we demonstrate how to detect faces using half resolution frames (i.e., `resize=0.5`).

In [None]:
fast_mtcnn = FastMTCNN(
    stride=4,
    resize=0.5,
    margin=14,
    factor=0.5,
    keep_all=True,
    device=device
)

In [None]:
run_detection(fast_mtcnn, filenames)

Frames per second: 42.454, faces detected: 2178