#### **비디오 데이터 추출**

In [10]:
import os
import glob
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
from einops import rearrange
import numpy as np

##### 1. 기존 모듈 정의 (VideoPatchEmbed, VideoEmbed, VideoViTFeatureExtractor, VideoFeatureExtractor)

In [11]:
# VideoPatchEmbed: 비디오를 패치 임베딩으로 변환
class VideoPatchEmbed(nn.Module):
    def __init__(self, patch_size=16, in_chans=3, embed_dim=768):
        super().__init__()
        self.patch_size = (patch_size, patch_size)
        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=self.patch_size, stride=self.patch_size)

    def forward(self, x):
        # x: (B, C, T, H, W)
        B, C, T, H, W = x.shape
        x = rearrange(x, 'b c t h w -> (b t) c h w')
        x = self.proj(x)  # (B*T, embed_dim, H_patch, W_patch)
        _, embed_dim, H_patch, W_patch = x.shape
        x = x.flatten(2).transpose(1, 2)  # (B*T, N, embed_dim) with N = H_patch * W_patch
        return x, T, W_patch

In [12]:
# VideoEmbed: 패치 임베딩에 시간 임베딩을 추가하여 spatiotemporal embedding 생성
class VideoEmbed(nn.Module):
    def __init__(self, patch_embed, embed_dim=768, num_frames=18):
        super().__init__()
        self.patch_embed = patch_embed
        self.embed_dim = embed_dim
        self.num_frames = num_frames
        self.time_embed = nn.Parameter(torch.zeros(1, num_frames, embed_dim))
        self.drop = nn.Dropout(0.1)
        nn.init.trunc_normal_(self.time_embed, std=0.02)
    
    def forward(self, x):
        # x: (B, C, T, H, W)
        B, C, T, H, W = x.shape
        x, T, W_patch = self.patch_embed(x)  # (B*T, N, embed_dim)
        N = x.shape[1]
        x = x.view(B, T, N, self.embed_dim)
        if T != self.time_embed.shape[1]:
            time_embed = self.time_embed.transpose(1, 2)
            new_time_embed = F.interpolate(time_embed, size=T, mode='linear', align_corners=False)
            new_time_embed = new_time_embed.transpose(1, 2)
        else:
            new_time_embed = self.time_embed
        x = x + new_time_embed.unsqueeze(2)
        x = self.drop(x)
        x = x.reshape(B, T * N, self.embed_dim)
        return x

In [13]:
# VideoViTFeatureExtractor: Hugging Face의 ViTModel을 활용하여 피처 추출
from transformers import ViTModel, ViTConfig

class VideoViTFeatureExtractor(nn.Module):
    def __init__(self, embed_dim=768, depth=12, num_heads=12, mlp_dim=3072, num_tokens=196, dropout=0.1):
        super().__init__()
        config = ViTConfig(
            hidden_size=embed_dim,
            num_hidden_layers=depth,
            num_attention_heads=num_heads,
            intermediate_size=mlp_dim,
            hidden_dropout_prob=dropout,
            attention_probs_dropout_prob=dropout,
            image_size=224,   # 더미 값
            patch_size=1,     # 더미 값
            num_channels=3,   # 더미 값
        )
        self.vit = ViTModel(config)
        # patch embedding을 bypass하기 위해 Identity로 변경
        self.vit.embeddings.patch_embeddings = nn.Identity()
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        num_total_tokens = num_tokens + 1  # CLS 포함
        self.vit.embeddings.position_embeddings = nn.Parameter(torch.zeros(1, num_total_tokens, embed_dim))
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        nn.init.trunc_normal_(self.vit.embeddings.position_embeddings, std=0.02)
        self.dropout = nn.Dropout(dropout)

    def forward(self, tokens):
        B, total_tokens, D = tokens.shape
        T = 18  # 또는 self.num_frames로 바꿔도 돼
        N = total_tokens // T

        cls_tokens = self.cls_token.expand(B, -1, -1)           # (B, 1, D)
        x = torch.cat((cls_tokens, tokens), dim=1)              # (B, 1 + T*N, D)
        x = x + self.vit.embeddings.position_embeddings[:, :x.size(1), :]
        x = self.dropout(x)

        encoder_outputs = self.vit.encoder(x, return_dict=True)
        x = self.vit.layernorm(encoder_outputs.last_hidden_state[:, 1:])  # [CLS] 제외 → (B, T*N, D)

        x = x.view(B, T, N, D)       # (B, T, N, D)
        frame_features = x.mean(dim=2)  # (B, T, D) ← 각 프레임의 평균
        return frame_features


In [14]:
# VideoFeatureExtractor: 전체 파이프라인 구성 (RGB 또는 Optical Flow)
class VideoFeatureExtractor(nn.Module):
    def __init__(self, in_chans=3, patch_size=16, embed_dim=768, num_frames=18, vit_depth=12, vit_heads=12, vit_mlp_dim=3072):
        super().__init__()
        self.video_embed = VideoEmbed(
            VideoPatchEmbed(patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim),
            embed_dim=embed_dim,
            num_frames=num_frames
        )
        num_patches_per_frame = (224 // patch_size) ** 2
        total_tokens = num_frames * num_patches_per_frame
        self.vit_extractor = VideoViTFeatureExtractor(
            embed_dim=embed_dim,
            depth=vit_depth,
            num_heads=vit_heads,
            mlp_dim=vit_mlp_dim,
            num_tokens=total_tokens,
            dropout=0.1
        )
    
    def forward(self, x):
        tokens = self.video_embed(x)
        features = self.vit_extractor(tokens)
        return features

##### 2. 이미지 로딩 및 전처리 함수

In [15]:
def load_video_frames(folder, modality='rgb', num_frames=18, target_size=(224,224)):
    """
    folder: 비디오 프레임들이 저장된 폴더 경로  
    modality: 'rgb'이면 .jpg, 'flow'이면 .png 파일 읽음  
    """
    if modality == 'rgb':
        ext = '*.jpg'
    elif modality == 'flow':
        ext = '*.png'
    else:
        raise ValueError("알 수 없는 modality입니다.")
    files = sorted(glob.glob(os.path.join(folder, ext)))
    files = files[:num_frames]
    transform = transforms.Compose([
        transforms.Resize(target_size),
        transforms.ToTensor(),
    ])
    frames = []
    for f in files:
        if modality == 'rgb':
            img = Image.open(f).convert('RGB')
            img = transform(img)
        else:
            img = Image.open(f).convert('L')
            img = img.convert('RGB')  # Optical Flow는 실제 데이터에 맞게 수정
            img = transform(img)
            img = img[:2, :, :]  # 앞의 2채널 사용
        frames.append(img)
    video = torch.stack(frames, dim=1)  # (C, T, H, W)
    video = video.unsqueeze(0)  # (1, C, T, H, W)
    return video

##### 3. 폴더 순회 및 여러 비디오에 대해 피처 추출

In [19]:
def process_videos(modality, root_dir, extractor, device, num_frames=18, target_size=(224,224), target_classes=None):
    results = {}
    for class_name in os.listdir(root_dir):
        if target_classes is not None and class_name not in target_classes:
            continue

        class_dir = os.path.join(root_dir, class_name)
        if not os.path.isdir(class_dir):
            continue
        frames_folder = os.path.join(class_dir, "18frames")
        if not os.path.isdir(frames_folder):
            print(f"'18frames' 폴더가 {class_dir}에 없습니다.")
            continue
        for video_name in os.listdir(frames_folder):
            video_dir = os.path.join(frames_folder, video_name)
            if not os.path.isdir(video_dir):
                continue
            video_id = video_name  # 폴더명 = video_id로 사용
            video = load_video_frames(video_dir, modality=modality, num_frames=num_frames, target_size=target_size).to(device)
            with torch.no_grad():
                features = extractor(video)
            key = (class_name, video_id)
            results[key] = features.squeeze(0).cpu().numpy()  # (18, D)
            print(f"처리 완료: {modality}/{class_name}/{video_id}, 피처 shape: {features.shape}")
    return results


##### 4. 메인 실행: 모델 생성 후 데이터 전체에 적용

In [None]:
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    rgb_extractor = VideoFeatureExtractor(in_chans=3).to(device)
    flow_extractor = VideoFeatureExtractor(in_chans=2).to(device)

    root_rgb = r"D:\RGB\training"
    root_flow = r"D:\OpticalFlow\training"

    target_classes = [
        "adult+female+singing", "adult+female+speaking", "adult+male+singing",
        "adult+male+speaking", "applauding", "ascending", "asking", "assembling",
        "autographing", "baking", "balancing", "barbecuing", "barking", "bending",
        "bicycling", "biting", "blowing", "boarding", "boating", "boiling"
    ]

    rgb_results = process_videos('rgb', root_rgb, rgb_extractor, device, target_classes=target_classes)
    flow_results = process_videos('flow', root_flow, flow_extractor, device, target_classes=target_classes)

    #다시 돌릴 때에는 외장하드 기준 경로 수정 필요
    np.save("rgb_training.npy", rgb_results)
    np.save("flow_training.npy", flow_results)

처리 완료: rgb/adult+female+singing/2cEKxGB6-YM_35, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/90Mk6DgOIAI_35, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/BJWOChJ5EKc_1, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/getty-cheerful-young-woman-singing-in-a-karaoke-video-id627354642_15, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/getty-close-up-of-group-of-friends-having-fun-at-karaoke-video-id825600946_2, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/getty-close-up-of-happy-couples-having-fun-at-karaoke-video-id825604494_3, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/getty-couple-sailing-in-mediterranean-sea-on-wooden-yacht-video-id545014254_4, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adult+female+singing/getty-factory-workers-singing-and-dancing-during-lunch-break-united-kingdom-video-idmr_00076507_35, 피처 shape: torch.Size([1, 18, 768])
처리 완료: rgb/adul

##### 4-1. 추출 데이터 검증 테스트

In [None]:
rgb_train = np.load("rgb_training.npy", allow_pickle=True).item()

for (class_name, video_id), feature in rgb_train.items():
    print(f"✅ 이 영상의 라벨은 → {class_name}")
    print(f"✅ 이 영상의 ID는 → {video_id}")
    print(f"✅ 피처 shape은 → {feature.shape}")  # (18, 768)


✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → 2cEKxGB6-YM_35
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → 90Mk6DgOIAI_35
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → BJWOChJ5EKc_1
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → getty-cheerful-young-woman-singing-in-a-karaoke-video-id627354642_15
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → getty-close-up-of-group-of-friends-having-fun-at-karaoke-video-id825600946_2
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → getty-close-up-of-happy-couples-having-fun-at-karaoke-video-id825604494_3
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → getty-couple-sailing-in-mediterranean-sea-on-wooden-yacht-video-id545014254_4
✅ 피처 shape은 → (18, 768)
✅ 이 영상의 라벨은 → adult+female+singing
✅ 이 영상의 ID는 → getty-factory-workers-singing-and-dancing-during-lunch-break-united-kingdom-video-idmr_00076507_35

##### 5. audio유효 데이터 필터링

In [29]:
import numpy as np
import csv

def filter_by_valid_audio(npy_path, csv_path, save_path):
    # 1. 유효 비디오 목록 로딩
    valid_keys = set()
    with open(csv_path, newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            video_id = row['video_id']
            class_name = row['class_name']
            valid_keys.add((class_name, video_id))

    print(f"🔎 유효한 오디오 키 수: {len(valid_keys)}")

    # 2. 원본 .npy 로딩
    full_data = np.load(npy_path, allow_pickle=True).item()
    print(f"📦 전체 데이터 수: {len(full_data)}")

    # 3. 필터링
    filtered = {k: v for k, v in full_data.items() if k in valid_keys}
    print(f"✅ 필터링 후 남은 수: {len(filtered)}")

    # 4. 새 파일로 저장
    np.save(save_path, filtered)
    print(f"💾 저장 완료: {save_path}")

In [31]:
csv_path = r'D:\Audio-Feature\18-audio-train.csv'

filter_by_valid_audio(r"D:\Video-Feature\training\18frames-feature\rgb_training.npy", csv_path, r"D:\Video-Feature\training\18frames-audio유효-feature\rgb_training_filtered.npy")
filter_by_valid_audio(r"D:\Video-Feature\training\18frames-feature\flow_training.npy", csv_path, r"D:\Video-Feature\training\18frames-audio유효-feature\flow_training_filtered.npy")

🔎 유효한 오디오 키 수: 12978
📦 전체 데이터 수: 24723
✅ 필터링 후 남은 수: 12978
💾 저장 완료: D:\Video-Feature\training\18frames-audio유효-feature\rgb_training_filtered.npy
🔎 유효한 오디오 키 수: 12978
📦 전체 데이터 수: 24723
✅ 필터링 후 남은 수: 12978
💾 저장 완료: D:\Video-Feature\training\18frames-audio유효-feature\flow_training_filtered.npy


In [32]:
from collections import defaultdict

def count_by_class(data_dict, name=""):
    class_counts = defaultdict(int)
    for (class_name, video_id) in data_dict:
        class_counts[class_name] += 1

    print(f"📊 {name} 총 샘플 수: {len(data_dict)}")
    for class_name in sorted(class_counts):
        print(f"  - {class_name}: {class_counts[class_name]}개")

In [33]:
rgb_filtered = r"D:\Video-Feature\training\18frames-audio유효-feature\rgb_training_filtered.npy"
optical_filtered = r"D:\Video-Feature\training\18frames-audio유효-feature\flow_training_filtered.npy"

rgb_data = np.load(rgb_filtered, allow_pickle=True).item()
flow_data = np.load(optical_filtered, allow_pickle=True).item()

count_by_class(rgb_data, "RGB")
count_by_class(flow_data, "Optical Flow")

📊 RGB 총 샘플 수: 12978
  - adult+female+singing: 1138개
  - adult+female+speaking: 1142개
  - adult+male+singing: 863개
  - adult+male+speaking: 1490개
  - applauding: 727개
  - ascending: 269개
  - asking: 618개
  - assembling: 864개
  - autographing: 965개
  - baking: 317개
  - balancing: 401개
  - barbecuing: 118개
  - barking: 753개
  - bending: 419개
  - bicycling: 587개
  - biting: 407개
  - blowing: 646개
  - boarding: 547개
  - boating: 592개
  - boiling: 115개
📊 Optical Flow 총 샘플 수: 12978
  - adult+female+singing: 1138개
  - adult+female+speaking: 1142개
  - adult+male+singing: 863개
  - adult+male+speaking: 1490개
  - applauding: 727개
  - ascending: 269개
  - asking: 618개
  - assembling: 864개
  - autographing: 965개
  - baking: 317개
  - balancing: 401개
  - barbecuing: 118개
  - barking: 753개
  - bending: 419개
  - bicycling: 587개
  - biting: 407개
  - blowing: 646개
  - boarding: 547개
  - boating: 592개
  - boiling: 115개


##### 6. train/validation/test 셋 분리 (클래스 별 균형)

In [36]:
import numpy as np
import random
from collections import defaultdict

def split_keys_by_class(data_dict, ratios=(0.7, 0.15, 0.15), seed=42):
    train_keys, val_keys, test_keys = [], [], []
    class_groups = defaultdict(list)

    for key in data_dict:
        class_name, video_id = key
        class_groups[class_name].append(key)

    for class_name, keys in class_groups.items():
        random.Random(seed).shuffle(keys)
        n = len(keys)
        n_train = int(n * ratios[0])
        n_val = int(n * ratios[1])
        n_test = n - n_train - n_val

        train_keys += keys[:n_train]
        val_keys += keys[n_train:n_train + n_val]
        test_keys += keys[n_train + n_val:]

    return train_keys, val_keys, test_keys

def check_split_ratio(train_dict, val_dict, test_dict):
    counter = defaultdict(lambda: [0, 0, 0])

    for cls, _ in train_dict: counter[cls][0] += 1
    for cls, _ in val_dict: counter[cls][1] += 1
    for cls, _ in test_dict: counter[cls][2] += 1

    print(f"{'Class':35s} | Train | Val | Test | Total | Train% | Val% | Test%")
    print("-"*85)
    for cls in sorted(counter):
        t, v, ts = counter[cls]
        total = t + v + ts
        print(f"{cls:35s} | {t:5d} | {v:3d} | {ts:4d} | {total:5d} | {t/total:6.2%} | {v/total:5.2%} | {ts/total:6.2%}")

def split_and_save_filtered(rgb_path, flow_path):
    print("🚀 필터된 데이터 로딩 중...")
    rgb_all = np.load(rgb_path, allow_pickle=True).item()
    flow_all = np.load(flow_path, allow_pickle=True).item()

    assert set(rgb_all.keys()) == set(flow_all.keys()), "❌ RGB / Flow 키 불일치!"

    # ✅ RGB 기준으로 split
    train_keys, val_keys, test_keys = split_keys_by_class(rgb_all)

    # 딕셔너리 구성
    rgb_train = {k: rgb_all[k] for k in train_keys}
    rgb_val   = {k: rgb_all[k] for k in val_keys}
    rgb_test  = {k: rgb_all[k] for k in test_keys}

    flow_train = {k: flow_all[k] for k in train_keys}
    flow_val   = {k: flow_all[k] for k in val_keys}
    flow_test  = {k: flow_all[k] for k in test_keys}

    # ✅ 저장 경로
    save_dir = r"D:\Video-Feature\training\18frames-audio유효-split-feature"
    
    np.save(os.path.join(save_dir, "rgb_filtered_train.npy"), rgb_train)
    np.save(os.path.join(save_dir, "rgb_filtered_val.npy"), rgb_val)
    np.save(os.path.join(save_dir, "rgb_filtered_test.npy"), rgb_test)

    np.save(os.path.join(save_dir, "flow_filtered_train.npy"), flow_train)
    np.save(os.path.join(save_dir, "flow_filtered_val.npy"), flow_val)
    np.save(os.path.join(save_dir, "flow_filtered_test.npy"), flow_test)

    print("✅ 저장 완료! 클래스 분포:")
    check_split_ratio(rgb_train, rgb_val, rgb_test)


In [37]:
split_and_save_filtered(rgb_filtered, optical_filtered)

🚀 필터된 데이터 로딩 중...
✅ 저장 완료! 클래스 분포:
Class                               | Train | Val | Test | Total | Train% | Val% | Test%
-------------------------------------------------------------------------------------
adult+female+singing                |   796 | 170 |  172 |  1138 | 69.95% | 14.94% | 15.11%
adult+female+speaking               |   799 | 171 |  172 |  1142 | 69.96% | 14.97% | 15.06%
adult+male+singing                  |   604 | 129 |  130 |   863 | 69.99% | 14.95% | 15.06%
adult+male+speaking                 |  1043 | 223 |  224 |  1490 | 70.00% | 14.97% | 15.03%
applauding                          |   508 | 109 |  110 |   727 | 69.88% | 14.99% | 15.13%
ascending                           |   188 |  40 |   41 |   269 | 69.89% | 14.87% | 15.24%
asking                              |   432 |  92 |   94 |   618 | 69.90% | 14.89% | 15.21%
assembling                          |   604 | 129 |  131 |   864 | 69.91% | 14.93% | 15.16%
autographing                        |   675 | 144 |  1