In [1]:
%%capture
!pip install pytorch_lightning mediapipe hsemotion cvzone whisper_timestamped aniemore pystoi pydub noisereduce

In [8]:
import torch
import torch.nn as nn
from torchvision.models import resnet34
import pytorch_lightning as pl
from sklearn.metrics import fbeta_score
import torch.nn.functional as F
from fastai.vision.all import *
import os
import cv2
import mediapipe as mp
import torchvision.transforms as transforms
from PIL import Image
from tqdm import tqdm
import math
from hsemotion.facial_emotions import HSEmotionRecognizer
import numpy as np
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from cvzone.PoseModule import PoseDetector
import joblib
from joblib import load
import string
import moviepy.editor as mp_editor
import whisper_timestamped
from aniemore.models import HuggingFaceModel
from aniemore.recognizers.voice import VoiceRecognizer
from moviepy.audio.io.AudioFileClip import AudioFileClip
from pydub import AudioSegment
import wave
import nltk
from nltk import word_tokenize, FreqDist
import noisereduce as nr
import librosa
import scipy.io.wavfile as wavf
from moviepy.audio.io.AudioFileClip import AudioFileClip
from pystoi import stoi
from google.colab import drive
from moviepy.editor import VideoFileClip
from IPython.display import display
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [3]:
drive.mount('/content/drive')

Mounted at /content/drive


## Computer Vision

In [None]:
class LabelSmoothingBCEWithLogitsLossFlat(BCEWithLogitsLossFlat):
    """
    Modified loss function.
    """
    def init(self, eps:float=0.1, **kwargs):
        self.eps = eps
        super().init(thresh=0.2, **kwargs)

    def call(self, inp, targ, **kwargs):
        # https://www.kaggle.com/c/siim-isic-melanoma-classification/discussion/166833#929222
        targ_smooth = targ.float() * (1. - self.eps) + 0.5 * self.eps
        return super().call(inp, targ_smooth, **kwargs)

class CustomResNet(nn.Module):
    """
    Tuned resnet 34 model.
    """
    def __init__(self, num_classes=19):
        """
        Initialize resnet34 model and change last layer.
        :param num_classes: int number of outputs.
        """
        super(CustomResNet, self).__init__()
        resnet = resnet34(weights=ResNet34_Weights.IMAGENET1K_V1)
        for param in resnet.parameters():
            param.requires_grad = False
        in_features = resnet.fc.in_features
        resnet.fc = nn.Linear(in_features, num_classes)
        self.resnet = resnet

    def forward(self, x):
        return self.resnet(x)


class CustomModel(pl.LightningModule):
    def __init__(self, model, threshold=0.7, k=4):
        super(CustomModel, self).__init__()
        self.model = model
        self.train_loss_mean = []
        self.train_acc_mean = []
        self.train_k_acc = []
        self.val_loss_mean = []
        self.val_acc_mean = []
        self.val_k_acc = []
        self.k = k
        self.threshold = threshold

    def adversarial_loss(self, y_hat, y):
        """
        Initialize loss function.
        :param y_hat: prediction.
        :param y: real values.
        :return: loss function.
        """
        loss_fn = LabelSmoothingBCEWithLogitsLossFlat()
        return loss_fn(y_hat, y)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        """
        Step of the training loop.
        :param batch: batch for training.
        :param batch_idx: index of trained batch.
        :return: loss calculated on this step.
        """
        images, attributes = batch
        outputs = self(images)
        loss = self.adversarial_loss(outputs, attributes)
        self.train_loss_mean.append(loss)
        accuracy = self.calculate_accuracy(outputs, attributes)
        k_acc = self.top_k_accuracy(outputs, attributes)
        self.train_acc_mean.append(accuracy)
        self.train_k_acc.append(k_acc)
        return loss

    def validation_step(self, batch, batch_idx):
        """
        Step of the validation loop.
        :param batch: batch for validation.
        :param batch_idx: index of validation batch.
        :return: dictionary with validation loss and accuracy.
        """
        images, attributes = batch
        outputs = self(images)
        loss = self.adversarial_loss(outputs, attributes)
        self.val_loss_mean.append(loss)
        accuracy = self.calculate_accuracy(outputs, attributes)
        k_acc = self.top_k_accuracy(outputs, attributes)
        self.val_acc_mean.append(accuracy)
        self.val_k_acc.append(k_acc)
        return {"val_loss": loss, "val_accuracy": accuracy, "val_k_acc": k_acc}

    def configure_optimizers(self):
        """
        Initialize optimizer.
        :return: optimizer.
        """
        return torch.optim.Adam(self.model.parameters(), lr=0.001)

    def on_validation_epoch_end(self):
        """
        Calculate mean values when validation epoch ends.
        """
        loss = sum(self.val_loss_mean) / len(self.val_loss_mean)
        self.val_loss_mean = []
        acc = sum(self.val_acc_mean) / len(self.val_acc_mean)
        self.val_acc_mean = []
        k_acc = sum(self.val_k_acc) / len(self.val_k_acc)
        self.val_k_acc = []
        self.log("val epoch end loss", loss, prog_bar=True)
        self.log("val epoch end acc", acc, prog_bar=True)
        self.log("val epoch end k acc", k_acc, prog_bar=True)

    def calculate_accuracy(self, outputs, targets):
        """
        Calculate the quality of the model.
        :param outputs: model outputs.
        :param targets: targets: real values.
        :return: float value - accuracy.
        """
        probs = F.softmax(outputs, dim=1)
        binary_mask = (probs >= self.threshold).float()
        accuracy = fbeta_score(binary_mask, targets, beta=2, average='samples')
        return accuracy

    def on_train_epoch_end(self):
        """
        Calculate mean values when trining epoch ends.
        """
        loss = sum(self.train_loss_mean) / len(self.train_loss_mean)
        self.train_loss_mean = []
        acc = sum(self.train_acc_mean) / len(self.train_acc_mean)
        self.train_acc_mean = []
        k_acc = sum(self.train_k_acc) / len(self.train_k_acc)
        self.train_k_acc = []
        self.log("train epoch end loss", loss, prog_bar=True)
        self.log("train epoch end acc", acc, prog_bar=True)
        self.log("train epoch end k acc", k_acc, prog_bar=True)

    def top_k_accuracy(self, outputs, targets):
        """
        Calculate accuracy among k most probable classes.
        :param outputs: model outputs.
        :param targets: real values.
        :return: float value - accuracy.
        """
        topk_values, topk_indices = torch.topk(outputs, self.k, dim=1)
        correct_count = 0
        for i in range(topk_indices.size(0)):
            for j in range(topk_indices.size(1)):
                if targets[i, topk_indices[i, j]] == 1:
                    correct_count += 1
        accuracy = correct_count / (outputs.size(0) * self.k)
        return accuracy

In [None]:
class Clothes:
    attributes = ['floral', 'graphic', 'striped', 'embroidered', 'solid', 'lattice',
                  'long_sleeve', 'short_sleeve', 'sleeveless', 'maxi_length',
                  'mini_length', 'crew_neckline', 'v_neckline', 'square_neckline',
                  'no_neckline', 'denim', 'tight', 'loose', 'conventional']
    not_acceptable_attributes = ['sleeveless', 'mini_length', 'denim', 'tight', 'loose']

    def __init__(self):
        """
        Initialize transforms.
        """
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def compute_image_sharpness(self, image):
        """
        Calculate sharpness of one image.
        :param image: image to process.
        :return: float value - sharpness of image.
        """
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        return cv2.Laplacian(gray_image, cv2.CV_64F).var()

    def choose_sharpest_image(self, images):
        """
        Choose sharpest image for future assessing.
        :param images: frames for choosing.
        :return: sharpest image.
        """
        sharpest_image = None
        max_sharpness = 0

        for image in images:
            sharpness = self.compute_image_sharpness(image)
            if sharpness > max_sharpness:
                max_sharpness = sharpness
                sharpest_image = image

        return sharpest_image

    def transform_image(self, image):
        """
        Transform image into model input.
        :param image: image for processing.
        :return: tensor - transformed image.
        """
        mp_pose = mp.solutions.pose
        pose = mp_pose.Pose()

        image_h, image_w, _ = image.shape
        results = pose.process(image)

        if results.pose_landmarks:
            # Identify bound box.
            x_min, y_min, x_max, y_max = image_w, image_h, 0, 0
            for landmark in results.pose_landmarks.landmark:
                x, y = int(landmark.x * image_w), int(landmark.y * image_h)
                x_min = max(0, min(x_min, x))
                y_min = max(0, min(y_min, y))
                x_max = min(image_w - 1, max(x_max, x))
                y_max = min(image_h - 1, max(y_max, y))
            image = image[y_min:y_max, x_min:x_max]

        pose.close()
        pil_image = Image.fromarray(image)
        image = self.transform(pil_image)
        return image

    def check_arrays(self, arr1, arr2):
        """
        Check presence of first array elements in second array.
        :param arr1: array for checking elements.
        :param arr2: second array for processing.
        :return: bool value if none of elements in first array is in the second.
        """
        for elem in arr1:
            if elem in arr2:
                return False
        return True

    def assess_appearance(self, frames):
        """
        Assess clothes attributes.
        :param frames: frames for choosing best frame for processing.
        :return: bool value if clothes is acceptable.
        """
        model = CustomResNet()
        custom_model = CustomModel(model)

        path = "/content/drive/MyDrive/saved_model_modified.pth"

        custom_model.model.load_state_dict(torch.load(path))
        image = self.choose_sharpest_image(frames)
        image = self.transform_image(image)
        image = image.unsqueeze(0)
        custom_model.eval()
        output = custom_model(image)
        pred = F.softmax(output, dim=1)
        topk_values, topk_indices = torch.topk(pred, 3, dim=1)
        captions = []
        for i in range(topk_indices.size(0)):
            for j in range(topk_indices.size(1)):
                captions.append(Clothes.attributes[topk_indices[i, j]])
        return self.check_arrays(captions, Clothes.not_acceptable_attributes)



In [None]:
class DrawResults:
    def __init__(self, path, dist=10, good_color=(0,128,0), bad_color=(60,20,220)):
        self.video_path = path
        self.dist = dist
        self.right_color = good_color
        self.not_right_color = bad_color

    def draw_frames(self, frame, text, color_flag):
        """
        Draw results on video frames.
        :param frames: frames for processing.
        :return: processed frames.
        """
        font = cv2.FONT_HERSHEY_COMPLEX
        x = 20
        y = 30
        font_scale = 0.5
        thickness = 1
        count = 1
        for i in range(len(text)):
            if color_flag[i]:
                color = self.right_color
            else:
                color = self.not_right_color
            if text[i] is not None:
                frame = cv2.putText(frame, text[i], (x, y * count), font, font_scale, color, thickness, cv2.LINE_AA)
                count += 1
        return frame

    def draw_angle(self, frame, length, color):
        """
        Draw lines for correct angle.
        :param frame: image for drawing.
        :param length: length of speaker's bound box.
        :param color: red if angle is incorrect, green otherwise.
        :return: new frame with angle lines.
        """
        image_orig = frame.copy()
        height, width = frame.shape[:2]
        center_x = width // 2
        line_length = length // 2
        line_thickness = 5
        line_offset_top = height // 3 + int(0.15 * height)
        line_offset_bottom = height // 3 - int(0.15 * height)
        font_color = self.right_color
        if not color:
            font_color = self.not_right_color
        cv2.line(frame, (center_x - line_length, height), (center_x - line_length, 0), font_color, line_thickness)
        cv2.line(frame, (center_x + line_length, height), (center_x + line_length, 0), font_color, line_thickness)
        cv2.line(frame, (center_x - line_length, line_offset_top), (center_x + line_length, line_offset_top),
                 font_color, line_thickness)
        cv2.line(frame, (center_x - line_length, line_offset_bottom), (center_x + line_length, line_offset_bottom),
                 font_color, line_thickness)
        font = cv2.FONT_HERSHEY_COMPLEX
        bottom_left_corner_text = (center_x - line_length, line_offset_top - 20)
        font_scale = 0.5
        line_type = 1
        cv2.putText(frame, 'Рекомендуемый уровень глаз', bottom_left_corner_text, font, font_scale, font_color,
                    line_type)
        image_out = cv2.addWeighted(frame, 0.3, image_orig, 0.7, 0.0)
        return image_out

    def draw(self, output_path, text, colors, angle, angle_color):
        cap = cv2.VideoCapture(self.video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
        try:
            segment_duration = self.dist
            segment_frame_count = math.ceil(fps * segment_duration)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            i = 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                ind = i // segment_frame_count
                text_elements = [row[ind] for row in text]
                colors_elements = [row[ind] for row in colors]
                frame = self.draw_frames(frame, text_elements, colors_elements)
                if len(angle) > 0 and angle[ind] is not None:
                    color = True
                    if i % segment_frame_count in angle_color[ind]:
                        color = False
                    frame = self.draw_angle(frame, angle[ind], color)
                out.write(frame)
                i += 1
        except Exception as e:
            print(e.args)
        finally:
            cap.release()
            out.release()
            cv2.destroyAllWindows()


In [None]:
class VideoEmotions:
    face_detection = mp.solutions.face_detection.FaceDetection(min_detection_confidence=0.3)
    model_name = 'enet_b0_8_best_afew'
    face_mesh = mp.solutions.face_mesh.FaceMesh()

    def __init__(self, device='cpu', model='HSEmotion'):
        """
        Initialize model and device.
        :param device: cpu or gpu.
        :param model: HSEmotion or deepFace.
        """
        self.device = device
        self.model = model
        if model == 'HSEmotion':
            self.predictor = HSEmotionRecognizer(model_name=VideoEmotions.model_name, device=device)

    @staticmethod
    def get_main_face(frame):
        """
        Crop image to get main face on the frame.
        @return: cropped image.
        """
        results = VideoEmotions.face_detection.process(frame)
        main_face = None
        max_score = 0
        if results.detections is not None:
            for detection in results.detections:
                if detection.score[0] > max_score:
                    main_face = detection
                    max_score = detection.score[0]
            if main_face is not None:
                bbox = main_face.location_data.relative_bounding_box
                image_height, image_width, _ = frame.shape
                x, y, w, h = int(bbox.xmin * image_width), int(bbox.ymin * image_height), \
                    int(bbox.width * image_width), int(bbox.height * image_height)
                main_face = frame[y:y + h, x:x + w]
        return main_face

    @staticmethod
    def calculate_emotion_percentage(emotion_list):
        """
        Calculate percentage of each element in the list.
        :param emotion_list: list for calculation.
        :return: dictionary with percentages of each element.
        """
        total_frames = len(emotion_list)
        emotion_percentage = {}
        for emotion in emotion_list:
            if emotion in emotion_percentage.keys():
                emotion_percentage[emotion] += 1
            else:
                emotion_percentage[emotion] = 1
        for emotion in emotion_percentage.keys():
            emotion_percentage[emotion] = (emotion_percentage[emotion] / total_frames) * 100
        return emotion_percentage

    @staticmethod
    def calculate_emotion_change_frequency(emotion_list):
        """
        calculate the percentage of changing emotions between two seconds.
        :param emotion_list: list to calculate changes in it.
        :return: frequency of changing emotions.
        """
        total_frames = len(emotion_list)
        emotion_changes = 0
        for i in range(1, total_frames):
            if emotion_list[i] != emotion_list[i - 1]:
                emotion_changes += 1
        emotion_change_frequency = emotion_changes / total_frames
        return emotion_change_frequency

    def process_frames(self, frames):
        """
        Predict emotions on each frame.
        :param frames: frames for processing.
        :return: main emotions and probabilities for each frame.
        """
        imgs = frames
        faces = list(map(VideoEmotions.get_main_face, imgs))
        emotions, scores = [], []
        for face in faces:
            if face is not None:
                try:
                    emotion, score = self.predictor.predict_emotions(face, logits=False)
                    emotions.append(emotion)
                    scores.append(score)
                except Exception:
                    continue
        return emotions, scores


In [None]:
class GazeDirection:
    LEFT_EYE = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398, 286, 258, 257, 259, 260]
    RIGHT_IRIS = [468, 470, 469, 472, 471]
    RIGHT_EYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246, 30, 29, 28, 27, 56]
    LEFT_IRIS = [473, 475, 474, 477, 476]

    def __init__(self, threshold=0.1):
        """
        Initialize prediction model and threshold.
        :param threshold: float value - acceptable displacement of iris.
        """

        path = "/content/drive/MyDrive/face_landmarker_v2_with_blendshapes.task"

        model_file = open(path, "rb")
        model_data = model_file.read()
        model_file.close()
        base_options = python.BaseOptions(model_asset_buffer=model_data)
        options = vision.FaceLandmarkerOptions(base_options=base_options,
                                               output_face_blendshapes=True,
                                               output_facial_transformation_matrixes=True,
                                               num_faces=1)
        self.detector = vision.FaceLandmarker.create_from_options(options)
        self.threshold = threshold

    def count_displacement(self, eye_coords, iris_coords):
        """
        Calculate the position of iris in percent relatively center.
        :param eye_coords: all coordinates of eye.
        :param iris_coords: all coordinates of iris.
        :return: percent of x and y axis - position of an iris.
        """
        max_x = (max(eye_coords, key=lambda item: item[0]))[0]
        min_x = (min(eye_coords, key=lambda item: item[0]))[0]
        max_y = (max(eye_coords, key=lambda item: item[1]))[1]
        min_y = (min(eye_coords, key=lambda item: item[1]))[1]
        width = max_x - min_x
        height = max_y - min_y
        iris_x = iris_coords[0][0]
        iris_y = iris_coords[0][1]
        percent_x = (2 * iris_x - width - 2 * min_x) / width
        percent_y = (2 * iris_y - height - 2 * min_y) / height
        return percent_x, percent_y

    def process_gaze(self, right_x, right_y, left_x, left_y):
        """
        Asses gaze.
        :param right_x: x position of right iris.
        :param right_y: y position of right iris.
        :param left_x: x position of left iris.
        :param left_y: y position of left iris.
        :return: string value - gaze direction.
        """
        x = (right_x + left_x) / 2
        y = (right_y + left_y) / 2
        if y > 0.45:
            result = "down "
        elif y < 0.2:
            result = "up "
        else:
            result = ""

        if abs(x) > self.threshold and x > 0:
            result += "right"
        elif abs(x) > self.threshold and x < 0:
            result += "left"
        else:
            result += "center"
        return result

    def landmarks_detection(self, img_width, img_height, face_landmarks, ind):
        """
        Transform coordinates into pixels of image.
        :param img_width: width of an image.
        :param img_height: height of an image.
        :param face_landmarks: not transformed landmarks.
        :param ind: indexes of required points.
        :return: transformed coordinates.
        """
        mesh_coord = [(int(face_landmarks[i].x * img_width), int(face_landmarks[i].y * img_height)) for i in ind]
        return mesh_coord

    def gaze_detection(self, frames):
        """
        Calculate direction of eyes on each frame.
        :param frames: frames for processing.
        :return: list with string results for all frames.
        """
        result_list = []
        for frame in frames:
            image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
            results = self.detector.detect(image)
            frame_width = frame.shape[0]
            frame_height = frame.shape[1]
            try:
                face_landmarks = results.face_landmarks[0]
                left_iris_coords = self.landmarks_detection(frame_width, frame_height, face_landmarks, GazeDirection.LEFT_IRIS)
                right_iris_coords = self.landmarks_detection(frame_width, frame_height, face_landmarks, GazeDirection.RIGHT_IRIS)
                left_eye_coords = self.landmarks_detection(frame_width, frame_height, face_landmarks, GazeDirection.LEFT_EYE)
                right_eye_coords = self.landmarks_detection(frame_width, frame_height, face_landmarks, GazeDirection.RIGHT_EYE)
                right_x, right_y = self.count_displacement(right_eye_coords, right_iris_coords)
                left_x, left_y = self.count_displacement(left_eye_coords, left_iris_coords)
                res = self.process_gaze(right_x, right_y, left_x, left_y)
                result_list.append(res)
            except Exception as ex:
                continue
        return result_list


In [None]:
class Gestures:
    body_angles = [[16, 14, 12], [14, 12, 11], [15, 13, 11], [12, 11, 13],
                   [21, 15, 19], [19, 15, 17], [22, 16, 20], [20, 16, 18],
                   [18, 20, 16, 14], [17, 19, 15, 13], [11, 0, 12]]

    def __init__(self):
        self.body_res = {16: {'name': 'right elbow', 'res': []}, 14: {'name': 'right shoulder', 'res': []},
                         15: {'name': 'left elbow', 'res': []},
                         12: {'name': 'left shoulder', 'res': []},
                         21: {'name': 'left thumb', 'res': []},
                         19: {'name': 'left pinky', 'res': []}, 22: {'name': 'right thumb', 'res': []},
                         20: {'name': 'right pinky', 'res': []},
                         18: {'name': 'right wrist', 'res': []}, 17: {'name': 'left wrist', 'res': []},
                         11: {'name': 'head', 'res': []}}

    def get_vector_between_points(self, first_point, second_point):
        """
        Calculate vector between two points in 2d.
        :param first_point: list or array with 2 elements (x and y) - first point to calculate vector.
        :param second_point: list or array with 2 elements (x and y) - second point to calculate vector.
        :return: list wit x and y of calculated vector.
        """
        x1, y1 = first_point[0], first_point[1]
        x2, y2 = second_point[0], second_point[1]
        vector = np.array([x2, y2]) - np.array([x1, y1])
        return vector

    def angle_between_vectors(self, v1, v2):
        """
        Calculate angle in degrees between given vectors.
        :param v1: list or array with 2 elements (x and y) - first vector.
        :param v2: list or array with 2 elements (x and y) - second vector.
        :return: float value [0:360] - angle between v1 and v2.
        """
        dot_product = np.dot(v1, v2)
        norm_v1 = np.linalg.norm(v1)
        norm_v2 = np.linalg.norm(v2)
        cos_theta = dot_product / (norm_v1 * norm_v2)
        angle_rad = np.arccos(np.clip(cos_theta, -1.0, 1.0))
        angle_deg = np.degrees(angle_rad)

        # Check the angle of the sign and adjust it in the range from 0 to 360 degrees.
        if np.cross(v1, v2) < 0:
            angle_deg = 360 - angle_deg

        return angle_deg

    def min_angle_difference(self, angle1, angle2):
        """
        Get min differance between two angles.
        :param angle1: float value [0:360] - first value in degrees.
        :param angle2: float value [0:360] - second value in degrees.
        :return: float value [0:360] - min angle between two angles in closed circle.
        """
        diff1 = abs(angle1 - angle2)
        diff2 = 360 - diff1
        return min(diff1, diff2)

    def point_between(self, point1, point2):
        """
        Calculate point between 2 points in 2d.
        :param point1: landmark with x and y attributes - first point.
        :param point2: landmark with x and y attributes - second point.
        :return: list with x and y of point between 2 given points.
        """
        return [(point1.x + point2.x) / 2, (point1.y + point2.y) / 2]

    def calculate_angles(self, landmarks, mean_angle):
        """
        calculate the displacement of the joints between frames.
        :param landmarks: coordinates of the main joints.
        :param mean_angle: dictionary for calculation results.
        :return: dictionary with results.
        """
        for angles in Gestures.body_angles:
            if all(landmarks[angle].visibility >= 0.5 for angle in angles):
                point_second = [landmarks[angles[-1]].x, landmarks[angles[-1]].y]
                point_mid = [landmarks[angles[-2]].x, landmarks[angles[-2]].y]
                if len(angles) > 3:
                    point_first = self.point_between(landmarks[angles[0]], landmarks[angles[1]])
                else:
                    point_first = [landmarks[angles[0]].x, landmarks[angles[0]].y]
                v1 = self.get_vector_between_points(point_first, point_mid)
                v2 = self.get_vector_between_points(point_mid, point_second)
                angle = self.angle_between_vectors(v1, v2)
                if mean_angle[angles[0]]['prev'] is not None:
                    mean_angle[angles[0]]['res'] += self.min_angle_difference(angle, mean_angle[angles[0]]['prev'])
                    mean_angle[angles[0]]['count'] += 1
                else:
                    mean_angle[angles[0]]['prev'] = angle
            else:
                mean_angle[angles[0]]['prev'] = None
        return mean_angle

    def process_velocity(self, frames):
        """
        Count angle displacement for all frames.
        :param frames: frames to process.
        :return: dictionary with results for each joint.
        """
        mp_pose = mp.solutions.pose
        with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
            mean_angle = {}
            for angle in Gestures.body_angles:
                mean_angle[angle[0]] = {}
                mean_angle[angle[0]]['prev'] = None
                mean_angle[angle[0]]['count'] = 0
                mean_angle[angle[0]]['res'] = 0
            for image in frames:
                results = pose.process(image)
                try:
                    landmarks = results.pose_landmarks.landmark
                    mean_angle = self.calculate_angles(landmarks, mean_angle)
                except Exception as ex:
                    continue
            for angle in Gestures.body_angles:
                if mean_angle[angle[0]]['count'] > 0:
                    result = mean_angle[angle[0]]['res'] / mean_angle[angle[0]]['count']
                    self.body_res[angle[0]]['res'].append(round(result, 2))
                else:
                    self.body_res[angle[0]]['res'].append(0)

    def get_result(self):
        """
        Get result angles for body parts.
        :return: dictionary with body parts as keys and angles as values.
        """
        return {value['name']: value['res'] for key, value in self.body_res.items()}


In [None]:
class Perspective:
    def __init__(self):
        """
        Initialize model for detection.
        """
        self.detector = PoseDetector(staticMode=False,
                                modelComplexity=1,
                                smoothLandmarks=True,
                                enableSegmentation=False,
                                smoothSegmentation=True,
                                detectionCon=0.5,
                                trackCon=0.5)

    def point_between(self, point1, point2):
        """
        Calculate point between 2 points in 2d.
        :param point1: list with x and y of first point.
        :param point2: list with x and y of second point.
        :return: list with x and y of mid point.
        """
        return [(point1[0] + point2[0]) / 2, (point1[1] + point2[1]) / 2]

    def count_brightness(self, frames):
        """
        Asses lightning on frames.
        :param frames: list with frames to process.
        :return: string value - lightning.
        """
        dark = 0
        optimal = 0
        bright = 0
        for frame in frames:
            gray_image = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            mean_brightness = np.mean(gray_image)
            if mean_brightness < 100:
                dark += 1
            elif mean_brightness > 200:
                bright += 1
            else:
                optimal += 1
        dark /= len(frames)
        optimal /= len(frames)
        bright /= len(frames)
        if dark >= optimal and dark >= bright:
            return 0
        elif bright >= dark and bright >= optimal:
            return 2
        else:
            return 1

    def check_correct_pose(self, bounding_box, eye_coords, image_width, image_height):
        """
        Check if speaker in a right position.
        :param bounding_box: coordinates of the speakers bound box.
        :param eye_coords: eyes coordinates.
        :param image_width: width of an image.
        :param image_height: height of an image.
        :return: 1 or 0 - if position is correct.
        """
        x_center = image_width // 2
        y_third_line = image_height // 3
        x, y, x_len, y_len = bounding_box["bbox"]
        if abs(x + x_len / 2 - x_center) > 0.2 * x_center:
            return False

        # Check eye position according rule of the third.
        eye_x, eye_y = eye_coords
        if eye_y < y_third_line - 0.15 * image_height or eye_y > y_third_line + 0.15 * image_height:
            return False
        return True

    def count_angle(self, frames):
        """
        Count percent of incorrect frames.
        :param frames: frames to process.
        :return: percent of incorrect frames.
        """
        incorrect_pose = 0
        bbox_length = 0
        inc_index = []
        ind = 0
        for frame in frames:
            img = self.detector.findPose(frame, draw=False)
            lm_list, bbox_info = self.detector.findPosition(img, draw=False, bboxWithHands=False)
            right_coords = [lm_list[5][0], lm_list[5][1]]
            left_coords = [lm_list[2][0], lm_list[2][1]]
            height, width = frame.shape[:2]
            if not self.check_correct_pose(bbox_info, self.point_between(right_coords, left_coords), width, height):
                inc_index.append(ind)
                incorrect_pose += 1
            ind += 1
            length = bbox_info['bbox'][2] - bbox_info['bbox'][0]
            if length > bbox_length:
                bbox_length = length
        return incorrect_pose / len(frames), bbox_length, inc_index


In [None]:
class VideoSubsystem:
    acceptable_velocity = {'right elbow': [5, 50], 'left elbow': [5, 50], 'left shoulder': [2, 25],
                           'right shoulder': [2, 25], 'left thumb': [0, 30], 'left pinky': [3, 40],
                           'right thumb': [0, 30], 'right pinky': [3, 40], 'right wrist': [3, 40],
                           'left wrist': [3, 40], 'head': [0, 12]}

    def __init__(self, path, inappropriate_emotions, emotions=True, gesticulation=True, angle=True, gaze=True, clothes=True,
                 device='cpu', dist=5, acceptable_angle=0.6):
        self.fps = None
        self.inappropriate_emotions = inappropriate_emotions
        self.device = device
        self.video_path = path
        self.emotions = emotions
        self.gesticulation = gesticulation
        self.angle = angle
        self.gaze = gaze
        self.clothes = clothes
        self.dist = dist
        self.acceptable_angle = acceptable_angle

        path = "/content/drive/MyDrive/model_first.joblib"
        self.emotion_model = load(path)
        self.emotion_list = []
        self.emotion_inappropriate_percentage = []
        self.gesture_list = []
        self.angle_list = []
        self.gaze_list = []
        self.lightning = []
        self.angle_len = []
        self.inc_ind = []
        self.clothes_estimation = None

    def get_emotions(self):
        return self.emotion_list

    def get_gestures(self):
        return self.gesture_list

    def get_angle(self):
        return self.angle_list

    def get_gaze(self):
        return self.gaze_list

    def get_lightning(self):
        return self.lightning

    def get_angle_len(self):
        return self.angle_len

    def get_clothes_estimation(self):
        return self.clothes_estimation

    def get_incorrect_angle_ind(self):
        return self.inc_ind

    def get_inappropriate_emotion_percentage(self):
        return self.emotion_inappropriate_percentage
    @staticmethod
    def get_subarray(array, subset, ind):
        """
        Get subarray.
        :param array: array to get subarray from it.
        :param subset: number of elements in subarray.
        :param ind: index of array from which subarray starts.
        :return: subarray.
        """
        last_ind = min(ind + subset, len(array))
        return array[ind:last_ind]

    @staticmethod
    def calculate_percentage(percent_list):
        """
        Calculate percent of each element in the list.
        :param percent_list: list for calculating percents.
        :return: dictionary with elements of list as keys and percents as values.
        """
        total_frames = len(percent_list)
        percentage = {}
        for element in percent_list:
            if element in percentage.keys():
                percentage[element] += 1
            else:
                percentage[element] = 1
        for element in percentage.keys():
            percentage[element] = (percentage[element] / total_frames) * 100

        return percentage

    def process_emotions(self, frames):
        """
        Evaluate emotionality of video fragment.
        :param frames: list of frames for evaluation.
        :return: string value - emotionality.
        """
        total_frames = len(frames)
        emotion_class = VideoEmotions()
        emotion_results = []
        fps = int(self.fps)
        for i in range(0, total_frames, int(fps)):
            sec_frames = self.get_subarray(frames, fps, i)[::self.dist]
            emotions, scores = emotion_class.process_frames(sec_frames)
            percentages = VideoSubsystem.calculate_percentage(emotions)
            try:
                max_emotion = max(percentages, key=percentages.get)
                emotion_results.append(max_emotion)
            except Exception as ex:
                print(ex.args[0])
                emotion_results.append('emotion not determined')
        frequency = emotion_class.calculate_emotion_change_frequency(emotion_results)
        percentages = VideoSubsystem.calculate_percentage(emotion_results)
        features = [frequency]
        for element in ['Sadness', 'Disgust', 'Fear', 'Neutral', 'Happiness', 'Anger',
        'Contempt']:
            if element in percentages.keys():
                features.append(percentages[element])
            else:
                features.append(0.0)
        res = self.emotion_model.predict([features])[0]
        percent_res = 0.0
        for element in self.inappropriate_emotions:
            if element in percentages.keys():
                percent_res += percentages[element]
        return res, percent_res * 0.01

    def replace_values_with_condition(self, dictionary):
        """
        Change values for values in rating scale.
        :param dictionary: dictionary with unprocessed values.
        :return: dictionary with processed values.
        """
        for key, value in dictionary.items():
            min_val = VideoSubsystem.acceptable_velocity[key][0]
            max_val = VideoSubsystem.acceptable_velocity[key][1]
            for i in range(len(value)):
                if value[i] < min_val:
                    value[i] = '0'
                elif value[i] > max_val:
                    value[i] = '2'
                else:
                    value[i] = '1'
            dictionary[key] = value
        return dictionary

    def process_gesticulation(self, frames, duration=10):
        """
        Estimate velocity of the speaker.
        :param frames: list of frames for estimation.
        :param duration: number of seconds for estimation.
        :return: estimated velocity.
        """
        gesture = Gestures()
        total_frames = len(frames)
        fps = int(self.fps)
        for i in range(0, total_frames, fps):
            sec_frames = self.get_subarray(frames, fps, i)[::self.dist]
            gesture.process_velocity(sec_frames)
        res = gesture.get_result()
        res = self.replace_values_with_condition(res)
        result = []
        key = list(res.keys())[0]
        cycle = len(res[key])
        for ind in range(cycle):
            percent = []
            for key in res.keys():
                percent.append(res[key][ind])
            percentage = VideoSubsystem.calculate_percentage(percent)
            if '2' in percentage.keys():
                result.append(2)
            elif '0' in percentage.keys() and percentage['0'] > 70:
                result.append(0)
            else:
                result.append(1)
        all_percent = VideoSubsystem.calculate_percentage(result)
        if 2 in all_percent.keys():
            return 2
        elif 0 in all_percent.keys() and all_percent[0] > 70:
            return 0
        else:
            return 1

    def process_gaze(self, frames):
        """
        Calculate percent of incorrect gaze.
        :param frames: list of frames for processing.
        :return: float value - percent of incorrect frames.
        """
        model = GazeDirection()
        percent = model.gaze_detection(frames)
        percentages = VideoSubsystem.calculate_percentage(percent)
        # max_key = max(percentages, key=percentages.get)
        return (100 - percentages['center']) * 0.01

    def process_angle(self, frames):
        """
        Calculate incorrect angles.
        :param frames: list of frames for processing.
        :return: float value - percent of incorrect frames.
        """
        perspective = Perspective()
        brightness = perspective.count_brightness(frames[::self.dist])
        percent, length, inc_ind = perspective.count_angle(frames)
        return percent, length, brightness, inc_ind

    def process_clothes(self, frames):
        """
        Defines if clothes is appropriate.
        :param frames: list of frames for processing.
        :return: bool value if clothes is appropriate.
        """
        clothes = Clothes()
        return clothes.assess_appearance(frames)


    def process_video(self, duration=10):
        """
        Read for duration seconds and process frames.
        :param output_path: new path of processed video.
        :param duration: number of seconds to process in one cycle.
        :return: dictionary with results.
        """
        cap = cv2.VideoCapture(self.video_path)
        try:
            self.fps = math.ceil(cap.get(cv2.CAP_PROP_FPS))
            segment_duration = duration
            segment_frame_count = math.ceil(cap.get(cv2.CAP_PROP_FPS) * segment_duration)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            for i in tqdm(range(0, frame_count, segment_frame_count)):
                frames = []
                cap.set(cv2.CAP_PROP_POS_FRAMES, i)
                for j in range(segment_frame_count):
                    ret, frame = cap.read()
                    if not ret:
                        break
                    frames.append(frame)
                if self.emotions:
                    res, percent_res = self.process_emotions(frames)
                    self.emotion_list.append(res)
                    self.emotion_inappropriate_percentage.append(percent_res)
                if self.gesticulation:
                    res = self.process_gesticulation(frames)
                    self.gesture_list.append(res)
                if self.angle:
                    res, length, brightness, inc_ind = self.process_angle(frames)
                    self.angle_list.append(res)
                    self.angle_len.append(length)
                    self.lightning.append(brightness)
                    self.inc_ind.append(inc_ind)
                if self.gaze:
                    res = self.process_gaze(frames)
                    self.gaze_list.append(res)
                if self.clothes and self.clothes_estimation is None:
                    self.clothes_estimation = self.process_clothes(frames)
        finally:
            cap.release()
            cv2.destroyAllWindows()


## Speech Processing

In [None]:
class AutomaticSpeechRecognition:
    """
    Class for transcribing audio into text.
    Creates speech text, words time intervals, unidentified noise time intervals.
    """
    def __init__(self, path):
        """
        Initialization of speech processing class
        @param path: path to audio file
        """
        clip = mp_editor.AudioFileClip(path)
        self.path = path
        self.duration = clip.duration
        self.transcription = None

    def get_speech_recognition(self):
        """
        Translates audio to text, creates words lists with timestamps (with and without background noise)
        """
        model = whisper_timestamped.load_model("large")
        # whisper timestamped allows to receive timestamps for each word and sentence, as well as
        # noise timestamps
        audio = whisper_timestamped.load_audio(self.path)
        self.transcription = whisper_timestamped.transcribe(
            model,
            audio,
            language="ru",
            detect_disfluencies=True,
            remove_punctuation_from_words=False)
        correct_transcription = self.check_transcription()
        # creation of transcription without punctuation marks
        transcription = self.transcription["text"].lower()
        transcription = transcription.translate(str.maketrans('', '', string.punctuation))
        transcription = "".join([ch for ch in transcription if ch not in string.digits])
        cleaned_transcription = " ".join(transcription.split())
        word_arrays = self.get_words()
        return cleaned_transcription, word_arrays, correct_transcription

    def check_transcription(self):
        """
        Checks if transcription is correct (if there are word doubles at the end of transcription)
        @return: True if transcription is correct, False otherwise
        """
        words = self.transcription["text"].split()
        segments = self.transcription["segments"]
        end_idx = len(segments)
        # find first segment out of time range
        for i in range(len(segments)):
            if segments[i]["end"] > self.duration:
                end_idx = i
                break
        # checks if there is no segments out of time range
        if end_idx == len(segments):
            return True
        else:
            # count words out of time range
            extra_words = 0
            for i in range(end_idx, len(segments)):
                extra_words += len(segments[i]["text"].split())
            # transcription correction
            self.transcription["text"] = " ".join((self.transcription["text"].split())[:len(words) - extra_words])
            self.transcription["segments"] = self.transcription["segments"][:end_idx]
            return False

    def get_words(self):
        """
        Creates lists with all words (with background noise), words without noise and only noise
        @return: three lists with dicts of words and their timestamps
        """
        all_words, all_words_without_noise, noise = [], [], []
        for sentence in self.transcription["segments"]:
            for word in sentence["words"]:
                all_words.append(word)
                if word["text"] != "[*]":
                    all_words_without_noise.append(word)
                else:
                    noise.append((word["start"], word["end"]))
        return all_words_without_noise, noise


In [None]:
class BackgroundNoise:
    """
    Class for background noise detecting.
    """
    # boundary values for size of time window and maximal acceptable noise percentage
    params = {
        "noise_time_window": 30,  # size of time window to view noise percentage
        "noise_percentage": 0.45,  # maximal noise percentage
    }

    def __init__(self, noise):
        """
        Initialization of background noise analysis class
        @param noise: timestamps with background noise, list of two-element lists
        """
        self.noise = noise

    def get_high_noise_timestamps(self):
        """
        Searches most noisy periods with the help of floating window
        @return: Most noisy periods, list of two-element lists
        """
        high_noise_timestamps = []
        if len(self.noise) == 0:
            return high_noise_timestamps
        start_idx, end_idx = 0, 1
        noise_sum = self.noise[0][1] - self.noise[0][0]
        while end_idx < len(self.noise):
            # searches for minimal time window larger than boundary value
            if self.noise[end_idx][1] - self.noise[start_idx][0] < \
                    self.params["noise_time_window"]:
                noise_sum += self.noise[end_idx][1] - self.noise[end_idx][0]
                end_idx += 1
                continue
            # check if the percentage of noise is larger than parameter
            if noise_sum / (self.noise[end_idx][1] - self.noise[start_idx][0]) > \
                    self.params["noise_percentage"]:
                # if period intersects with previous one - they are united
                if len(high_noise_timestamps) > 0 and high_noise_timestamps[-1][1] > \
                        self.noise[start_idx][0]:
                    high_noise_timestamps[-1][1] = self.noise[end_idx][1]
                # otherwise, new time period is appended
                else:
                    high_noise_timestamps.append(
                        [self.noise[start_idx][0], self.noise[end_idx][1]])
            noise_sum -= (self.noise[start_idx][1] - self.noise[start_idx][0])
            start_idx += 1
        return high_noise_timestamps


In [None]:
class AudioEmotions:
    """
    Class for emotions detecting.
    Counts percentage of preferred emotions and percentage of neutral emotion.
    """
    def __init__(self, path, analyzed_segment_len, negative_emotions):
        """
        Initialization of emotion classification class
        @param path: path to audio file
        @param analyzed_segment_len: length of file segment to analyze separately
        @param negative_emotions: list of preferred emotions
        """
        self.path = path
        self.analyzed_segment_len = analyzed_segment_len
        self.negative_emotions = negative_emotions
        # paths for N-second sub clips
        path = os.path.abspath(os.path.dirname(__file__))
        self.subclip_path = os.path.abspath(os.path.join(path, "file_processing/processing.wav"))
        self.subclip_modified_path = os.path.abspath(os.path.join(path, "file_processing/processing2.wav"))
        # order of emotions in model
        self.order = ["happiness", "anger", "disgust", "neutral", "sadness", "enthusiasm"]

    def emotions_analysis(self):
        """
        Analyzes speech per N seconds (see init params) and provides emotions probabilities
        @return: lists with emotions probabilities
        """
        model = VoiceRecognizer(model=HuggingFaceModel.Voice.Wav2Vec2)
        clip = AudioFileClip(self.path)
        duration = clip.duration
        # number of file fragments to analyze
        number_of_segments = math.ceil(duration / self.analyzed_segment_len)
        negative_emotions_percentage = np.zeros(number_of_segments)
        neutral_emotion_percentage = np.zeros(number_of_segments)
        time = self.analyzed_segment_len
        for i in tqdm(range(number_of_segments)):
            # path to analyzed file fragment
            subclip = clip.subclip(i * time, min(i * time + time, duration))
            subclip.write_audiofile(self.subclip_path, logger=None)

            # sub clip preprocessing to convert stereo to mono
            self.audio_channels_processing()
            emotions_percentages = model.recognize(self.subclip_modified_path, return_single_label=False)
            # counting of preferred emotions percentage
            for idx, emotion in enumerate(self.order):
                if self.negative_emotions[idx]:
                    negative_emotions_percentage[i] += emotions_percentages[emotion]
                neutral_emotion_percentage[i] = emotions_percentages["neutral"]

        # deleting of intermediate files
        file_paths = [self.subclip_path, self.subclip_modified_path]
        for file_path in file_paths:
            if os.path.isfile(file_path):
                os.remove(file_path)
        return negative_emotions_percentage, neutral_emotion_percentage

    def audio_channels_processing(self):
        """
        Rewriting file to one channel if necessary
        """
        audio_file = wave.open(self.subclip_path)
        channels = audio_file.getnchannels()
        sound = AudioSegment.from_wav(self.subclip_path)
        if channels > 1:
            sound = sound.set_channels(1)
        # rewriting one channel file
        sound.export(self.subclip_modified_path, format="wav")


In [None]:
class FillerWordsAndPhrases:
    """
    Class for filler words and phrases detecting.
    Detects words and phrases from lists and as most common in speech.
    """
    # maximal acceptable percentages and lists of filler words
    params = {
        # multiplier for most common word or phrase occurrence to be compared with others
        "word_count_multiplier": 0.1,
        # minimal percentage for word or phrase to be considered common
        "occurrence_percentage": 0.0001,
        "parasites": ["просто", "вот", "ну", "короче", "типа", "пожалуй", "кстати", "вообще", "буквально", "скажем",
                      "блин", "допустим", "черт", "вроде", "круто", "прикинь", "прикиньте", "реально", "отпад",
                      "отпадно", "клево", "капец", "норм", "слушай", "конечно", "наверное", "вероятно", "кажется"],
        "parasite_phrases": ["так сказать", "как бы", "в натуре", "в общем", "в общемто", "в целом", "в принципе",
                             "как говорится", "как сказать", "на фиг", "то есть", "это самое", "как его", "типа того"]
    }

    def __init__(self, cleaned_transcription):
        """
        Initialization of filler words detection class
        @param cleaned_transcription: text transcription without punctuation marks
        """
        self.cleaned_transcription = cleaned_transcription

    def count_occurrences(self, min_len=5):
        """
        Counts two-words phrases occurrences
        @param min_len: minimal length in letters for phrase to be considered
        @return: list of two-element lists, each with phrase and its occurrence
        """
        pairs = dict()
        words = self.cleaned_transcription.split()
        for i in range(len(words) - 1):
            # create two-word phrases
            phrase = words[i] + ' ' + words[i + 1]
            if len(phrase) > min_len:
                # save phrases with acceptable length
                if phrase not in pairs:
                    pairs[phrase] = 0
                pairs[phrase] += 1
        phrases_from_list = {}

        # rewrite phrases from list into separate dictionary
        for phrase in self.params["parasite_phrases"]:
            if phrase in pairs:
                phrases_from_list[phrase] = pairs[phrase]
        phrase_dic = list(pairs.items())
        phrases = sorted(phrase_dic, key=lambda x: -x[1])
        return phrases, phrases_from_list

    def find_worst_phrases(self, phrases):
        """
        Takes most common phrases from all
        @param phrases: all two-word phrases
        @return: dictionary with key - phrases and value - their occurrences
        """
        num_words = len(self.cleaned_transcription)
        max_repeats = phrases[0][1]
        # if all collocations appear one time - there are no most common phrases
        if max_repeats == 1 or max_repeats / num_words < self.params["occurrence_percentage"]:
            return dict()
        # maximal deviation from most common word or phrase occurrence
        diff = round(max_repeats * self.params["word_count_multiplier"])
        worst_word_pairs = dict()
        # find phrases with small deviation from most common one
        for word_pair, cnt in phrases:
            if cnt >= max_repeats - diff and cnt / num_words >= self.params["occurrence_percentage"]:
                worst_word_pairs[word_pair] = cnt
        return worst_word_pairs

    def get_one_words(self):
        """
        Counts all filler words from params parasites
        @return: frequency dictionary with key - words and value - their occurrences
        """
        text_tokens = word_tokenize(self.cleaned_transcription)
        text_tokens = [token.strip() for token in text_tokens if token in set(self.params["parasites"])]
        text = nltk.Text(text_tokens)
        fdist = FreqDist(text)
        return fdist

    def find_worst_words(self, fdist):
        """
        Takes most common filler words from all
        @param fdist: frequency dictionary with key - words and value - their occurrences
        @return: dictionary with key - words and value - their occurrences
        """
        num_words = len(self.cleaned_transcription)
        if len(fdist) == 0:
            return dict()
        # most common word appearance
        max_repeats = fdist.most_common(1)[0][1]
        if max_repeats == 1 or max_repeats / num_words < self.params["occurrence_percentage"]:
            return dict()
        # maximal deviation from most common word or phrase occurrence
        diff = round(max_repeats * self.params["word_count_multiplier"])
        idx = 1
        # add words with high occurrence percentage
        while idx <= len(fdist) and fdist.most_common(idx)[-1][1] >= max_repeats - diff and \
                fdist.most_common(idx)[-1][1] / num_words >= self.params["occurrence_percentage"]:
            idx += 1
        worst_words = dict(fdist.most_common(idx - 1))
        return worst_words

    def get_filler_words_final(self):
        """
        Concatenates all words and phrases into two dictionaries - all and most common filler words
        @return: two dictionaries with words / phrases and their occurrences
        """
        # find all and most common / listed phrases
        phrases, phrases_from_list = self.count_occurrences()
        worst_phrases = self.find_worst_phrases(phrases)

        # find all and most common / listed words
        fdist = self.get_one_words()
        worst_words = self.find_worst_words(fdist)

        # dicts with all and most common / list words and phrases
        total_dict = dict(worst_phrases) | dict(fdist) | phrases_from_list
        worst_dict = dict(worst_phrases) | dict(worst_words)
        return total_dict, worst_dict


In [None]:
class Intelligibility:
    """
    Class for intelligibility detecting.
    Uses info from background noise analysis and high speech rate timestamps.
    """
    def __init__(self, path, all_words_without_noise, noise, analyzed_segment_len):
        """
        Initialization of background noise analysis class
        @param path: path to audio file
        @param all_words_without_noise: list of all words and their timestamps
        @param noise: timestamps with background noise, list of two-element lists
        @param analyzed_segment_len: length of file segment to analyze separately
        """
        self.noise = noise
        self.path = path
        self.all_words_without_noise = all_words_without_noise
        self.analyzed_segment_len = analyzed_segment_len

    def stoi_index(self):
        """
        Counting short time objective intelligibility index per file fragment
        @return: list with STOI indexes for each fragment
        """
        # paths for file fragments
        subclip_path = "processing.wav"
        subclip_modified_path = "processing2.wav"

        clip = AudioFileClip(self.path)
        duration = clip.duration
        # number of file segments to analyze
        number_of_segments = math.ceil(duration / self.analyzed_segment_len)
        indexes = np.zeros(number_of_segments)
        for i in range(number_of_segments):
            # file fragment (checks for len not out of file length)
            subclip = clip.subclip(i * self.analyzed_segment_len,
                                   min((i + 1) * self.analyzed_segment_len, clip.duration))
            # it is ineffective to analyze too short fragments
            if subclip.duration < 3:
                indexes[i] = 0.5
                continue
            subclip.write_audiofile(subclip_path, logger=None)
            data, rate = librosa.load(subclip_path)
            # cleaning degraded speech signal
            reduced_noise = nr.reduce_noise(y=data, sr=rate, thresh_n_mult_nonstationary=2, stationary=False)
            wavf.write(subclip_modified_path, rate, reduced_noise)
            # loading signal info
            clean, fs = librosa.load(subclip_modified_path)
            base, fs = librosa.load(subclip_path)
            # counting and saving STOI indexes
            index = stoi(clean, base, fs, extended=False)
            indexes[i] = round(index, 3)

        # deleting intermediate files
        file_paths = [subclip_path, subclip_modified_path]
        for file_path in file_paths:
            if os.path.isfile(file_path):
                os.remove(file_path)
        return indexes

    def indirect_features(self):
        """
        Analyses intelligibility of speech
        @return: intervals with high speech and high levels of background noise
        """
        # timestamps with fast speech rate
        speech_rate = SpeechRate(self.all_words_without_noise)
        _, fast_intervals = speech_rate.find_incorrect_speech_rate_intervals()
        # timestapms with high background noise
        noisy_intervals = BackgroundNoise(self.noise).get_high_noise_timestamps()

        return fast_intervals, noisy_intervals

    def get_intelligibility_features(self):
        """
        Final method for aggregating file info
        @return: lists with STOI indexes, intervals with high speech and high levels of background noise
        """
        indexes = self.stoi_index()
        fast_intervals, noisy_intervals = self.indirect_features()
        return indexes, fast_intervals, noisy_intervals


In [None]:
class SpeechRate:
    """
    Analyses speech rate, searches for fast and slow speech rate intervals.
    """
    # border parameters for analysis
    params = {
        # size of time window to view pauses
        "pause_time_window": 30,
        # minimal noise percentage
        "pause_percentage": 0.35,
        # size of time window to speech rate
        "speech_rate_time_window": 60,
        # minimal number of words in time window for normal speech rate
        "speech_rate_min_word_count": 60,
        # maximal number of words in time window for normal speech rate
        "speech_rate_max_word_count": 140,
        # allowed pauses between words
        "rules": {
            "word": 0.5,
            "punct_mark": 0.75,
            ".": 1,
            "?": 5,
            "!": 3
        },
    }

    def __init__(self, all_words_without_noise):
        """
        Initialization of speech rate analysis class
        @param all_words_without_noise: list of dicts with words and their start and end timestamps
        """
        self.all_words_without_noise = all_words_without_noise

    def find_pauses(self):
        """
        Finds all pauses longer than allowed
        @return: list of two-element lists with pauses timestamps
        """
        rules = self.params["rules"]
        pauses = []
        start_idx = 0
        end_idx = 1
        while end_idx < len(self.all_words_without_noise) - 1:
            silence_start = self.all_words_without_noise[start_idx]["end"]
            silence_end = self.all_words_without_noise[end_idx]["start"]
            # detecting pause type
            if self.all_words_without_noise[start_idx]["text"][-1].isalpha():
                pause_type = rules["word"]
            elif self.all_words_without_noise[start_idx]["text"][-1] in rules:
                pause_type = rules[self.all_words_without_noise[start_idx]["text"][-1]]
            else:
                pause_type = rules["punct_mark"]
            # checking with border value (depends on pause type)
            if silence_end - silence_start > pause_type:
                pauses.append([silence_start, silence_end])
            start_idx = end_idx
            end_idx += 1
        return pauses

    def find_pause_intervals(self, pauses):
        """
        Searches periods with high pauses percentage with the help of floating window
        @param pauses: pauses intervals, list of two-element lists
        @return: list of two-element lists with pause intervals timestamps
        """
        intervals = []
        if len(pauses) == 0:
            return intervals
        start_idx, end_idx = 0, 0
        # current pause length
        summary = pauses[0][1] - pauses[0][0]
        while end_idx < len(pauses):
            while end_idx < len(pauses) - 1 and pauses[end_idx][1] - pauses[start_idx][0] < \
                    self.params["pause_time_window"]:
                end_idx += 1
                summary += pauses[end_idx][1] - pauses[end_idx][0]
            # break if file end is reached
            if pauses[end_idx][1] - pauses[start_idx][0] < self.params["pause_time_window"]:
                break
            # check if the percentage of pauses is larger than parameter
            if summary / (pauses[end_idx][1] - pauses[start_idx][0]) > self.params["pause_percentage"]:
                # if period intersects with previous one - they are united
                if len(intervals) > 0 and intervals[-1][-1] > pauses[start_idx][0]:
                    intervals[-1][-1] = pauses[end_idx][1]
                else:
                    intervals.append([pauses[start_idx][0], pauses[end_idx][1]])
            # delete first pause, move interval start to next word
            summary -= pauses[start_idx][1] - pauses[start_idx][0]
            start_idx += 1
        return intervals

    def find_incorrect_speech_rate_intervals(self):
        """
        Searches intervals with too fast or slow speech rate
        @return: two lists with two-element list each - periods with too fast or slow speech rate
        """
        fast_intervals = []
        slow_intervals = []
        word_count = 1
        start = self.all_words_without_noise[0]["start"]
        end = self.all_words_without_noise[0]["end"]
        start_idx = 0
        end_idx = 1
        while end_idx < len(self.all_words_without_noise):
            # add word if time window is smaller than border value
            if end - start < self.params["speech_rate_time_window"]:
                end = self.all_words_without_noise[end_idx]["end"]
                end_idx += 1
                word_count += 1
            else:
                # if word count is too small or too large - append time interval to corresponding list
                if word_count < self.params["speech_rate_min_word_count"]:
                    # unite intervals if necessary
                    if len(slow_intervals) > 0 and slow_intervals[-1][1] >= start:
                        slow_intervals[-1][1] = end
                    else:
                        slow_intervals.append([start, end])
                elif word_count > self.params["speech_rate_max_word_count"]:
                    # unite intervals if necessary
                    if len(fast_intervals) > 0 and fast_intervals[-1][1] >= start:
                        fast_intervals[-1][1] = end
                    else:
                        fast_intervals.append([start, end])
                # remove first word from interval
                start_idx += 1
                start = self.all_words_without_noise[start_idx]["start"]
                word_count -= 1
        return slow_intervals, fast_intervals

    def get_intervals(self):
        """
        get slow intervals in two formats - high pauses percentage and low speech rate
        @return:
        """
        speech_rate_results, _ = self.find_incorrect_speech_rate_intervals()
        pauses = self.find_pauses()
        pause_intervals = self.find_pause_intervals(pauses)
        return speech_rate_results, pause_intervals

    def unite_slow_speech_rate_intervals(self):
        """
        Unites two lists of intervals: with pauses and with slow speech rate
        @return: list of two-element lists with slow speech rate intervals timestamps
        """
        speech_rate_results, _ = self.find_incorrect_speech_rate_intervals()
        pauses = self.find_pauses()
        pause_intervals = self.find_pause_intervals(pauses)
        final_intervals = []
        speech_rate_idx, pause_idx = 0, 0
        while speech_rate_idx < len(speech_rate_results) and pause_idx < len(pause_intervals):
            sr_start, sr_end = speech_rate_results[speech_rate_idx]
            pause_start, pause_end = pause_intervals[pause_idx][0], pause_intervals[pause_idx][1]
            if sr_start <= pause_start:
                if sr_end <= pause_start:
                    speech_rate_idx += 1
                elif pause_start < sr_end <= pause_end:
                    final_intervals.append([pause_start, sr_end])
                    speech_rate_idx += 1
                else:
                    final_intervals.append([pause_start, pause_end])
                    pause_idx += 1
            elif pause_start <= sr_start <= pause_end:
                if sr_end <= pause_end:
                    final_intervals.append([sr_start, sr_end])
                    speech_rate_idx += 1
                else:
                    final_intervals.append([sr_start, pause_end])
                    pause_idx += 1
            else:
                pause_idx += 1
        return final_intervals


In [None]:
class SpeechProcessingSubsystem:
    def __init__(self, path, negative_emotions_bool, analyzed_segment_len):
        """
        Initialization of speech processing class
        @param path: path to video file
        @param negative_emotions_bool: list of unwanted emotions (set by user)
        @param analyzed_segment_len: length of file segment to analyze separately
        """
        # rewrite video to audio file
        clip = mp_editor.VideoFileClip(path)
        audio_path = path[:path.rfind('.')] + '.wav'
        clip.audio.write_audiofile(audio_path, logger=None)
        self.path = audio_path
        # fields for words and noise timestamps
        self.cleaned_transcription = None
        self.all_words_without_noise = None
        self.noise = None
        self.duration = clip.duration
        self.analyzed_segment_len = analyzed_segment_len
        self.negative_emotions_bool = negative_emotions_bool

    def speech_recognition(self):
        """
        Performs ASR process
        """
        speech_recogniser = AutomaticSpeechRecognition(self.path)
        cleaned_transcription, word_arrays, correct_transcription = \
            speech_recogniser.get_speech_recognition()
        # fill class params with ASR results
        self.cleaned_transcription = cleaned_transcription
        self.all_words_without_noise = word_arrays[0]
        self.noise = word_arrays[1]

    @staticmethod
    def unite_intervals(intervals_1, intervals_2):
        """
        Unite two time frames intervals
        @param intervals_1: first list of intervals
        @param intervals_2: second list of intervals
        @return: united list of intervals
        """
        final_intervals = []
        # indexes to indexing through lists
        first_idx, second_idx = 0, 0
        while first_idx < len(intervals_1) and second_idx < len(intervals_2):
            interval_1_start, interval_1_end = intervals_1[first_idx]
            interval_2_start, interval_2_end = intervals_2[second_idx][0], intervals_2[second_idx][1]
            # if first interval's time period is earlier
            if interval_1_start <= interval_2_start:
                # first interval's time period is inside second's
                if interval_1_end <= interval_2_start:
                    first_idx += 1
                elif interval_2_start < interval_1_end <= interval_2_end:
                    final_intervals.append([interval_2_start, interval_1_end])
                    first_idx += 1
                else:
                    final_intervals.append([interval_2_start, interval_2_end])
                    second_idx += 1
            # if second interval's time period is earlier
            elif interval_2_start <= interval_1_start <= interval_2_end:
                if interval_1_end <= interval_2_end:
                    final_intervals.append([interval_1_start, interval_1_end])
                    first_idx += 1
                else:
                    final_intervals.append([interval_1_start, interval_2_end])
                    second_idx += 1
            else:
                second_idx += 1
        return final_intervals

    def periods_to_fractions(self, intervals, length):
        """
        Saves percentages of intervals per analyzed file fragment length
        @param intervals: time intervals of any kind
        @param length: result's list length
        @return: list with fractions (percentages) of occurrence
        """
        res = np.zeros(length)
        for i in intervals:
            fraction = (i[1] - i[0]) / self.analyzed_segment_len
            idx = int(i[0] // self.analyzed_segment_len)
            res[idx] = round(res[idx] + fraction, 3)
        return res

    def get_fraction(self, timestamps):
        """
        Counts timestamps proportion of some event
        @param timestamps: time periods of some event
        @return: timestamps proportion of some event
        """
        duration = 0
        for time_period in timestamps:
            duration += time_period[1] - time_period[0]
        return duration / self.duration

    def get_fractions_from_intervals(self, intervals):
        """
        Transform random length intervals to N-second fractions
        @param intervals: intervals of some event
        @return: list of fraction per file fragment
        """
        length = math.ceil(self.duration / self.analyzed_segment_len)
        fixed_intervals = [[i * self.analyzed_segment_len, (i + 1) * self.analyzed_segment_len] for i in range(length)]
        united_intervals = self.unite_intervals(intervals, fixed_intervals)
        fractions = self.periods_to_fractions(united_intervals, len(fixed_intervals))
        return fractions

    def get_emotionality(self):
        """
        Analyses emotionality of file
        @return: list of lists of emotions probabilities and time period per which emotions are defined
        """
        audio_emotions = AudioEmotions(self.path, self.analyzed_segment_len, self.negative_emotions_bool)
        negative_emotions_fractions, neutral_emotion_fractions = audio_emotions.emotions_analysis()
        return negative_emotions_fractions, neutral_emotion_fractions

    def get_filler_words(self):
        """
        Analyses presence of filler words
        @return: dicts with all filler words and phrases and with most common ones
        """
        filler_words = FillerWordsAndPhrases(self.cleaned_transcription)
        all_filler_words_dict, worst_words = filler_words.get_filler_words_final()
        return all_filler_words_dict, worst_words

    def get_speech_rate(self):
        """
        Analyses speech rate of speech
        @return: intervals with slow speech rate and their percentage of file duration
        """
        speech_rate = SpeechRate(self.all_words_without_noise)
        speech_rate_results, pause_intervals = speech_rate.get_intervals()
        intervals = self.unite_intervals(speech_rate_results, pause_intervals)
        fractions = self.get_fractions_from_intervals(intervals)
        return intervals, fractions, self.get_fraction(intervals)

    def get_background_noise(self):
        """
        Analyses background noise presence
        @return: intervals with high background noise and their percentage of file duration
        """
        # collect high background noise intervals
        background_noise = BackgroundNoise(self.noise)
        high_noise_intervals = background_noise.get_high_noise_timestamps()
        # transform to fractions for each file fragment
        high_noise_fractions = self.get_fractions_from_intervals(high_noise_intervals)
        high_noise_fractions = np.array(high_noise_fractions)

        # collect STOI indexes
        intelligibility = Intelligibility(self.path, self.all_words_without_noise, self.noise,
                                          self.analyzed_segment_len)
        indexes = intelligibility.stoi_index()
        # transform to fractions for each file fragment
        fractions = (high_noise_fractions + 1 - indexes) / 2
        return high_noise_intervals, fractions, self.get_fraction(high_noise_intervals)

    def get_intelligibility(self):
        """
        Analyses intelligibility of speech
        @return: approximate intelligibility per file fragment and summary intelligibility
        """
        # collect basic intelligibility measures
        intelligibility = Intelligibility(self.path, self.all_words_without_noise, self.noise,
                                          self.analyzed_segment_len)
        indexes, fast_intervals, noisy_intervals = intelligibility.get_intelligibility_features()
        # transform to fractions on whole file
        fast_fraction = self.get_fraction(fast_intervals)
        noisy_fraction = self.get_fraction(noisy_intervals)
        index_fraction = np.average(indexes)
        # transform to fractions per file fragment
        noisy_fractions = np.array(self.get_fractions_from_intervals(noisy_intervals))
        fast_fractions = np.array(self.get_fractions_from_intervals(fast_intervals))

        # count average
        negative_fractions = (2 * noisy_fractions + fast_fractions + 2 * (1 - indexes)) / 5
        negative_fraction = (fast_fraction + 2 * noisy_fraction + 2 * (1 - index_fraction)) / 5
        return negative_fractions, negative_fraction


## Final

In [None]:
CONSTANTS = {
    "clean_speech": (0.3, 0.8),
    "speech_rate": (0.3, 0.6),
    "background_noise": (0.3, 0.6),
    "intelligibility": (0.3, 0.6),
    "clothes": (0.5, 1),
    "gestures": (0, 1),
    "angle": (0.3, 0.6),
    "glances": (0.6, 1),
    "emotionality": (0.3, 0.6),
    "neutral_emotionality_official": (0.2, 0.6),
    "neutral_emotionality_nonofficial": (0.2, 0.6),
}

ORDER = [
    "background_noise",
    "speech_rate",
    "emotionality",
    "intelligibility",
    "gestures",
    "glances"
]

DRAW_VALUES = {
    "speech_rate": {
        0: "Оптимальный темп речи",
        1: "Немного медленный темп речи",
        2: "Слишком медленный темп речи"
    },
    "background_noise": {
        0: "Нет фонового шума",
        1: "Небольшой фоновый шум",
        2: "Сильный фоновый шум"
    },
    "intelligibility": {
        0: "Речь совсем неразборчива",
        1: "Речь немного неразборчива",
        2: "Речь полностью разборчива",
    },
    "gestures": {
        0: 'Неактивная жестикуляция',
        1: 'Оптимальная жестикуляция',
        2: 'Активная жестикуляция',
    },
    "glances": {
        0: None,
        1: 'Вы часто отводите взгляд'
    },
    "emotionality": {
        0: "Преимущественно желаемые эмоции",
        1: "Не полностью желаемые эмоции",
        2: "Не желаемые эмоции",
    },
    "lightning": {
        0: 'Слишком темное освещение',
        1: 'Оптимальное освещение',
        2: 'Слишком яркое освещение',
    }
}

In [None]:
class FileProcessingSystem:
    """
    Class for file analysis
    """
    def __init__(self, file, flags, negative_emotions_bool, preferred_gestures_bool, analyzed_segment_len):
        """
        Initializing of file and its params
        @param file: FileInfo instance to analyze
        @param analyzed_segment_len: length of one file fragment to analyze
        @param language_flag: text language flag (for recommendations and statistics)
        """
        self.file_path = file
        self.analyzed_segment_len = analyzed_segment_len
        self.flags = flags
        emotions = ["Happiness", "Anger", "Disgust", "Neutral", "Sadness", "Surprise"]
        negative_emotions = []
        for i, flag in enumerate(negative_emotions_bool):
            if flag:
                negative_emotions.append(emotions[i])
        self.negative_emotions_bool = negative_emotions_bool
        self.preferred_gestures_bool = preferred_gestures_bool
        self.timestamps = {}
        self.computer_vision = VideoSubsystem(self.file_path, negative_emotions, emotions=flags["emotionality"],
                                              gesticulation=flags["gestures"], angle=flags["angle"],
                                              gaze=flags["glances"], clothes=flags["clothes"])
        self.computer_vision.process_video(duration=analyzed_segment_len)
        self.speech_processing = SpeechProcessingSubsystem(self.file_path, negative_emotions_bool,
                                                           analyzed_segment_len=analyzed_segment_len)


    def save_timestamps_to_db(self, timestamps, type_choice):
        """
        Saves timestamps of low speech rate or high background noise to database
        @param timestamps: periods to be saved
        @param type_choice: 0 for background noise, 1 for speech rate
        """
        periods = []
        for time_period in timestamps:
            start_seconds, end_seconds = round(time_period[0]), round(time_period[1])
            # transform seconds to time type
            start = time(hour=start_seconds // 3600, minute=start_seconds // 60, second=start_seconds % 60)
            end = time(hour=end_seconds // 3600, minute=end_seconds // 60, second=end_seconds % 60)
            periods.append((start, end))
        return periods

    def get_transcription(self):
        """
        Translates and saves file transcription
        """
        self.speech_processing.speech_recognition()
        return self.speech_processing.cleaned_transcription

    def get_emotionality(self):
        """
        Gets emotionality from audio and video subsystems, unites them and saves neutral emotion fraction
        """
        video_emotions = self.computer_vision.get_inappropriate_emotion_percentage()
        video_neutral_emotions = self.computer_vision.get_emotions()

        try:
            audio_emotions, audio_neutral_emotions = self.speech_processing.get_emotionality()
        except Exception as e:
            audio_emotions = self.computer_vision.get_inappropriate_emotion_percentage()
            audio_neutral_emotions = self.computer_vision.get_emotions()
        audio_emotions = np.array(audio_emotions)
        video_emotions = np.array(video_emotions)
        incorrect_emotions_percentage = (2 * video_emotions + audio_emotions) / 3
        incorrect_emotions_percentage = np.round(np.array(incorrect_emotions_percentage), 3)
        emotions_fraction = round(np.sum(incorrect_emotions_percentage) / len(incorrect_emotions_percentage), 3)

        neutral_emotions = (np.array(video_neutral_emotions) + np.array(audio_neutral_emotions)) / 2
        neutral_emotions_fraction = round(np.sum(neutral_emotions) / len(neutral_emotions), 3)
        self.timestamps["emotionality"] = incorrect_emotions_percentage
        # доля нейтральных эмоций во всем видео, доля нежелательных эмоций во всем видео, доли нежелательных эмоций в каждом отрезке
        return neutral_emotions_fraction, emotions_fraction, incorrect_emotions_percentage

    def get_filler_words(self):
        """
        Gets filler words and phrases, saves them and their count per minute
        """
        all_filler_words, worst_filler_words = self.speech_processing.get_filler_words()

        overall_count = sum(list(all_filler_words.values()))
        words_per_minute_percentage = round((overall_count / (self.speech_processing.duration / 60)) / 10, 5)
        # доля слов-паразитов во всем видео, все слова-паразиты и их встречаемость, самые частые слова-паразиты и их встречаемость
        return words_per_minute_percentage, all_filler_words, worst_filler_words

    def get_speech_rate(self):
        """
        Gets and saves intervals with slow speech rate and their percentage
        """
        intervals, fractions, final_fraction = self.speech_processing.get_speech_rate()
        self.timestamps["speech_rate"] = fractions
        # доля с низким темпом речи во всем видео, доли с низким темпом речи в каждом отрезке, интервалы с низким темпом речи
        return final_fraction, fractions, intervals

    def get_background_noise(self):
        """
        Gets and saves intervals with high background noise and their percentage
        """
        intervals, fractions, final_fraction = self.speech_processing.get_background_noise()
        self.timestamps["background_noise"] = fractions
        # доля с высоким фоновым шумом во всем видео, доли с высоким фоновым шумом в каждом отрезке, интервалы с высоким фоновым шумом
        return final_fraction, fractions, intervals

    def get_intelligibility(self):
        """
        Gets and saves intelligibility estimation
        """
        negative_fractions, negative_index = self.speech_processing.get_intelligibility()
        fractions = np.round(1 - negative_fractions, 3)
        self.timestamps["intelligibility"] = fractions
        # разборчивость речи во всем видео, разборчивость речи в каждом отрезке
        return 1 - negative_index, fractions

    def get_incorrect_angle(self):
        """
        Gets and saves incorrect angle percentage
        """
        incorrect_angle_fractions = self.computer_vision.get_angle()
        incorrect_angle_fractions = np.round(np.array(incorrect_angle_fractions), 3)
        incorrect_angle = round(np.sum(incorrect_angle_fractions) / len(incorrect_angle_fractions), 3)
        # доля некорректного ракурса во всем видео, доля некорректного ракурса в каждом отрезке
        return incorrect_angle, incorrect_angle_fractions

    def get_incorrect_glances(self):
        """
        Gets and saves incorrect glances percentage
        """
        incorrect_glance_fractions = self.computer_vision.get_gaze()
        incorrect_glance_fractions = np.round(np.array(incorrect_glance_fractions), 3)
        incorrect_glance = round(np.sum(incorrect_glance_fractions) / len(incorrect_glance_fractions), 3)
        self.timestamps["glances"] = incorrect_glance_fractions
        # доля некорректного направления взгляда во всем видео, доля некорректного направления взгляда в каждом отрезке
        return incorrect_glance, incorrect_glance_fractions

    def get_gestures(self):
        """
        Gets and saves gesticulation level
        """
        gestures = self.computer_vision.get_gestures()
        gestures = np.round(np.array(gestures), 3)
        final_gesture_fraction = round(np.sum(gestures) / len(gestures), 3)
        self.timestamps["gestures"] = gestures
        # активность жестикуляции во всем видео, активность жестикуляции в каждом отрезке
        return final_gesture_fraction, gestures

    def get_clothes(self):
        """
        Gets and saves clothes suitability
        """
        clothes = self.computer_vision.get_clothes_estimation()
        # корректность одежды (True - одежда подходит)
        return clothes

    def draw(self):
        """
        Draw analysis result signatures on video file
        """
        print(self.timestamps)
        # indexes of best values for each parameter
        optimal_indexes = {
            "background_noise": 0,
            "speech_rate": 0,
            "emotionality": 0,
            "intelligibility": 2,
            "gestures": self.preferred_gestures_bool,
            "glances": 0,
            "lightning": 0,
        }
        lst = ORDER
        # texts to put into file
        text_values = []
        # boolean values (True - optimal) for text color
        boolean_flags = []
        for period_index, name in enumerate(lst):
            if name in self.timestamps:
                text_values.append([])
                boolean_flags.append([])
                # get grades for parameter for each file fragment
                res = self.timestamps[name]
                print(name, res)
                # transform grade into text
                for value in res:
                    text_idx = 0
                    if value > CONSTANTS[name][1]:
                        text_idx = 2
                    elif value > CONSTANTS[name][0]:
                        text_idx = 1
                    text = DRAW_VALUES[name][text_idx]
                    text_values[-1].append(text)
                    # append text color
                    if name == "gestures":
                        boolean_flags[-1].append(text_idx in optimal_indexes[name])
                    else:
                        boolean_flags[-1].append(text_idx == optimal_indexes[name])

        # append values on lightning if possible
        lightning_numbers = self.computer_vision.get_lightning()
        if len(lightning_numbers) > 0:
            text_values.append([])
            boolean_flags.append([])
            for val in lightning_numbers:
                text_values[-1].append(DRAW_VALUES["lightning"][val])
                boolean_flags[-1].append(val == 1)

        draw_res = DrawResults(self.file_path, dist=self.analyzed_segment_len)
        # path for temporary file
        temp_path = self.file_path[:self.file_path.rfind('.')] + '_temp.' + \
                       self.file_path[self.file_path.rfind('.')+1:]
        # file is saved without noise
        print(text_values)
        print(boolean_flags)
        draw_res.draw(temp_path, text_values, boolean_flags,
                      self.computer_vision.get_angle_len(), self.computer_vision.get_incorrect_angle_ind())

        # unite video and audio
        output = mp_editor.VideoFileClip(temp_path)
        painted_path = self.file_path[:self.file_path.rfind('.')] + '_painted.' + self.file_path[self.file_path.rfind('.')+1:]
        final_duration = output.duration
        output_audio = mp_editor.VideoFileClip(self.file_path).audio.subclip(0, final_duration)
        output.audio = output_audio
        output.write_videofile(painted_path)
        return painted_path


In [None]:
flags = {"emotionality": True, "gestures": True, "angle": True, "glances": True, "clothes": True}
negative_emotions = [False, True, True, False, False, False] # Happiness, Anger, Disgust, Neutral, Sadness, Surprise
preferred_gestures = [True, True, False] # inactive, medium, active

In [None]:
processing = FileProcessingSystem("chess_short.mp4", flags, negative_emotions, preferred_gestures, 10)

  0%|          | 0/12 [00:00<?, ?it/s]

Downloading enet_b0_8_best_afew from https://github.com/HSE-asavchenko/face-emotion-recognition/blob/main/models/affectnet_emotions/enet_b0_8_best_afew.pt?raw=true





/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)
max() arg is an empty sequence




Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth

  0%|          | 0.00/83.3M [00:00<?, ?B/s][A
 10%|▉         | 8.12M/83.3M [00:00<00:00, 85.0MB/s][A
 28%|██▊       | 23.2M/83.3M [00:00<00:00, 128MB/s] [A
 47%|████▋     | 39.4M/83.3M [00:00<00:00, 146MB/s][A
 64%|██████▍   | 53.4M/83.3M [00:00<00:00, 137MB/s][A
100%|██████████| 83.3M/83.3M [00:00<00:00, 150MB/s]
  8%|▊         | 1/12 [00:31<05:48, 31.70s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 17%|█▋        | 2/12 [01:00<04:57, 29.79s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 25%|██▌       | 3/12 [01:29<04:25, 29.48s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 33%|███▎      | 4/12 [01:58<03:55, 29.41s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 42%|████▏     | 5/12 [02:27<03:24, 29.24s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 50%|█████     | 6/12 [02:57<02:56, 29.42s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 58%|█████▊    | 7/12 [03:27<02:28, 29.63s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 67%|██████▋   | 8/12 [03:57<01:58, 29.65s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 75%|███████▌  | 9/12 [04:25<01:27, 29.16s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)
max() arg is an empty sequence
max() arg is an empty sequence




 83%|████████▎ | 10/12 [04:53<00:57, 28.81s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




 92%|█████████▏| 11/12 [05:20<00:28, 28.43s/it]

/root/.hsemotion/enet_b0_8_best_afew.pt Compose(
    Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
    ToTensor()
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
)




100%|██████████| 12/12 [05:35<00:00, 27.97s/it]


In [None]:
processing.get_transcription()

100%|█████████████████████████████████████| 2.87G/2.87G [00:37<00:00, 81.7MiB/s]
100%|██████████| 11504/11504 [00:47<00:00, 241.64frames/s]


'друзья сейчас я покажу вам самую странную и самую смешную партию в истории шахмат в этой партии человек смог обыграть сверхсильный искусственный интеллект итак мы переносимся в год уже несколько лет компьютеры бьют нещадно сильнейших гроссмейстеров мира каспаров в свое время проиграл уже прошло много лет с тех пор в году майкл адамс в свое время тоже топ топ шахматист мира сыграл матч с шахматным движком hydra и проиграл ему на полочка то есть всего одну ничью в партиях сделал все остальные проиграл и теперь уже год движки стали еще сильнее и самый сильный движок называется рыбка собственно в честь рыбы у него рейтинг около самый сильный шахматист на этот момент имеет рейтинг около то есть на пунктов меньше абсолютно без шансов проигрывает все партии максимум когданибудь может сделать ничью и тут на арену выходит хикару накамура ныне легендарный стример топ шахматист мира тогда он был еще помоложе но уже был легендой интернетблицы он играл очень много в блиц на платформе icc internet 

In [None]:
processing.get_emotionality()

(0.289,
 0.075,
 array([0.1, 0.2, 0.1, 0. , 0. , 0.1, 0. , 0.1, 0.2, 0.1, 0. , 0. ]))

In [None]:
processing.get_filler_words()

(0.26078, {'то есть': 3, 'вот': 1, 'просто': 1}, {'то есть': 3})

In [None]:
processing.get_speech_rate()

(0.0, array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]), [])

In [None]:
processing.get_background_noise()

(0.0,
 array([0.0305, 0.034 , 0.0315, 0.031 , 0.0295, 0.0305, 0.037 , 0.0245,
        0.031 , 0.035 , 0.0355, 0.0265]),
 [])

In [None]:
processing.get_intelligibility()

(0.7754563282336578,
 array([0.776, 0.773, 0.775, 0.775, 0.776, 0.776, 0.77 , 0.78 , 0.775,
        0.772, 0.772, 0.884]))

In [None]:
processing.get_incorrect_angle()

(0.0, array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]))

In [None]:
processing.get_incorrect_glances()

(0.414,
 array([0.382, 0.44 , 0.513, 0.43 , 0.288, 0.278, 0.478, 0.467, 0.324,
        0.466, 0.385, 0.522]))

In [None]:
processing.get_gestures()

(2.0, array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]))

In [None]:
processing.get_clothes()

False

In [None]:
painted_path = processing.draw()

{'emotionality': array([0.1, 0.2, 0.1, 0. , 0. , 0.1, 0. , 0.1, 0.2, 0.1, 0. , 0. ]), 'speech_rate': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]), 'background_noise': array([0.0305, 0.034 , 0.0315, 0.031 , 0.0295, 0.0305, 0.037 , 0.0245,
       0.031 , 0.035 , 0.0355, 0.0265]), 'intelligibility': array([0.776, 0.773, 0.775, 0.775, 0.776, 0.776, 0.77 , 0.78 , 0.775,
       0.772, 0.772, 0.884]), 'glances': array([0.382, 0.44 , 0.513, 0.43 , 0.288, 0.278, 0.478, 0.467, 0.324,
       0.466, 0.385, 0.522]), 'gestures': array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])}
background_noise [0.0305 0.034  0.0315 0.031  0.0295 0.0305 0.037  0.0245 0.031  0.035
 0.0355 0.0265]
speech_rate [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
emotionality [0.1 0.2 0.1 0.  0.  0.1 0.  0.1 0.2 0.1 0.  0. ]
intelligibility [0.776 0.773 0.775 0.775 0.776 0.776 0.77  0.78  0.775 0.772 0.772 0.884]
gestures [2 2 2 2 2 2 2 2 2 2 2 2]
glances [0.382 0.44  0.513 0.43  0.288 0.278 0.478 0.467 0.324 0.466 0.385 0.522]




MoviePy - Done.
Moviepy - Writing video chess_short_painted.mp4





Moviepy - Done !
Moviepy - video ready chess_short_painted.mp4


In [10]:
video_path = painted_path
video_clip = VideoFileClip(video_path)
video_clip.ipython_display(width=640)

Moviepy - Building video __temp__.mp4.
MoviePy - Writing audio in __temp__TEMP_MPY_wvf_snd.mp3




MoviePy - Done.
Moviepy - Writing video __temp__.mp4





Moviepy - Done !
Moviepy - video ready __temp__.mp4
