## 딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회

**※주의** : 반드시 본 파일을 이용하여 제출을 수행해야 하며, 파일의 이름은 `task.ipynb`로 유지되어야 합니다.

* #### 추론 실행 환경
    * `python 3.9` 환경
    * `CUDA 10.2`, `CUDA 11.8`, `CUDA 12.6`를 지원합니다.
    * 각 CUDA 환경에 미리 설치돼있는 torch 버전은 다음 표를 참고하세요.

<table>
  <thead>
    <tr>
      <th align="center">Python</th>
      <th align="center">CUDA</th>
      <th align="center">torch</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="center" style="vertical-align: middle;">3.8</td>
      <td align="center">10.2</td>
      <td align="center">1.6.0</td>
    </tr>
    <tr>
      <td align="center" style="vertical-align: middle;">3.9</td>
      <td align="center">11.8</td>
      <td align="center">1.8.0</td>
    </tr>
    <tr>
      <td align="center">3.10</td>
      <td align="center">12.6</td>
      <td align="center">2.7.1</td>
    </tr>
  </tbody>
</table>

* #### CUDA 버전 관련 안내사항  
  - 이번 경진대회는 3개의 CUDA 버전을 지원합니다.  
  - 참가자는 자신의 모델의 라이브러리 의존성에 맞는 CUDA 환경을 선택하여 모델을 제출하면 됩니다.   
  - 각 CUDA 환경에는 기본적으로 torch가 설치되어 있으나, 참가자는 제출하는 CUDA 버전과 호환되는 torch, 필요한 버전의 라이브러리를 `!pip install` 하여 사용하여도 무관합니다.

* #### `task.ipynb` 작성 규칙
코드는 크게 3가지 파트로 구성되며, 해당 파트의 특성을 지켜서 내용을 편집하세요.   
1. **제출용 aifactory 라이브러리 및 추가 필요 라이브러리 설치**
    - 채점 및 제출을 위한 aifactory 라이브러리를 설치하는 셀입니다. 이 부분은 수정하지 않고 그대로 실행합니다.
    - 그 외로, 모델 추론에 필요한 라이브러리를 직접 설치합니다.
2. **추론용 코드 작성**
    - 모델 로드, 데이터 전처리, 예측 등 실제 추론을 수행하는 모든 코드를 이 영역에 작성합니다.
3. **aif.submit() 함수를 호출하여 최종 결과를 제출**
    - **마이 페이지-활동히스토리**에서 발급받은 key 값을 함수의 인자로 정확히 입력해야 합니다.
    - **※주의** : 제출하고자 하는 CUDA 환경에 맞는 key를 입력하여야 합니다.

<table>
  <thead>
    <tr>
      <th align="left">Competition 이름</th>
      <th align="center">CUDA</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회</td>
      <td align="center">11.8</td>
    </tr>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회 CUDA 12.6</td>
      <td align="center">12.6</td>
    </tr>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회 CUDA 10.2</td>
      <td align="center">10.2</td>
    </tr>
  </tbody>
</table>

------

#### 1. 제출용 aifactory 라이브러리 설치
※ 결과 전송에 필요하므로 아래와 같이 aifactory 라이브러리가 반드시 최신버전으로 설치될 수 있게끔 합니다

In [1]:
!pip install -U aifactory



* 자신의 모델 추론 실행에 필요한 추가 라이브러리 설치

In [None]:
!pip install -U ultralytics
!pip install -U transformers
!pip install -U torch==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl/cu118
!pip install -U opencv-python-headless pillow numpy pandas scikit-learn tqdm

Looking in indexes: https://download.pytorch.org/whl/cu118
Collecting numpy
  Using cached numpy-2.3.5-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (62 kB)


-----

#### 2. 추론용 코드 작성

##### 추론 환경의 기본 경로 구조

- 평가 데이터셋 경로: `./data/`
   - 채점에 사용될 테스트 데이터셋은 `./data/` 디렉토리 안에 포함되어 있습니다.
   - 해당 디렉토리에는 이미지(JPG, PNG)와 동영상(MP4) 파일이 별도의 하위 폴더 없이 혼합되어 있습니다.
```bash
/aif/
└── data/
    ├── {이미지 데이터1}.jpg
    ├── {이미지 데이터2}.png
    ├── {동영상 데이터1}.mp4
    ├── {이미지 데이터3}.png
    ├── {동영상 데이터2}.mp4
    ...
```

- 모델 및 자원 경로: 예시 : `./model/`
   - 추론 스크립트가 실행되는 위치를 기준으로, 제출된 모델 관련 파일들이 위치해야하 하는 상대 경로입니다.
   - 학습된 모델 가중치(.pt, .ckpt, .pth 등)

* 제출 파일은 `submission.csv`로 저장돼야 합니다.
  * submission.csv는 *filename*과 *label* 컬럼으로 구성돼야 합니다.
  * filename은 추론한 파일의 이름(확장자 포함), label은 추론 결과입니다. (real:0, fake:1)
  * filename은 *string*, label은 *int* 자료형이어야 합니다.
  * 추론하는 데이터의 순서는 무작위로 섞여도 상관 없습니다.

<table>
  <thead>
    <tr>
      <th align="center">filename</th>
      <th align="center">label</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="center">{이미지 데이터1}.jpg</td>
      <td align="center">0</td>
    </tr>
    <tr>
      <td align="center">{동영상 데이터1}.mp4</td>
      <td align="center">1</td>
    </tr>
    <tr>
      <td colspan="2" align="center">...</td>
    </tr>
  </tbody>
</table>

**※ 주의 사항**

* argparse 사용시 `args, _ = parser.parse_known_args()`로 인자를 지정하세요.   
   - `args = parser.parse_args()`는 jupyter에서 오류가 발생합니다.
* return 할 결과물과 양식에 유의하세요.

In [None]:
import os
import time
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn.functional as F
from torchvision import transforms
from transformers import Dinov2ForImageClassification, AutoImageProcessor
from ultralytics import YOLO
import warnings
warnings.filterwarnings('ignore')

model_path = "./models_trained/dino_v2_4channel"
yolo_weights = "yolov8n-face-lindevs.pt" 

test_data_path = "./data/"
submission_file = "submission.csv"

IMAGE_EXTS = {".jpg", ".jpeg", ".png"}
VIDEO_EXTS = {".avi", ".mp4"}
TARGET_SIZE = (224, 224)
NUM_FRAMES_TO_EXTRACT = 30


def get_defocus_map(img_pil):
    """Compute defocus map using Laplacian operator"""
    img_gray_cv = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2GRAY)
    laplacian = cv2.Laplacian(img_gray_cv, cv2.CV_32F, ksize=3)
    defocus_map = cv2.normalize(np.abs(laplacian), None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    return Image.fromarray(defocus_map)


def get_boundingbox_yolo(box, width, height):
    x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
    size_bb = int(max(x2 - x1, y2 - y1) * 1.3)
    center_x, center_y = (x1 + x2) // 2, (y1 + y2) // 2
    x1 = max(int(center_x - size_bb // 2), 0)
    y1 = max(int(center_y - size_bb // 2), 0)
    size_bb = min(width - x1, size_bb)
    size_bb = min(height - y1, size_bb)
    return x1, y1, size_bb


def detect_and_crop_face_optimized(image: Image.Image, face_detector, processor):
    """
    Detect face and return 4-channel tensor (RGB + defocus map)
    """
    if image.mode != 'RGB': 
        image = image.convert('RGB')
    
    results = face_detector(image, device="cuda", verbose=False)

    if not results or len(results[0].boxes) == 0:
        return None  

    original_np = np.array(image)
    original_h, original_w, _ = original_np.shape
    best_box = results[0].boxes.xyxy[0].cpu().numpy() 
    
    x, y, size = get_boundingbox_yolo(best_box, original_w, original_h)
    cropped_np = original_np[y:y + size, x:x + size]
    
    if cropped_np.size == 0:
        return None
    
    face_rgb = Image.fromarray(cropped_np)
    
    defocus_map_full = get_defocus_map(image)
    defocus_map_np = np.array(defocus_map_full)
    defocus_cropped = defocus_map_np[y:y + size, x:x + size]
    face_defocus = Image.fromarray(defocus_cropped)
    
    resize_crop = transforms.Compose([
        transforms.Resize(processor.size["shortest_edge"]),
        transforms.CenterCrop(processor.size["shortest_edge"]),
    ])
    
    face_rgb_resized = resize_crop(face_rgb)
    face_defocus_resized = resize_crop(face_defocus)
    
    to_tensor = transforms.ToTensor()
    normalize_rgb = transforms.Normalize(mean=processor.image_mean, std=processor.image_std)
    normalize_defocus = transforms.Normalize(mean=[0.5], std=[0.5])
    
    rgb_tensor = normalize_rgb(to_tensor(face_rgb_resized))
    defocus_tensor = normalize_defocus(to_tensor(face_defocus_resized))
    
    combined_tensor = torch.cat((rgb_tensor, defocus_tensor), dim=0)
    
    return combined_tensor


def process_single_file(file_path, face_detector, processor):
    face_tensors = []
    ext = file_path.suffix.lower()

    try:
        if ext in IMAGE_EXTS:
            image = Image.open(file_path)
            face_tensor = detect_and_crop_face_optimized(image, face_detector, processor)
            if face_tensor is not None:
                face_tensors.append(face_tensor)
                
        elif ext in VIDEO_EXTS:
            cap = cv2.VideoCapture(str(file_path))
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            if total_frames > 0:
                frame_indices = np.linspace(0, total_frames - 1, NUM_FRAMES_TO_EXTRACT, dtype=int)
                for idx in frame_indices:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
                    ret, frame = cap.read()
                    if not ret: 
                        continue
                    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                    face_tensor = detect_and_crop_face_optimized(image, face_detector, processor)
                    if face_tensor is not None:
                        face_tensors.append(face_tensor)
            cap.release()
            
    except Exception as e:
        print(f"Error processing {file_path.name}: {e}")
        return [] 

    return face_tensors


if __name__ == "__main__":
    start_time = time.time()
    
    try:
        face_detector = YOLO(yolo_weights)
        face_detector.to("cuda")
        print(f"YOLOv8-Face detector loaded successfully from {yolo_weights}.")

        model = Dinov2ForImageClassification.from_pretrained(model_path).to("cuda")
        processor = AutoImageProcessor.from_pretrained(model_path)
        model.eval()
        print(f"DINOv2 4-channel model loaded successfully from {model_path}.")
    
    except Exception as e:
        print(f"Fatal error during model loading: {e}")
        raise e
    

    data_dir = Path(test_data_path)
    files = [f for f in data_dir.iterdir() if f.is_file() and f.suffix.lower() in (IMAGE_EXTS | VIDEO_EXTS)]
    print(f"Found {len(files)} files in {test_data_path}")
    results_to_write = {}

    print("Starting inference with 4-channel DINOv2 model...")
    
    with tqdm(total=len(files), desc="Processing files") as pbar:
        for f in files:
            filename = f.name
            
            face_tensors = process_single_file(f, face_detector, processor) 
            
            if not face_tensors:
                results_to_write[filename] = 0 
            else:
                try:
                    batch_tensor = torch.stack(face_tensors).to("cuda")
                    
                    with torch.no_grad():
                        outputs = model(batch_tensor)
                        logits = outputs.logits
                        probs = F.softmax(logits, dim=1)
                    
                    avg_probs = probs.mean(dim=0) 
                    predicted_class = torch.argmax(avg_probs).item()
                    results_to_write[filename] = predicted_class
                    
                except Exception as e:
                    print(f"Error during inference for {filename}: {e}")
                    results_to_write[filename] = 0 

            pbar.update(1)

    print(f"Inference completed. Saving results to {submission_file}...")
    submission_df = pd.DataFrame(results_to_write.items(), columns=["filename", "label"])
    submission_df.to_csv(submission_file, index=False)

    end_time = time.time()
    print(f"Total processing time: {end_time - start_time:.2f} seconds")
    print("Inference completed.")

----

#### 3. `aif.submit()` 함수를 호출하여 최종 결과를 제출

**※주의** : task별, 참가자별로 key가 다릅니다. 잘못 입력하지 않도록 유의바랍니다.
- key는 대회 페이지 [베이스라인 코드](https://aifactory.space/task/9197/baseline) 탭에 기재된 가이드라인을 따라 task 별로 확인하실 수 있습니다.
- key가 틀리면 제출이 진행되지 않거나 잘못 제출되므로 task에 맞는 자신의 key를 사용해야 합니다.
-  **NOTE** : 이번 경진대회에서는 3개의 CUDA 버전을 지원하며, 각 CUDA 버전에 따라 task key가 상이합니다. 함수를 실행하기 전에 현재 key가 제출하고자 하는 CUDA 환경에 대한 key인지 반드시 확인하세요.

In [None]:
import aifactory.score as aif
import time
t = time.time()

#-----------------------------------------------------#
aif.submit(model_name="T1_3Peats",
    key="01171c4a-df40-4271-baf0-3a240cbf06a4"
)
#-----------------------------------------------------#
print(time.time() - t)

file : task
jupyter notebook
제출 완료
39.13414764404297
