In [1]:
pip install opencv-python numpy pandas scipy


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
# >>> ajuste exato do seu caminho <<<
ROOTS = [
    Path("/Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube")
]
# Se o seu mac realmente usa /Usuários/, troque o prefixo por: "/Usuários/victorolegario/..."
COMPRESSIONS = {"raw", "c23"}  # pega as duas qualidades


In [5]:
# -*- coding: utf-8 -*-                 # Define a codificação do arquivo (útil para acentos).

from pathlib import Path               # Path: manipulação de caminhos de arquivos/pastas (mais robusto que strings).
import cv2 as cv                       # OpenCV para leitura de vídeo e detecção de face.
import numpy as np                     # Numpy para operações numéricas.
import pandas as pd                    # Pandas para montar e salvar DataFrames/CSVs.
from scipy.signal import butter, filtfilt, welch  # Filtros e PSD para estimar SNR.

# ======== CONFIG ========
ROOTS = [                               # Raízes onde estão os "reais" do FF/FF++.
    Path("/Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube")
]                                       # Dica: se o prefixo no seu Mac for /Usuários/, troque aqui.

VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv"}  # Extensões de vídeo aceitas.

COMPRESSIONS = {"raw", "c23"}           # Vamos considerar ambas as qualidades (raw e c23).

OUT_DIR = Path("./rgb_signals")         # Pasta-base da saída.
(OUT_DIR/"per_video").mkdir(parents=True, exist_ok=True)  # Garante a criação de ./rgb_signals/per_video.

MASTER_CSV  = OUT_DIR/"master_rgb.csv"  # Saída do master CSV (todos os frames de todos os vídeos).
QUALITY_CSV = OUT_DIR/"quality_summary.csv"  # Saída de métricas por vídeo.
LABEL = "Real"                          # O dataset é de vídeos reais (campo fixo 'label').

HAAR_PATH = cv.data.haarcascades + "haarcascade_frontalface_default.xml"  # Caminho do Haar frontal.
FACE_CASCADE = cv.CascadeClassifier(HAAR_PATH)  # Carrega o classificador de rosto (Haar).

EMA_ALPHA = 0.25                        # Alpha do EMA para suavizar jitter da caixa de face (0=sem suavização).

# ROI: TESTA (apenas testa, centralizada)
FOREHEAD_Y_TOP = 0.15                   # Começa 15% abaixo do topo da face.
FOREHEAD_H     = 0.18                   # Altura da faixa da testa = 18% da altura da face.
FOREHEAD_W     = 0.50                   # Largura da testa = 50% da largura da face (central).
FOREHEAD_X_PAD = 0.25                   # Margem lateral para centralizar = (1-0.50)/2.

HEART_BAND = (0.7, 4.0)                 # Banda cardíaca (Hz) usada para SNR do canal G.

# ======== FUNÇÕES ========
def safe_rect(x, y, w, h, H, W):
    """Clampa o retângulo (x,y,w,h) para ficar dentro da imagem HxW."""
    x = int(max(0, min(x, W-1)))        # Garante x dentro [0, W-1].
    y = int(max(0, min(y, H-1)))        # Garante y dentro [0, H-1].
    w = int(max(1, min(w, W-x)))        # Ajusta w para não ultrapassar borda direita.
    h = int(max(1, min(h, H-y)))        # Ajusta h para não ultrapassar borda inferior.
    return x, y, w, h                   # Retorna retângulo seguro.

def smooth_box(prev, curr, alpha=EMA_ALPHA):
    """Suaviza a caixa de face corrente usando a anterior (EMA)."""
    if prev is None:                    # No primeiro frame (sem 'prev'), só retorna a caixa atual.
        return curr
    # Combina ponto a ponto: caixa_suave = alpha*prev + (1-alpha)*curr.
    return tuple(int(alpha*p + (1-alpha)*c) for p,c in zip(prev,curr))

def forehead_roi(face_rect, H, W):
    """Calcula a ROI da testa dentro da bounding box da face."""
    x, y, w, h = face_rect              # Desempacota a caixa da face.
    fx = x + int(FOREHEAD_X_PAD * w)    # Posição x inicial da testa (centralizada).
    fy = y + int(FOREHEAD_Y_TOP * h)    # Posição y inicial (15% abaixo do topo da face).
    fw = int(FOREHEAD_W * w)            # Largura da ROI (50% da face).
    fh = int(FOREHEAD_H * h)            # Altura da ROI (18% da face).
    return safe_rect(fx, fy, fw, fh, H, W)  # Garante que a ROI fique dentro da imagem.

def frame_rgb_means(frame, roi):
    """Extrai R_mean, G_mean, B_mean e luminância (Y_mean, Y_std) dentro da ROI."""
    x, y, w, h = roi                    # ROI da testa.
    patch = frame[y:y+h, x:x+w]         # Recorte do frame (BGR) correspondente à ROI.
    if patch.size == 0:                 # Se não há pixels (ROI inválida), retorna NaNs.
        return (np.nan,)*5
    # OpenCV é BGR; convertemos para floats para média com precisão.
    B = patch[:,:,0].astype(np.float32)
    G = patch[:,:,1].astype(np.float32)
    R = patch[:,:,2].astype(np.float32)
    # Médias por canal.
    Rm, Gm, Bm = float(R.mean()), float(G.mean()), float(B.mean())
    # Luminância (aprox. sRGB) e variação espacial.
    Y = 0.2126*R + 0.7152*G + 0.0722*B
    return Rm, Gm, Bm, float(Y.mean()), float(Y.std(ddof=0))

def bandpass(sig, fs, f_lo=0.7, f_hi=4.0, order=3):
    """Filtro passa-faixa (ou detrend robusto se fs for baixo)."""
    if fs <= 2*f_hi:                    # Nyquist insuficiente? Faz detrend por mediana móvel ~1s.
        s = pd.Series(sig)
        return (s - s.rolling(max(3,int(fs)), min_periods=1, center=True)
                  .median().bfill().ffill()).values
    # Butterworth passa-faixa normalizado pela Nyquist.
    b,a = butter(order, [f_lo/(fs/2.0), f_hi/(fs/2.0)], btype="band")
    return filtfilt(b,a,sig)            # Zero-phase para não atrasar o sinal.

def snr_green_db(g_trace, fs, band=(0.7,4.0)):
    """SNR do traço verde = pico(PSD banda) / mediana(PSD fora do pico), em dB."""
    g = np.asarray(g_trace, dtype=np.float64)   # Array float64.
    if len(g) < max(64, int(5*fs)):             # Exige ao menos ~5s de sinal para PSD estável.
        return np.nan
    s = pd.Series(g)                             # Série para mediana móvel (detrend robusto).
    g = (g - s.rolling(int(min(max(3,fs), len(g)//5)), min_periods=1, center=True)
                .median().bfill().ffill()).values
    g_bp = bandpass(g, fs, band[0], band[1])     # Passa-faixa cardíaca.
    if np.allclose(g_bp.std(),0):                # Sinal degenerado?
        return np.nan
    f,Pxx = welch(g_bp, fs=fs, nperseg=min(256, len(g_bp)))  # PSD de Welch.
    sel = (f>=band[0]) & (f<=band[1])            # Seleciona apenas a banda cardíaca.
    if sel.sum()<3:                              # Segurança: precisa de pontos suficientes.
        return np.nan
    fb,Pb = f[sel], Pxx[sel]                     # Frequências e potências na banda.
    i = int(np.argmax(Pb))                       # Índice do pico na banda.
    peak=float(Pb[i]); fpk=float(fb[i])         # Valor do pico e frequência do pico.
    noise_vals = Pb[(fb<fpk-0.1)|(fb>fpk+0.1)]  # “Ruído” = banda sem a vizinhança do pico.
    if noise_vals.size<1 or peak<=0:            # Checagens de segurança.
        return np.nan
    noise = float(np.median(noise_vals))        # Mediana é robusta contra outliers.
    return np.nan if noise<=0 else 10*np.log10(peak/noise)  # SNR em dB.

def enumerate_real_videos(roots):
    """
    Lista vídeos reais no layout FF/FF++:
      .../original_sequences/youtube/<compression>/videos/*.mp4
    Respeita o conjunto COMPRESSIONS.
    """
    vids = []
    for root in roots:                          # Ex.: ROOTS = [ .../original_sequences/youtube ]
        if not root.exists():                   # Se a raiz não existe, pula.
            continue
        for comp in (COMPRESSIONS or {"raw","c23","c40"}):  # Para cada compressão desejada.
            comp_dir = (root/comp/"videos")     # Monta .../<comp>/videos
            if comp_dir.exists():               # Se a pasta existe…
                for p in comp_dir.rglob("*"):   # Varre recursivamente dentro de 'videos'.
                    if p.is_file() and p.suffix.lower() in VIDEO_EXTS:  # Arquivo de vídeo?
                        vids.append(p)          # Adiciona à lista de vídeos a processar.
    return sorted(vids)                         # Ordena p/ reprodutibilidade.

def video_id_from_path(path: Path) -> str:
    """Gera um id estável baseado no caminho relativo à raiz."""
    rel = None
    for r in ROOTS:
        try:
            rel = path.relative_to(r)           # Tenta tornar o caminho relativo à raiz.
            break
        except Exception:
            pass
    rel = rel if rel is not None else path     # Se não conseguir, usa o caminho absoluto mesmo.
    return rel.as_posix().replace("/", "__").rsplit(".",1)[0]  # Troca '/' por '__' e remove a extensão.

def compression_from_path(path: Path) -> str:
    """Extrai 'raw', 'c23' ou 'c40' do caminho do vídeo (para a coluna 'compression')."""
    p = path.as_posix().lower()
    if "/c23/" in p: return "c23"
    if "/raw/" in p: return "raw"
    if "/c40/" in p: return "c40"
    return "unknown"

# ======== RUN ========
master_rows, quality_rows = [], []      # Buffers para montar master e o resumo de qualidade.

videos = enumerate_real_videos(ROOTS)   # Busca todos os vídeos reais conforme ROOTS/COMPRESSIONS.
print(f"[INFO] Vídeos reais encontrados: {len(videos)}")  # Log informativo.

for vpath in videos:                    # Loop principal de processamento por vídeo.
    vid  = video_id_from_path(vpath)    # Id legível p/ o vídeo (derivado do caminho).
    comp = compression_from_path(vpath) # raw/c23/c40 detectado do caminho.
    out_csv = OUT_DIR/"per_video"/f"{vid}.csv"  # Caminho do CSV por vídeo.
    print(f"[PROC] {vpath} -> {out_csv.name}")  # Log de progresso.

    cap = cv.VideoCapture(str(vpath))   # Abre o vídeo no OpenCV.
    if not cap.isOpened():              # Se não abriu, avisa e passa.
        print(f"[WARN] Não abriu: {vpath}"); 
        continue

    fps = cap.get(cv.CAP_PROP_FPS) or 25.0  # Lê FPS; se falhar, assume 25.0.
    H = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))  # Altura do frame.
    W = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))   # Largura do frame.
    prev_box, frame_idx = None, 0        # Caixa prévia (EMA) e índice de frame.
    rows, g_trace = [], []               # Linhas do CSV deste vídeo; traço G para SNR.

    while True:                          # Loop por frames.
        ok, frame = cap.read()           # Lê um frame.
        if not ok or frame is None:      # Fim do vídeo?
            break

        gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)  # Converte p/ cinza (detecção é mais rápida).

        faces = FACE_CASCADE.detectMultiScale(  # Detecta faces no frame.
            gray, scaleFactor=1.1, minNeighbors=5,
            flags=cv.CASCADE_SCALE_IMAGE,
            minSize=(int(0.15*W), int(0.15*H))   # Descarta detecções muito pequenas (ruído).
        )

        if len(faces)>0:                 # Alguma face detectada?
            i = int(np.argmax([w*h for (x,y,w,h) in faces]))  # Pega a maior (rosto principal).
            box = tuple(int(v) for v in faces[i])             # (x,y,w,h) como ints.
        else:
            box = prev_box if prev_box is not None else None  # Usa a última caixa válida (tracking simples).

        if box is None:                  # Sem ROI neste frame?
            Rm=Gm=Bm=Ymean=Ystd=np.nan   # Preenche com NaNs.
        else:
            box = smooth_box(prev_box, box, EMA_ALPHA)  # Suaviza jitter da caixa.
            prev_box = box                               # Atualiza "anterior".
            roi = forehead_roi(box, H, W)               # Calcula ROI da testa.
            Rm, Gm, Bm, Ymean, Ystd = frame_rgb_means(frame, roi)  # Métricas na ROI.

        t_sec = frame_idx/float(fps)     # Timestamp do frame em segundos.
        rows.append((vid, comp, frame_idx, t_sec, Rm, Gm, Bm, Ymean, Ystd, fps, LABEL))  # Linha do CSV.
        if not np.isnan(Gm):             # Alimenta traço G para SNR se valor válido.
            g_trace.append(Gm)
        frame_idx += 1                   # Próximo frame.

    cap.release()                        # Fecha o arquivo de vídeo.

    if not rows:                         # Se não coletou nada, avisa e continua.
        print(f"[WARN] Sem frames válidos: {vpath}"); 
        continue

    cols = ["video_id","compression","frame_idx","t_sec","R_mean","G_mean","B_mean","Y_mean","Y_std","fps","label"]  # Cabeçalho.
    df = pd.DataFrame(rows, columns=cols) # DataFrame por vídeo.
    out_csv.parent.mkdir(parents=True, exist_ok=True)  # Garante pasta.
    df.to_csv(out_csv, index=False)     # Salva CSV por vídeo.
    master_rows.extend(rows)            # Acumula no buffer do master.

    duration_s = df["t_sec"].max()      # Duração do vídeo (s).
    quality_rows.append({               # Métricas agregadas por vídeo.
        "video_id": vid,
        "compression": comp,
        "n_frames": int(len(df)),
        "fps": float(fps),
        "duration_s": float(duration_s),
        "Y_mean_avg": float(df["Y_mean"].mean()),
        "Y_mean_std_over_time": float(df["Y_mean"].std(ddof=0)),
        "snr_green_db": float(snr_green_db(g_trace, fps, HEART_BAND))
    })

if master_rows:                          # Se há dados totais…
    master_df = pd.DataFrame(            # Monta DataFrame master (todos os frames).
        master_rows,
        columns=["video_id","compression","frame_idx","t_sec","R_mean","G_mean","B_mean","Y_mean","Y_std","fps","label"]
    )
    master_df.to_csv(MASTER_CSV, index=False)  # Salva master CSV.
    print(f"[OK] Master CSV salvo: {MASTER_CSV.resolve()}  (linhas: {len(master_df)})")

if quality_rows:                         # Se há métricas por vídeo…
    qdf = pd.DataFrame(quality_rows)     # DataFrame do resumo de qualidade.
    qdf.to_csv(QUALITY_CSV, index=False) # Salva CSV de qualidade.
    print(f"[OK] Quality CSV salvo: {QUALITY_CSV.resolve()}  (vídeos: {len(qdf)})")

print("[DONE]")                          # Fim do processamento.


[INFO] Vídeos reais encontrados: 110
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/033.mp4 -> c23__videos__033.csv
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/035.mp4 -> c23__videos__035.csv
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/036.mp4 -> c23__videos__036.csv
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/044.mp4 -> c23__videos__044.csv
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/046.mp4 -> c23__videos__046.csv
[PROC] /Users/victorolegario/workspace/tese_codigo_deep_fake/Datasets/FaceForensics/original_sequences/youtube/c23/videos/055.mp4 -> c23__videos__055.csv
[PROC] /Users/victorolegario/workspace/