In [None]:
!pip install ffmpeg-python
!pip install av

Collecting ffmpeg-python
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl (25 kB)
Installing collected packages: ffmpeg-python
Successfully installed ffmpeg-python-0.2.0
Collecting av
  Downloading av-12.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.8/33.8 MB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: av
Successfully installed av-12.0.0


# Librerias

In [None]:
import torch
import torchaudio
import torchvision

import os
import cv2
import numpy as np
import pandas as pd
from skimage import transform as tf
from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")

# MediaPipe

In [None]:
!pip install mediapipe

In [None]:
import mediapipe as mp

In [None]:
class LandmarksDetector:
    def __init__(self):
        self.mp_face_detection = mp.solutions.face_detection
        self.short_range_detector = self.mp_face_detection.FaceDetection(min_detection_confidence=0.5, model_selection=0)
        self.full_range_detector = self.mp_face_detection.FaceDetection(min_detection_confidence=0.5, model_selection=1)

    def __call__(self, video_frames):
        landmarks = self.detect(video_frames, self.full_range_detector)
        if all(element is None for element in landmarks):
            landmarks = self.detect(video_frames, self.short_range_detector)
            assert any(l is not None for l in landmarks), "Cannot detect any frames in the video"
        return landmarks

    def detect(self, video_frames, detector):
        landmarks = []
        for frame in video_frames:
            results = detector.process(frame)
            if not results.detections:
                landmarks.append(None)
                continue
            face_points = []
            for idx, detected_faces in enumerate(results.detections):
                max_id, max_size = 0, 0
                bboxC = detected_faces.location_data.relative_bounding_box
                ih, iw, ic = frame.shape
                bbox = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih)
                bbox_size = (bbox[2] - bbox[0]) + (bbox[3] - bbox[1])
                if bbox_size > max_size:
                    max_id, max_size = idx, bbox_size
                lmx = [
                    [int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(0).value].x * iw),
                     int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(0).value].y * ih)],
                    [int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(1).value].x * iw),
                     int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(1).value].y * ih)],
                    [int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(2).value].x * iw),
                     int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(2).value].y * ih)],
                    [int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(3).value].x * iw),
                     int(detected_faces.location_data.relative_keypoints[self.mp_face_detection.FaceKeyPoint(3).value].y * ih)],
                    ]
                face_points.append(lmx)
            landmarks.append(np.array(face_points[max_id]))
        return landmarks

In [None]:
reference = np.array([[ 70.92383848,  97.13757949],[ 72.62515288, 114.90361188],
  [ 75.98941827, 131.07402473],[ 79.26959318, 146.2116596 ],[ 83.61348894, 163.26020701],
  [ 91.33134186, 177.44535288],[100.27169245, 187.0885567 ],[112.12435016, 196.00353535],
  [130.74175801, 200.52998862],[149.8553027 , 195.31198065],[163.101687  , 186.44060881],
  [173.65334172, 176.57250158],[182.43614459, 162.27992572],[187.16738556, 145.09391978],
  [190.22905333, 129.72418731],[193.02118502, 113.45923358],[194.43863372,  95.5795392 ],
  [ 81.33095967,  80.79544511],[ 87.75906556,  75.27980275],[ 96.22692544,  73.83857497],
  [104.55524335,  74.74029382],[112.23186144,  76.97670954],[144.49576387,  76.42387471],
  [152.34799901,  73.83329748],[161.13054079,  72.63570385],[170.58715674,  73.84785054],
  [178.21409885,  79.43802857],[128.7337425 ,  95.35962566],[128.48854473, 106.92459506],
  [128.24475936, 118.27285086],[128.26596547, 127.69870727],[118.76000113, 135.19357677],
  [122.96307973, 136.14619774],[128.87017961, 137.30253356],[134.9428314 , 135.99720543],
  [139.48259748, 134.87763793],[ 92.52245553,  94.36876014],[ 97.58518219,  90.95977781],
  [105.41368273,  90.91345887],[112.77241724,  94.9436087 ],[106.103635  ,  97.08485693],
  [ 98.04628565,  97.36335869],[145.52511509,  94.53499862],[152.58953438,  90.21485666],
  [160.61170666,  90.19938514],[166.67710071,  93.56562296],[160.55971572,  96.48125958],
  [152.20465993,  96.47281336],[107.16760614, 157.19606764],[114.47611216, 152.12006957],
  [123.84852759, 148.51863199],[128.97628288, 149.41552527],[134.14360703, 148.42628211],
  [144.17717842, 151.79343262],[152.19284005, 156.98711116],[143.85966895, 164.00347101],
  [136.7441507 , 167.9430006 ],[129.15278278, 168.81853366],[121.79511074, 168.02271929],
  [115.27508573, 164.15159355],[109.23088653, 157.00172103],[122.50270762, 154.40733649],
  [129.02862236, 154.12104227],[135.83648069, 154.31214998],[150.75782809, 156.79506004],
  [135.66204627, 160.62976732],[128.95218547, 161.28762709],[122.48775432, 160.50878431]])

def linear_interpolate(landmarks, start_idx, stop_idx):
    start_landmarks = landmarks[start_idx]
    stop_landmarks = landmarks[stop_idx]
    delta = stop_landmarks - start_landmarks
    for idx in range(1, stop_idx - start_idx):
        landmarks[start_idx + idx] = (
            start_landmarks + idx / float(stop_idx - start_idx) * delta
        )
    return landmarks


def warp_img(src, dst, img, std_size):
    tform = tf.estimate_transform("similarity", src, dst)
    warped = tf.warp(img, inverse_map=tform.inverse, output_shape=std_size)
    warped = (warped * 255).astype("uint8")
    return warped, tform


def apply_transform(transform, img, std_size):
    warped = tf.warp(img, inverse_map=transform.inverse, output_shape=std_size)
    warped = (warped * 255).astype("uint8")
    return warped


def cut_patch(img, landmarks, height, width, threshold=5):
    center_x, center_y = np.mean(landmarks, axis=0)
    # Check for too much bias in height and width
    if abs(center_y - img.shape[0] / 2) > height + threshold:
        raise OverflowError("too much bias in height")
    if abs(center_x - img.shape[1] / 2) > width + threshold:
        raise OverflowError("too much bias in width")
    # Calculate bounding box coordinates
    y_min = int(round(np.clip(center_y - height, 0, img.shape[0])))
    y_max = int(round(np.clip(center_y + height, 0, img.shape[0])))
    x_min = int(round(np.clip(center_x - width, 0, img.shape[1])))
    x_max = int(round(np.clip(center_x + width, 0, img.shape[1])))
    # Cut the image
    cutted_img = np.copy(img[y_min:y_max, x_min:x_max])
    return cutted_img


class VideoProcess:
    def __init__(
        self,
        mean_face_path="20words_mean_face.npy",
        crop_width=96,
        crop_height=96,
        start_idx=3,
        stop_idx=4,
        window_margin=12,
        convert_gray=True,
    ):
        # self.reference = np.load(
        #     os.path.join(os.path.dirname(__file__), mean_face_path)
        # )
        self.reference = reference
        self.crop_width = crop_width
        self.crop_height = crop_height
        self.start_idx = start_idx
        self.stop_idx = stop_idx
        self.window_margin = window_margin
        self.convert_gray = convert_gray

    def __call__(self, video, landmarks):
        # Pre-process landmarks: interpolate frames that are not detected
        preprocessed_landmarks = self.interpolate_landmarks(landmarks)
        # Exclude corner cases: no landmark in all frames
        if not preprocessed_landmarks:
            return
        # Affine transformation and crop patch
        sequence = self.crop_patch(video, preprocessed_landmarks)
        assert sequence is not None, "crop an empty patch."
        return sequence

    def crop_patch(self, video, landmarks):
        sequence = []
        for frame_idx, frame in enumerate(video):
            window_margin = min(
                self.window_margin // 2, frame_idx, len(landmarks) - 1 - frame_idx
            )
            smoothed_landmarks = np.mean(
                [
                    landmarks[x]
                    for x in range(
                        frame_idx - window_margin, frame_idx + window_margin + 1
                    )
                ],
                axis=0,
            )
            smoothed_landmarks += landmarks[frame_idx].mean(
                axis=0
            ) - smoothed_landmarks.mean(axis=0)
            transformed_frame, transformed_landmarks = self.affine_transform(
                frame, smoothed_landmarks, self.reference, grayscale=self.convert_gray
            )
            patch = cut_patch(
                transformed_frame,
                transformed_landmarks[self.start_idx : self.stop_idx],
                self.crop_height // 2,
                self.crop_width // 2,
            )
            sequence.append(patch)
        return np.array(sequence)

    def interpolate_landmarks(self, landmarks):
        valid_frames_idx = [idx for idx, lm in enumerate(landmarks) if lm is not None]

        if not valid_frames_idx:
            return None

        for idx in range(1, len(valid_frames_idx)):
            if valid_frames_idx[idx] - valid_frames_idx[idx - 1] > 1:
                landmarks = linear_interpolate(
                    landmarks, valid_frames_idx[idx - 1], valid_frames_idx[idx]
                )

        valid_frames_idx = [idx for idx, lm in enumerate(landmarks) if lm is not None]

        # Handle corner case: keep frames at the beginning or at the end that failed to be detected
        if valid_frames_idx:
            landmarks[: valid_frames_idx[0]] = [
                landmarks[valid_frames_idx[0]]
            ] * valid_frames_idx[0]
            landmarks[valid_frames_idx[-1] :] = [landmarks[valid_frames_idx[-1]]] * (
                len(landmarks) - valid_frames_idx[-1]
            )

        assert all(lm is not None for lm in landmarks), "not every frame has landmark"

        return landmarks

    def affine_transform(
        self,
        frame,
        landmarks,
        reference,
        grayscale=False,
        target_size=(256, 256),
        reference_size=(256, 256),
        stable_points=(0, 1, 2, 3),
        interpolation=cv2.INTER_LINEAR,
        border_mode=cv2.BORDER_CONSTANT,
        border_value=0,
    ):
        if grayscale:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        stable_reference = self.get_stable_reference(
            reference, reference_size, target_size
        )
        transform = self.estimate_affine_transform(
            landmarks, stable_points, stable_reference
        )
        transformed_frame, transformed_landmarks = self.apply_affine_transform(
            frame,
            landmarks,
            transform,
            target_size,
            interpolation,
            border_mode,
            border_value,
        )

        return transformed_frame, transformed_landmarks

    def get_stable_reference(self, reference, reference_size, target_size):
        # -- right eye, left eye, nose tip, mouth center
        stable_reference = np.vstack(
            [
                np.mean(reference[36:42], axis=0),
                np.mean(reference[42:48], axis=0),
                np.mean(reference[31:36], axis=0),
                np.mean(reference[48:68], axis=0),
            ]
        )
        stable_reference[:, 0] -= (reference_size[0] - target_size[0]) / 2.0
        stable_reference[:, 1] -= (reference_size[1] - target_size[1]) / 2.0
        return stable_reference

    def estimate_affine_transform(self, landmarks, stable_points, stable_reference):
        return cv2.estimateAffinePartial2D(
            np.vstack([landmarks[x] for x in stable_points]),
            stable_reference,
            method=cv2.LMEDS,
        )[0]

    def apply_affine_transform(
        self,
        frame,
        landmarks,
        transform,
        target_size,
        interpolation,
        border_mode,
        border_value,
    ):
        transformed_frame = cv2.warpAffine(
            frame,
            transform,
            dsize=(target_size[0], target_size[1]),
            flags=interpolation,
            borderMode=border_mode,
            borderValue=border_value,
        )
        transformed_landmarks = (
            np.matmul(landmarks, transform[:, :2].transpose())
            + transform[:, 2].transpose()
        )
        return transformed_frame, transformed_landmarks

# RetinaFace and FAN

In [None]:
!git clone https://github.com/hhj1897/face_alignment.git
!git clone https://github.com/hhj1897/face_detection.git

Cloning into 'face_alignment'...
remote: Enumerating objects: 190, done.[K
remote: Counting objects: 100% (32/32), done.[K
remote: Compressing objects: 100% (6/6), done.[K
remote: Total 190 (delta 27), reused 27 (delta 26), pack-reused 158[K
Receiving objects: 100% (190/190), 213.82 MiB | 37.31 MiB/s, done.
Resolving deltas: 100% (84/84), done.
Updating files: 100% (14/14), done.
Cloning into 'face_detection'...
remote: Enumerating objects: 300, done.[K
remote: Counting objects: 100% (50/50), done.[K
remote: Compressing objects: 100% (11/11), done.[K
remote: Total 300 (delta 41), reused 39 (delta 39), pack-reused 250[K
Receiving objects: 100% (300/300), 81.19 MiB | 31.14 MiB/s, done.
Resolving deltas: 100% (141/141), done.


In [None]:
from face_detection.ibug.face_detection import RetinaFacePredictor
from face_alignment.ibug.face_alignment import FANPredictor

In [None]:
class LandmarksDetector:
    def __init__(self, device="cuda:0", model_name="resnet50"):
        self.face_detector = RetinaFacePredictor(
            device=device,
            threshold=0.8,
            model=RetinaFacePredictor.get_model(model_name),
        )
        self.landmark_detector = FANPredictor(device=device, model=None)

    def __call__(self, video_frames):
        landmarks = []
        for frame in video_frames:
            detected_faces = self.face_detector(frame, rgb=False)
            face_points, _ = self.landmark_detector(frame, detected_faces, rgb=True)
            if len(detected_faces) == 0:
                landmarks.append(None)
            else:
                max_id, max_size = 0, 0
                for idx, bbox in enumerate(detected_faces):
                    bbox_size = (bbox[2] - bbox[0]) + (bbox[3] - bbox[1])
                    if bbox_size > max_size:
                        max_id, max_size = idx, bbox_size
                landmarks.append(face_points[max_id])
        return landmarks

In [None]:
reference = np.array([[ 70.92383848,  97.13757949],[ 72.62515288, 114.90361188],
  [ 75.98941827, 131.07402473],[ 79.26959318, 146.2116596 ],[ 83.61348894, 163.26020701],
  [ 91.33134186, 177.44535288],[100.27169245, 187.0885567 ],[112.12435016, 196.00353535],
  [130.74175801, 200.52998862],[149.8553027 , 195.31198065],[163.101687  , 186.44060881],
  [173.65334172, 176.57250158],[182.43614459, 162.27992572],[187.16738556, 145.09391978],
  [190.22905333, 129.72418731],[193.02118502, 113.45923358],[194.43863372,  95.5795392 ],
  [ 81.33095967,  80.79544511],[ 87.75906556,  75.27980275],[ 96.22692544,  73.83857497],
  [104.55524335,  74.74029382],[112.23186144,  76.97670954],[144.49576387,  76.42387471],
  [152.34799901,  73.83329748],[161.13054079,  72.63570385],[170.58715674,  73.84785054],
  [178.21409885,  79.43802857],[128.7337425 ,  95.35962566],[128.48854473, 106.92459506],
  [128.24475936, 118.27285086],[128.26596547, 127.69870727],[118.76000113, 135.19357677],
  [122.96307973, 136.14619774],[128.87017961, 137.30253356],[134.9428314 , 135.99720543],
  [139.48259748, 134.87763793],[ 92.52245553,  94.36876014],[ 97.58518219,  90.95977781],
  [105.41368273,  90.91345887],[112.77241724,  94.9436087 ],[106.103635  ,  97.08485693],
  [ 98.04628565,  97.36335869],[145.52511509,  94.53499862],[152.58953438,  90.21485666],
  [160.61170666,  90.19938514],[166.67710071,  93.56562296],[160.55971572,  96.48125958],
  [152.20465993,  96.47281336],[107.16760614, 157.19606764],[114.47611216, 152.12006957],
  [123.84852759, 148.51863199],[128.97628288, 149.41552527],[134.14360703, 148.42628211],
  [144.17717842, 151.79343262],[152.19284005, 156.98711116],[143.85966895, 164.00347101],
  [136.7441507 , 167.9430006 ],[129.15278278, 168.81853366],[121.79511074, 168.02271929],
  [115.27508573, 164.15159355],[109.23088653, 157.00172103],[122.50270762, 154.40733649],
  [129.02862236, 154.12104227],[135.83648069, 154.31214998],[150.75782809, 156.79506004],
  [135.66204627, 160.62976732],[128.95218547, 161.28762709],[122.48775432, 160.50878431]])

def linear_interpolate(landmarks, start_idx, stop_idx):
    start_landmarks = landmarks[start_idx]
    stop_landmarks = landmarks[stop_idx]
    delta = stop_landmarks - start_landmarks
    for idx in range(1, stop_idx - start_idx):
        landmarks[start_idx + idx] = (
            start_landmarks + idx / float(stop_idx - start_idx) * delta
        )
    return landmarks


def warp_img(src, dst, img, std_size):
    tform = tf.estimate_transform("similarity", src, dst)
    warped = tf.warp(img, inverse_map=tform.inverse, output_shape=std_size)
    warped = (warped * 255).astype("uint8")
    return warped, tform


def apply_transform(transform, img, std_size):
    warped = tf.warp(img, inverse_map=transform.inverse, output_shape=std_size)
    warped = (warped * 255).astype("uint8")
    return warped


def cut_patch(img, landmarks, height, width, threshold=5):
    center_x, center_y = np.mean(landmarks, axis=0)
    # Check for too much bias in height and width
    if abs(center_y - img.shape[0] / 2) > height + threshold:
        raise OverflowError("too much bias in height")
    if abs(center_x - img.shape[1] / 2) > width + threshold:
        raise OverflowError("too much bias in width")
    # Calculate bounding box coordinates
    y_min = int(round(np.clip(center_y - height, 0, img.shape[0])))
    y_max = int(round(np.clip(center_y + height, 0, img.shape[0])))
    x_min = int(round(np.clip(center_x - width, 0, img.shape[1])))
    x_max = int(round(np.clip(center_x + width, 0, img.shape[1])))
    # Cut the image
    cutted_img = np.copy(img[y_min:y_max, x_min:x_max])
    return cutted_img


class VideoProcess:
    def __init__(
        self,
        mean_face_path="20words_mean_face.npy",
        crop_width=96,
        crop_height=96,
        start_idx=48,
        stop_idx=68,
        window_margin=12,
        convert_gray=True,
    ):
        # self.reference = np.load(
        #     os.path.join(os.path.dirname(__file__), mean_face_path)
        # )
        self.reference = reference
        self.crop_width = crop_width
        self.crop_height = crop_height
        self.start_idx = start_idx
        self.stop_idx = stop_idx
        self.window_margin = window_margin
        self.convert_gray = convert_gray

    def __call__(self, video, landmarks):
        # Pre-process landmarks: interpolate frames that are not detected
        preprocessed_landmarks = self.interpolate_landmarks(landmarks)
        # Exclude corner cases: no landmark in all frames or number of frames is less than window length
        if (
            not preprocessed_landmarks
            or len(preprocessed_landmarks) < self.window_margin
        ):
            return
        # Affine transformation and crop patch
        sequence = self.crop_patch(video, preprocessed_landmarks)
        assert sequence is not None, "crop an empty patch."
        return sequence

    def crop_patch(self, video, landmarks):
        sequence = []
        for frame_idx, frame in enumerate(video):
            window_margin = min(
                self.window_margin // 2, frame_idx, len(landmarks) - 1 - frame_idx
            )
            smoothed_landmarks = np.mean(
                [
                    landmarks[x]
                    for x in range(
                        frame_idx - window_margin, frame_idx + window_margin + 1
                    )
                ],
                axis=0,
            )
            smoothed_landmarks += landmarks[frame_idx].mean(
                axis=0
            ) - smoothed_landmarks.mean(axis=0)
            transformed_frame, transformed_landmarks = self.affine_transform(
                frame, smoothed_landmarks, self.reference, grayscale=self.convert_gray
            )
            patch = cut_patch(
                transformed_frame,
                transformed_landmarks[self.start_idx : self.stop_idx],
                self.crop_height // 2,
                self.crop_width // 2,
            )
            sequence.append(patch)
        return np.array(sequence)

    def interpolate_landmarks(self, landmarks):
        valid_frames_idx = [idx for idx, lm in enumerate(landmarks) if lm is not None]

        if not valid_frames_idx:
            return None

        for idx in range(1, len(valid_frames_idx)):
            if valid_frames_idx[idx] - valid_frames_idx[idx - 1] > 1:
                landmarks = linear_interpolate(
                    landmarks, valid_frames_idx[idx - 1], valid_frames_idx[idx]
                )

        valid_frames_idx = [idx for idx, lm in enumerate(landmarks) if lm is not None]

        # Handle corner case: keep frames at the beginning or at the end that failed to be detected
        if valid_frames_idx:
            landmarks[: valid_frames_idx[0]] = [
                landmarks[valid_frames_idx[0]]
            ] * valid_frames_idx[0]
            landmarks[valid_frames_idx[-1] :] = [landmarks[valid_frames_idx[-1]]] * (
                len(landmarks) - valid_frames_idx[-1]
            )

        assert all(lm is not None for lm in landmarks), "not every frame has landmark"

        return landmarks

    def affine_transform(
        self,
        frame,
        landmarks,
        reference,
        grayscale=True,
        target_size=(256, 256),
        reference_size=(256, 256),
        stable_points=(28, 33, 36, 39, 42, 45, 48, 54),
        interpolation=cv2.INTER_LINEAR,
        border_mode=cv2.BORDER_CONSTANT,
        border_value=0,
    ):
        if grayscale:
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        stable_reference = self.get_stable_reference(
            reference, stable_points, reference_size, target_size
        )
        transform = self.estimate_affine_transform(
            landmarks, stable_points, stable_reference
        )
        transformed_frame, transformed_landmarks = self.apply_affine_transform(
            frame,
            landmarks,
            transform,
            target_size,
            interpolation,
            border_mode,
            border_value,
        )

        return transformed_frame, transformed_landmarks

    def get_stable_reference(
        self, reference, stable_points, reference_size, target_size
    ):
        stable_reference = np.vstack([reference[x] for x in stable_points])
        stable_reference[:, 0] -= (reference_size[0] - target_size[0]) / 2.0
        stable_reference[:, 1] -= (reference_size[1] - target_size[1]) / 2.0
        return stable_reference

    def estimate_affine_transform(self, landmarks, stable_points, stable_reference):
        return cv2.estimateAffinePartial2D(
            np.vstack([landmarks[x] for x in stable_points]),
            stable_reference,
            method=cv2.LMEDS,
        )[0]

    def apply_affine_transform(
        self,
        frame,
        landmarks,
        transform,
        target_size,
        interpolation,
        border_mode,
        border_value,
    ):
        transformed_frame = cv2.warpAffine(
            frame,
            transform,
            dsize=(target_size[0], target_size[1]),
            flags=interpolation,
            borderMode=border_mode,
            borderValue=border_value,
        )
        transformed_landmarks = (
            np.matmul(landmarks, transform[:, :2].transpose())
            + transform[:, 2].transpose()
        )
        return transformed_frame, transformed_landmarks

# AVSRDataLoader

In [None]:
class AVSRDataLoader:
    def __init__(self, modality, detector="retinaface", convert_gray=True):
        self.modality = modality
        if modality == "video":
            # if detector == "retinaface":
            #     from detectors.retinaface.detector import LandmarksDetector
            #     from detectors.retinaface.video_process import VideoProcess
            #     self.landmarks_detector = LandmarksDetector(device="cuda:0")
            #     self.video_process = VideoProcess(convert_gray=convert_gray)

            # if detector == "mediapipe":
            #     from detectors.mediapipe.detector import LandmarksDetector
            #     from detectors.mediapipe.video_process import VideoProcess
            #     self.landmarks_detector = LandmarksDetector()
            #     self.video_process = VideoProcess(convert_gray=convert_gray)
            self.landmarks_detector = LandmarksDetector(device="cuda:0")
            self.video_process = VideoProcess(convert_gray=convert_gray)

    def load_data(self, data_filename, landmarks=None, transform=True):
        if self.modality == "audio":
            audio, sample_rate = self.load_audio(data_filename)
            audio = self.audio_process(audio, sample_rate)
            return audio
        if self.modality == "video":
            video = self.load_video(data_filename)
            if not landmarks:
                landmarks = self.landmarks_detector(video)
            video = self.video_process(video, landmarks)
            if video is None:
                raise TypeError("video cannot be None")
            video = torch.tensor(video)
            return video

    def load_audio(self, data_filename):
        waveform, sample_rate = torchaudio.load(data_filename, normalize=True)
        return waveform, sample_rate

    def load_video(self, data_filename):
        return torchvision.io.read_video(data_filename, pts_unit="sec")[0].numpy()

    def audio_process(self, waveform, sample_rate, target_sample_rate=16000):
        if sample_rate != target_sample_rate:
            waveform = torchaudio.functional.resample(
                waveform, sample_rate, target_sample_rate
            )
        waveform = torch.mean(waveform, dim=0, keepdim=True)
        return waveform

# Util

In [None]:
def split_file(filename, max_frames=600, fps=25.0):

    lines = open(filename).read().splitlines()

    flag = 0
    stack = []
    res = []

    tmp = 0
    start_timestamp = 0.0

    threshold = max_frames / fps

    for line in lines:
        if "WORD START END ASDSCORE" in line:
            flag = 1
            continue
        if flag:
            word, start, end, score = line.split(" ")
            start, end, score = float(start), float(end), float(score)
            if end < tmp + threshold:
                stack.append(word)
                last_timestamp = end
            else:
                res.append(
                    [
                        " ".join(stack),
                        start_timestamp,
                        last_timestamp,
                        last_timestamp - start_timestamp,
                    ]
                )
                tmp = start
                start_timestamp = start
                stack = [word]
    if stack:
        res.append([" ".join(stack), start_timestamp, end, end - start_timestamp])
    return res


def save_vid_txt(
    dst_vid_filename, dst_txt_filename, trim_video_data, content, video_fps=25
):
    # -- save video
    save2vid(dst_vid_filename, trim_video_data, video_fps)
    # -- save text
    os.makedirs(os.path.dirname(dst_txt_filename), exist_ok=True)
    f = open(dst_txt_filename, "w", encoding="utf-8")
    f.write(f"{content}")
    f.close()


def save_vid_aud(
    dst_vid_filename,
    dst_aud_filename,
    trim_vid_data,
    trim_aud_data,
    video_fps=25,
    audio_sample_rate=16000,
):
    # -- save video
    save2vid(dst_vid_filename, trim_vid_data, video_fps)
    # -- save audio
    save2aud(dst_aud_filename, trim_aud_data, audio_sample_rate)


def save_vid_aud_txt(
    dst_vid_filename,
    dst_aud_filename,
    dst_txt_filename,
    trim_vid_data,
    trim_aud_data,
    content,
    video_fps=25,
    audio_sample_rate=16000,
):
    # -- save video
    if dst_vid_filename is not None:
        save2vid(dst_vid_filename, trim_vid_data, video_fps)
    # -- save audio
    if dst_aud_filename is not None:
        save2aud(dst_aud_filename, trim_aud_data, audio_sample_rate)
    # -- save text
    os.makedirs(os.path.dirname(dst_txt_filename), exist_ok=True)
    f = open(dst_txt_filename, "w")
    f.write(f"{content}")
    f.close()


def save2vid(filename, vid, frames_per_second):
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    torchvision.io.write_video(filename, vid, frames_per_second)


def save2aud(filename, aud, sample_rate):
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    torchaudio.save(filename, aud, sample_rate)

# Whisper

In [None]:
! pip install git+https://github.com/openai/whisper.git

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-8eet18pj
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-8eet18pj
  Resolved https://github.com/openai/whisper.git to commit ba3f3cd54b0e5b8ce1ab3de13e32122d0d5f98ab
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper==20231117)
  Downloading tiktoken-0.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->openai-whisper==20231117)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import whisper
from moviepy.editor import VideoFileClip

In [None]:
model_whisper = whisper.load_model("medium")

100%|██████████████████████████████████████| 1.42G/1.42G [00:12<00:00, 123MiB/s]


In [None]:
def obtener_texto(ruta, name):
    video = VideoFileClip(ruta)
    video.audio.write_audiofile(f'{name}.mp3')
    while True:
        result = model_whisper.transcribe(f'{name}.mp3')
        temperature = result['segments'][0]['temperature']
        compression_ratio = result['segments'][0]['compression_ratio']
        if temperature == 0.0 and compression_ratio > 1.2:
            text = result['text']
            break
        elif temperature != 1.0 and temperature > 0.2 and compression_ratio > 1.2:
            text = result['text']
            break
    text = text[1:]
    print(text)
    print(f'temperature={temperature}')
    print(f'compression ratio={compression_ratio}')
    text = text.replace(',','')
    text = text.replace('.','')
    text = text.replace('¿','')
    text = text.replace('?','')
    text = text.lower()
    return text

# def obtener_texto(ruta, audio):
#     video = VideoFileClip(ruta)
#     video.audio.write_audiofile(f'{audio}.mp3')
#     # load audio and pad/trim it to fit 30 seconds
#     audio = whisper.load_audio(f'{audio}.mp3')
#     audio = whisper.pad_or_trim(audio)
#     # make log-Mel spectrogram and move to the same device as the model
#     mel = whisper.log_mel_spectrogram(audio).to(model_whisper.device)
#     # detect the spoken language
#     # _, probs = model.detect_language(mel)
#     # print(f"Detected language: {max(probs, key=probs.get)}")
#     # decode the audio
#     options = whisper.DecodingOptions()
#     result = whisper.decode(model_whisper, mel, options)
#     # recognized text
#     return result.text

# Generacion de video procesado, y obtencion de texto

In [None]:
ruta_videos = '/content/drive/MyDrive/vsr/VPHB_USFX/videos'
ruta_dataset = '/content/drive/MyDrive/vsr/VPHB_USFX_preprocessing'
list_videos_fa = os.listdir(ruta_videos)
list_videos_pre = os.listdir(f'{ruta_dataset}/data/video')
list_videos = [v for v in list_videos_fa if v not in list_videos_pre]

video_dataloader = AVSRDataLoader(modality="video", detector="retinaface", convert_gray=False)

for data_filename in tqdm(list_videos[0:2]):
    print(data_filename)
    name = data_filename.split('.')[0]
    ruta = f'{ruta_videos}/{data_filename}'
    # Obtener video y texto
    text_data = obtener_texto(ruta, name)
    print(text_data)
    video_data = video_dataloader.load_data(ruta)
    # Guardar texto y video
    output_video_path = f'{ruta_dataset}/data/video/{name}.mp4'
    output_text_path = f'{ruta_dataset}/data/text/{name}.txt'
    save_vid_txt(output_video_path, output_text_path,
                 video_data, text_data)
    print('--------------------------------------------------------------------')

  0%|          | 0/2 [00:00<?, ?it/s]

IvveiO9WMp4_V1-0003.mp4





NameError: name 'obtener_texto' is not defined

# TextTransform

In [None]:
import sentencepiece

In [None]:
SP_MODEL_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(''))),
    "content",
    "unigram",
    "unigram40.model",
)

DICT_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(''))),
    "content",
    "unigram",
    "unigram40_units.txt",
)


class TextTransform:
    """Mapping Dictionary Class for SentencePiece tokenization."""

    def __init__(
        self,
        sp_model_path=SP_MODEL_PATH,
        dict_path=DICT_PATH,
    ):

        # Load SentencePiece model
        self.spm = sentencepiece.SentencePieceProcessor(model_file=sp_model_path)

        # Load units and create dictionary
        units = open(dict_path, encoding='utf8').read().splitlines()
        self.hashmap = {unit.split()[0]: unit.split()[-1] for unit in units}
        # 0 will be used for "blank" in CTC
        self.token_list = ["<blank>"] + list(self.hashmap.keys()) + ["<eos>"]
        self.ignore_id = -1

    def tokenize(self, text):
        tokens = self.spm.EncodeAsPieces(text)
        token_ids = [self.hashmap.get(token, self.hashmap["<unk>"]) for token in tokens]
        return torch.tensor(list(map(int, token_ids)))

    def post_process(self, token_ids):
        token_ids = token_ids[token_ids != -1]
        text = self._ids_to_str(token_ids, self.token_list)
        text = text.replace("\u2581", " ").strip()
        return text

    def _ids_to_str(self, token_ids, char_list):
        token_as_list = [char_list[idx] for idx in token_ids]
        return "".join(token_as_list).replace("<space>", " ")

# Tokenizer y generacion de csv

In [None]:
text_transform = TextTransform()

In [None]:
ruta = "/content/drive/MyDrive/vsr/VPHB_USFX_preprocessing"

In [None]:
list_video_text = [[f'{ruta}/data/video/{name}.mp4',f'{ruta}/data/text/{name}.txt'] for name in [n.split('.')[0] for n in os.listdir(f'{ruta}/data/video')]]
list_csv = []
for r_vid, r_text in tqdm(list_video_text):
    video = torchvision.io.read_video(r_vid, pts_unit="sec")[0].numpy()
    text = open(r_text, 'r').read()
    token_id_str = " ".join(
            map(str, [_.item() for _ in text_transform.tokenize(text)])
    )
    name_video = r_vid.split('/')[-1]
    list_csv.append(['VPHBUSFX', f'video/{name_video}', video.shape[0], token_id_str])
df_csv = pd.DataFrame(list_csv, columns=['Dataset','ruta','input_length','token_id'])
df_csv.to_csv(f'{ruta}/labels/VPHBUSFX.csv')
df_csv

100%|██████████| 42/42 [00:13<00:00,  3.10it/s]


Unnamed: 0,Dataset,ruta,input_length,token_id
0,VPHBUSFX,video/4yquiL-r1J0_V1-0001.mp4,298,2 18 27 23 9 2 30 35 15 2 34 9 23 2 11 19 14 2...
1,VPHBUSFX,video/4yquiL-r1J0_V1-0002.mp4,280,2 11 27 23 19 37 19 9 2 14 23 2 14 38 2 24 9 2...
2,VPHBUSFX,video/4yquiL-r1J0_V1-0003.mp4,360,2 34 27 13 27 33 2 9 12 10 2 14 33 34 10 25 2 ...
3,VPHBUSFX,video/4yquiL-r1J0_V1-0004.mp4,241,2 27 2 14 33 14 2 24 9 23 14 33 34 9 32 2 14 3...
4,VPHBUSFX,video/4yquiL-r1J0_V1-0005.mp4,419,2 25 27 33 27 34 32 27 33 2 9 12 10 2 25 27 33...
5,VPHBUSFX,video/4yquiL-r1J0_V1-0006.mp4,419,2 33 27 12 19 9 23 2 12 35 10 25 34 27 33 2 37...
6,VPHBUSFX,video/4yquiL-r1J0_V1-0007.mp4,405,2 39 9 2 25 27 2 18 9 39 2 17 32 9 12 19 9 33 ...
7,VPHBUSFX,video/4yquiL-r1J0_V1-0008.mp4,333,2 32 14 29 23 9 25 34 14 9 32 2 35 25 9 2 25 3...
8,VPHBUSFX,video/4yquiL-r1J0_V1-0009.mp4,346,2 23 9 2 9 12 34 19 34 35 13 31 2 14 33 34 10 ...
9,VPHBUSFX,video/4yquiL-r1J0_V1-0010.mp4,220,2 33 35 2 17 14 33 34 19 28 25 2 13 14 33 13 1...


# Obtener train, val

In [None]:
df_csv.head(20).to_csv(f'{ruta}/labels/VPHBUSFX_train.csv')
df_csv.tail(20).to_csv(f'{ruta}/labels/VPHBUSFX_val.csv')