## 1. Load pre-trained model

In [3]:
import torch
import torchvision
import torch.nn as nn
import numpy as np
import cv2
from PIL import Image

from torchvision import transforms
from torch.utils.data import DataLoader

import pytorch_lightning
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import torch.nn.functional as F
import torchmetrics

layers=[64,64,'M',128,128,'M',256,256,256,'M',512,512,512,'M',512,512,512,'M']
device="mps"

torch.manual_seed(1442)
#torch.cuda.manual_seed(1442)


layers=[64,64,'M',128,128,'M',256,256,256,'M',512,512,512,'M',512,512,512,'M']


class vgg16(pytorch_lightning.LightningModule):
    def __init__(self,in_channel=1, num_classes=7,dropout=0.5,lr=3e-4):
        super().__init__()

        self.in_channel=in_channel
        self.num_classes=num_classes
        self.lr=lr

        self.dropout=nn.Dropout(dropout)

        # VGG16 backbone
        self.backbone=[]
        for layer in layers:
            if layer=='M':
                self.backbone+=[nn.MaxPool2d(kernel_size=2,stride=2)]
            else:
                self.backbone+=[nn.Conv2d(in_channel,layer,kernel_size=3,padding=1),
                                nn.BatchNorm2d(layer),
                                nn.ReLU(inplace=True)]
                in_channel=layer

        self.backbone+=[nn.AdaptiveAvgPool2d((1,1))]
        self.backbone=nn.Sequential(*self.backbone)

        # classifier
        self.classifier=nn.Linear(512,num_classes)

        # acc metrics
        self.acc=torchmetrics.classification.Accuracy(task="multiclass",num_classes=num_classes)

        # initialize parameters
        self.apply(self._init_weights)

    def _init_weights(self,module):
        if isinstance(module,nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.2, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module,nn.Embedding):
            torch.nn.init.normal_(module.weight,mean=0.0,std=0.02)

    def training_step(self,batch,batch_idx):
        # return loss
        x,y=batch
        preds=self.forward(x) # preds = logits
        loss=F.cross_entropy(preds,y)
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc",self.acc(preds,y))
        return loss

    def validation_step(self,batch,batch_idx):
        x,y=batch
        # x=[128,10,1,44,44] -> take average prediction over 10 subimages
        x=x.transpose(0,1) # [10,128,1,44,44]
        loss=0.0
        preds=torch.zeros(x.size(1),self.num_classes,device=device)  # [128,7]
        for i in range(x.size(0)):
            # prediction and loss for subimage i
            preds+=self.forward(x[i])
            loss+=F.cross_entropy(preds,y)
        loss/=x.size(0)
        preds/=x.size(0)
        self.log("val_loss",loss,prog_bar=True)
        self.log("val_acc",self.acc(preds,y),prog_bar=True)

    def test_step(self,batch,batch_idx):
        x,y=batch
        # x=[128,10,1,44,44] -> take average prediction over 10 subimages
        x=x.transpose(0,1) # [10,128,1,44,44]
        loss=0.0
        preds=torch.zeros(x.size(1),self.num_classes,device=device)  # [128,7]
        for i in range(x.size(0)):
            # prediction and loss for subimage i
            preds+=self.forward(x[i])
            loss+=F.cross_entropy(preds,y)
        loss/=x.size(0)
        preds/=x.size(0)
        self.log("test_loss",loss)
        self.log("test_acc",self.acc(preds,y))


    def forward(self,x):
        # x= a batch of data = [128,1,44,44]

        # backbone
        x=self.backbone(x)           # [128,512,1,1]
        x=x.squeeze(-1).squeeze(-1)  # [128,512]

        # dropout
        x=self.dropout(x)            # [128,512]
        # classifier
        x=self.classifier(x)         # [128,7]

        return x


    def configure_optimizers(self):
        
        optimizer=torch.optim.AdamW(self.parameters(),lr=self.lr)
        scheduler=torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,mode="min",cooldown=5,
                                                             patience=10,min_lr=1e-6,factor=0.1)
        return {"optimizer": optimizer,
                "lr_scheduler": {"scheduler": scheduler,
                                "monitor": "train_loss",
                                "interval": "epoch", # default
                                "frequency": 1       # default
                                }
                }


In [4]:
from data import test_loader

# load model from checkpoint path
ckpt_path="epoch=87-step=19800.ckpt"
model=vgg16.load_from_checkpoint(ckpt_path)
print(f"{sum(p.numel() for p in model.parameters())/1e6} million parameters")

# check performance
trainer=pytorch_lightning.Trainer()
trainer.test(model,test_loader)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


14.725575 million parameters


/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:424: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=9` in the `DataLoader` to improve performance.


Testing: |                                                | 0/? [00:00<?, ?it/s]

  tp = tp.sum(dim=0 if multidim_average == "global" else 1)


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc            0.7249930500984192
        test_loss            9.008393287658691
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 9.008393287658691, 'test_acc': 0.7249930500984192}]

## 2. Video 

In [5]:
# for running inference
transform_test=transforms.Compose([
    transforms.TenCrop(44),
    transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
])

# list of emotions
emotions = {
    0: ['Angry', (0,0,255), (255,255,255)],
    1: ['Disgust', (0,102,0), (255,255,255)],
    2: ['Fear', (255,255,153), (0,51,51)],
    3: ['Happy', (153,0,153), (255,255,255)],
    4: ['Sad', (255,0,0), (255,255,255)],
    5: ['Surprise', (0,255,0), (255,255,255)],
    6: ['Neutral', (160,160,160), (255,255,255)]
}

In [None]:
import cv2
import face_recognition

from time import time
from sort import Sort

from tqdm.auto import tqdm

# draw boxes and put text 
def draw_boxes(frame,bboxes,preds):
    # bbox = list of facial-region coordinates in format (left,top,right,bottom)
    # preds= list of emotions (predictions by the model)
    
    # draw boxes around facial regions
    for bbox,pred in zip(bboxes,preds):

        left,top,right,bottom=int(bbox[0]),int(bbox[1]),int(bbox[2]),int(bbox[3])
        cv2.rectangle(frame,(left,top), (right,bottom), 
                      emotions[pred][1],2)
        cv2.rectangle(frame,(left,top-30), (right,top), 
                      emotions[pred][1],-1)
        cv2.putText(frame, f'{emotions[pred][0]}', 
                    (left,top-5), 
                    0, 0.7, emotions[pred][2],2)

    return frame

In [None]:
def infer_video(path):
    
    cap=cv2.VideoCapture(path)
    
    width,height=int(cap.get(3)), int(cap.get(4))
    fps=cap.get(cv2.CAP_PROP_FPS)
    
    num_frames=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Number of frames: {num_frames} | fps: {fps}" )
    
    # create codec to write video
    out=cv2.VideoWriter(path[:-4]+'_out.mp4', cv2.VideoWriter_fourcc(*"mp4v"),
                        fps, (width, height))
    
    # initiate tracker sort: if we loose an object for 30 frames, discard the track
    #                           start a tracker if the object has appeared in 3 frames
    sort=Sort(max_age=30,min_hits=3,iou_threshold=0.3)
    
    # progress bar
    pbar=tqdm(total=num_frames)
    i=0
    
    # read frame, make inference and write it in a video
    while True:
        
        i+=1
        pbar.update(1)
        
        success,frame=cap.read()
        if not success:
            break
        
        # get prediction
        detections=[]  # list [bbox,score]
        predictions=[] # list of prediction for frame
        
        locations=face_recognition.face_locations(frame)       # all face locations
        
        # get predictions for all faces
        with torch.no_grad():
            for (top,right,bottom,left) in locations:
                face=frame[top:bottom,left:right,:]
                
                # convert to gray scale, resize and do transform_test
                face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)           # [W,H]
                face_gray = cv2.resize(face_gray, (48, 48)).astype(np.uint8)                  # [48,48]
                face_gray = transform_test(Image.fromarray(face_gray))     
                # [10,1,44,44]
                
                # make prediction
                model.eval()
                pred=model(face_gray)                                  # [10,7]
                pred=pred.mean(axis=0)                                 # [7,]
                scores=torch.nn.functional.softmax(pred,dim=-1)
                
                pred=scores.argmax().item()
                score=scores[pred].item()
                
                # append result to detection list
                merged_detection=[left,top,right,bottom,score]
                
                detections.append(merged_detection)
                predictions.append(pred)
    
        detections=np.asarray(detections)
        predictions=np.asarray(predictions)
        
        # sort tracking
        
        # handle no detection in frame 
        if len(detections)==0:        
            detections=np.empty((0,5))

        results=sort.update(detections) # new positions + scores 
        bboxes=results[:,:-1]           # extract new positions
        
        # sort tracking
#         results=sort.update(detections)
#         bboxes=results[:,:-1]
        
        # draw bounding box and write frame into video
        p_frame=draw_boxes(frame,bboxes,predictions)
        out.write(p_frame)
        

        
    cap.release()
    out.release()
    cv2.destroyAllWindows()

In [None]:
# def infer_video(path):
    
#     cap=cv2.VideoCapture(path)
    
#     width,height=int(cap.get(3)), int(cap.get(4))
#     fps=cap.get(cv2.CAP_PROP_FPS)
    
#     num_frames=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#     print("Number of frames:", num_frames)
    
#     # create codec to write video
#     out=cv2.VideoWriter(path[:-4]+'_out.mp4', cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    
#     # initiate tracker sort
#     sort=Sort(max_age=10,min_hits=3,iou_threshold=0.3)
    
#     # progress bar
#     pbar=tqdm(total=num_frames)
#     i=0
    
#     # read frame, make inference and write it in a video
#     while True:
        
#         i+=1
#         pbar.update(1)
        
#         success,frame=cap.read()
#         if not success:
#             break
        
#         # get prediction
#         detections=[]  # list [bbox,score]
#         predictions=[] # list of prediction for frame
        
#         locations=face_recognition.face_locations(frame)       # all face locations
        
#         # get predictions for all faces
#         with torch.no_grad():
#             for (top,right,bottom,left) in locations:
#                 face=frame[top:bottom,left:right,:]
                
#                 # convert to gray scale, resize and do transform_test
#                 face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)           # [W,H]
#                 face_gray = cv2.resize(face_gray, (48, 48)).astype(np.uint8)                  # [48,48]
#                 face_gray = transform_test(Image.fromarray(face_gray))       # [10,1,44,44]
                
#                 # make prediction
#                 model.eval()
#                 pred=model(face_gray)                                  # [10,7]
#                 pred=pred.mean(axis=0)                                 # [7,]
#                 scores=torch.nn.functional.softmax(pred,dim=-1)
                
#                 pred=scores.argmax().item()
#                 score=scores[pred].item()
                
#                 # append result to detection list
#                 merged_detection=[left,top,right,bottom,score]
                
#                 detections.append(merged_detection)
#                 predictions.append(pred)
    
#         detections=np.asarray(detections)
#         predictions=np.asarray(predictions)
        
#         # new detections by sort
#         results=sort.update(detections)
#         bboxes=results[:,:-1]
        
#         # draw bounding box and write frame into video
#         p_frame=draw_boxes(frame,bboxes,predictions)
#         out.write(p_frame)
        

        
#     cap.release()
#     out.release()
#     cv2.destroyAllWindows()

In [None]:
path="inference/baby.mp4"

start_time=time()
infer_video(path)
end_time=time()

print(f"Conversion done: {(end_time-start_time)/60:.2f} minutes")

In [None]:
import moviepy
from moviepy.editor import *

# combine video and original audio (optional)
def combine_video_audio(video): # video = path of original clip
    # read the video
    processed_path=video[:-4]+"_out.mp4"
    clip=VideoFileClip(processed_path)
    # extract audio file from the original video
    audioclip=moviepy.editor.VideoFileClip(video).audio
    # set audio for clip
    videoclip=clip.set_audio(audioclip)
    # new file 
    new_name=video[:-4]+"_with_audio_out.mp4"
    
    videoclip.write_videofile(new_name, codec='libx264', audio_codec='aac',remove_temp=True)
    
combine_video_audio(path)

## 3. Live webcam

In [6]:
import cv2
import face_recognition

from time import time
from sort import Sort


import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

class EmotionDetection:
    def __init__(self,capture_index):
        
        self.capture_index=capture_index
        self.model=model
        self.emotions=emotions
    
    
    @torch.no_grad()
    def predict(self,frame):
        # return a list [bbox,pred] with bbox=[left,top,width,height]
        
        detections=[]  # list [bbox,pred] with bbox=top,left,right,bottom
        predictions=[] # list of emotions
        
        self.model.eval()
        face_locations=face_recognition.face_locations(frame)

        # give predictions to facial regions 
        for (top,right,bottom,left) in face_locations:

            # extract facial region
            face=frame[top:bottom,left:right,:]

            # convert to gray scale and resize
            gray=cv2.cvtColor(face,cv2.COLOR_BGR2GRAY)
            gray=cv2.resize(gray,(48,48)).astype(np.uint8)

            # randomly crop the face image into 10 subimages
            inputs=transform_test(Image.fromarray(gray))           #[10,1,44,44]
            
            # make prediction
            pred=model(inputs)                                     # [10,7]
            pred=pred.mean(axis=0)                                 # [7,]

            scores=torch.nn.functional.softmax(pred,dim=-1)
            pred=scores.argmax().item()

            score=scores[pred].item()

            # append result to detection list
            merged_detection=np.array([left,top,right,bottom,score])

            detections.append(merged_detection)
            predictions.append(pred)
            
            
        return np.array(detections), np.array(predictions)


    def draw_boxes(self,frame,bboxes,preds):
        # draw boxes around facial regions
        for bbox,pred in zip(bboxes,preds):
            
            left,top,right,bottom=int(bbox[0]),int(bbox[1]),int(bbox[2]),int(bbox[3])
            
            # draw rectangle and put texts around the faces
            cv2.rectangle(frame,(left,top), (right,bottom), self.emotions[pred][1],2)
            cv2.rectangle(frame,(left,top-50), (right,top), self.emotions[pred][1],-1)
            cv2.putText(frame, f'{self.emotions[pred][0]}', (left,top-5), 0, 1.5, self.emotions[pred][2],2)
            
        return frame
    
    def __call__(self):
        cap=cv2.VideoCapture(self.capture_index)
        assert cap.isOpened()
        
        # initiate tracker sort
        sort=Sort(max_age=30,min_hits=5,iou_threshold=0.3)
        # if we loose an object for 30 frames, discard the track
        # start a tracker if the object has appeared in 5 frames
        
        while True:
            start_time=time()
            
            ret,frame=cap.read()
            assert ret
            
            # resize 1/4 frame for faster processing
            small_frame=cv2.resize(frame,(0,0),fx=0.25,fy=0.25)
            detections, predictions=self.predict(small_frame)            

            # times 4 to get back original positions
            for i in range(len(detections)):
                detections[i][:4]*=4 
            
            
            # sort tracking
            if len(detections)==0:        # no detection in frame
                detections=np.empty((0,5))
                
            results=sort.update(detections) # new positions for bounding boxes
            
            bboxes=results[:,:-1]
            
            frame=self.draw_boxes(frame,bboxes,predictions)
            
            end_time=time()
            fps = 1/np.round(end_time - start_time, 2)
             
            cv2.putText(frame, f'FPS: {int(fps)}', (20,70), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0,255,0), 2)
            
            cv2.imshow('Emotion Detection - Tracked', frame)
 
            if cv2.waitKey(1) & 0xFF == 27:
                break
        
        cap.release()
        cv2.destroyAllWindows()
        



In [None]:
detector=EmotionDetection(capture_index=0)
detector()

