In [1]:
# input_video_path=f'/Users/jiyeong/Desktop/컴공 캡스톤/Dataset/ff++/val/*'
input_video_path=f'/Users/jiyeong/Desktop/컴공 캡스톤/Dataset/ff++/train/fake'    # input
output_video_path=f'/Users/jiyeong/Desktop/컴공 캡스톤/output/fake_ff'   # 영상 output 저장하는 경로
frame_path=f'/Users/jiyeong/Desktop/컴공 캡스톤/output/fake_ff/jpg'      # jpg output 저장하는 경로
checkpoint_path=f'/Users/jiyeong/HUFS.CSE.DE-fake-it/model/checkpoints'
predictions_file_path = '/Users/jiyeong/Desktop/컴공 캡스톤/Dataset/ff+(train)_video_predictions.xlsx' # 예측 후 메타 데이터


In [2]:
import cv2
import torch
import torchvision.transforms as T
import numpy as np
import torch.nn.functional as F
from torchvision import models
from torch import nn
import os
import glob
import pandas as pd

In [None]:
# ✅ MPS 디바이스 설정
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
device

In [4]:
# ✅ 모델 정의
class Model(nn.Module):
    def __init__(self, num_classes, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained=True)
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(2048, num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        batch_size, seq_length, c, h, w = x.shape
        x = x.view(batch_size * seq_length, c, h, w)
        fmap = self.model(x)
        x = self.avgpool(fmap)
        x = x.view(batch_size, seq_length, 2048)
        x_lstm, _ = self.lstm(x, None)
        return fmap, self.dp(self.linear1(x_lstm[:, -1, :]))

# ✅ Grad-CAM 계산 함수
def compute_gradcam(model, input_tensor, target_class=None):
    model.eval()
    fmap = None
    grad = None

    def fw_hook(module, inp, out):
        nonlocal fmap
        fmap = out.detach()

    def bw_hook(module, grad_in, grad_out):
        nonlocal grad
        grad = grad_out[0].detach()

    last_layer = model.model[-1]
    f = last_layer.register_forward_hook(fw_hook)
    b = last_layer.register_backward_hook(bw_hook)

    input_tensor = input_tensor.to(device).unsqueeze(0).unsqueeze(0).requires_grad_(True)
    _, output = model(input_tensor)

    if target_class is None:
        target_class = output.argmax(dim=1).item()

    model.zero_grad()
    output[0, target_class].backward()

    weights = grad.mean(dim=[2, 3], keepdim=True)
    cam = (weights * fmap).sum(dim=1, keepdim=True)
    cam = F.relu(cam)
    cam = cam.squeeze().cpu().numpy()
    cam = (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)
    cam = cv2.resize(cam, (input_tensor.shape[-1], input_tensor.shape[-2]))

    f.remove()
    b.remove()
    return cam

# ✅ MJPEG 처리 및 저장 함수
def process_video_and_save_frames(input_video_path, output_video_path, model, frame_dir=f'{frame_path}'):
    os.makedirs(frame_dir, exist_ok=True)
    input_path = f'{input_video_path}/*.mp4'  #Input file path, 입력 파일 경로 - 파일 경로 수정!!
    video_files = glob.glob(input_path)

    # Ensure to use MPS for MacBook -MPS GPu 사용하기
    device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
    print(f"Using device: {device}")
    # 이미 처리되어 저장된 영상 개수 확인
    already_present_count = glob.glob(output_video_path+ '/*.mp4')
    print("No of videos already present ", len(already_present_count))

    # Excel 파일 로드
    df = pd.read_excel(predictions_file_path)


    for video_file in video_files:
        out_path = os.path.join(output_video_path,video_file.split('/')[-1]) # 영상 파일 이름 추출
        print(out_path)

        file_exists = glob.glob(out_path)
        print(file_exists)
        if(len(file_exists) != 0): # 이미 존재하면 pass
            print("File Already exists: " , out_path)
            continue

        
        cap = cv2.VideoCapture(video_file)
        fps = cap.get(cv2.CAP_PROP_FPS)
        w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        result = str(df[df['Filepath'] == video_file]['label'].iloc[0])[0] + str(df[df['Filepath'] == video_file]['Prediction'].iloc[0])[0]


        # 고유한 파일 이름으로 저장
        filename = os.path.basename(video_file)
        name, _ = os.path.splitext(filename)
        output_path = os.path.join(output_video_path, f"({result})_{name}.mp4")

        # MP4로 저장
        out = cv2.VideoWriter(output_path,cv2.VideoWriter_fourcc('M','J','P','G'), fps, (w, h))

        transform = T.Compose([
            T.ToTensor(),
            T.Resize((224, 224)),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        model.to(device)
        model.eval()
        frame_count = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            original = frame.copy()
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = transform(img).to(device)

            cam = compute_gradcam(model, img)
            heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
            heatmap = cv2.resize(heatmap, (original.shape[1], original.shape[0]))
            overlay = 0.4 * heatmap + 0.6 * original
            overlay = np.clip(overlay, 0, 255).astype(np.uint8)

            out.write(overlay)

            # 프레임 저장
            frame_path = os.path.join(frame_dir, f"({result})_{name}_{frame_count:04d}.jpg")
            cv2.imwrite(frame_path, overlay)
            frame_count += 1

        cap.release()
        out.release()
        print(f"✅ Grad-CAM 영상 저장 완료: {output_video_path}")
        print(f"✅ 프레임 이미지 {frame_count}개 저장됨: {frame_dir}/frame_XXXX.jpg")


In [None]:
model = Model(num_classes=2)
model.load_state_dict(torch.load(f"{checkpoint_path}/checkpoint.pt", map_location=device))

process_video_and_save_frames(
    input_video_path,
    output_video_path,
    model=model
)
