<a href="https://colab.research.google.com/github/hongjinkong/opensw/blob/main/final/deepfakedetection_autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import json
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from google.colab import drive
import random
import shutil

# 1. Google Drive 마운트 및 데이터셋 준비
def setup_dataset_from_drive(drive_path, local_extract_path):
    drive.mount('/content/drive')
    video_dir = os.path.join('/content/drive/My Drive', drive_path)

    if not os.path.exists(video_dir):
        print(f"Error: {video_dir} does not exist.")
        return None

    print(f"Dataset found at {video_dir}")
    return video_dir

# 2. metadata.json 기반으로 프레임 추출 및 라벨링
def extract_frames_with_metadata(video_dir, metadata_path, output_base_dir, frame_rate=5):
    # 프레임 디렉토리가 이미 존재하면 작업 건너뛰기
    if os.path.exists(output_base_dir) and len(os.listdir(output_base_dir)) > 0:
        print(f"Frames already extracted at {output_base_dir}. Skipping extraction.")
        return

    with open(metadata_path, 'r') as f:
        metadata = json.load(f)

    fake_videos = []
    real_videos = []

    # fake와 real 라벨로 분리
    for video_file, attributes in metadata.items():
        label = 'fake' if attributes['label'] == 'FAKE' else 'real'
        video_path = os.path.join(video_dir, video_file)

        if not os.path.exists(video_path):
            print(f"Warning: {video_path} does not exist.")
            continue

        if label == 'fake':
            fake_videos.append(video_file)
        else:
            real_videos.append(video_file)

    # train/val로 분할
    def split_data(videos, output_base_dir, label):
        random.shuffle(videos)  # 무작위로 섞기

        split_index = int(0.8 * len(videos))  # 8:2 비율로 분할
        train_videos = videos[:split_index]
        val_videos = videos[split_index:]

        # 훈련 및 검증 디렉토리 생성
        train_dir = os.path.join(output_base_dir, 'train', label)
        val_dir = os.path.join(output_base_dir, 'val', label)
        os.makedirs(train_dir, exist_ok=True)
        os.makedirs(val_dir, exist_ok=True)

        # 프레임 추출 및 저장
        def extract_frames(videos, output_dir):
            for video_file in videos:
                video_path = os.path.join(video_dir, video_file)
                cap = cv2.VideoCapture(video_path)
                frame_id = 0
                count = 0
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                    if count % frame_rate == 0:  # frame_rate마다 프레임 추출
                        if not os.path.exists(output_dir):
                            os.makedirs(output_dir)
                        frame_path = os.path.join(output_dir, f"{video_file}_frame_{frame_id}.jpg")
                        cv2.imwrite(frame_path, frame)
                        frame_id += 1
                    count += 1
                cap.release()

        # 훈련 및 검증 데이터 프레임 추출
        extract_frames(train_videos, train_dir)
        extract_frames(val_videos, val_dir)

    # fake와 real 데이터 분리 후, 각 라벨별로 train/val 분할
    split_data(fake_videos, output_base_dir, 'fake')
    split_data(real_videos, output_base_dir, 'real')

    print(f"Frames extracted and split into train/val directories at {output_base_dir}")


# 3. 데이터 로드
def load_data_from_frames(base_dir):
    # 디렉토리 구조 확인
    print("Loading data from directories...")
    print(f"Train directory: {os.path.join(base_dir, 'train')}")
    print(f"Validation directory: {os.path.join(base_dir, 'val')}")

    datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True,
        validation_split=0.2  # Train/Validation Split
    )

    # flow_from_directory를 이용한 데이터 로드
    train_generator = datagen.flow_from_directory(
        os.path.join(base_dir, 'train'),  # train 디렉토리
        target_size=(128, 128),
        batch_size=32,
        class_mode='categorical',
        subset='training'
    )
    val_generator = datagen.flow_from_directory(
        os.path.join(base_dir, 'val'),  # val 디렉토리
        target_size=(128, 128),
        batch_size=32,
        class_mode='categorical',
        subset='validation'
    )
    return train_generator, val_generator


# 4. 모델 정의
def build_classifier(input_shape):
    inputs = tf.keras.Input(shape=input_shape)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(2, activation='softmax')(x)
    model = Model(inputs, outputs)
    return model

# 5. 새로운 영상 프레임 추출 및 판별
def extract_frames_from_video(video_path, output_dir="temp_frames", frame_rate=5):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_rate == 0:
            frame = cv2.resize(frame, (128, 128))
            frames.append(frame)
        count += 1
    cap.release()
    frames = np.array(frames)
    return frames

def predict_video(model, video_path, frame_rate=5):
    frames = extract_frames_from_video(video_path, frame_rate=frame_rate)
    frames = frames / 255.0  # Normalize
    predictions = model.predict(frames)
    fake_probabilities = predictions[:, 1]  # Fake 클래스 확률
    avg_fake_probability = np.mean(fake_probabilities)

    if avg_fake_probability > 0.5:
        result = "Deepfake"
    else:
        result = "Real"

    print(f"Video Prediction: {result} (Average Fake Probability: {avg_fake_probability:.2f})")

    # 영상에 빨간 테두리 그리기
    cap = cv2.VideoCapture(video_path)
    out = cv2.VideoWriter('output_video_with_border.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 20, (int(cap.get(3)), int(cap.get(4))))

    frame_id = 0
    count = 0  # 추출된 프레임과 predictions 인덱스를 맞추기 위한 변수

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # 프레임 번호와 predictions 매칭
        if count % frame_rate == 0:  # frame_rate에 따라 프레임 추출
            if frame_id < len(predictions):  # predictions 배열 범위 확인
                if predictions[frame_id, 1] > 0.5:  # fake 확률이 0.5 이상일 경우
                    cv2.rectangle(frame, (10, 10), (frame.shape[1] - 10, frame.shape[0] - 10), (0, 0, 255), 5)  # 빨간 테두리
            frame_id += 1

        count += 1
        out.write(frame)  # 결과 영상 저장

    cap.release()
    out.release()
    print("Output video with red border saved as 'output_video_with_border.mp4'")



# 6. 실행
if __name__ == '__main__':
    # Google Drive에서 데이터셋 준비
    video_dir = setup_dataset_from_drive('train_sample_videos', './train_sample_videos')  # Google Drive 내의 폴더 경로

    # metadata.json 경로 수정
    metadata_path = '/content/drive/My Drive/train_sample_videos/metadata.json'  # Google Drive 경로로 직접 지정

    # 프레임 추출
    output_frames_dir = './train_frames'
    extract_frames_with_metadata(video_dir, metadata_path, output_frames_dir, frame_rate=5)

    # 데이터 로드
    train_gen, val_gen = load_data_from_frames(output_frames_dir)

    # 모델 경로 설정
    model_path = '/content/drive/My Drive/train_sample_videos/model.h5'

    # 모델 로드 또는 훈련
    if os.path.exists(model_path):
        print(f"Loading pre-trained model from {model_path}...")
        model = tf.keras.models.load_model(model_path)  # 저장된 모델 로드
    else:
        print("Pre-trained model not found. Starting training from scratch...")
        model = build_classifier((128, 128, 3))
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
        model.fit(train_gen, validation_data=val_gen, epochs=5)  # 새로 모델 훈련
        model.save(model_path)  # 훈련 완료 후 모델 저장
        print(f"Model saved to {model_path}")

    # Google Drive의 testvideo 폴더에서 비디오 파일 하나씩 예측
    testvideo_dir = '/content/drive/My Drive/testvideo'  # testvideo 폴더 경로
    for video_file in os.listdir(testvideo_dir):
        video_path = os.path.join(testvideo_dir, video_file)
        if os.path.isfile(video_path):
            predict_video(model, video_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Dataset found at /content/drive/My Drive/train_sample_videos
Frames already extracted at ./train_frames. Skipping extraction.
Loading data from directories...
Train directory: ./train_frames/train
Validation directory: ./train_frames/val
Found 15744 images belonging to 2 classes.
Found 972 images belonging to 2 classes.
Pre-trained model not found. Starting training from scratch...
Epoch 1/5
[1m  3/492[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m12:46[0m 2s/step - accuracy: 0.6823 - loss: 1.4645

KeyboardInterrupt: 