# Initialize

Install required packages.

In [None]:
!pip install scipy scikit-image torch tqdm transformers mediapipe opencv-python torchvision numpy pandas timm evaluate facenet-pytorch

Mount google drive if working on colab.

In [1]:
from google.colab import drive
from pathlib import Path

drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Move to data dir.

In [2]:
%cd drive/MyDrive/NLP/MultiModalEmotionRecognition/data

/content/drive/MyDrive/NLP/MultiModalEmotionRecognition/data


In [3]:
!ls

backup	 images  README.md	 test.zip  train_ende.zip
dev.zip  labels  saved_features  texts


Unzip data archives, that are downloaded from specified links in the data README file.

In [None]:
!unzip dev.zip
!unzip test.zip
!unzip train_ende.zip

Move extracted data to it's folder.

In [None]:
%mv dev/ images/val
%mv test/ images/test
%mv train_ende/ images/train

In [None]:
%ls images

README.md  [0m[01;34mtest[0m/  [01;34mtrain[0m/  [01;34mval[0m/


Ensure every thing is fine and data is extracted in it's folder.

In [None]:
%ls images/train -1 | wc -l
%ls images/test -1 | wc -l
%ls images/val -1 | wc -l

20240
5067
5063


In [4]:
%cd ..

/content/drive/MyDrive/NLP/MultiModalEmotionRecognition


Import required packages.

In [5]:
import torch
import torchvision
import math
import mediapipe
import cv2
import os
import urllib
import torch
import ast
import pickle
import evaluate
import re
import numpy as np
import pandas as pd
import torch.optim as optim
import matplotlib.pyplot as plt

from torch import nn
from torch.utils.data import Dataset, DataLoader
from typing import List
from tqdm import trange, tqdm
from pathlib import Path
from PIL import Image
from scipy.spatial.distance import euclidean
from skimage.transform import rotate
from facenet_pytorch import MTCNN as MTCNN
from torchvision import transforms
from torchvision.transforms import transforms as transforms
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models.detection import KeypointRCNN_ResNet50_FPN_Weights
from torchvision.io import read_image
from transformers import (
    PreTrainedTokenizerFast,
    pipeline,
    RobertaForSequenceClassification,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)


Initialize directory variables. Change the Path object input to your project base directory.

In [6]:
BASE_DIR = Path("/content/drive/MyDrive/NLP/MultiModalEmotionRecognition")
DATA_DIR = BASE_DIR / "data"
SAVE_DIR = DATA_DIR / "saved_features"
TEXTS_DIR = DATA_DIR / "texts"
IMAGES_DIR = DATA_DIR / "images"
LABELS_DIR = DATA_DIR / "labels"

In [7]:
if not os.path.exists(SAVE_DIR):
    os.makedirs(SAVE_DIR)

Initialize cuda device.

In [8]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda', index=0)

Initialize feature vectors size.

In [9]:
FACE_EMBEDDING_SIZE = 1280
ENG_TEXT_EMBEDDING_SIZE = 768
GER_TEXT_EMBEDDING_SIZE = 768
POSE_EMBEDDING_SIZE = 34
SCENE_EMBEDDING_SIZE = 2048

# Extractors

### Scene Embedding

In [10]:
class SceneEmbeddingExtractor:
    """
    Extracts embedding based on scene recognition task
    """

    def __init__(self):
        self.weights = ResNet50_Weights.DEFAULT
        self.pretrained_model = resnet50(weights=self.weights)
        self.pretrained_model.eval().to(DEVICE)
        self.preprocess = self.weights.transforms()
        self.feature_extractor = self.remove_last_layer()

    def remove_last_layer(self):
        modules = list(self.pretrained_model.children())[:-1]
        model = nn.Sequential(*modules)
        model.eval()
        return model

    def extract_embedding(self, image):
        transformed_image = self.preprocess(image).unsqueeze(0)
        return self.feature_extractor(transformed_image).ravel()

### Pose Embedding

In [11]:
class PoseEmbeddingExtractor:
    """
    Extracts embedding based on pose of the persons in the image. Each person is consisted of 17 keypoints
    and they are used as a feature.
    """

    def __init__(self):
        self.model = torchvision.models.detection.keypointrcnn_resnet50_fpn(
            weights=KeypointRCNN_ResNet50_FPN_Weights.DEFAULT, num_keypoints=17
        ).to(DEVICE)
        self.model.eval()
        self.transform = transforms.Compose([transforms.ToTensor()])

    def extract_embedding(self, image):
        image = self.transform(image)
        image = image.unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            outputs = self.model(image)

        keypoints_scores = outputs[0]["keypoints_scores"]
        best_score = torch.mean(keypoints_scores, axis=1).argmax().item()
        keypoints = outputs[0]["keypoints"][best_score, :, :2]
        return keypoints.ravel()


# p = PoseEmbeddingExtractor(device=DEVICE)
# path = 'data/images/val/4965.jpg'
# img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)
# p.extract_embedding(img).shape

### Face Embedding

In [12]:
def get_model_path(model_name):
    model_file = model_name + ".pt"
    cache_dir = os.path.join(os.path.expanduser("~"), ".hsemotions")
    os.makedirs(cache_dir, exist_ok=True)
    fpath = os.path.join(cache_dir, model_file)
    if not os.path.isfile(fpath):
        print(f"{model_file} not exists")
        url = (
            "https://github.com/HSE-asavchenko/face-emotion-recognition/blob/main/models/affectnet_emotions/"
            + model_file
            + "?raw=true"
        )
        print("Downloading", model_name, "from", url)
        urllib.request.urlretrieve(url, fpath)

    return fpath


class FaceAlignment:
    """
    Aligns the face such that faces are simillar before passing to next steps
    """

    def __init__(
        self,
    ):
        pass

    @staticmethod
    def apply_rotation_on_images(input_images, angles):
        rotated_images = [
            rotate(image, angle) for image, angle in zip(input_images, angles)
        ]
        return rotated_images

    @staticmethod
    def compute_alignment_rotation_(eyes_coordinates):
        angles = []
        directions = []
        for left_eye_coordinate, right_eye_coordinate in eyes_coordinates:

            left_eye_x, left_eye_y = left_eye_coordinate
            right_eye_x, right_eye_y = right_eye_coordinate

            triangle_vertex = (
                (right_eye_x, left_eye_y)
                if left_eye_y > right_eye_y
                else (left_eye_x, right_eye_y)
            )
            direction = (
                -1 if left_eye_y > right_eye_y else 1
            )  # rotate clockwise else counter-clockwise

            # compute length of triangle edges
            a = euclidean(left_eye_coordinate, triangle_vertex)
            b = euclidean(right_eye_coordinate, triangle_vertex)
            c = euclidean(right_eye_coordinate, left_eye_coordinate)

            # cosine rule
            if (
                b != 0 and c != 0
            ):  # this multiplication causes division by zero in cos_a calculation
                cos_a = (b**2 + c**2 - a**2) / (2 * b * c)
                angle = np.arccos(cos_a)  # angle in radian
                angle = (angle * 180) / math.pi  # radian to degree
            else:
                angle = 0

            angle = angle - 90 if direction == -1 else angle

            angles.append(angle)
            directions.append(direction)

        return angles, directions


class FaceDetection:
    """
    Detects faces in the image using MTCNN netork.
    """

    def __init__(self, model_name, minimum_confidence):

        self.detected_faces_information = None
        self.model_name = model_name
        self.minimum_confidence = minimum_confidence
        if model_name == "MTCNN":
            detector_model = MTCNN(device=DEVICE)
            self.detect_faces_function = lambda input_image: detector_model.detect(
                input_image, landmarks=True
            )

    def extract_faces(self, input_image, return_detections_information=True):
        self.detect_faces__(input_image)
        faces = self.get_faces__(
            input_image,
        )
        if return_detections_information:
            return faces, self.detected_faces_information

        else:
            return faces

    def detect_faces__(self, input_image):
        detections = self.detect_faces_function(input_image)
        detections = [
            {
                "box": detections[0][i],
                "confidence": detections[1][i],
                "keypoints": {
                    "left_eye": detections[2][i][0],
                    "right_eye": detections[2][i][1],
                    "nose": detections[2][i][2],
                    "mouth_left": detections[2][i][3],
                    "mouth_right": detections[2][i][4],
                },
            }
            for i in range(detections[0].shape[0])
        ]
        self.detected_faces_information = list(
            filter(
                lambda element: element["confidence"] > self.minimum_confidence,
                detections,
            )
        )

    def get_detected_faces_information(self):
        return self.detected_faces_information

    def get_keypoints(
        self,
    ):
        return list(
            map(lambda element: element["keypoints"], self.detected_faces_information)
        )

    def get_faces__(
        self,
        input_image,
    ):
        boxes = [
            detection_information["box"]
            for detection_information in self.detected_faces_information
        ]
        y1y2x1x2 = [(int(y), int(y2), int(x), int(x2)) for x, y, x2, y2 in boxes]
        faces = [input_image[y1:y2, x1:x2] for y1, y2, x1, x2 in y1y2x1x2]
        return faces

    def get_eyes_coordinates(
        self,
    ):
        eyes_coordinates = [
            (info["keypoints"]["left_eye"], info["keypoints"]["right_eye"])
            for info in self.detected_faces_information
        ]
        return eyes_coordinates


class FaceEmotionRecognizer:
    """
    Finds emotion of a face image input.
    """

    def __init__(self, model_name="enet_b0_8_best_vgaf"):
        self.is_mtl = "_mtl" in model_name
        if "_7" in model_name:
            self.idx_to_class = {
                0: "Anger",
                1: "Disgust",
                2: "Fear",
                3: "Happiness",
                4: "Neutral",
                5: "Sadness",
                6: "Surprise",
            }
        else:
            self.idx_to_class = {
                0: "Anger",
                1: "Contempt",
                2: "Disgust",
                3: "Fear",
                4: "Happiness",
                5: "Neutral",
                6: "Sadness",
                7: "Surprise",
            }

        self.img_size = 224 if "_b0_" in model_name else 260
        self.test_transforms = transforms.Compose(
            [
                transforms.Resize((self.img_size, self.img_size)),
                transforms.ToTensor(),
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                ),
            ]
        )

        path = get_model_path(model_name)

        model = torch.load(path)
        model = model.to(DEVICE)

        if isinstance(model.classifier, torch.nn.Sequential):
            self.classifier_weights = model.classifier[0].weight.data
            self.classifier_bias = model.classifier[0].bias.data
        else:
            self.classifier_weights = model.classifier.weight.data
            self.classifier_bias = model.classifier.bias.data

        model.classifier = torch.nn.Identity()
        self.model = model.eval()
        # print(path, self.test_transforms)

    def compute_probability(self, features):
        return torch.matmul(features, self.classifier_weights.T) + self.classifier_bias

    def extract_representations_from_faces(self, input_faces):
        faces = [self.test_transforms(Image.fromarray(face)) for face in input_faces]
        features = self.model(torch.stack(faces, dim=0).to(DEVICE))
        return features

    def predict_emotions_from_representations(
        self, representations, logits=True, return_features=True
    ):
        scores = self.compute_probability(representations)
        if self.is_mtl:
            predictions_indices = torch.argmax(scores[:, :-2], dim=1)

        else:
            predictions_indices = torch.argmax(scores, dim=1)

        if self.is_mtl:
            x = scores[:, :-2]

        else:
            x = scores
        pred = torch.argmax(x[0])

        if not logits:
            e_x = torch.exp(x - torch.max(x, dim=1)[:, None])
            e_x = e_x / e_x.sum(dim=1)[:, None]
            if self.is_mtl:
                scores[:, :-2] = e_x
            else:
                scores = e_x

        return [
            self.idx_to_class[pred.item()] for pred in (predictions_indices)
        ], scores


class FaceNormalizer:
    """
    Normalizes images for network consistency.
    """

    def __init__(self):
        self.mp_face_mesh = mediapipe.solutions.face_mesh
        face_mesh = self.mp_face_mesh.FaceMesh(static_image_mode=True)

        mp_face_mesh = mediapipe.solutions.face_mesh
        self.face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True)
        self.routes_idx = self.initialize__()

    def initialize__(self):
        df = pd.DataFrame(
            list(self.mp_face_mesh.FACEMESH_FACE_OVAL), columns=["p1", "p2"]
        )
        routes_idx = []

        p1 = df.iloc[0]["p1"]
        p2 = df.iloc[0]["p2"]

        for i in range(0, df.shape[0]):
            obj = df[df["p1"] == p2]
            p1 = obj["p1"].values[0]
            p2 = obj["p2"].values[0]

            route_idx = []
            route_idx.append(p1)
            route_idx.append(p2)
            routes_idx.append(route_idx)

        return routes_idx

    def get_landmarks__(self, input_image: np.ndarray):
        if input_image.dtype == np.float:
            input_image = (input_image * 255).astype(np.uint8)

        results = self.face_mesh.process(input_image)
        landmarks = results.multi_face_landmarks[0]

        routes = []
        # for source_idx, target_idx in mp_face_mesh.FACEMESH_FACE_OVAL:
        for source_idx, target_idx in self.routes_idx:
            source = landmarks.landmark[source_idx]
            target = landmarks.landmark[target_idx]

            relative_source = (
                int(input_image.shape[1] * source.x),
                int(input_image.shape[0] * source.y),
            )
            relative_target = (
                int(input_image.shape[1] * target.x),
                int(input_image.shape[0] * target.y),
            )

            # cv2.line(img, relative_source, relative_target, (255, 255, 255), thickness = 2)

            routes.append(relative_source)
            routes.append(relative_target)

        return routes

    @staticmethod
    def normalize_with_landmark_points__(input_image, landmarks):
        mask = np.zeros((input_image.shape[0], input_image.shape[1]))
        mask = cv2.fillConvexPoly(mask, np.array(landmarks), 1)
        mask = mask.astype(bool)

        out = np.zeros_like(input_image)
        out[mask] = input_image[mask]
        return out

    def normalize_faces_image(self, input_images):
        normalized_faces_images = [
            self.normalize_with_landmark_points__(
                input_image, self.get_landmarks__(input_image)
            )
            for input_image in input_images
        ]
        return normalized_faces_images


class FaceEmbeddingExtractor:
    """
    Extracts embedding of an image, based on detected faces and their emotions. It consists of all necessary steps for
    extracting embedding from an image.
    """

    def __init__(self):
        self.faces = None
        self.normalized_rotated_faces = None
        self.rotated_faces = None
        self.rotation_angles = None
        self.rotation_directions = None

        fd = FaceDetection("MTCNN", minimum_confidence=0.95)
        self.face_detection_model: FaceDetection = fd
        fa = FaceAlignment()
        self.face_alignment_model: FaceAlignment = fa
        fn = FaceNormalizer()
        self.face_normalizer_model: FaceNormalizer = fn
        model_name = "enet_b0_8_best_afew"
        fer = FaceEmotionRecognizer(model_name)
        self.face_emotion_recognition_model: FaceEmotionRecognizer = fer

    def extract_embedding(self, input_image):
        faces, detected_faces_information = self.face_detection_model.extract_faces(
            input_image, return_detections_information=True
        )

        (
            rotation_angles,
            rotation_directions,
        ) = self.face_alignment_model.compute_alignment_rotation_(
            self.face_detection_model.get_eyes_coordinates()
        )
        rotated_faces = self.face_alignment_model.apply_rotation_on_images(
            faces, rotation_angles
        )
        normalized_rotated_faces = self.face_normalizer_model.normalize_faces_image(
            rotated_faces
        )

        normalized_rotated_faces_255 = [
            (image * 255).astype(np.uint8) for image in normalized_rotated_faces
        ]

        representations = (
            self.face_emotion_recognition_model.extract_representations_from_faces(
                normalized_rotated_faces_255
            )
        )[
            0
        ]  # WARNING: 0 was not here
        del normalized_rotated_faces_255
        del normalized_rotated_faces
        del rotated_faces
        del rotation_angles
        del rotation_directions
        del faces
        del detected_faces_information
        # (
        #     predictions,
        #     scores,
        # ) = self.face_emotion_recognition_model.predict_emotions_from_representations(
        #     representations
        # )

        # self.faces = faces
        # self.rotation_angles, self.rotation_directions = (
        #     rotation_angles,
        #     rotation_directions,
        # )
        # self.rotated_faces = rotated_faces
        # self.normalized_rotated_faces = normalized_rotated_faces_255

        return None, None, representations
        # return preictions, scores, representations

    def get_rotations_information(self):
        return self.rotation_angles, self.rotation_directions

    def get_faces(self):
        return self.faces

    def get_rotated_faces(self):
        return self.rotated_faces

    def get_normalized_rotated_faces(self):
        return self.normalized_rotated_faces

    def clear(self):
        self.faces = None
        self.normalized_rotated_faces = None
        self.rotated_faces = None
        self.rotation_angles = None
        self.rotation_directions = None

    def store_embeddings(self, file, embeddings):
        with open(file, "wb") as file_out:
            pickle.dump(
                {"embeddings": embeddings}, file_out, protocol=pickle.HIGHEST_PROTOCOL
            )

    def load_embeddings(self, file):
        with open(file, "rb") as file_in:
            stored_data = pickle.load(file_in)
            stored_embeddings = stored_data["embeddings"]

        return stored_embeddings

### Text Embedding


In [13]:
class EnglishTextEmbeddingExtractor:
    """
    Extracts embedding of the text using [CLS] token of a Roberta based model.
    """

    def __init__(
        self,
        model_name="pysentimiento/robertuito-sentiment-analysis",
        show_progress_bar=True,
        to_tensor=True,
        max_length=128,
    ):
        self.model_name = model_name
        self.max_length = max_length
        self.tokenizer = PreTrainedTokenizerFast.from_pretrained(self.model_name)
        self.model = RobertaForSequenceClassification.from_pretrained(
            self.model_name, num_labels=3, output_hidden_states=True
        ).to(DEVICE)

        self.generator = pipeline(
            task="sentiment-analysis",
            model=self.model,
            tokenizer=self.tokenizer,
        )

    def extract_embedding(
        self,
        input_batch_sentences,
    ):
        encoded_input = self.tokenizer(
            input_batch_sentences,
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt",
        ).to(DEVICE)

        with torch.no_grad():
            model_output = self.model(**encoded_input)
            hidden_states = model_output["hidden_states"]
            last_layer_hidden_states = hidden_states[
                12
            ]  # 12 = len(hidden_states) , dim = (batch_size, seq_len, 768)
            cls_hidden_state = last_layer_hidden_states[:, 0, :]

        return cls_hidden_state

    def get_labels(self, input_batch_sentences):
        return self.generator(input_batch_sentences)


class GermanTextEmbeddingExtractor:
    """
    Extracts embedding of the text using [CLS] token of a Bert based model.
    """

    def __init__(
        self,
        model_name="oliverguhr/german-sentiment-bert",
        max_length=128,
    ):
        self.model_name = model_name
        self.max_length = max_length
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.model_name, output_hidden_states=True
        ).to(DEVICE)

        self.generator = pipeline(
            task="sentiment-analysis",
            model=self.model,
            tokenizer=self.tokenizer,
            device=0,
        )
        self.clean_chars = re.compile(r"[^A-Za-züöäÖÜÄß ]", re.MULTILINE)
        self.clean_http_urls = re.compile(r"https*\S+", re.MULTILINE)
        self.clean_at_mentions = re.compile(r"@\S+", re.MULTILINE)

    def predict_sentiment(self, texts: List[str]) -> List[str]:
        texts = [self.clean_text(text) for text in texts]
        # Add special tokens takes care of adding [CLS], [SEP], <s>... tokens in the right way for each model.
        # truncation=True limits number of tokens to model's limitations (512)
        encoded = self.tokenizer.batch_encode_plus(
            texts,
            padding=True,
            add_special_tokens=True,
            truncation=True,
            return_tensors="pt",
        )
        encoded = encoded.to(self.device)
        with torch.no_grad():
            logits = self.model(**encoded)

        label_ids = torch.argmax(logits[0], axis=1)
        return [self.model.config.id2label[label_id.item()] for label_id in label_ids]

    def replace_numbers(self, text: str) -> str:
        return (
            text.replace("0", " null")
            .replace("1", " eins")
            .replace("2", " zwei")
            .replace("3", " drei")
            .replace("4", " vier")
            .replace("5", " fünf")
            .replace("6", " sechs")
            .replace("7", " sieben")
            .replace("8", " acht")
            .replace("9", " neun")
        )

    def clean_text(self, text: str) -> str:
        text = text.replace("\n", " ")
        text = self.clean_http_urls.sub("", text)
        text = self.clean_at_mentions.sub("", text)
        text = self.replace_numbers(text)
        text = self.clean_chars.sub("", text)  # use only text chars
        text = " ".join(
            text.split()
        )  # substitute multiple whitespace with single whitespace
        text = text.strip().lower()
        return text

    def extract_embedding(
        self,
        input_batch_sentences,
    ):
        encoded_input = self.tokenizer(
            input_batch_sentences,
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt",
        ).to(DEVICE)

        with torch.no_grad():
            model_output = self.model(**encoded_input)
            hidden_states = model_output["hidden_states"]
            last_layer_hidden_states = hidden_states[
                12
            ]  # 12 = len(hidden_states) , dim = (batch_size, seq_len, 768)
            cls_hidden_state = last_layer_hidden_states[:, 0, :]

        return cls_hidden_state

    def get_labels(self, input_batch_sentences):
        return self.predict_sentiment(input_batch_sentences)

# Dataset

Our dataset class definition.

In [14]:
class MSCTDDataSet(Dataset):
    """
    MSCTD dataset.

    It can be used with raw data, to extract embeddings or with saved features.
    """

    def __init__(
        self,
        split="train",
        data_size=None,
        load=False,
    ):
        """
        Args:
            split (str): val, train, test.
            data_size (int): None for full dataset. If provided dataset size will be reduced to data_size.
            load (bool): If false, all embeddings will be extracted and dataset works with bare text and image. If true, it loads all pre extracted embeddings.
                         Warning: don't use load=False for training. Always try using with load=True for training, to speedup process.
        """

        self.split = split
        self.eng_text_file_path = TEXTS_DIR / "english" / f"{split}.txt"
        self.ger_text_file_path = TEXTS_DIR / "german" / f"{split}.txt"
        self.sentiment_file_path = LABELS_DIR / f"sentiment_{split}.txt"
        self.image_dir = IMAGES_DIR / split

        self.data_size = data_size
        self.load = load

        self.eng_texts = None
        self.sentiments = None
        self.indexes = None
        self.face_embeddings = None
        self.pose_embeddings = None
        self.eng_text_embeddings = None
        self.load_data()
        self.face_embedding_extractor = FaceEmbeddingExtractor()
        self.eng_text_embedding_extractor = EnglishTextEmbeddingExtractor()
        self.ger_text_embedding_extractor = GermanTextEmbeddingExtractor()
        self.pose_embedding_extractor = PoseEmbeddingExtractor()
        self.scene_embedding_extractor = SceneEmbeddingExtractor()

    # text is not valid
    def load_data(self):
        if self.load:
            eng_texts = None
            ger_texts = None
            indexes = torch.load(SAVE_DIR / f"indexes_{self.split}.pt").to(DEVICE)
            sentiments = torch.load(SAVE_DIR / f"sentiments_{self.split}.pt").to(DEVICE)
            face_embeddings = torch.load(
                SAVE_DIR / f"face_embeddings_{self.split}.pt"
            ).to(DEVICE)
            pose_embeddings = torch.load(
                SAVE_DIR / f"pose_embeddings_{self.split}.pt"
            ).to(DEVICE)
            eng_text_embeddings = torch.load(
                SAVE_DIR / f"eng_text_embeddings_{self.split}.pt"
            ).to(DEVICE)
            ger_text_embeddings = torch.load(
                SAVE_DIR / f"ger_text_embeddings_{self.split}.pt"
            ).to(DEVICE)
            scene_embeddings = torch.load(
                SAVE_DIR / f"scene_embeddings_{self.split}.pt"
            ).to(DEVICE)

            assert (
                face_embeddings.shape[0] == pose_embeddings.shape[0]
            ), "ERROR:  face and pose list are not the same size in loading"
            assert (
                pose_embeddings.shape[0] == eng_text_embeddings.shape[0]
            ), "ERROR: text and pose list are not the same size in loading"
            assert (
                eng_text_embeddings.shape[0] == indexes.shape[0]
            ), "ERROR: text and index list are not the same size in loading"
            assert (
                indexes.shape[0] == sentiments.shape[0]
            ), "ERROR: index and sentiment list are not the same size in loading"

            print(face_embeddings.shape)
            print(pose_embeddings.shape)
            print(eng_text_embeddings.shape)
            print(ger_text_embeddings.shape)
            print(indexes.shape)
            print(sentiments.shape)

        else:
            with open(self.eng_text_file_path) as eng_text_file, open(
                self.ger_text_file_path
            ) as ger_text_file, open(self.sentiment_file_path) as sentiment_file:
                sentiments = [int(t.strip()) for t in sentiment_file.readlines()]
                eng_texts = [t.strip() for t in eng_text_file.readlines()]
                ger_texts = [t.strip() for t in ger_text_file.readlines()]
                indexes = range(len(sentiments))
                # indexes = torch.load(SAVE_DIR / f"indexes_{self.split}.pt").to(DEVICE) #WARNING: REMOVE

                face_embeddings = None
                pose_embeddings = None
                eng_text_embeddings = None
                ger_text_embeddings = None
                scene_embeddings = None

        if self.data_size:
            indexes = indexes[: self.data_size]
            sentiments = sentiments[: self.data_size]
            if not eng_texts is None:
                eng_texts = eng_texts[: self.data_size]
            if not ger_texts is None:
                ger_texts = ger_texts[: self.data_size]
            if not face_embeddings is None:
                face_embeddings = face_embeddings[: self.data_size, :]
            if not pose_embeddings is None:
                pose_embeddings = pose_embeddings[: self.data_size, :]
            if not scene_embeddings is None:
                scene_embeddings = scene_embeddings[: self.data_size, :]
            if not eng_text_embeddings is None:
                eng_text_embeddings = eng_text_embeddings[: self.data_size, :]

        self.eng_texts = eng_texts
        self.ger_texts = ger_texts
        self.sentiments = sentiments
        self.indexes = indexes
        self.face_embeddings = face_embeddings
        self.pose_embeddings = pose_embeddings
        self.eng_text_embeddings = eng_text_embeddings
        self.ger_text_embeddings = ger_text_embeddings
        self.scene_embeddings = scene_embeddings

    def __len__(self):
        return len(self.indexes)

    def get_face_embedding(self, image):
        (
            predictions,
            scores,
            representations,
        ) = self.face_embedding_extractor.extract_embedding(image)
        return representations

    def get_pose_embedding(self, image):
        return self.pose_embedding_extractor.extract_embedding(image)

    def get_image_embeddings(self, index):
        if self.load:
            return self.face_embeddings[index], self.pose_embeddings[index]

        image_name = self.image_dir / f"{index}.jpg"
        image = cv2.cvtColor(cv2.imread(str(image_name)), cv2.COLOR_BGR2RGB)
        face_embedding = self.get_face_embedding(image)
        pose_embedding = self.get_pose_embedding(image)
        return face_embedding, pose_embedding

    def get_scene_embedding(self, index):
        if self.load:
            return self.scene_embeddings[index]
        real_index = self.indexes[index]
        image_name = str(self.image_dir) + f"/{real_index}.jpg"
        image = read_image(image_name).to(DEVICE)
        return self.scene_embedding_extractor.extract_embedding(image)

    def get_sentiment(self, index):
        return self.sentiments[index]

    def get_eng_text(self, index):
        if self.load:
            return self.eng_text_embeddings[index]
        text = self.eng_texts[index]
        text = self.eng_text_embedding_extractor.extract_embedding([text])[0]
        return text

    def get_ger_text(self, index):
        if self.load:
            return self.ger_text_embeddings[index]
        text = self.ger_texts[index]
        text = self.ger_text_embedding_extractor.extract_embedding([text])[0]
        return text

    def __getitem__(self, index):
        if torch.is_tensor(index):
            index = index.tolist()
        try:
            face_embedding, pose_embedding = self.get_image_embeddings(index)
            scene_embedding = self.get_scene_embedding(index)
        except Exception as e:
            print(f"error for split:{self.split} index: {index}")
            print(e)
            face_embedding = torch.ones(FACE_EMBEDDING_SIZE).to(DEVICE) * -123
            pose_embedding = torch.ones(POSE_EMBEDDING_SIZE).to(DEVICE) * -123

        sentiment = self.get_sentiment(index)
        eng_text_embedding = self.get_eng_text(index)
        ger_text_embedding = self.get_ger_text(index)

        sample = {
            "index": self.indexes[index],
            "pose_embedding": pose_embedding,
            "face_embedding": face_embedding,
            "scene_embedding": scene_embedding,
            "eng_text_embedding": eng_text_embedding,
            "ger_text_embedding": ger_text_embedding,
            "sentiment": sentiment,
        }
        return sample

# Save features

This step is crucial. First, you save all embeddings in this step. Dataset is initialized with load=False, and 
extracts all embeddings with their network. This step might take a while to complete. You should run this step for 
all data splits (train, test and val).

In [15]:
SAVE = False
if SAVE:
    SAVE_SPLIT = "test"
    SAVE_BATCH = 8
    dataset = MSCTDDataSet(split=SAVE_SPLIT, load=True)
    print(len(dataset))
    dataloader = DataLoader(dataset, batch_size=SAVE_BATCH)

In [16]:
def save_features(dataloader, split):
    """
    Save features using a dataset with load = False. Later on you can use dataset with load=True for a
    fast dataset for later trainings.
    """
    stop_batch = None

    for batch_index, batch in enumerate(tqdm(dataloader)):
        errors = (batch["pose_embedding"] == -123).all(dim=1)

        torch.save(
            batch["face_embedding"][~errors],
            SAVE_DIR / f"face_embeddings_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["pose_embedding"][~errors],
            SAVE_DIR / f"pose_embeddings_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["eng_text_embedding"][~errors],
            SAVE_DIR / f"eng_text_embeddings_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["ger_text_embedding"][~errors],
            SAVE_DIR / f"ger_text_embeddings_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["index"][~errors],
            SAVE_DIR / f"indexes_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["sentiment"][~errors],
            SAVE_DIR / f"sentiments_{split}_{batch_index}.pt",
        )
        torch.save(
            batch["scene_embedding"][~errors],
            SAVE_DIR / f"scene_embeddings_{split}_{batch_index}.pt",
        )

        assert (
            batch["pose_embedding"].shape[0] == batch["text_embedding"].shape[0]
        ), "text and pose list are not the same size in saving"
        assert (
            batch["face_embedding"].shape[0] == batch["pose_embedding"].shape[0]
        ), "face and pose list are not the same size in saving"
        assert (
            batch["eng_text_embedding"].shape[0] == batch["index"].shape[0]
        ), "text and index list are not the same size in saving"
        assert (
            batch["index"].shape[0] == batch["sentiment"].shape[0]
        ), "index and sentiment list are not the same size in saving"
        assert (
            batch["ger_text_embedding"].shape[0] == batch["index"].shape[0]
        ), "index and sentiment list are not the same size in saving"
        if stop_batch and batch_index == stop_batch:
            break

    print("----------------------")
    len_batch = len(dataloader)
    if stop_batch:
        len_batch = stop_batch
    print(len(dataloader))

    def concat_batches(name="face_embedding"):
        batches = []
        for i in range(len_batch):
            batches.append(torch.load(SAVE_DIR / f"{name}_{split}_{i}.pt"))
        batches = torch.cat(batches, dim=0)
        print(batches.shape)
        torch.save(batches, SAVE_DIR / f"{name}_{split}.pt")
        del batches

    for name in [
        "face_embeddings",
        "pose_embeddings",
        "eng_text_embeddings",
        "ger_text_embeddings",
        "text_embeddings",
        "scene_embeddings",
        "sentiments",
        "indexes",
    ]:
        concat_batches(name)

In [17]:
if SAVE:
    save_features(dataloader, SAVE_SPLIT)

In [18]:
if SAVE:
    del dataset
    del dataloader

Ensure embeddings are created.

In [None]:
!ls -sh backup
# %ls -sh data/saved_features | grep face_embeddings_test.pt
%ls -sh data/saved_features/*_val.pt

total 462K
281K face_embeddings_val.pt  1.5K sentiments_val.pt
8.5K pose_embeddings_val.pt  169K text_embeddings_val.pt
1.5K real_indexes_val.pt


Delete extra files if your storage is full.

In [48]:
# %rm data/saved_features/ger_text_embeddings_val_*.pt

Check size of created embedding files.

In [None]:
!du data/saved_features/text_embeddings_val.pt -h

169K	data/saved_features/text_embeddings_val.pt


Backup embeddings if needed.

In [None]:
# CHANGE VAL TO SPLIT
%mkdir backup
!cp data/saved_features/face_embeddings_val.pt backup
!cp data/saved_features/pose_embeddings_val.pt backup
!cp data/saved_features/real_indexes_val.pt backup
!cp data/saved_features/eng_text_embeddings_val.pt backup
!cp data/saved_features/ger_text_embeddings_val.pt backup
!cp data/saved_features/sentiments_val.pt backup

# Model

# Evaluating

In [None]:
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
precision_macro = evaluate.load("precision")
precision_micro = evaluate.load("precision")
precision_weighted = evaluate.load("precision")
f1 = evaluate.load("f1")
f1_macro = evaluate.load("f1")
f1_micro = evaluate.load("f1")
f1_weighted = evaluate.load("f1")
recall = evaluate.load("recall")
recall_macro = evaluate.load("recall")
recall_micro = evaluate.load("recall")
recall_weighted = evaluate.load("recall")


def validate(model, dataloader, loss_fn, language="eng"):
    running_loss = 0.0

    for data_pair_index, batch in enumerate(dataloader):
        # print("--------------", data_pair_index, "-------------")
        errors = (batch["pose_embedding"] == -123).all(dim=1)
        assert torch.all(~errors).item()

        text_embedding = batch[f"{language}_text_embedding"]
        face_embedding = batch["face_embedding"]
        pose_embedding = batch["pose_embedding"]
        scene_embedding = batch["scene_embedding"]
        labels = batch["sentiment"]
        inputs = torch.cat(
            (face_embedding, text_embedding, pose_embedding, scene_embedding), 1
        )
        logits = model(inputs)
        probs = logits.argmax(dim=1)

        accuracy.add_batch(predictions=probs, references=labels)
        precision.add_batch(predictions=probs, references=labels)
        precision_macro.add_batch(predictions=probs, references=labels)
        precision_micro.add_batch(predictions=probs, references=labels)
        precision_weighted.add_batch(predictions=probs, references=labels)
        f1.add_batch(predictions=probs, references=labels)
        f1_macro.add_batch(predictions=probs, references=labels)
        f1_micro.add_batch(predictions=probs, references=labels)
        f1_weighted.add_batch(predictions=probs, references=labels)
        recall.add_batch(predictions=probs, references=labels)
        recall_macro.add_batch(predictions=probs, references=labels)
        recall_micro.add_batch(predictions=probs, references=labels)
        recall_weighted.add_batch(predictions=probs, references=labels)

        loss = loss_fn(logits, labels)
        running_loss += loss.item()

    print(accuracy.compute())
    print(precision.compute(average=None))
    print(precision_macro.compute(average="macro"))
    print(precision_micro.compute(average="micro"))
    print(precision_weighted.compute(average="weighted"))
    print(f1.compute(average=None))
    print(f1_macro.compute(average="macro"))
    print(f1_micro.compute(average="micro"))
    print(f1_weighted.compute(average="weighted"))
    print(recall.compute(average=None))
    print(recall_macro.compute(average="macro"))
    print(recall_micro.compute(average="micro"))
    print(recall_weighted.compute(average="weighted"))

In [None]:
class SimpleDenseNetwork(nn.Module):
    """
    A simple linear network as the classification head of our network.
    """

    def __init__(self, n_classes, embedding_dimension):
        super(SimpleDenseNetwork, self).__init__()

        self.n_classes = n_classes
        self.embedding_dimension = embedding_dimension

        self.classifier = nn.Sequential(
            nn.Linear(
                in_features=self.embedding_dimension,
                out_features=512,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=512, out_features=128),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=128, out_features=3),
        )

    def forward(self, x):
        x = self.classifier(x)
        return x

# Train

In [22]:
BATCH_SIZE = 32
num_workers = 1
EPOCHS = 30
embedding_dimension = (
    FACE_EMBEDDING_SIZE
    + ENG_TEXT_EMBEDDING_SIZE
    + POSE_EMBEDDING_SIZE
    + SCENE_EMBEDDING_SIZE
)

learning_rate = 0.0001
momentum = 0.001
data_size = None

In [21]:
def get_dataset_and_dataloder(split, batch_size, data_size=None):
    dataset = MSCTDDataSet(split, data_size=data_size, load=True)
    dataloader = DataLoader(dataset, batch_size=batch_size)
    return dataset, dataloader


train_dataset, train_dataloader = get_dataset_and_dataloder("train", BATCH_SIZE)
val_dataset, val_dataloader = get_dataset_and_dataloder("val", BATCH_SIZE)
test_dataset, test_dataloader = get_dataset_and_dataloder("test", BATCH_SIZE)

torch.Size([12816, 1280])
torch.Size([12816, 34])
torch.Size([12816, 768])
torch.Size([12816, 768])
torch.Size([12816])
torch.Size([12816])
torch.Size([3334, 1280])
torch.Size([3334, 34])
torch.Size([3334, 768])
torch.Size([3334, 768])
torch.Size([3334])
torch.Size([3334])
torch.Size([3331, 1280])
torch.Size([3331, 34])
torch.Size([3331, 768])
torch.Size([3331, 768])
torch.Size([3331])
torch.Size([3331])


In [35]:
def train_epoch(epoch_index, model, dataloader, loss_fn, optimizer, language="eng"):
    running_loss = 0.0

    for batch_index, batch in enumerate(tqdm(dataloader)):
        errors = (batch["pose_embedding"] == -123).all(dim=1)
        assert torch.all(~errors).item()
        text_embedding = batch[f"{language}_text_embedding"]
        face_embedding = batch["face_embedding"]
        pose_embedding = batch["pose_embedding"]
        scene_embedding = batch["scene_embedding"]
        labels = batch["sentiment"]
        optimizer.zero_grad()

        inputs = torch.cat(
            (face_embedding, text_embedding, pose_embedding, scene_embedding), 1
        )
        outputs = model(inputs)

        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print("Epoch loss: ", running_loss)


def train_model(model, epochs, train_dataloader, val_dataloader, language="eng"):
    loss_fn = nn.CrossEntropyLoss()
    # optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        print("--------------epoch: ", epoch, "-------------")
        model.train()
        train_epoch(epoch, model, train_dataloader, loss_fn, optimizer, language)
        model.eval()
        validate(model, val_dataloader, loss_fn)

In [28]:
model = SimpleDenseNetwork(n_classes=3, embedding_dimension=embedding_dimension).to(
    device=DEVICE
)

In [None]:
train_model(model, EPOCHS, train_dataloader, val_dataloader)

In [30]:
validate(model, test_dataloader, nn.CrossEntropyLoss())

{'accuracy': 0.5813261968480523}
{'precision': array([0.41584158, 0.6522285 , 0.62485482])}
{'precision': 0.5643083012737713}
{'precision': 0.5813261968480523}
{'precision': 0.5828709957741022}
{'f1': array([0.42495784, 0.67511371, 0.57570894])}
{'f1': 0.5585934956483021}
{'f1': 0.5813261968480523}
{'f1': 0.5806041595247571}
{'recall': array([0.43448276, 0.6996633 , 0.53373016])}
{'recall': 0.5559020700991526}
{'recall': 0.5818072650855599}
{'recall': 0.5818072650855599}


Save trained model.

In [32]:
torch.save(model, SAVE_DIR / "eng_model.pt")

Fine tuning model on german.

In [None]:
FINE_TUNE_EPOCHS = 10
ger_model = torch.load(SAVE_DIR / "eng_model.pt")
train_model(ger_model, FINE_TUNE_EPOCHS, train_dataloader, val_dataloader, "ger")

In [37]:
validate(model, test_dataloader, nn.CrossEntropyLoss(), "ger")

{'accuracy': 0.3524467126988892}
{'precision': array([0.25990854, 0.48034516, 0.34016393])}
{'precision': 0.36013920973636754}
{'precision': 0.3524467126988892}
{'precision': 0.38192645072553466}
{'f1': array([0.31544866, 0.39730373, 0.33569262])}
{'f1': 0.3494816682189696}
{'f1': 0.3524467126988892}
{'f1': 0.357882790881482}
{'recall': array([0.40117647, 0.33874239, 0.33133733])}
{'recall': 0.35708539648222154}
{'recall': 0.3524467126988892}
{'recall': 0.3524467126988892}


In [None]:
torch.save(model, SAVE_DIR / "ger_model.pt")