In [None]:
from torch.utils.data import Dataset
import tempfile
import subprocess
import librosa
from imagebind.model import ModalityType
from imagebind.utils import data
from loguru import logger
import cv2
import mediapipe as mp
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

In [None]:
class PersonalityDataset(Dataset):
    def __init__(self, csv_file, path_to_video, imagebind_model, device):
        self.data = pd.read_csv(csv_file)
        self.path_to_video = path_to_video
        self.device = device

        imagebind_model.eval()
        imagebind_model.to(device)
        self.imagebind_model = imagebind_model

    def __len__(self):
        return len(self.data)

    def extract_audio_embedding(self, video_path):
        """
        Извлекает аудиовектор из видео.
        
        Параметры
        ----------
        video_path : str
            Путь к видеофайлу, из которого необходимо извлечь аудиодорожку.
        
        Возвращаемое значение
        ----------------------
        tuple
            Кортеж, содержащий:
            - audio : numpy.ndarray
                Массив, представляющий аудиоданные, загруженные из временного файла.
            - sr : int
                Частота дискретизации аудиоданных.
            - audio_embeddings : torch.Tensor
                Векторное представление аудиоданных, извлеченное моделью ImageBind.
        
        Описание
        ---------
        Данная функция использует библиотеку ffmpeg для извлечения аудиодорожки из
        видеофайла и сохранения её во временном WAV-файле. Затем с помощью библиотеки
        librosa аудиоданные загружаются и преобразуются в массив. После этого аудиоданные
        обрабатываются моделью ImageBind, чтобы получить векторное представление аудио.
        Функция возвращает массив аудиоданных, частоту дискретизации и среднее значение
        векторных представлений аудио.
        
        Исключения
        ----------
        - subprocess.CalledProcessError
            Генерируется, если выполнение команды ffmpeg завершилось с ошибкой.
        - FileNotFoundError
            Генерируется, если ffmpeg или необходимые библиотеки не установлены.
        """
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=True) as temp_audio_file:
            temp_audio_path = temp_audio_file.name
            subprocess.run(
                ["ffmpeg", "-y", "-i", video_path, temp_audio_path, "-loglevel", "error"],
                check=True
            )
            audio, sr = librosa.load(temp_audio_path, sr=None)

            inputs = {ModalityType.AUDIO: data.load_and_transform_audio_data([temp_audio_path], self.device)}
            with torch.inference_mode():
                audio_embeddings = self.imagebind_model(inputs)[ModalityType.AUDIO].mean(dim=0)

            return audio, sr, audio_embeddings

    def extract_video_embedding(self, video_path):
        """
        Извлекает эмбеддинги видео, с логированием размеров каждого клипа.
        
        Параметры
        ----------
        video_path : str
            Путь к видеофайлу, для которого необходимо извлечь эмбеддинги.
        
        Возвращает
        -------
        torch.Tensor
            Тензор эмбеддинга видео со средним значением по всем клипам.
            Если клипы не были извлечены, возвращается тензор нулей с размерностью (1024,).
        
        Примечания
        --------
        - Данная функция использует модель ImageBind для обработки клипов видео и извлечения эмбеддингов.
        - Для повышения производительности используется torch.inference_mode().
        - Если видео успешно загружается и преобразуется, каждый клип обрабатывается отдельно, после чего
          их эмбеддинги усредняются.
        - Если клипов нет (например, видеофайл пуст или не удалось загрузить), возвращается тензор нулей.
        - Очищается кэш CUDA в конце для освобождения видеопамяти.
        
        Пример
        -------
        >>> video_embedding = extractor.extract_video_embedding('/path/to/video.mp4')
        >>> print(video_embedding.shape)
        torch.Size([1024])
        """
        embeddings_list = []
        with torch.inference_mode():
            video_clips = data.load_and_transform_video_data([video_path], self.device)
            if video_clips is not None:
                # video_clips shape: (num_clips, C, T, H, W)
                for i, video_clip in enumerate(video_clips):
                    # Add batch dimension if necessary
                    if video_clip.dim() == 4:
                        video_clip = video_clip.unsqueeze(0)
                    chunk_embeddings = self.imagebind_model({ModalityType.VISION: video_clip})
                    embeddings_list.append(chunk_embeddings[ModalityType.VISION].mean(dim=0))

        video_embedding = torch.stack(embeddings_list).mean(dim=0) if embeddings_list else torch.zeros(1024, device=self.device)
        torch.cuda.empty_cache()
        return video_embedding

    def extract_text_embedding(self, text):
        """
        Извлекает эмбеддинг для текста.
        
        Параметры
        ----------
        text : str
            Входной текст для извлечения эмбеддинга. Если строка пустая или не является строкой,
            будет использовано значение по умолчанию "<UNK>".
        
        Возвращает
        -------
        torch.Tensor
            Эмбеддинг текста, полученный с помощью модели ImageBind.
        
        Примечания
        ---------
        Функция использует метод `load_and_transform_text` для предварительной обработки текста,
        приводя его в формат, подходящий для обработки моделью. Затем эмбеддинг извлекается
        в режиме inference, что позволяет выполнять вычисления без сохранения промежуточных
        данных для обучения.
        
        Исключения
        ---------
        Проверка типа входных данных выполняется с целью обеспечения безопасности и предотвращения
        ошибок при передаче некорректного формата. Если переданный текст не соответствует
        ожидаемому типу, используется placeholder "<UNK>".
        """
        if not isinstance(text, str) or not text:
            text = "<UNK>"
        inputs = {ModalityType.TEXT: data.load_and_transform_text([text], self.device)}
        with torch.inference_mode():
            text_embedding = self.imagebind_model(inputs)[ModalityType.TEXT]
        return text_embedding

    def extract_keypoints(self, video_path, visualize=False):
        """
        Извлекает ключевые точки позы, лица и рук из видео и возвращает среднее значение этих ключевых точек.
        
        Параметры
        ----------
        video_path : str
            Путь к видеофайлу, из которого нужно извлечь ключевые точки.
        visualize : bool, optional
            Флаг, указывающий, нужно ли визуализировать ключевые точки на первом кадре видео (по умолчанию False).
        
        Возвращает
        -------
        np.ndarray
            Среднее значение ключевых точек по всем кадрам видео. Массив содержит координаты x, y, z и видимость для поз,
            а также координаты x, y, z для лица и рук. Если в видео отсутствуют ключевые точки, возвращается массив нулей.
        
        Описание
        --------
        Функция использует библиотеку Mediapipe для анализа видео и извлечения ключевых точек человеческого тела,
        включая позы, лицо и руки. Видео обрабатывается покадрово, и для каждого кадра вычисляются ключевые точки.
        Если ключевые точки отсутствуют в каком-либо кадре, заполняются нулевые значения.
        
        В случае, если флаг `visualize` установлен в True, функция отобразит ключевые точки на первом кадре видео
        с использованием matplotlib и OpenCV.
        
        Примечание
        ---------
        1. Функция использует Mediapipe Holistic для извлечения ключевых точек, включая улучшенные маркеры лица.
        2. Для работы требуется установленная библиотека OpenCV (cv2), matplotlib и numpy.
        
        Пример
        -------
        >>> keypoints = self.extract_keypoints("path/to/video.mp4", visualize=True)
        >>> print(keypoints)
        
        """
        mp_holistic = mp.solutions.holistic
        mp_drawing = mp.solutions.drawing_utils

        holistic = mp_holistic.Holistic(
            static_image_mode=False,
            model_complexity=2,
            enable_segmentation=False,
            refine_face_landmarks=True
        )

        cap = cv2.VideoCapture(video_path)
        keypoints_list = []

        frame_count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1

            # Преобразование изображения в формат RGB
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Получение результатов
            results = holistic.process(image)

            keypoints = []

            # Позы
            if results.pose_landmarks:
                for landmark in results.pose_landmarks.landmark:
                    keypoints.extend([landmark.x, landmark.y,
                                      landmark.z, landmark.visibility])
            else:
                keypoints.extend([0] * 33 * 4)

            # Лицо
            if results.face_landmarks:
                for landmark in results.face_landmarks.landmark:
                    keypoints.extend([landmark.x, landmark.y, landmark.z])
            else:
                keypoints.extend([0] * 468 * 3)

            # Левая рука
            if results.left_hand_landmarks:
                for landmark in results.left_hand_landmarks.landmark:
                    keypoints.extend([landmark.x, landmark.y, landmark.z])
            else:
                keypoints.extend([0] * 21 * 3)

            # Правая рука
            if results.right_hand_landmarks:
                for landmark in results.right_hand_landmarks.landmark:
                    keypoints.extend([landmark.x, landmark.y, landmark.z])
            else:
                keypoints.extend([0] * 21 * 3)

            keypoints_list.append(keypoints)

            if visualize and frame_count == 1:
                # Визуализация ключевых точек на первом кадре
                annotated_image = frame.copy()
                # Позы
                if results.pose_landmarks:
                    mp_drawing.draw_landmarks(
                        annotated_image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
                # Лицо
                if results.face_landmarks:
                    mp_drawing.draw_landmarks(
                        annotated_image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION)
                # Левая рука
                if results.left_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        annotated_image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                # Правая рука
                if results.right_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        annotated_image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                # Преобразование изображения в формат RGB для отображения
                annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
                # Отображение изображения в Jupyter Notebook
                plt.figure(figsize=(10, 10))
                plt.imshow(annotated_image)
                plt.axis('off')
                plt.title(f"Ключевые точки для видео: {os.path.basename(video_path)}")
                plt.show()

        cap.release()
        cv2.destroyAllWindows()
        holistic.close()

        # Усредняем ключевые точки по всем кадрам
        keypoints_array = np.array(keypoints_list)
        if keypoints_array.size == 0:
            num_keypoints = (33 * 4) + (468 * 3) + (21 * 3 * 2)
            keypoints_mean = np.zeros(num_keypoints)
        else:
            keypoints_mean = keypoints_array.mean(axis=0)

        return keypoints_mean
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]

        video_file = row['video_file']
        video_path = os.path.join(self.path_to_video, video_file)
        if not os.path.isfile(video_path):
            raise FileNotFoundError(f"Video file not found: {video_path}")

        logger.debug(f"Processing video file: {video_path}")

        # Extract embeddings
        audio, sr, audio_embeddings = self.extract_audio_embedding(video_path)
        # video_embedding = self.extract_video_embedding(video_path)
        transcript_embeddings = self.extract_text_embedding(row['transcript'])
        
        key_points = self.extract_keypoints(video_path)
        
        # Prepare labels
        labels = {
            'extraversion': row['extraversion'],
            'neuroticism': row['neuroticism'],
            'agreeableness': row['agreeableness'],
            'conscientiousness': row['conscientiousness'],
            'openness': row['openness'],
            'interview': row['interview']
        }

        # Assemble all data into a dictionary
        sample = {
            'video_file': video_file,
            'audio': audio,
            'audio_embeddings': audio_embeddings,
            'sampling_rate': sr,
            'transcript': row['transcript'],
            "transcript_embeddings": transcript_embeddings,
            'labels': labels,
            'key_points': key_points,
        }

        return sample
