In [None]:
import re
import pandas as pd
import numpy as np
import yaml
import torch
import cv2
from pathlib import Path
from tqdm import tqdm
from facenet_pytorch import InceptionResnetV1, MTCNN
from deepface import DeepFace

## Carregamento e Transformação

In [None]:
def to_seconds(t):
    h, m, s_ms = t.split(":")
    s, ms = s_ms.split(",")
    return int(h)*3600 + int(m)*60 + int(s) + int(ms)/1000

def to_snake_case(name: str) -> str:
    """Convert a string (like a column name) to snake_case."""
    name = name.strip()
    name = re.sub(r"[^\w\s]", "", name)
    name = re.sub(r"\s+", "_", name)
    return name.lower()

def prepare_df(df: pd.DataFrame):
    df["start_s"] = df["StartTime"].apply(to_seconds)
    df["end_s"] = df["EndTime"].apply(to_seconds)
    df["duration_s"] = df["end_s"] - df["start_s"]

    df[df["duration_s"] <= 0].head()

    df = df.drop(columns=["Sr No.", "StartTime", "EndTime", "Season", "Episode", "Sentiment"])

    df.columns = [to_snake_case(c) for c in df.columns]

    return df

In [None]:
train_df = prepare_df(pd.read_csv("./data/train_sent_emo.csv"))

## Funções base

In [None]:
def load_config(path):
    with open(path, "r") as f:
        return yaml.safe_load(f)

In [None]:
class BaseExtractor:
    def __init__(self, save_dir):
        self.save_dir = Path(save_dir)
        self.save_dir.mkdir(parents=True, exist_ok=True)

    def save(self, dialogue_id, utterance_id, vector):
        path = self.save_dir / f"{dialogue_id}_{utterance_id}.npy"
        np.save(path, vector)
        return str(path)

## Features Faciais

In [None]:
class VideoExtractor(BaseExtractor):
    def __init__(self, model_name, save_dir, cfg):
        super().__init__(save_dir)
        self.model_name = model_name.lower()
        self.cfg = cfg
        self.frame_step = cfg.get("frame_step", 5)
        self.resize = cfg.get("resize", 160)

        if "facenet" in self.model_name:
            self.detector = MTCNN(keep_all=False, device="cpu")
            self.model = InceptionResnetV1(pretrained="vggface2").eval()
        elif "deepface" in self.model_name or "vgg-face" in self.model_name:
            self.detector_backend = cfg.get("detector_backend", "opencv")
            self.deepface = DeepFace
        else:
            raise ValueError(f"Unsupported video model: {self.model_name}")

    def extract(self, video_path):
        """Extract averaged face embedding for one utterance clip (.mp4)."""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise RuntimeError(f"Could not open {video_path}")

        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS) or 25
        frame_step = self.frame_step

        frame_id = 0
        embeddings = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if frame_id % frame_step != 0:
                frame_id += 1
                continue

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            if "facenet" in self.model_name:
                boxes, probs = self.detector.detect(frame_rgb)
                if boxes is not None and len(boxes) > 0:
                    x1, y1, x2, y2 = boxes[0].astype(int)
                    face = frame_rgb[y1:y2, x1:x2]
                    face = cv2.resize(face, (self.resize, self.resize))
                    face = np.transpose(face, (2, 0, 1)) / 255.0
                    face = torch.tensor(face).unsqueeze(0).float()
                    with torch.no_grad():
                        emb = self.model(face).squeeze().numpy()
                    embeddings.append(emb)

            elif "deepface" in self.model_name or "vgg-face" in self.model_name:
                try:
                    rep = self.deepface.represent(
                        img_path=frame_rgb,
                        model_name=self.model_name.upper(),
                        detector_backend=self.detector_backend,
                        enforce_detection=False,
                    )
                    if len(rep) > 0:
                        emb = np.array(rep[0]["embedding"])
                        embeddings.append(emb)
                except Exception:
                    pass

            frame_id += 1

        cap.release()

        if len(embeddings) == 0:
            return np.zeros(128)

        return np.mean(np.stack(embeddings), axis=0)

In [None]:
def extract_video_with_progress(df: pd.DataFrame, extractor: VideoExtractor, save_dir: str, video_dir: str):
    save_dir = Path(save_dir)
    save_dir.mkdir(parents=True, exist_ok=True)

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Video extracting", ncols=80):
        filename = f"{row['dialogue_id']}_{row['utterance_id']}.npy"
        out_path = save_dir / filename
        
        if out_path.exists():
            continue

        try:
            vec = extractor.extract(f"{video_dir}/dia{row['dialogue_id']}_utt{row['utterance_id']}.mp4")
            extractor.save(row["dialogue_id"], row["utterance_id"], vec)
        except Exception as e:
            print(f"⚠️ Error on {video_dir}: {e}")
            continue
        

    print(f"✅ Saved embeddings in {save_dir}")

### Deepface

In [None]:
cfg = load_config("configs/faces/deepface.yaml")
extractor = VideoExtractor(cfg["model_name"], cfg["save_dir"], cfg)

extract_video_with_progress(train_df, extractor, cfg["save_dir"], "./data/train/train_splits")

### Facenet

In [None]:
cfg = load_config("configs/faces/facenet.yaml")
extractor = VideoExtractor(cfg["model_name"], cfg["save_dir"], cfg)

extract_video_with_progress(train_df, extractor, cfg["save_dir"], "./data/train/train_splits")