In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

nexar_collision_prediction_path = kagglehub.competition_download('nexar-collision-prediction')

print('Data source import complete.')


## **💡Reference Notebook - Fernandosr85 - Dashcam Collision Prediction Project 🚗**
#### **https://www.kaggle.com/code/fernandosr85/dashcam-collision-prediction-project**

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

max_files = 10

count = 0

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
        count += 1
        if count >= max_files:
            break

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import gc
import time
import warnings
from multiprocessing import Pool

import cv2
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import KFold
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.models

warnings.filterwarnings("ignore")

# Check GPU availability and set device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

In [None]:
# Suppress unnecessary formatting warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)

# Paths to the CSV files
train_csv_path = '/kaggle/input/nexar-collision-prediction/train.csv'
test_csv_path = '/kaggle/input/nexar-collision-prediction/test.csv'
submission_csv_path = '/kaggle/input/nexar-collision-prediction/sample_submission.csv'

# Load the CSV files
train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)
submission_df = pd.read_csv(submission_csv_path)

# Display the first few rows of the DataFrames
print("Train.csv:")
print(train_df.head())

print("\nTest.csv:")
print(test_df.head())

print("\nSample Submission:")
print(submission_df.head())

# Optional: handle NaN values if needed, filling with zero or another value
train_df['time_of_event'] = train_df['time_of_event'].fillna(0)
train_df['time_of_alert'] = train_df['time_of_alert'].fillna(0)

## **Data Preprocessing and Feature Extraction**

In [None]:
# 일반적으로 충돌이 발생하는 마지막 부분에 초점을 맞춰 비디오에서 주요 프레임을 추출
# 지수 분포를 사용하여 마지막에 가까운 프레임에 더 많은 가중치를 부여

def extract_keyframes(video_path, num_frames=12, target_size=(160, 160)):
    """
    Extracts key frames from the video, focusing on the final part where collisions typically occur.
    Uses exponential distribution to give more weight to frames closer to the end.
    """
    cap = cv2.VideoCapture(video_path) # 동영상을 불러오기 위해 OpenCV의 videoCapture 객체 생성

    # 파일이 제대로 열리지 않았을 경우 대비한 예외 처리
    if not cap.isOpened():
        print(f"Could not open the video: {video_path}")
        return np.zeros((num_frames, target_size[0], target_size[1], 3), dtype=np.uint8)

    # 총 프레임 수와 초당 프레임 수(FPS)를 가져오기
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    if total_frames <= 0:
        print(f"Video without frames: {video_path}")
        cap.release()
        return np.zeros((num_frames, target_size[0], target_size[1], 3), dtype=np.uint8)

    # 영상 길이(초 단위) 계산
    duration = total_frames / fps if fps > 0 else 0

    # 짧은 영상 (10초 미만): 균등한 간격으로 프레임 추출
    if duration < 10:
        frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)

    # 긴 영상 (10초 이상): 후반부에 더 집중해서 추출
    else:
        # 마지막 3초 동안 프레임의 80% 집중(중요 영역)
        end_frames = int(num_frames * 0.8)
        start_frames = num_frames - end_frames

        # 지난 3초 동안의 시작 인덱스를 계산
        last_seconds = 3
        last_frame_count = min(int(fps * last_seconds), total_frames - 1)
        start_idx = max(0, total_frames - last_frame_count)

        # 마지막 프레임에 더 많은 가중치를 부여하는 지수 분포
        # 이렇게 하면 마지막에 더 밀집된 인덱스가 생성된다 ("프레임을 뽑는 간격"자체를 조절 → 끝부분에 더 많이 몰리게 만드는 방식)
        end_indices = np.array([
            start_idx + int((total_frames - start_idx - 1) * (i/end_frames)**2)
            for i in range(1, end_frames + 1)
        ])

        # context에 맞게 균일하게 배포된 초기 프레임 (초반부에서 균등하게 추출한 프레임들)
        # context란? 사고 직전에 어떤 상황이 펼쳐졌는지에 대한 흐름, 배경, 맥락
        begin_indices = np.linspace(0, start_idx - 1, start_frames, dtype=int) if start_idx > 0 else np.zeros(start_frames, dtype=int)

        # 인덱스 결합
        frame_indices = np.concatenate([begin_indices, end_indices])

    # 선택한 프레임 추출
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            # Use higher resolution and better interpolation
            frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_LANCZOS4)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        else:
            frames.append(np.zeros((target_size[0], target_size[1], 3), dtype=np.uint8))

    cap.release()
    return np.array(frames, dtype=np.uint8)

# 먼저, 전역 범위에서 변환 클래스를 정의
# 입력된 영상 프레임을 일정 확률로 좌우 반전시켜서, 데이터 다양성을 늘리는 역할
class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, frames):
        if np.random.random() < self.p:
            return frames[:, :, ::-1, :].copy()  # horizontally flip each frame
        return frames

# 영상 프레임의 밝기와 대비를 무작위로 조정해, 다양한 조명 환경을 시뮬레이션하는 증강 클래스
class ColorJitter(object):
    def __init__(self, brightness=0, contrast=0):
        self.brightness = brightness
        self.contrast = contrast

    def __call__(self, frames):
        # Apply brightness jitter
        if self.brightness > 0:
            brightness_factor = np.random.uniform(max(0, 1-self.brightness), 1+self.brightness)
            frames = frames * brightness_factor
            frames = np.clip(frames, 0, 255)

        # Apply contrast jitter
        if self.contrast > 0:
            contrast_factor = np.random.uniform(max(0, 1-self.contrast), 1+self.contrast)
            frames = (frames - 128) * contrast_factor + 128
            frames = np.clip(frames, 0, 255)

        return frames

# 프레임에 흐릿한 안개 효과를 넣어, 시야가 나쁜 날씨 상황을 시뮬레이션하는 클래스
class AddFog(object):
    def __call__(self, frames):
        fog = np.random.uniform(0.7, 0.9, frames.shape).astype(np.float32)
        return frames * 0.8 + fog * 50  # Adjusted for 0-255 scale

# 프레임에 흰색 선형 노이즈(빗방울)를 추가해 비 오는 날씨를 시뮬레이션하는 클래스
class AddRain(object):
    def __call__(self, frames):
        h, w = frames.shape[1:3]
        rain = np.random.uniform(0, 1, (len(frames), h, w, 1)).astype(np.float32)
        rain = (rain > 0.97).astype(np.float32) * 200  # White rain drops
        return np.clip(frames * 0.9 + rain, 0, 255)  # Darken a bit and add drops

# 지정된 확률에 따라 어떤 변환을 적용할지 말지를 무작위로 결정하는 컨트롤러 클래스(랜덤성 부여)
class RandomApply(object):
    def __init__(self, transform, p=0.5):
        self.transform = transform
        self.p = p

    def __call__(self, frames):
        if np.random.random() < self.p:
            return self.transform(frames)
        return frames

# 여러 개의 변환(Flip, Jitter, Fog 등)을 순서대로 적용하는 데이터 증강 파이프라인 클래스
class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, frames):
        for t in self.transforms:
            frames = t(frames)
        return frames

# 영상 프레임 배열을 PyTorch 텐서로 바꾸고, 픽셀 값을 0~1 범위로 정규화하는 클래스
class ToTensor(object):
    def __call__(self, frames):
        # Convert from (T, H, W, C) to (T, C, H, W)
        frames = frames.transpose(0, 3, 1, 2)
        # Convert to tensor and normalize to [0, 1]
        return torch.from_numpy(frames).float() / 255.0

In [None]:
# 동영상에서 데이터 증강을 위한 변환을 반환

def get_video_transforms():
    """
    Returns transformations for data augmentation in videos.
    """
    return {
        'train': Compose([
            RandomHorizontalFlip(p=0.5),
            ColorJitter(brightness=0.3, contrast=0.3),
            RandomApply(AddFog(), p=0.15),
            RandomApply(AddRain(), p=0.15),
            RandomApply(RandomNoise(0.05), p=0.2),
            RandomApply(RandomOcclusion(), p=0.1),
            ToTensor()
        ]),
        'val': Compose([
            ToTensor()  # Only tensor conversion for validation
        ])
    }

# 비디오 프레임에서 무작위 가우시안(정규분포) 노이즈를 추가하여, 실제 촬영 환경에서
# 발생할 수 있는 잡음에 대해 모델이 더 강건해지도록 만드는 클래스
class RandomNoise(object):
    """
    Applies random Gaussian noise to video frames for data augmentation.

    This transformation helps the model become more robust to noise
    that may be present in real-world video data.

    Args:
        std (float): Standard deviation of the Gaussian noise as a fraction
                     of the pixel value range (default: 0.05)
    """
    def __init__(self, std=0.05):
        self.std = std

    def __call__(self, frames):
        """
        Apply random noise to the input frames.

        Args:
            frames (numpy.ndarray): Input video frames of shape (T, H, W, C)
                                   where T is number of frames

        Returns:
            numpy.ndarray: Noise-augmented frames, clipped to valid pixel range [0, 255]
        """
        # 지정된 표준 편차를 가진 가우시안 노이즈 생성
        noise = np.random.normal(0, self.std * 255, frames.shape).astype(np.float32)

        # 유효한 픽셀 범위에 노이즈 및 클립 추가하기
        # 영상은 정수형 데이터여야 하므로 형 변환 (astype)
        return np.clip(frames + noise, 0, 255).astype(np.uint8)

# 영상 프레임에 검은색 사각형을 무작위로 덮어 씌워, 일부 정보가 가려졌을 때도 모델이 견딜 수 있도록 훈련시키는 클래스
class RandomOcclusion(object):
    """
    Simulates occlusion in video frames by adding black rectangles.

    This transformation helps the model learn to handle partial occlusions
    that may occur in real-world scenarios when objects block the camera view.
    """
    def __call__(self, frames):
        """
        Apply random occlusion to the input frames.

        Args:
            frames (numpy.ndarray): Input video frames of shape (T, H, W, C)
                                   where T is number of frames

        Returns:
            numpy.ndarray: Frames with random occlusion applied
        """
        # 프레임 하나의 세로(h), 가로(w) 길이 가져오기
        h, w = frames.shape[1:3]

        # 전체 프레임 크기의 10%~25% 사이 크기의 가림 영역 크기 설정
        occl_h = np.random.randint(int(h * 0.1), int(h * 0.25))
        occl_w = np.random.randint(int(w * 0.1), int(w * 0.25))

        # 이 가림 영역이 들어갈 무작위 위치 좌표 설정
        occl_x = np.random.randint(0, w - occl_w)
        occl_y = np.random.randint(0, h - occl_h)

        # 원본 프레임을 수정하지 않도록 복사본 만들기
        frames_copy = frames.copy()

        # 픽셀을 0(검정색)으로 설정하여 모든 프레임에 occlusion 적용
        for i in range(len(frames)):
            frames_copy[i, occl_y:occl_y+occl_h, occl_x:occl_x+occl_w, :] = 0

        return frames_copy

In [None]:
# 비디오 프레임 간 움직임(모션)을 추적하는 'optical_flow'를 계산해, 객체나 배경의 이동 방향과 속도를 벡터 형태로 반환하는 함수
# 두 연속된 이미지(또는 프레임) 사이에서, 각 픽셀이 어떻게 이동했는지를 벡터로 표현하는 기술 -> optical_flow

def compute_optical_flow(frames, skip_frames=1):
    """Calculates optical flow skipping some frames to reduce processing."""
    if len(frames) < 2:
        return np.zeros((1, frames.shape[1], frames.shape[2], 2), dtype=np.float32)

    flows = []

    # 첫 프레임을 그레이스케일(흑백 이미지)로 변환
    prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_RGB2GRAY)

    # 연속된 프레임 쌍마다 optical_flow 계산
    for i in range(1, len(frames), skip_frames):
        curr_gray = cv2.cvtColor(frames[i], cv2.COLOR_RGB2GRAY)
        try:
            # 더 빠른 계산을 위해 매개변수 줄이기
            flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray,
                                               None, 0.5, 3, 15, 3, 5, 1.2, 0)
            flows.append(flow)
        # 예외 처리 (계산 중 오류 발생 시 안전하게 0으로 대체)
        except Exception as e:
            print(f"Error calculating optical flow: {str(e)}")
            flows.append(np.zeros((frames.shape[1], frames.shape[2], 2), dtype=np.float32))

        prev_gray = curr_gray

    if not flows:
        return np.zeros((1, frames.shape[1], frames.shape[2], 2), dtype=np.float32)

    return np.array(flows, dtype=np.float32)


In [None]:
# 하나의 동영상 파일을 받아, 주요 프레임을 추출하고 optical_flow(움직임 정보)를 계산해 구조화된 데이터로 반환하는 전처리 함수

def process_video(args):
    """
    Function to process an individual video.
    """
    video_path, video_id, num_frames = args
    try:
        # 더 높은 해상도로 프레임 추출
        frames = extract_keyframes(video_path, num_frames=num_frames, target_size=(160, 160))

        # optical flow 계산
        optical_flow = compute_optical_flow(frames, skip_frames=1)

        # 변환을 적용하는 대신 NumPy 배열을 반환
        return video_id, {
            'frames': frames,
            'optical_flow': optical_flow,
        }
    # 예외 처리 (None을 리턴)
    except Exception as e:
        print(f"Error processing video {video_id}: {str(e)}")
        return video_id, None

In [None]:
# 여러 개의 영상을 병렬로 처리해 빠르게 전처리하고, 각 영상의 프레임 및 optical flow 데이터를 하나의 딕셔너리로 정리하는 함수

def parallel_preprocess_dataset(video_dir, video_ids, num_frames=8, num_workers=4):
    """
    Pre-processes multiple videos in parallel.
    """

    # 비디오 목록 준비 (튜플 형태로 리스트에 추가)
    args_list = []
    for video_id in video_ids:
        video_path = os.path.join(video_dir, f"{video_id}.mp4")
        if os.path.exists(video_path):
            args_list.append((video_path, video_id, num_frames))

    start_time = time.time()
    print(f"Starting parallel pre-processing of {len(args_list)} videos with {num_workers} workers...")

    processed_data = {}

    # 병렬 처리 시작
    with Pool(num_workers) as p:
        results = p.map(process_video, args_list)
        for video_id, data in results:
            if data is not None:
                processed_data[video_id] = data

    print(f"Pre-processing completed in {time.time() - start_time:.2f} seconds.")
    print(f"Processed {len(processed_data)} out of {len(args_list)} videos.")

    return processed_data

In [None]:
# mp4 비디오 파일에서 직접 프레임과 optical flow를 추출하며 학습 데이터를 생성
class DashcamDataset(Dataset):
   def __init__(self, video_dir, annotations, transform=None, num_frames=8):
       self.video_dir = video_dir
       self.annotations = annotations
       self.transform = transform
       self.num_frames = num_frames
       self.video_ids = list(annotations.keys())

   def __len__(self):
       return len(self.video_ids)

   def __getitem__(self, idx):
       video_id = self.video_ids[idx]
       video_path = os.path.join(self.video_dir, f"{video_id}.mp4")

       try:
           # Check if the file exists
           if not os.path.exists(video_path):
               print(f"Video not found: {video_path}")
               raise FileNotFoundError(f"File not found: {video_path}")

           # Extract frames with reduced resolution
           frames = extract_keyframes(video_path, self.num_frames, target_size=(112, 112))

           # Calculate optical flow with skip_frames
           optical_flow = compute_optical_flow(frames, skip_frames=1)

           # Apply transformations
           if self.transform:
               frames = self.transform(frames)
           else:
               # Convert to tensor manually
               frames = torch.from_numpy(frames.transpose(0, 3, 1, 2)).float() / 255.0

           # Convert optical flow to tensor
           optical_flow = torch.from_numpy(optical_flow.transpose(0, 3, 1, 2)).float()

           # Load label and alert time
           label = self.annotations[video_id]['label']
           alert_time = self.annotations[video_id].get('alert_time', 0)

           return {
               'frames': frames,
               'optical_flow': optical_flow,
               'label': torch.tensor(label).float(),
               'alert_time': torch.tensor(alert_time).float(),
               'video_id': video_id
           }

       except Exception as e:
           print(f"Error processing video {video_id}: {str(e)}")
           # Create a placeholder for this video
           dummy_frames = torch.zeros((self.num_frames, 3, 112, 112))
           dummy_flow = torch.zeros((max(1, self.num_frames-1), 2, 112, 112))
           return {
               'frames': dummy_frames,
               'optical_flow': dummy_flow,
               'label': torch.tensor(self.annotations[video_id]['label']).float(),
               'alert_time': torch.tensor(self.annotations[video_id].get('alert_time', 0)).float(),
               'video_id': video_id
           }

# 이미 전처리된 결과(NumPy 배열)를 메모리에서 불러와서 빠르게 학습 데이터를 생성
class PreprocessedDashcamDataset(Dataset):
   def __init__(self, processed_data, annotations, transform=None):
       self.processed_data = processed_data
       self.annotations = annotations
       self.transform = transform
       self.video_ids = list(processed_data.keys())

   def __len__(self):
       return len(self.video_ids)

   def __getitem__(self, idx):
       video_id = self.video_ids[idx]
       data = self.processed_data[video_id]

       # Get frames and optical flow
       frames = data['frames']
       optical_flow = data['optical_flow']

       # Apply transformations to frames
       if self.transform:
           frames_tensor = self.transform(frames)
       else:
           # Convert to tensor manually
           frames_tensor = torch.from_numpy(frames.transpose(0, 3, 1, 2)).float() / 255.0

       # Convert optical flow to tensor
       optical_flow_tensor = torch.from_numpy(optical_flow.transpose(0, 3, 1, 2)).float()

       # Load label and alert time
       label = self.annotations[video_id]['label']
       alert_time = self.annotations[video_id].get('alert_time', 0)

       return {
           'frames': frames_tensor,
           'optical_flow': optical_flow_tensor,
           'label': torch.tensor(label).float(),
           'alert_time': torch.tensor(alert_time).float(),
           'video_id': video_id
       }

## **AAT-DA 모델 입력 생성을 위한 데이터 처리**
#### **필수 구성요소: Object Detection, VGG Feature 추출, Driver Attention 맵 계산, Attention Weight 계산**

In [None]:
# 외부에서 object_detector와 driver_attention_model을 인자로 주입받도록 설계

# 더미 객체 탐지기 클래스
class ObjectDetector:
    def detect(self, frame):
        h, w, _ = frame.shape
        return [(w//3, h//3, 2*w//3, 2*h//3)]  # 테스트용 더미 박스

# 더미 시선 예측 모델 클래스
class DummyDriverAttention:
    def predict(self, frame):
        h, w, _ = frame.shape
        attention_map = np.zeros((h, w), dtype=np.float32)
        cx, cy = w // 2, h // 2
        attention_map[cy-10:cy+10, cx-10:cx+10] = 1.0
        return cv2.GaussianBlur(attention_map, (25, 25), 0)

In [None]:
import torch
import numpy as np
import cv2
from torchvision import models, transforms

class AATDAPreprocessor:
    def __init__(self, object_detector, driver_attention_model, vgg_model=None):
        self.detector = object_detector  # Cascade R-CNN 등 외부 감지기
        self.attention_model = driver_attention_model  # Gate-DAP 등
        self.vgg = vgg_model or models.vgg16(pretrained=True).features.eval()
        self.vgg_fc7 = torch.nn.Sequential(*list(models.vgg16(pretrained=True).classifier.children())[:6]).eval()
        self.vgg_transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

    # 감지된 객체 영역을 잘라서 VGG FC7 feature(4096-dim) 추출
    def extract_object_features(self, frame, boxes):
        features = []
        for (x1, y1, x2, y2) in boxes:
            crop = frame[y1:y2, x1:x2]
            if crop.shape[0] == 0 or crop.shape[1] == 0:
                features.append(torch.zeros(4096))
                continue
            input_tensor = self.vgg_transform(crop).unsqueeze(0)
            with torch.no_grad():
                feat = self.vgg(input_tensor)
                feat = feat.view(feat.size(0), -1)
                fc7 = self.vgg_fc7(feat).squeeze(0)
            features.append(fc7)
        return torch.stack(features)  # Shape: (N, 4096)

    # attention map과 객체 영역이 얼마나 겹치는지 평균값으로 가중치 계산
    def compute_attention_weights(self, attention_map, boxes):
        weights = []
        for (x1, y1, x2, y2) in boxes:
            patch = attention_map[y1:y2, x1:x2]
            weight = np.mean(patch) if patch.size > 0 else 0.0
            weights.append(weight)
        return torch.tensor(weights, dtype=torch.float32)  # Shape: (N,)

    # 한 프레임에 대해 아래와 같이 순서대로 작동
    def process_frame(self, frame):
        # 1. 객체 감지
        boxes = self.detector.detect(frame)  # returns List of [x1, y1, x2, y2]

        # 2. VGG Feature 추출
        object_feats = self.extract_object_features(frame, boxes)  # (N, 4096)

        # 3. 시선 맵 생성
        attention_map = self.attention_model.predict(frame)  # (H, W), numpy float32

        # 4. Attention Weight 계산
        weights = self.compute_attention_weights(attention_map, boxes)  # (N,)

        # 5. Feature 강화
        weighted_feats = object_feats * weights.unsqueeze(1)  # (N, 4096)

        return weighted_feats, weights, boxes  # Transformer 입력으로 사용할 수 있음


In [None]:
# 실행 예시
if __name__ == '__main__':
    detector = ObjectDetector()
    attention_model = DummyDriverAttention()
    processor = AATDAPreprocessor(detector, attention_model)

    # 테스트용 이미지 불러오기
    frame = cv2.imread("sample_frame.jpg")  # 실제 경로로 작성하기!!!
    if frame is not None:
        weighted_feats, weights, boxes = processor.process_frame(frame)
        print("감지된 객체 수:", len(boxes))
        print("Feature shape:", weighted_feats.shape)  # (N, 4096)
    else:
        print("이미지를 불러올 수 없습니다.")

In [None]:
# 비디오 하나에 포함된 여러 프레임을 process_frame()으로 반복 처리
# 각 프레임의 객체 feature를 (T, N, 4096) 구조로 패딩하여 Transformer 입력에 맞춤

class AATDADataset(Dataset):
    def __init__(self, video_data_dict, annotations, processor, num_frames=8):
        self.video_data = video_data_dict  # {video_id: [frame1, frame2, ...]}
        self.annotations = annotations  # {video_id: {label: ..., alert_time: ...}}
        self.processor = processor
        self.num_frames = num_frames
        self.video_ids = list(video_data_dict.keys())

    def __len__(self):
        return len(self.video_ids)

    def __getitem__(self, idx):
        video_id = self.video_ids[idx]
        frames = self.video_data[video_id]  # raw RGB frames

        # 각 프레임에 대해 Object Feature + Attention 처리
        sequence_features = []
        for frame in frames:
            weighted_feats, _, _ = self.processor.process_frame(frame)
            sequence_features.append(weighted_feats)

        # 시퀀스를 (T, N, 4096) 형태의 tensor로 정리 (padding 가능)
        max_objects = max(f.shape[0] for f in sequence_features)
        padded_seq = []
        for f in sequence_features:
            pad_size = max_objects - f.shape[0]
            if pad_size > 0:
                padded = torch.cat([f, torch.zeros(pad_size, 4096)], dim=0)
            else:
                padded = f
            padded_seq.append(padded)

        # 최종 시퀀스: (T, N, 4096)
        input_tensor = torch.stack(padded_seq)

        label = torch.tensor(self.annotations[video_id]['label']).float()
        alert_time = torch.tensor(self.annotations[video_id].get('alert_time', 0)).float()

        return {
            'video_id': video_id,
            'input': input_tensor,  # (T, N, 4096)
            'label': label,
            'alert_time': alert_time
        }


### **1단계: 일반 전처리**
#### **(1) 프레임 추출 - extract_keyframes()**
#### **(2) Optical Flow 계산 - compute_optical_flow()**
#### **(3) 기본 증강 - ColorJitter, AddRain, AddFog, ToTensor() 등**
#### **(4) 데이터 구성 - DashcamDataset 또는 PreprocessDashcamDataset으로 구성**

### **2단계: AAT-DA 전용 전처리 (기존 데이터에서 Transformer용 구조 변환)**
##### AAT-DA는 단순한 영상 프레임이 아니라, "객체 중심의 시공간 Attention 입력 구조"를 요구하기 때문에 기존 전처리된 데이터를 바탕으로 추가적인 전처리가 필요
#### **(1) 객체 감지 - 프레임에서 객체 감지 (Cascade R-CNN 등)**
#### **(2) 객체 특징 추출 - 감지된 박스마다 VGG16 FC7 feature 추출 (4096-dim)**
#### **(3) 시선 맵 예측 - Gate-DAP 등으로 driver attention heatmap 생성**
#### **(4) 주의 가중치 계산 - 시선 맵 + 객체 위치 → 객체별 attention weight αᵢ 계산**
#### **(5) Feature 가중 - 객체 feature αᵢ → 강조된 객체 feature**
#### **(6) 시퀀스 구성 - 모든 프레임의 결과를 (T, N, 4096) 시퀀스로 패딩 정리**

In [None]:
# AAT-DA 모델 구성
# 입력: (B, T, N, 4096)
# 구성: Spatial Transformer + Temporal Transformer + Classifier

import torch
import torch.nn as nn
import torch.nn.functional as F

# 각 프레임 내 객체들 간 상호작용 학습 (Object Self-Attention)
class SpatialTransformer(nn.Module):
    def __init__(self, d_model=4096, nhead=8, dim_feedforward=2048):
        super().__init__()
        self.encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=1)

    def forward(self, x):  # x: (B, T, N, 4096)
        B, T, N, C = x.shape
        x = x.view(B * T, N, C)  # (B*T, N, C)
        x = self.encoder(x)  # (B*T, N, C)
        x = x.mean(dim=1)  # (B*T, C)
        x = x.view(B, T, C)  # (B, T, C)
        return x

# 시간 순서에 따른 의미 흐름 모델링 (Temporal Attention)
class TemporalTransformer(nn.Module):
    def __init__(self, d_model=4096, nhead=8, dim_feedforward=2048):
        super().__init__()
        self.encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=1)

    def forward(self, x):  # x: (B, T, C)
        x = x.transpose(0, 1)  # (T, B, C)
        x = self.encoder(x)  # (T, B, C)
        x = x.transpose(0, 1)  # (B, T, C)
        return x

class AATDA(nn.Module):
    def __init__(self, d_model=4096, nhead=8, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.spatial_transformer = SpatialTransformer(d_model, nhead, dim_feedforward)
        self.temporal_transformer = TemporalTransformer(d_model, nhead, dim_feedforward)
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 1),
        )

    def forward(self, x):  # x: (B, T, N, 4096)
        x = self.spatial_transformer(x)     # (B, T, 4096)
        x = self.temporal_transformer(x)    # (B, T, 4096)
        x = x.mean(dim=1)                   # (B, 4096) - 평균 풀링
        out = self.classifier(x)            # (B, 1)
        return out.squeeze(-1)              # (B,)

In [None]:
# 데이터 로더 연결


In [None]:
# 모델 초기화 및 학습 설정

In [None]:
# 학습 함수 작성
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for batch in loader:
        inputs = batch['input'].to(device)         # (B, T, N, 4096)
        labels = batch['label'].to(device).float() # (B,)

        optimizer.zero_grad()
        outputs = model(inputs)                    # (B,)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)
