In [87]:
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image
from pytorch_grad_cam.utils.image import show_cam_on_image, \
    preprocess_image
import argparse
import datetime
import numpy as np
import time
import torch
import torch.backends.cudnn as cudnn
import json
import os
import warnings

from pathlib import Path

from timm.data import Mixup
from timm.models import create_model
from timm.loss import LabelSmoothingCrossEntropy, SoftTargetCrossEntropy
from timm.scheduler import create_scheduler
from timm.optim import create_optimizer
from timm.utils import NativeScaler, get_state_dict


import models
import utils
import sys
import cv2
from datasetsV2 import get_transform_to_eval, get_transform_to_eval_NO_SRM, get_transform_to_eval_Sobel
from torchvision.transforms import ToPILImage

from PIL import Image
import matplotlib.pyplot as plt
import random
from tqdm import tqdm
import pandas as pd
import json
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc,  log_loss
import math
import time
import ffmpeg


In [88]:
%pip install ffmpeg-python

Note: you may need to restart the kernel to use updated packages.


In [89]:
device = 'cuda'

In [90]:
def preprocess_image_transform(image_input, transform):
    """
    Pré-processa uma imagem a partir de um caminho ou de um ROI já carregado.

    Parâmetros:
        - image_input: str (caminho da imagem) ou np.ndarray (ROI)
        - transform: função de transformação (ex: Albumentations)

    Retorno:
        - Imagem transformada
    """

    # Se a entrada for um caminho, carregamos a imagem
    if isinstance(image_input, str):
        image = Image.open(image_input).convert('RGB').resize((224, 224))
        image = np.array(image)  # Converte para numpy
    elif isinstance(image_input, np.ndarray):
        image = cv2.resize(image_input, (224, 224))  # Redimensiona diretamente
    else:
        raise ValueError("image_input deve ser um caminho (str) ou uma imagem (np.ndarray)")

    # Aplica a transformação
    augmented = transform(image=image)
    
    return augmented['image'].squeeze(0)  # Retorna a imagem transformada

In [91]:
def reshape_transform(tensor, height=14, width=14):
    result = tensor[:, 1:, :].reshape(tensor.size(0),
                                      height, width, tensor.size(2))

    # Bring the channels to the first dimension,
    # like in CNNs.
    result = result.transpose(2, 3).transpose(1, 2)
    return result

In [92]:
def get_transform(logs_path):
    if 'baseline' in logs_path:
        print("using without srm")
        return get_transform_to_eval_NO_SRM(224)
    elif "input" in logs_path and "srm" in logs_path:
        print("using input srm")
        return get_transform_to_eval(224)
    elif "input" in logs_path and "sobel" in logs_path:
        print("using input sobel")
        return get_transform_to_eval_Sobel(224)
    elif "branch" in logs_path and "srm" in logs_path:
        print("using branch filter")
        return get_transform_to_eval_NO_SRM(224)
    elif "branch" in logs_path and "sobel" in logs_path:
        print("using branch filter")
        return get_transform_to_eval_NO_SRM(224)
    else:
        return get_transform_to_eval_NO_SRM(224)

In [93]:
result_dict = {}
prob_dict = {}

In [94]:
def single_image_inference(img, transform, model):

    input_tensor = preprocess_image_transform(img, transform)
    input_tensor = input_tensor.unsqueeze(0) 
    # input_tensor = torch.stack((input_tensor))

    img = input_tensor.to(device)
    results = model(img)
    # print(results)
    pred = torch.sigmoid(results)
    # print(pred)
    pred = pred.to('cpu')
    
    detected = "FAKE" if pred.item() > 0.5 else "REAL"
    # print(detected)

    return pred.item(), detected
    

In [95]:
def detect_deepfake(imgs_paths, video_name, count_thresh, n_images, transform, model):
    count_fake = 0
    count_real = 0
    # for img in imgs_paths:
    if n_images <= len(imgs_paths):
        imgs_paths = random.sample(imgs_paths, n_images)

    input_tensor = [preprocess_image_transform(x, transform) for x in imgs_paths]
    input_tensor = torch.stack((input_tensor))

    # print(input_tensor.shape)
    # for img in input_tensor:
    img = input_tensor.to(device)
    results = model(img)
    pred = torch.sigmoid(results)
    # print(pred)
    pred = pred.to('cpu')
    for result in pred:
        detected = "FAKE" if result.item() > 0.5 else "REAL"
        if detected == "FAKE":
            count_fake += 1
        else:
            count_real += 1


    if count_fake > count_real:
        result_dict[video_name] = 1
        # print("FAKE")
    else:
        result_dict[video_name] = 0
        # print("REAL")
    
    # Calculando a média das predições e armazenando no dicionário
    pred_mean = np.mean(pred.detach().numpy().tolist())
    prob_dict[video_name] = pred_mean


In [96]:
logs_path = "/home/eferreira/master/cross-vit/CrossViT/old_logs/24_srm_l_branch_simple"
backbone = ''
args = open(os.path.join(logs_path, 'args.txt'),'r')
for txt in args:
    if "model" in txt:
        print(txt)
        backbone = txt.split(' ')[-1][:-1]

model = create_model(
backbone,
pretrained=True,
num_classes=1
)
model_path = os.path.join(logs_path, 'model_best.pth')
checkpoint = torch.load(model_path, map_location='cpu')
utils.load_checkpoint(model, checkpoint['model'])
model.eval()

transform = get_transform(logs_path)
model.to(device)

source_dir = '/home/eferreira/master/cross-vit/CrossViT/obama/faces'


for video in tqdm(os.listdir(source_dir)):
# print(video)
    parent_name = os.path.join(source_dir, video)
    # print(parent_name)
    imgs = os.listdir(parent_name)
    imgs = [os.path.join(parent_name, x) for x in imgs]
    # print(imgs)
    detect_deepfake(imgs, video, 18, 30,transform, model)

model: crossvit_18_dagger_224_srm

ViT with 1 classes.


INFO:fvcore.common.checkpoint:[Checkpointer] Loading from /tmp/tmp8v31a9sx ...
[34msrm.kernel[0m


using branch filter


100%|██████████| 1/1 [00:00<00:00,  9.37it/s]


In [97]:
result_dict

{'obama': 1}

In [98]:
prob_dict

{'obama': 0.8166824309776227}

In [99]:
json_path = "/home/eferreira/master/cross-vit/CrossViT/obama/boxes/obama.json"
video_path = "/home/eferreira/master/cross-vit/CrossViT/obama/obama.mp4" 
output_video_path = "/home/eferreira/master/cross-vit/CrossViT/obama/output_video.mp4" 
final_video_path = "/home/eferreira/master/cross-vit/CrossViT/obama/obama_detected.mp4"  

In [100]:
original_width = 1280  # Largura da imagem na qual as bounding boxes foram geradas
original_height = 720

In [101]:
with open(json_path, "r") as f:
    bboxes = json.load(f)

cap = cv2.VideoCapture(video_path)

fps = int(cap.get(cv2.CAP_PROP_FPS))
print(fps)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")



frame_id = 0

23


In [102]:
out = cv2.VideoWriter(output_video_path, fourcc, 23.98, (width, height))


In [103]:
while True:
    init = time.time()
    ret, frame = cap.read()
    if not ret:
        break  

    frame_id += 1
    str_frame_id = str(frame_id)

    if str_frame_id in bboxes and bboxes[str_frame_id] is not None:
        for bbox in bboxes[str_frame_id]:
            xmin, ymin, xmax, ymax = [int(b * 2) for b in bbox]
            w = xmax - xmin
            h = ymax - ymin
            p_h = 0
            p_w = 0
            
            if h > w:
                p_w = int((h - w) / 2)
            elif h < w:
                p_h = int((w - h) / 2)

            x1 = max(xmin - p_w, 0)
            y1 = max(ymin - p_h, 0)
            x2 = min(xmax + p_w, frame.shape[1])
            y2 = min(ymax + p_h, frame.shape[0])
            roi = frame[y1:y2, x1:x2]

            roi = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)

            pred, detection = single_image_inference(roi, transform, model)

            text_x = x1
            text_y = y2 + 20

            text = f"Pred: {pred:.2f} - {detection}"

            color = (0, 255, 0) if detection == "REAL" else (0, 0, 255)

            end = time.time()

            cv2.rectangle(frame, 
                        (max(xmin - p_w, 0), max(ymin - p_h, 0)), 
                        (min(xmax + p_w, frame.shape[1]), min(ymax + p_h, frame.shape[0])),
                        color, 2)
            
            cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, color, 2, cv2.LINE_AA)

            inference_time = round((end - init), 5) * 1000
            inference_time = round(inference_time, 2)
            inference_text = f"Inference time: {inference_time}ms"
            cv2.putText(frame, inference_text, (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, (255, 255, 255), 2, cv2.LINE_AA)

    # cv2.imshow("Video com Bounding Boxes", frame)
    out.write(frame)
    

    # if cv2.waitKey(1) & 0xFF == ord("q"):
    #     break

cap.release()
out.release()
# cv2.destroyAllWindows()


In [105]:
out_vid = cv2.VideoCapture(output_video_path)
fps_out = out_vid.get(cv2.CAP_PROP_FPS)
print(fps_out)

23.98


In [108]:
try:
    ffmpeg.input(output_video_path).filter("fps", fps=23.98, round="up").output(
        "temp_output_video.mp4", vcodec="libx264", preset="slow", crf=18
    ).run(overwrite_output=True)

    video_fixed = ffmpeg.input("temp_output_video.mp4")
    audio_original = ffmpeg.input(video_path).audio 

    ffmpeg.output(video_fixed, audio_original, final_video_path, vcodec="copy", acodec="aac").global_args("-shortest").run(overwrite_output=True)

    print(f"✅ Vídeo final salvo em: {final_video_path}")

except ffmpeg.Error as e:
    print("⚠️ Erro ao processar com FFmpeg:")
    if e.stderr:
        print(e.stderr.decode())

ffmpeg version 5.1.2 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 11.3.0 (conda-forge gcc 11.3.0-19)
  configuration: --prefix=/home/conda/feedstock_root/build_artifacts/ffmpeg_1674566204550/_h_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_plac --cc=/home/conda/feedstock_root/build_artifacts/ffmpeg_1674566204550/_build_env/bin/x86_64-conda-linux-gnu-cc --cxx=/home/conda/feedstock_root/build_artifacts/ffmpeg_1674566204550/_build_env/bin/x86_64-conda-linux-gnu-c++ --nm=/home/conda/feedstock_root/build_artifacts/ffmpeg_1674566204550/_build_env/bin/x86_64-conda-linux-gnu-nm --ar=/home/conda/feedstock_root/build_artifacts/ffmpeg_1674566204550/_build_env/bin/x86_64-conda-linux-gnu-ar --disable-doc --disable-openssl --enable-demuxer=dash --enable-hardcoded-tables --enable-libfreetype --enable-libfontconfig --enable-libopenh264 --enable-gnu

✅ Vídeo final salvo em: /home/eferreira/master/cross-vit/CrossViT/obama/obama_detected.mp4


frame= 1737 fps=1162 q=-1.0 Lsize=   14936kB time=00:01:12.55 bitrate=1686.3kbits/s speed=48.5x    
video:13756kB audio:1126kB subtitle:0kB other streams:0kB global headers:0kB muxing overhead: 0.366684%
[aac @ 0x5b76839545c0] Qavg: 922.834
