# Inference on WSCNet for Facial Expressions

## Preparing the model

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torch==1.0.0
!pip install torchvision==0.2.2
!pip install opencv-python==4.1.2.30
!pip install youtube-dl
!pip install git+https://github.com/Cupcakus/pafy

In [3]:


from __future__ import print_function 
from __future__ import division

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler

import numpy as np
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from torch.autograd import Function, Variable
import matplotlib.pyplot as plt
import time
import os
import copy

from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


class ResNetWSL(nn.Module):
    
    def __init__(self, model, num_classes, num_maps, pooling, pooling2):
        super(ResNetWSL, self).__init__()
        self.features = nn.Sequential(*list(model.children())[:-2])
        self.num_ftrs = model.fc.in_features

        self.downconv = nn.Sequential(
            nn.Conv2d(2048, num_classes*num_maps, kernel_size=1, stride=1, padding=0, bias=True))
        
        self.GAP = nn.AvgPool2d(14)
        self.GMP = nn.MaxPool2d(14)
        self.spatial_pooling = pooling
        self.spatial_pooling2 = pooling2
        self.classifier = nn.Sequential(
            nn.Linear(4096, num_classes)
            )

    def forward(self, x):
        x = self.features(x)
        x_ori = x  
        # detect branch
        x = self.downconv(x) 
        x_conv = x              
        x = self.GMP(x)  #x = self.GMP(x)       
        x = self.spatial_pooling(x) 
        x = x.view(x.size(0), -1)
        # cls branch
        x_conv = self.spatial_pooling(x_conv) 
        x_conv = x_conv * x.view(x.size(0),x.size(1),1,1) 
        x_conv = self.spatial_pooling2(x_conv) 
        x_conv_copy = x_conv
        for num in range(0,2047):            
            x_conv_copy = torch.cat((x_conv_copy, x_conv),1) 
        x_conv_copy = torch.mul(x_conv_copy,x_ori)
        x_conv_copy = torch.cat((x_ori,x_conv_copy),1) 
        x_conv_copy = self.GAP(x_conv_copy)
        x_conv_copy = x_conv_copy.view(x_conv_copy.size(0),-1)
        x_conv_copy = self.classifier(x_conv_copy) #Add softmax activation!
        return x, x_conv_copy



class ClassWisePoolFunction(Function):
    def __init__(self, num_maps):
        super(ClassWisePoolFunction, self).__init__()
        self.num_maps = num_maps

    def forward(self, input):
        # batch dimension
        batch_size, num_channels, h, w = input.size()

        if num_channels % self.num_maps != 0:
            print('Error in ClassWisePoolFunction. The number of channels has to be a multiple of the number of maps per class')
            sys.exit(-1)

        num_outputs = int(num_channels / self.num_maps)
        x = input.view(batch_size, num_outputs, self.num_maps, h, w)
        output = torch.sum(x, 2)
        self.save_for_backward(input)
        return output.view(batch_size, num_outputs, h, w) / self.num_maps

    def backward(self, grad_output):
        input, = self.saved_tensors
        # batch dimension
        batch_size, num_channels, h, w = input.size()
        num_outputs = grad_output.size(1)

        grad_input = grad_output.view(batch_size, num_outputs, 1, h, w).expand(batch_size, num_outputs, self.num_maps,
                                                                               h, w).contiguous()
        return grad_input.view(batch_size, num_channels, h, w)




class ClassWisePool(nn.Module):
    def __init__(self, num_maps):
        super(ClassWisePool, self).__init__()
        self.num_maps = num_maps

    def forward(self, input):
        return ClassWisePoolFunction(self.num_maps)(input)

In [6]:
import os
import cv2
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as tt
import warnings
warnings.filterwarnings("ignore")

print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

face_classifier = cv2.CascadeClassifier("/content/drive/MyDrive/Seminar_CV/models/haarcascade_frontalface_default.xml")
class_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']


model = torch.load(r'/content/drive/MyDrive/Seminar_CV/self_trained_models/wscnet_full_50runs_120622.pt')#,map_location=torch.device('cpu'))
model.eval()

data_transforms = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(448),
        transforms.CenterCrop(448),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

PyTorch Version:  1.0.0
Torchvision Version:  0.2.2


### Make inference on pictures taken with your webcam

In [7]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode
from PIL import Image
import io

def take_photo(filename='photo.jpg', quality=1):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return Image.open(io.BytesIO(binary))


from google.colab.patches import cv2_imshow

In [8]:
def main():
    with torch.no_grad():
        while True:
            extracted_frame = take_photo() # Auslesen eines Frames des Videostreams
            extracted_frame = np.array(extracted_frame)
            extracted_frame = cv2.flip(extracted_frame, 1)
            extracted_frame_gray = cv2.cvtColor(extracted_frame, cv2.IMREAD_GRAYSCALE)
            faces = face_classifier.detectMultiScale(extracted_frame_gray, 1.3, 5) # Erkennen von Faces durch cv2 

            for (x, y, w, h) in faces:

                cv2.rectangle(extracted_frame, (x, y), (x + w, y + h), (255, 255, 255), 2) # Erzeugen einer Box um Gesichter
                ttens = data_transforms(extracted_frame[y : y + h, x : x + w]) # Transformieren der Bildauschnitte mit Gesichtern

                if ttens is not None:
                    ttens = ttens.to(device="cuda")
                    tens, pred = model(ttens[None]) # Prediction der Gesichter im WSCNet
                    pred = torch.max(pred, dim=1)[1].tolist()
                    label = class_labels[pred[0]]
                    label_position = (x, y)
                    cv2.putText(extracted_frame, label, label_position, cv2.FONT_HERSHEY_DUPLEX, 1,  (255, 255, 255), 2)              
                else:
                    cv2.putText(extracted_frame, "No Face Found", (20, 60), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2)

            cv2_imshow(extracted_frame)

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

    cap.release()
    cv2.destroyAllWindows()

In [9]:
main()

AttributeError: ignored

### Make inference on a YouTube Video and watch it with augmented inference

#### Paste your YouTube video link below:
Notify that your videos duration shouldnt be longer than 1min

In [10]:
url = 'https://www.youtube.com/watch?v=4H_CNtd4W7I'

#### Run all cells below, prediction may take a while

In [11]:
# Objekt fuer Videostream erzeugen
import pafy
import cv2
import time
from google.colab.patches import cv2_imshow
from IPython.display import clear_output

video = pafy.new(url)
best = video.getbest(preftype="mp4")
streams = video.allstreams
cap = cv2.VideoCapture(best.url)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
res=(int(width), int(height))
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('/content/output.avi', fourcc, 20.0, res) #open-cv speichert sonst nicht?
frame = None

with torch.no_grad():
    while True:
        try:
            is_success, extracted_frame = cap.read()
        except cv2.error:
            continue

        if not is_success:
            break
        #extracted_frame = cv2.flip(extracted_frame, 1)
        extracted_frame_gray = cv2.cvtColor(extracted_frame, cv2.IMREAD_GRAYSCALE)
        faces = face_classifier.detectMultiScale(extracted_frame_gray, 1.3, 5) # Erkennen von Faces durch cv2    

        for (x, y, w, h) in faces:

            cv2.rectangle(extracted_frame, (x, y), (x + w, y + h), (255, 255, 255), 2) # Erzeugen einer Box um Gesichter
            ttens = data_transforms(extracted_frame[y : y + h, x : x + w]) # Transformieren der Bildauschnitte mit Gesichtern
            #clear_output(wait = True)
            if ttens is not None:
                ttens = ttens.to(device="cuda")
                tensor, dec_tens = model(ttens[None]) # Prediction der Gesichter im WSCNet
                pred = torch.max(tensor, dim=1)[1].tolist()
                label = class_labels[pred[0]]
                label_position = (x, y)
                cv2.putText(extracted_frame, label, label_position, cv2.FONT_HERSHEY_DUPLEX, 1,  (255, 255, 255), 2)              
            else:
                cv2.putText(extracted_frame, "No Face Found", (20, 60), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2)
        out.write(extracted_frame)
        #cv2_imshow(extracted_frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

cap.release()
out.release()
cv2.destroyAllWindows()

In [12]:
!ffmpeg -i output.avi output.mp4

ffmpeg version 3.4.11-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 7 (Ubuntu 7.5.0-3ubuntu1~18.04)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --enable-gpl --disable-stripping --enable-avresample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librubberband --enable-librsvg --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvorbis --enable-libvpx --enable-libwavpack --enable-libwebp --enable-libx265 --enable-libxml2 --enable-libxvid --enable-li

#### Play your video

In [13]:
from IPython.display import HTML
from base64 import b64encode
import os

#os.system(f"!ffmpeg -i output.avi output.mp4") #Siehe oben, mit os.system wird eine exception geworfen, wenn Datei schon vorhanden.

path1 = "/content/output.mp4"
path2 = "/content/result_compressed.mp4"

os.system(f"ffmpeg -i {path1} -vcodec libx264 {path2}")

# Show video
mp4 = open(path2,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=720 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)