# 딥페이크 탐지 모델 추론 - SRNet 제출용

이 노트북은 SRNet 모델을 사용한 대회 제출용 추론 스크립트입니다.

## 모델 정보
- **아키텍처**: SRNet (Steganalysis-based ResNet)
- **입력 크기**: 128x128
- **색상 공간**: YCbCr
- **파라미터 수**: ~4.8M

## 평가 데이터 경로
- 입력: `./data/` (이미지 및 비디오 혼합)
- 출력: `./submission.csv`

## 출력 형식
- filename: 파일명 (확장자 포함)
- label: 0(Real) 또는 1(Fake)

## 1. 라이브러리 설치

In [1]:
# 필요한 라이브러리 설치
!pip install -q torch==2.7.1
!pip install -q torchvision==0.22.1
!pip install -q numpy==1.26.4
!pip install -q opencv-python-headless==4.10.0.82
!pip install -q pandas
!pip install -q Pillow
!pip install -q tqdm
!pip install -q aifactory

print("✓ 라이브러리 설치 완료")

✓ 라이브러리 설치 완료


  DEPRECATION: Building 'docopt' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'docopt'. Discussion can be found at https://github.com/pypa/pip/issues/6334


## 2. 라이브러리 Import

In [2]:
import warnings
warnings.filterwarnings("ignore")

import os
import glob
import cv2
import numpy as np
import pandas as pd
import pickle
from tqdm.auto import tqdm
from PIL import Image as PILImage

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.transforms import functional as TF

print("✓ 라이브러리 임포트 완료")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"Device: {torch.cuda.get_device_name(0)}")

✓ 라이브러리 임포트 완료
PyTorch version: 2.7.1+cpu
CUDA available: False


## 3. SRNet 모델 정의

In [3]:
# Squeeze-and-Excitation Block
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)
    
    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = F.adaptive_avg_pool2d(x, 1).view(batch, channels)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(batch, channels, 1, 1)
        return x * y.expand_as(x)

# SRNet Block
class SRNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_se=True):
        super(SRNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.use_se = use_se
        if use_se:
            self.se = SEBlock(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        
        if self.use_se:
            out = self.se(out)
        
        out += self.shortcut(x)
        out = F.relu(out)
        return out

# SRNet 모델
class SRNet(nn.Module):
    def __init__(self, num_classes=2, input_channels=3):
        super(SRNet, self).__init__()
        
        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        
        self.layer1 = self._make_layer(64, 64, num_blocks=2, stride=1, use_se=True)
        self.layer2 = self._make_layer(64, 128, num_blocks=2, stride=2, use_se=True)
        self.layer3 = self._make_layer(128, 256, num_blocks=2, stride=2, use_se=True)
        self.layer4 = self._make_layer(256, 512, num_blocks=2, stride=2, use_se=True)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)
    
    def _make_layer(self, in_channels, out_channels, num_blocks, stride, use_se):
        layers = []
        layers.append(SRNetBlock(in_channels, out_channels, stride, use_se))
        for _ in range(1, num_blocks):
            layers.append(SRNetBlock(out_channels, out_channels, stride=1, use_se=use_se))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

print("✓ SRNet 모델 정의 완료")

✓ SRNet 모델 정의 완료


## 4. YCbCr 변환 및 전처리 함수

In [4]:
def rgb_to_ycbcr_tensor(img_tensor):
    """
    RGB 텐서를 YCbCr 텐서로 변환
    img_tensor: (C, H, W) 형태의 RGB 텐서 (0~1 범위)
    """
    transform_matrix = torch.tensor([
        [ 0.299,  0.587,  0.114],
        [-0.169, -0.331,  0.500],
        [ 0.500, -0.419, -0.081]
    ], dtype=img_tensor.dtype, device=img_tensor.device)
    
    img_flat = img_tensor.permute(1, 2, 0).reshape(-1, 3)
    ycbcr_flat = torch.matmul(img_flat, transform_matrix.T)
    ycbcr_flat[:, 1:] += 0.5
    
    h, w = img_tensor.shape[1], img_tensor.shape[2]
    ycbcr_tensor = ycbcr_flat.reshape(h, w, 3).permute(2, 0, 1)
    
    return ycbcr_tensor

class YCbCrNormalize:
    def __init__(self, mean, std):
        self.mean = torch.tensor(mean).view(3, 1, 1)
        self.std = torch.tensor(std).view(3, 1, 1)
    
    def __call__(self, ycbcr_tensor):
        if ycbcr_tensor.is_cuda:
            self.mean = self.mean.cuda()
            self.std = self.std.cuda()
        return (ycbcr_tensor - self.mean) / self.std

print("✓ YCbCr 변환 함수 정의 완료")

✓ YCbCr 변환 함수 정의 완료


## 5. 모델 및 설정 로드

In [5]:
# 모델 경로 설정 (자동 감지)
if os.path.exists("./model/srnet-steganalysis-model"):
    MODEL_DIR = "./model/srnet-steganalysis-model"  # 제출 환경
    print("✓ 제출 환경 감지: ./model/srnet-steganalysis-model/ 사용")
elif os.path.exists("./srnet_submission"):
    MODEL_DIR = "./srnet_submission"  # 개발 환경
    print("✓ 개발 환경 감지: ./srnet_submission/ 사용")
else:
    raise FileNotFoundError(
        "모델 디렉토리를 찾을 수 없습니다.\n"
        "먼저 Steganalysis.ipynb를 실행하여 모델을 학습하고 저장해주세요.\n"
        "모델은 ./srnet_submission/ 에 저장됩니다."
    )

# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

print(f"\n=== 모델 로딩 중 ===")
print(f"모델 경로: {MODEL_DIR}\n")

# 1. 모델 설정 로드
with open(os.path.join(MODEL_DIR, 'model_config.pkl'), 'rb') as f:
    model_config = pickle.load(f)

print(f"✓ 모델 설정 로드:")
print(f"  아키텍처: {model_config['architecture']}")
print(f"  입력 크기: {model_config['input_size']}")
print(f"  색상 공간: {model_config['color_space']}")
print(f"  라벨: {model_config['labels']}")

# 2. YCbCr 통계 로드
with open(os.path.join(MODEL_DIR, 'ycbcr_stats.pkl'), 'rb') as f:
    ycbcr_stats = pickle.load(f)

ycbcr_normalizer = YCbCrNormalize(
    mean=ycbcr_stats['mean'],
    std=ycbcr_stats['std']
)
print(f"\n✓ YCbCr 통계 로드:")
print(f"  Mean: {ycbcr_stats['mean']}")
print(f"  Std: {ycbcr_stats['std']}")

# 3. 모델 생성 및 가중치 로드
model = SRNet(
    num_classes=model_config['num_classes'],
    input_channels=model_config['input_channels']
)

# 경량화된 모델 파일 사용 (state_dict만 포함)
model_file = os.path.join(MODEL_DIR, 'srnet_model_light.pth')
if not os.path.exists(model_file):
    model_file = os.path.join(MODEL_DIR, 'srnet_model.pth')  # fallback
    
state_dict = torch.load(model_file, map_location=device, weights_only=False)

# state_dict가 직접 state_dict인 경우와 체크포인트인 경우 모두 처리
if isinstance(state_dict, dict) and 'model_state_dict' in state_dict:
    model.load_state_dict(state_dict['model_state_dict'])
    print(f"\n✓ 모델 가중치 로드 (전체 체크포인트):")
    print(f"  Epoch: {state_dict.get('epoch', 'N/A')}")
    print(f"  Val Accuracy: {state_dict.get('val_acc', 0)*100:.2f}%")
    print(f"  Val Macro F1: {state_dict.get('val_macro_f1', 0):.4f}")
else:
    model.load_state_dict(state_dict)
    print(f"\n✓ 모델 가중치 로드 (경량화 버전)")

model = model.to(device)
model.eval()

# 4. 전처리 설정
INPUT_SIZE = model_config['input_size']
transform = transforms.Compose([
    transforms.Resize(INPUT_SIZE),
    transforms.ToTensor(),
])

print(f"\n=== 모델 로드 완료 ===")

✓ 제출 환경 감지: ./model/srnet-steganalysis-model/ 사용
Using device: cpu

=== 모델 로딩 중 ===
모델 경로: ./model/srnet-steganalysis-model

✓ 모델 설정 로드:
  아키텍처: SRNet
  입력 크기: (128, 128)
  색상 공간: YCbCr
  라벨: ['Real', 'Fake']

✓ YCbCr 통계 로드:
  Mean: [0.25792643427848816, 0.4994657635688782, 0.5009396076202393]
  Std: [0.27681994438171387, 0.0426921509206295, 0.03518436849117279]

✓ 모델 가중치 로드 (경량화 버전)

=== 모델 로드 완료 ===


## 6. 추론 함수 정의

In [6]:
def predict_image(image_path):
    """
    이미지 파일에 대한 예측 수행
    
    Args:
        image_path: 이미지 파일 경로
    
    Returns:
        int: 예측 라벨 (0: Real, 1: Fake)
    """
    try:
        # 이미지 로드
        image = PILImage.open(image_path).convert('RGB')
        
        # 전처리: Resize + ToTensor
        img_tensor = transform(image)  # (C, H, W), 0~1
        
        # YCbCr 변환
        ycbcr_tensor = rgb_to_ycbcr_tensor(img_tensor)
        
        # YCbCr 정규화
        ycbcr_tensor = ycbcr_normalizer(ycbcr_tensor)
        
        # 배치 차원 추가 및 디바이스 이동
        ycbcr_tensor = ycbcr_tensor.unsqueeze(0).to(device)
        
        # 추론
        with torch.no_grad():
            outputs = model(ycbcr_tensor)
            predicted_label = outputs.argmax(-1).item()
        
        return predicted_label
    
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return 0  # 오류 시 Real 반환


def extract_video_frames(video_path, num_frames=5):
    """
    비디오에서 여러 프레임 추출
    
    Args:
        video_path: 비디오 파일 경로
        num_frames: 추출할 프레임 수
    
    Returns:
        list: PIL Image 리스트
    """
    frames = []
    
    try:
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            return frames
        
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if total_frames <= 0:
            cap.release()
            return frames
        
        # 균등하게 프레임 인덱스 선택
        if total_frames < num_frames:
            indices = list(range(total_frames))
        else:
            indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
        
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            
            if ret and frame is not None:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_image = PILImage.fromarray(frame_rgb)
                frames.append(pil_image)
        
        cap.release()
    
    except Exception as e:
        print(f"Error extracting frames from {video_path}: {e}")
    
    return frames


def predict_video(video_path, num_frames=5):
    """
    비디오 파일에 대한 예측 수행 (여러 프레임 평균)
    
    Args:
        video_path: 비디오 파일 경로
        num_frames: 추출할 프레임 수
    
    Returns:
        int: 예측 라벨 (0: Real, 1: Fake)
    """
    try:
        # 프레임 추출
        frames = extract_video_frames(video_path, num_frames)
        
        if len(frames) == 0:
            return 0  # 프레임 추출 실패 시 Real 반환
        
        # 각 프레임에 대해 예측
        predictions = []
        
        for frame in frames:
            # 전처리
            img_tensor = transform(frame)
            
            # YCbCr 변환
            ycbcr_tensor = rgb_to_ycbcr_tensor(img_tensor)
            
            # YCbCr 정규화
            ycbcr_tensor = ycbcr_normalizer(ycbcr_tensor)
            
            # 배치 차원 추가
            ycbcr_tensor = ycbcr_tensor.unsqueeze(0).to(device)
            
            # 추론
            with torch.no_grad():
                outputs = model(ycbcr_tensor)
                probs = torch.softmax(outputs, dim=-1)
                fake_prob = probs[0, 1].item()  # Fake 확률
                predictions.append(fake_prob)
        
        # 평균 확률로 최종 예측
        avg_fake_prob = np.mean(predictions)
        final_label = 1 if avg_fake_prob > 0.5 else 0
        
        return final_label
    
    except Exception as e:
        print(f"Error processing {video_path}: {e}")
        return 0  # 오류 시 Real 반환


print("✓ 추론 함수 정의 완료")

✓ 추론 함수 정의 완료


## 7. 데이터 로드

In [7]:
# 데이터 경로 설정
DATA_PATH = './data/'

# 개발 환경: data 디렉토리가 없으면 생성
if not os.path.exists(DATA_PATH):
    print(f"⚠️  '{DATA_PATH}' 디렉토리가 없습니다.")
    os.makedirs(DATA_PATH, exist_ok=True)
    print(f"✓ '{DATA_PATH}' 디렉토리 생성")
    print("⚠️  테스트용 데이터를 ./data/ 에 넣어주세요.")
else:
    print(f"✓ 데이터 경로 확인: {DATA_PATH}")

# 지원 파일 확장자
IMAGE_EXTS = ('.jpg', '.jpeg', '.png', '.bmp', '.webp')
VIDEO_EXTS = ('.mp4', '.avi', '.mov', '.mkv', '.webm')

# 파일 목록 수집
print("\n=== 파일 스캔 시작 ===")
all_files = []

if os.path.exists(DATA_PATH):
    for file in os.listdir(DATA_PATH):
        file_path = os.path.join(DATA_PATH, file)
        if os.path.isfile(file_path):
            all_files.append(file)

print(f"총 파일 수: {len(all_files)}")

# 파일 타입별 분류
image_files = [f for f in all_files if f.lower().endswith(IMAGE_EXTS)]
video_files = [f for f in all_files if f.lower().endswith(VIDEO_EXTS)]

print(f"  이미지: {len(image_files)}개")
print(f"  비디오: {len(video_files)}개")

✓ 데이터 경로 확인: ./data/

=== 파일 스캔 시작 ===
총 파일 수: 0
  이미지: 0개
  비디오: 0개


## 8. 추론 수행

In [8]:
# 추론 수행
print("\n=== 추론 시작 ===")
results = []

# 이미지 추론
if len(image_files) > 0:
    print("\n이미지 추론 중...")
    for filename in tqdm(image_files, desc="Images"):
        file_path = os.path.join(DATA_PATH, filename)
        label = predict_image(file_path)
        results.append({
            'filename': filename,
            'label': label
        })

# 비디오 추론
if len(video_files) > 0:
    print("\n비디오 추론 중...")
    for filename in tqdm(video_files, desc="Videos"):
        file_path = os.path.join(DATA_PATH, filename)
        label = predict_video(file_path, num_frames=5)
        results.append({
            'filename': filename,
            'label': label
        })

print(f"\n✓ 추론 완료: {len(results)}개 파일")


=== 추론 시작 ===

✓ 추론 완료: 0개 파일


## 9. 결과 저장

In [9]:
# 결과 확인
print(f"\n=== 추론 결과 확인 ===")
print(f"results 리스트 길이: {len(results)}")

if len(results) == 0:
    print("⚠️  추론 결과가 없습니다!")
    print("이전 셀을 먼저 실행해주세요.")
    submission_df = pd.DataFrame(columns=['filename', 'label'])
else:
    # DataFrame 생성
    submission_df = pd.DataFrame(results)
    
    # 결과 확인
    print("\n=== 예측 결과 ===")
    print(submission_df.head(10))
    print(f"\n라벨 분포:")
    print(submission_df['label'].value_counts().sort_index())
    print(f"  Real (0): {(submission_df['label']==0).sum()}개")
    print(f"  Fake (1): {(submission_df['label']==1).sum()}개")
    
    # CSV 저장
    OUTPUT_PATH = "./submission.csv"
    submission_df.to_csv(OUTPUT_PATH, index=False)
    
    print(f"\n✓ 결과 저장 완료: {OUTPUT_PATH}")
    print(f"  총 {len(submission_df)}개 예측")


=== 추론 결과 확인 ===
results 리스트 길이: 0
⚠️  추론 결과가 없습니다!
이전 셀을 먼저 실행해주세요.


## 10. 모델 제출

In [11]:
# AIFactory 제출 (원래 방식)
import aifactory.score as aif

COMPETITION_KEY = "7ad25b19-4651-4fc9-b7b3-126ca1f23876"
            
aif.submit(
    model_name="deepfake_srn_model",  # ← 대회 측이 요구한 정확한 model_name 사용
    key=COMPETITION_KEY
)

print("\n✓ 제출 완료!")

file : task
jupyter notebook
중계 서버 오류 : uploadCompleted

✓ 제출 완료!
